From 85dc0fd41b4a3c99be58ff47f970e79ee8072666 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 12 May 2016 17:46:15 -0700 Subject: [PATCH] change server API: "release" instead of "deallocate" --- src/wormhole/server/rendezvous.py | 40 ++++++++++----------- src/wormhole/server/rendezvous_websocket.py | 16 ++++----- src/wormhole/test/test_scripts.py | 4 +-- src/wormhole/test/test_server.py | 28 +++++++-------- src/wormhole/twisted/transcribe.py | 18 +++++----- 5 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 3401008..9f20b61 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -12,8 +12,8 @@ MB = 1000*1000 CHANNEL_EXPIRATION_TIME = 3*DAY EXPIRATION_CHECK_PERIOD = 2*HOUR -ALLOCATE = u"_allocate" -DEALLOCATE = u"_deallocate" +CLAIM = u"_claim" +RELEASE = u"_release" class Channel: def __init__(self, app, db, welcome, blur_usage, log_requests, @@ -38,7 +38,7 @@ class Channel: " WHERE `appid`=? AND `channelid`=?" " ORDER BY `server_rx` ASC", (self._appid, self._channelid)).fetchall(): - if row["phase"] in (u"_allocate", u"_deallocate"): + if row["phase"] in (CLAIM, RELEASE): continue messages.append({"phase": row["phase"], "body": row["body"], "server_rx": row["server_rx"], "id": row["msgid"]}) @@ -66,16 +66,16 @@ class Channel: server_rx, msgid)) db.commit() - def allocate(self, side): - self._add_message(side, ALLOCATE, None, time.time(), None) + def claim(self, side): + self._add_message(side, CLAIM, None, time.time(), None) def add_message(self, side, phase, body, server_rx, msgid): self._add_message(side, phase, body, server_rx, msgid) self.broadcast_message(phase, body, server_rx, msgid) return self.get_messages() # for rendezvous_web.py POST /add - def deallocate(self, side, mood): - self._add_message(side, DEALLOCATE, mood, time.time(), None) + def release(self, side, mood): + self._add_message(side, RELEASE, mood, time.time(), None) db = self._db seen = set([row["side"] for row in db.execute("SELECT `side` FROM `messages`" @@ -85,7 +85,7 @@ class Channel: db.execute("SELECT `side` FROM `messages`" " WHERE `appid`=? AND `channelid`=?" " AND `phase`=?", - (self._appid, self._channelid, DEALLOCATE))]) + (self._appid, self._channelid, RELEASE))]) if seen - freed: return False self.delete_and_summarize() @@ -127,7 +127,7 @@ class Channel: started = min([m["server_rx"] for m in messages]) # 'total_time' is how long the channel was occupied. That ends now, # both for channels that got pruned for inactivity, and for channels - # that got pruned because of two DEALLOCATE messages + # that got pruned because of two RELEASE messages total_time = delete_time - started if len(all_sides) == 1: @@ -147,9 +147,9 @@ class Channel: # now, were all sides closed? If not, this is "pruney" A_deallocs = [m for m in messages - if m["phase"] == DEALLOCATE and m["side"] == A_side] + if m["phase"] == RELEASE and m["side"] == A_side] B_deallocs = [m for m in messages - if m["phase"] == DEALLOCATE and m["side"] == B_side] + if m["phase"] == RELEASE and m["side"] == B_side] if not A_deallocs or not B_deallocs: return (started, "pruney", total_time, None) @@ -202,31 +202,31 @@ class AppNamespace: self._appid = appid self._channels = {} - def get_allocated(self): + def get_claimed(self): db = self._db c = db.execute("SELECT DISTINCT `channelid` FROM `messages`" " WHERE `appid`=?", (self._appid,)) return set([row["channelid"] for row in c.fetchall()]) def find_available_channelid(self): - allocated = self.get_allocated() + claimed = self.get_claimed() for size in range(1,4): # stick to 1-999 for now available = set() for cid in range(10**(size-1), 10**size): - if cid not in allocated: + if cid not in claimed: available.add(cid) if available: return random.choice(list(available)) - # ouch, 999 currently allocated. Try random ones for a while. + # ouch, 999 currently claimed. Try random ones for a while. for tries in range(1000): cid = random.randrange(1000, 1000*1000) - if cid not in allocated: + if cid not in claimed: return cid raise ValueError("unable to find a free channel-id") - def allocate_channel(self, channelid, side): + def claim_channel(self, channelid, side): channel = self.get_channel(channelid) - channel.allocate(side) + channel.claim(side) return channel def get_channel(self, channelid): @@ -248,7 +248,7 @@ class AppNamespace: self._channels.pop(channelid) if self._log_requests: log.msg("freed+killed #%d, now have %d DB channels, %d live" % - (channelid, len(self.get_allocated()), len(self._channels))) + (channelid, len(self.get_claimed()), len(self._channels))) def prune_old_channels(self): # For now, pruning is logged even if log_requests is False, to debug @@ -259,7 +259,7 @@ class AppNamespace: log.msg(" channel prune begins") # a channel is deleted when there are no listeners and there have # been no messages added in CHANNEL_EXPIRATION_TIME seconds - channels = set(self.get_allocated()) # these have messages + channels = set(self.get_claimed()) # these have messages channels.update(self._channels) # these might have listeners for channelid in channels: log.msg(" channel prune checking %d" % channelid) diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py index 6f21b44..3cecd12 100644 --- a/src/wormhole/server/rendezvous_websocket.py +++ b/src/wormhole/server/rendezvous_websocket.py @@ -97,8 +97,8 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): return self.handle_watch(self._channel, msg) if mtype == "add": return self.handle_add(self._channel, msg, server_rx) - if mtype == "deallocate": - return self.handle_deallocate(self._channel, msg) + if mtype == "release": + return self.handle_release(self._channel, msg) raise Error("Unknown type") except Error as e: @@ -126,14 +126,14 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): self._side = msg["side"] def handle_list(self): - channelids = sorted(self._app.get_allocated()) + channelids = sorted(self._app.get_claimed()) self.send("channelids", channelids=channelids) def handle_allocate(self): if self._channel: raise Error("Already bound to a channelid") channelid = self._app.find_available_channelid() - self._channel = self._app.allocate_channel(channelid, self._side) + self._channel = self._app.claim_channel(channelid, self._side) self.send("allocated", channelid=channelid) def handle_claim(self, msg): @@ -144,7 +144,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): old_cid = self._channel.get_channelid() if msg["channelid"] != old_cid: raise Error("Already bound to channelid %d" % old_cid) - self._channel = self._app.allocate_channel(msg["channelid"], self._side) + self._channel = self._app.claim_channel(msg["channelid"], self._side) def handle_watch(self, channel, msg): if self._watching: @@ -166,9 +166,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): channel.add_message(self._side, msg["phase"], msg["body"], server_rx, msgid) - def handle_deallocate(self, channel, msg): - deleted = channel.deallocate(self._side, msg.get("mood")) - self.send("deallocated", status="deleted" if deleted else "waiting") + def handle_release(self, channel, msg): + deleted = channel.release(self._side, msg.get("mood")) + self.send("released", status="deleted" if deleted else "waiting") def send(self, mtype, **kwargs): kwargs["type"] = mtype diff --git a/src/wormhole/test/test_scripts.py b/src/wormhole/test/test_scripts.py index b39f2c5..e70cc1a 100644 --- a/src/wormhole/test/test_scripts.py +++ b/src/wormhole/test/test_scripts.py @@ -453,7 +453,7 @@ class Cleanup(ServerBase, unittest.TestCase): yield send_d yield receive_d - cids = self._rendezvous.get_app(cmd_send.APPID).get_allocated() + cids = self._rendezvous.get_app(cmd_send.APPID).get_claimed() self.assertEqual(len(cids), 0) @inlineCallbacks @@ -482,6 +482,6 @@ class Cleanup(ServerBase, unittest.TestCase): yield self.assertFailure(send_d, WrongPasswordError) yield self.assertFailure(receive_d, WrongPasswordError) - cids = self._rendezvous.get_app(cmd_send.APPID).get_allocated() + cids = self._rendezvous.get_app(cmd_send.APPID).get_claimed() self.assertEqual(len(cids), 0) diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index c52bd50..8708dd1 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -210,7 +210,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase): yield c1.sync() self.assertEqual(list(self._rendezvous._apps.keys()), [u"appid"]) app = self._rendezvous.get_app(u"appid") - self.assertEqual(app.get_allocated(), set()) + self.assertEqual(app.get_claimed(), set()) c1.send(u"list") msg = yield c1.next_non_ack() self.assertEqual(msg["type"], u"channelids") @@ -221,7 +221,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(msg["type"], u"allocated") cid = msg["channelid"] self.failUnlessIsInstance(cid, int) - self.assertEqual(app.get_allocated(), set([cid])) + self.assertEqual(app.get_claimed(), set([cid])) channel = app.get_channel(cid) self.assertEqual(channel.get_messages(), []) @@ -230,11 +230,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["channelids"], [cid]) - c1.send(u"deallocate") + c1.send(u"release") msg = yield c1.next_non_ack() - self.assertEqual(msg["type"], u"deallocated") + self.assertEqual(msg["type"], u"released") self.assertEqual(msg["status"], u"deleted") - self.assertEqual(app.get_allocated(), set()) + self.assertEqual(app.get_claimed(), set()) c1.send(u"list") msg = yield c1.next_non_ack() @@ -249,13 +249,13 @@ class WebSocketAPI(ServerBase, unittest.TestCase): c1.send(u"bind", appid=u"appid", side=u"side") yield c1.sync() app = self._rendezvous.get_app(u"appid") - self.assertEqual(app.get_allocated(), set()) + self.assertEqual(app.get_claimed(), set()) c1.send(u"allocate") msg = yield c1.next_non_ack() self.assertEqual(msg["type"], u"allocated") cid = msg["channelid"] self.failUnlessIsInstance(cid, int) - self.assertEqual(app.get_allocated(), set([cid])) + self.assertEqual(app.get_claimed(), set([cid])) channel = app.get_channel(cid) self.assertEqual(channel.get_messages(), []) @@ -268,7 +268,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase): c2.send(u"add", phase="1", body="") yield c2.sync() - self.assertEqual(app.get_allocated(), set([cid])) + self.assertEqual(app.get_claimed(), set([cid])) self.assertEqual(strip_messages(channel.get_messages()), [{"phase": "1", "body": ""}]) @@ -282,14 +282,14 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["channelids"], [cid]) - c1.send(u"deallocate") + c1.send(u"release") msg = yield c1.next_non_ack() - self.assertEqual(msg["type"], u"deallocated") + self.assertEqual(msg["type"], u"released") self.assertEqual(msg["status"], u"waiting") - c2.send(u"deallocate") + c2.send(u"release") msg = yield c2.next_non_ack() - self.assertEqual(msg["type"], u"deallocated") + self.assertEqual(msg["type"], u"released") self.assertEqual(msg["status"], u"deleted") c2.send(u"list") @@ -420,8 +420,8 @@ class WebSocketAPI(ServerBase, unittest.TestCase): class Summary(unittest.TestCase): def test_summarize(self): c = rendezvous.Channel(None, None, None, None, False, None, None) - A = rendezvous.ALLOCATE - D = rendezvous.DEALLOCATE + A = rendezvous.CLAIM + D = rendezvous.RELEASE messages = [{"server_rx": 1, "side": "a", "phase": A}] self.failUnlessEqual(c._summarize(messages, 2), diff --git a/src/wormhole/twisted/transcribe.py b/src/wormhole/twisted/transcribe.py index 248d1b9..64d83c3 100644 --- a/src/wormhole/twisted/transcribe.py +++ b/src/wormhole/twisted/transcribe.py @@ -84,7 +84,7 @@ class _Wormhole: self._sleepers = [] self._confirmation_failed = False self._closed = False - self._deallocated_status = None + self._released_status = None self._timing_started = self._timing.add("wormhole") self._ws = None self._ws_t = None # timing Event @@ -535,7 +535,7 @@ class _Wormhole: raise TypeError(type(mood)) with self._timing.add("API close"): - yield self._deallocate(mood) + yield self._release(mood) # TODO: mark WebSocket as don't-reconnect self._ws.transport.loseConnection() # probably flushes del self._ws @@ -544,17 +544,17 @@ class _Wormhole: returnValue(f) @inlineCallbacks - def _deallocate(self, mood): - with self._timing.add("deallocate"): - yield self._ws_send(u"deallocate", mood=mood) - while self._deallocated_status is None: + def _release(self, mood): + with self._timing.add("release"): + yield self._ws_send(u"release", mood=mood) + while self._released_status is None: yield self._sleep(wake_on_error=False) # TODO: set a timeout, don't wait forever for an ack # TODO: if the connection is lost, let it go - returnValue(self._deallocated_status) + returnValue(self._released_status) - def _ws_handle_deallocated(self, msg): - self._deallocated_status = msg["status"] + def _ws_handle_released(self, msg): + self._released_status = msg["status"] self._wakeup() def wormhole(appid, relay_url, reactor, tor_manager=None, timing=None):