change server API: "release" instead of "deallocate"

This commit is contained in:
Brian Warner 2016-05-12 17:46:15 -07:00
parent 2c2cf29564
commit 85dc0fd41b
5 changed files with 53 additions and 53 deletions

View File

@ -12,8 +12,8 @@ MB = 1000*1000
CHANNEL_EXPIRATION_TIME = 3*DAY
EXPIRATION_CHECK_PERIOD = 2*HOUR
ALLOCATE = u"_allocate"
DEALLOCATE = u"_deallocate"
CLAIM = u"_claim"
RELEASE = u"_release"
class Channel:
def __init__(self, app, db, welcome, blur_usage, log_requests,
@ -38,7 +38,7 @@ class Channel:
" WHERE `appid`=? AND `channelid`=?"
" ORDER BY `server_rx` ASC",
(self._appid, self._channelid)).fetchall():
if row["phase"] in (u"_allocate", u"_deallocate"):
if row["phase"] in (CLAIM, RELEASE):
continue
messages.append({"phase": row["phase"], "body": row["body"],
"server_rx": row["server_rx"], "id": row["msgid"]})
@ -66,16 +66,16 @@ class Channel:
server_rx, msgid))
db.commit()
def allocate(self, side):
self._add_message(side, ALLOCATE, None, time.time(), None)
def claim(self, side):
self._add_message(side, CLAIM, None, time.time(), None)
def add_message(self, side, phase, body, server_rx, msgid):
self._add_message(side, phase, body, server_rx, msgid)
self.broadcast_message(phase, body, server_rx, msgid)
return self.get_messages() # for rendezvous_web.py POST /add
def deallocate(self, side, mood):
self._add_message(side, DEALLOCATE, mood, time.time(), None)
def release(self, side, mood):
self._add_message(side, RELEASE, mood, time.time(), None)
db = self._db
seen = set([row["side"] for row in
db.execute("SELECT `side` FROM `messages`"
@ -85,7 +85,7 @@ class Channel:
db.execute("SELECT `side` FROM `messages`"
" WHERE `appid`=? AND `channelid`=?"
" AND `phase`=?",
(self._appid, self._channelid, DEALLOCATE))])
(self._appid, self._channelid, RELEASE))])
if seen - freed:
return False
self.delete_and_summarize()
@ -127,7 +127,7 @@ class Channel:
started = min([m["server_rx"] for m in messages])
# 'total_time' is how long the channel was occupied. That ends now,
# both for channels that got pruned for inactivity, and for channels
# that got pruned because of two DEALLOCATE messages
# that got pruned because of two RELEASE messages
total_time = delete_time - started
if len(all_sides) == 1:
@ -147,9 +147,9 @@ class Channel:
# now, were all sides closed? If not, this is "pruney"
A_deallocs = [m for m in messages
if m["phase"] == DEALLOCATE and m["side"] == A_side]
if m["phase"] == RELEASE and m["side"] == A_side]
B_deallocs = [m for m in messages
if m["phase"] == DEALLOCATE and m["side"] == B_side]
if m["phase"] == RELEASE and m["side"] == B_side]
if not A_deallocs or not B_deallocs:
return (started, "pruney", total_time, None)
@ -202,31 +202,31 @@ class AppNamespace:
self._appid = appid
self._channels = {}
def get_allocated(self):
def get_claimed(self):
db = self._db
c = db.execute("SELECT DISTINCT `channelid` FROM `messages`"
" WHERE `appid`=?", (self._appid,))
return set([row["channelid"] for row in c.fetchall()])
def find_available_channelid(self):
allocated = self.get_allocated()
claimed = self.get_claimed()
for size in range(1,4): # stick to 1-999 for now
available = set()
for cid in range(10**(size-1), 10**size):
if cid not in allocated:
if cid not in claimed:
available.add(cid)
if available:
return random.choice(list(available))
# ouch, 999 currently allocated. Try random ones for a while.
# ouch, 999 currently claimed. Try random ones for a while.
for tries in range(1000):
cid = random.randrange(1000, 1000*1000)
if cid not in allocated:
if cid not in claimed:
return cid
raise ValueError("unable to find a free channel-id")
def allocate_channel(self, channelid, side):
def claim_channel(self, channelid, side):
channel = self.get_channel(channelid)
channel.allocate(side)
channel.claim(side)
return channel
def get_channel(self, channelid):
@ -248,7 +248,7 @@ class AppNamespace:
self._channels.pop(channelid)
if self._log_requests:
log.msg("freed+killed #%d, now have %d DB channels, %d live" %
(channelid, len(self.get_allocated()), len(self._channels)))
(channelid, len(self.get_claimed()), len(self._channels)))
def prune_old_channels(self):
# For now, pruning is logged even if log_requests is False, to debug
@ -259,7 +259,7 @@ class AppNamespace:
log.msg(" channel prune begins")
# a channel is deleted when there are no listeners and there have
# been no messages added in CHANNEL_EXPIRATION_TIME seconds
channels = set(self.get_allocated()) # these have messages
channels = set(self.get_claimed()) # these have messages
channels.update(self._channels) # these might have listeners
for channelid in channels:
log.msg(" channel prune checking %d" % channelid)

View File

@ -97,8 +97,8 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
return self.handle_watch(self._channel, msg)
if mtype == "add":
return self.handle_add(self._channel, msg, server_rx)
if mtype == "deallocate":
return self.handle_deallocate(self._channel, msg)
if mtype == "release":
return self.handle_release(self._channel, msg)
raise Error("Unknown type")
except Error as e:
@ -126,14 +126,14 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
self._side = msg["side"]
def handle_list(self):
channelids = sorted(self._app.get_allocated())
channelids = sorted(self._app.get_claimed())
self.send("channelids", channelids=channelids)
def handle_allocate(self):
if self._channel:
raise Error("Already bound to a channelid")
channelid = self._app.find_available_channelid()
self._channel = self._app.allocate_channel(channelid, self._side)
self._channel = self._app.claim_channel(channelid, self._side)
self.send("allocated", channelid=channelid)
def handle_claim(self, msg):
@ -144,7 +144,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
old_cid = self._channel.get_channelid()
if msg["channelid"] != old_cid:
raise Error("Already bound to channelid %d" % old_cid)
self._channel = self._app.allocate_channel(msg["channelid"], self._side)
self._channel = self._app.claim_channel(msg["channelid"], self._side)
def handle_watch(self, channel, msg):
if self._watching:
@ -166,9 +166,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
channel.add_message(self._side, msg["phase"], msg["body"],
server_rx, msgid)
def handle_deallocate(self, channel, msg):
deleted = channel.deallocate(self._side, msg.get("mood"))
self.send("deallocated", status="deleted" if deleted else "waiting")
def handle_release(self, channel, msg):
deleted = channel.release(self._side, msg.get("mood"))
self.send("released", status="deleted" if deleted else "waiting")
def send(self, mtype, **kwargs):
kwargs["type"] = mtype

View File

@ -453,7 +453,7 @@ class Cleanup(ServerBase, unittest.TestCase):
yield send_d
yield receive_d
cids = self._rendezvous.get_app(cmd_send.APPID).get_allocated()
cids = self._rendezvous.get_app(cmd_send.APPID).get_claimed()
self.assertEqual(len(cids), 0)
@inlineCallbacks
@ -482,6 +482,6 @@ class Cleanup(ServerBase, unittest.TestCase):
yield self.assertFailure(send_d, WrongPasswordError)
yield self.assertFailure(receive_d, WrongPasswordError)
cids = self._rendezvous.get_app(cmd_send.APPID).get_allocated()
cids = self._rendezvous.get_app(cmd_send.APPID).get_claimed()
self.assertEqual(len(cids), 0)

View File

@ -210,7 +210,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
yield c1.sync()
self.assertEqual(list(self._rendezvous._apps.keys()), [u"appid"])
app = self._rendezvous.get_app(u"appid")
self.assertEqual(app.get_allocated(), set())
self.assertEqual(app.get_claimed(), set())
c1.send(u"list")
msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"channelids")
@ -221,7 +221,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"]
self.failUnlessIsInstance(cid, int)
self.assertEqual(app.get_allocated(), set([cid]))
self.assertEqual(app.get_claimed(), set([cid]))
channel = app.get_channel(cid)
self.assertEqual(channel.get_messages(), [])
@ -230,11 +230,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], [cid])
c1.send(u"deallocate")
c1.send(u"release")
msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"deallocated")
self.assertEqual(msg["type"], u"released")
self.assertEqual(msg["status"], u"deleted")
self.assertEqual(app.get_allocated(), set())
self.assertEqual(app.get_claimed(), set())
c1.send(u"list")
msg = yield c1.next_non_ack()
@ -249,13 +249,13 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send(u"bind", appid=u"appid", side=u"side")
yield c1.sync()
app = self._rendezvous.get_app(u"appid")
self.assertEqual(app.get_allocated(), set())
self.assertEqual(app.get_claimed(), set())
c1.send(u"allocate")
msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"]
self.failUnlessIsInstance(cid, int)
self.assertEqual(app.get_allocated(), set([cid]))
self.assertEqual(app.get_claimed(), set([cid]))
channel = app.get_channel(cid)
self.assertEqual(channel.get_messages(), [])
@ -268,7 +268,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c2.send(u"add", phase="1", body="")
yield c2.sync()
self.assertEqual(app.get_allocated(), set([cid]))
self.assertEqual(app.get_claimed(), set([cid]))
self.assertEqual(strip_messages(channel.get_messages()),
[{"phase": "1", "body": ""}])
@ -282,14 +282,14 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], [cid])
c1.send(u"deallocate")
c1.send(u"release")
msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"deallocated")
self.assertEqual(msg["type"], u"released")
self.assertEqual(msg["status"], u"waiting")
c2.send(u"deallocate")
c2.send(u"release")
msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], u"deallocated")
self.assertEqual(msg["type"], u"released")
self.assertEqual(msg["status"], u"deleted")
c2.send(u"list")
@ -420,8 +420,8 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
class Summary(unittest.TestCase):
def test_summarize(self):
c = rendezvous.Channel(None, None, None, None, False, None, None)
A = rendezvous.ALLOCATE
D = rendezvous.DEALLOCATE
A = rendezvous.CLAIM
D = rendezvous.RELEASE
messages = [{"server_rx": 1, "side": "a", "phase": A}]
self.failUnlessEqual(c._summarize(messages, 2),

View File

@ -84,7 +84,7 @@ class _Wormhole:
self._sleepers = []
self._confirmation_failed = False
self._closed = False
self._deallocated_status = None
self._released_status = None
self._timing_started = self._timing.add("wormhole")
self._ws = None
self._ws_t = None # timing Event
@ -535,7 +535,7 @@ class _Wormhole:
raise TypeError(type(mood))
with self._timing.add("API close"):
yield self._deallocate(mood)
yield self._release(mood)
# TODO: mark WebSocket as don't-reconnect
self._ws.transport.loseConnection() # probably flushes
del self._ws
@ -544,17 +544,17 @@ class _Wormhole:
returnValue(f)
@inlineCallbacks
def _deallocate(self, mood):
with self._timing.add("deallocate"):
yield self._ws_send(u"deallocate", mood=mood)
while self._deallocated_status is None:
def _release(self, mood):
with self._timing.add("release"):
yield self._ws_send(u"release", mood=mood)
while self._released_status is None:
yield self._sleep(wake_on_error=False)
# TODO: set a timeout, don't wait forever for an ack
# TODO: if the connection is lost, let it go
returnValue(self._deallocated_status)
returnValue(self._released_status)
def _ws_handle_deallocated(self, msg):
self._deallocated_status = msg["status"]
def _ws_handle_released(self, msg):
self._released_status = msg["status"]
self._wakeup()
def wormhole(appid, relay_url, reactor, tor_manager=None, timing=None):