diff --git a/src/wormhole/server/db-schemas/v2.sql b/src/wormhole/server/db-schemas/v2.sql index a3a3485..83751ce 100644 --- a/src/wormhole/server/db-schemas/v2.sql +++ b/src/wormhole/server/db-schemas/v2.sql @@ -18,12 +18,14 @@ CREATE TABLE `nameplates` `mailbox_id` VARCHAR, -- really a foreign key `side1` VARCHAR, -- side name, or NULL `side2` VARCHAR, -- side name, or NULL + `crowded` BOOLEAN, -- at some point, three or more sides were involved + `updated` INTEGER, -- time of last activity, used for pruning -- timing data `started` INTEGER, -- time when nameplace was opened - `second` INTEGER, -- time when second side opened - `closed` INTEGER -- time when closed + `second` INTEGER -- time when second side opened ); CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `id`); +CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`); CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -- Clients exchange messages through a "mailbox", which has a long (randomly @@ -34,10 +36,11 @@ CREATE TABLE `mailboxes` `id` VARCHAR, `side1` VARCHAR -- side name, or NULL `side2` VARCHAR -- side name, or NULL + `crowded` BOOLEAN, -- at some point, three or more sides were involved + `first_mood` VARCHAR, -- timing data for the mailbox itself `started` INTEGER, -- time when opened - `second` INTEGER, -- time when second side opened - `closed` INTEGER -- time when closed + `second` INTEGER -- time when second side opened ); CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); @@ -55,20 +58,22 @@ CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); CREATE TABLE `nameplate_usage` ( + `app_id` VARCHAR, `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `result` VARCHAR, -- happy, lonely, pruney, crowded + `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None + `total_time` INTEGER, -- seconds from open to last close/prune + `result` VARCHAR -- happy, lonely, pruney, crowded -- nameplate moods: -- "happy": two sides open and close -- "lonely": one side opens and closes (no response from 2nd side) -- "pruney": channels which get pruned for inactivity -- "crowded": three or more sides were involved - `waiting_time` INTEGER -- seconds from start to 2nd side appearing, or None ); -CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`started`); +CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`); CREATE TABLE `mailbox_usage` ( + `app_id` VARCHAR, `started` INTEGER, -- seconds since epoch, rounded to "blur time" `total_time` INTEGER, -- seconds from open to last close `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None @@ -81,7 +86,7 @@ CREATE TABLE `mailbox_usage` -- "pruney": channels which get pruned for inactivity -- "crowded": three or more sides were involved ); -CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`started`); +CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`); CREATE TABLE `transit_usage` ( diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 6571f09..76a8fed 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -1,5 +1,6 @@ from __future__ import print_function -import time, random +import os, time, random, base64 +from collections import namedtuple from twisted.python import log from twisted.application import service, internet @@ -22,83 +23,78 @@ def make_sides(sides): def generate_mailbox_id(): return base64.b32encode(os.urandom(8)).lower().strip("=") -# Unlike Channels, these instances are ephemeral, and are created and -# destroyed casually. -class Nameplate: - def __init__(self, app_id, db, id, mailbox_id): - self._app_id = app_id - self._db = db - self._id = id - self._mailbox_id = mailbox_id - def get_id(self): - return self._id +SideResult = namedtuple("SideResult", ["changed", "empty", "side1", "side2"]) +Unchanged = SideResult(changed=False, empty=False, side1=None, side2=None) +class CrowdedError(Exception): + pass - def get_mailbox_id(self): - return self._mailbox_id +def add_side(row, new_side): + old_sides = [s for s in [row["side1"], row["side2"]] if s] + if new_side in old_sides: + return Unchanged + if len(old_sides) == 2: + raise CrowdedError("too many sides for this thing") + return SideResult(changed=True, empty=False, + side1=old_sides[0], side2=new_side) - def claim(self, side, when): - db = self._db - sides = get_sides(db.execute("SELECT `side1`, `side2` FROM `nameplates`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, self._id)).fetchone()) - old_sides = len(sides) - sides.add(side) - if len(sides) > 2: - # XXX: crowded: bail - pass - sides12 = make_sides(sides) - db.execute("UPDATE `nameplates` SET `side1`=?, `side2`=?" - " WHERE `app_id`=? AND `id`=?", - (sides12[0], sides12[1], self._app_id, self._id)) - if old_sides == 0: - db.execute("UPDATE `mailboxes` SET `nameplate_started`=?" - " WHERE `app_id`=? AND `id`=?", - (when, self._app_id, self._mailbox_id)) - else: - db.execute("UPDATE `mailboxes` SET `nameplate_second`=?" - " WHERE `app_id`=? AND `id`=?", - (when, self._app_id, self._mailbox_id)) - db.commit() +def remove_side(row, side): + old_sides = [s for s in [row["side1"], row["side2"]] if s] + if side not in old_sides: + return Unchanged + remaining_sides = old_sides[:] + remaining_sides.remove(side) + if remaining_sides: + return SideResult(changed=True, empty=False, side1=remaining_sides[0], + side2=None) + return SideResult(changed=True, empty=True, side1=None, side2=None) - def release(self, side, when): - db = self._db - sides = get_sides(db.execute("SELECT `side1`, `side2` FROM `nameplates`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, self._id)).fetchone()) - sides.discard(side) - sides12 = make_sides(sides) - db.execute("UPDATE `nameplates` SET `side1`=?, `side2`=?" - " WHERE `app_id`=? AND `id`=?", - (sides12[0], sides12[1], self._app_id, self._id)) - if len(sides) == 0: - db.execute("UPDATE `mailboxes` SET `nameplate_closed`=?" - " WHERE `app_id`=? AND `id`=?", - (when, self._app_id, self._mailbox_id)) - db.commit() +Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"]) +TransitUsage = namedtuple("TransitUsage", + ["started", "waiting_time", "total_time", + "total_bytes", "result"]) class Mailbox: - def __init__(self, app, db, blur_usage, log_requests, app_id, channelid): + def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id): self._app = app self._db = db self._blur_usage = blur_usage self._log_requests = log_requests self._app_id = app_id - self._channelid = channelid + self._mailbox_id = mailbox_id self._listeners = {} # handle -> (send_f, stop_f) # "handle" is a hashable object, for deregistration # send_f() takes a JSONable object, stop_f() has no args - def get_channelid(self): - return self._channelid + def open(self, side, when): + # requires caller to db.commit() + assert isinstance(side, type(u"")), type(side) + db = self._db + row = db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, self._mailbox_id)).fetchone() + try: + sr = add_side(row, side) + except CrowdedError: + db.execute("UPDATE `mailboxes` SET `crowded`=?" + " WHERE `app_id`=? AND `id`=?", + (True, self._app_id, self._mailbox_id)) + db.commit() + raise + if sr.changed: + db.execute("UPDATE `mailboxes` SET" + " `side1`=?, `side2`=?, `second`=?" + " WHERE `app_id`=? AND `id`=?", + (sr.side1, sr.side2, when, + self._app_id, self._mailbox_id)) def get_messages(self): messages = [] db = self._db for row in db.execute("SELECT * FROM `messages`" - " WHERE `app_id`=? AND `channelid`=?" + " WHERE `app_id`=? AND `mailbox_id`=?" " ORDER BY `server_rx` ASC", - (self._app_id, self._channelid)).fetchall(): + (self._app_id, self._mailbox_id)).fetchall(): if row["phase"] in (CLAIM, RELEASE): continue messages.append({"phase": row["phase"], "body": row["body"], @@ -120,45 +116,107 @@ class Mailbox: def _add_message(self, side, phase, body, server_rx, msgid): db = self._db db.execute("INSERT INTO `messages`" - " (`app_id`, `channelid`, `side`, `phase`, `body`," + " (`app_id`, `mailbox_id`, `side`, `phase`, `body`," " `server_rx`, `msgid`)" " VALUES (?,?,?,?,?, ?,?)", - (self._app_id, self._channelid, side, phase, body, + (self._app_id, self._mailbox_id, side, phase, body, server_rx, msgid)) db.commit() - def claim(self, side): - self._add_message(side, CLAIM, None, time.time(), None) - def add_message(self, side, phase, body, server_rx, msgid): self._add_message(side, phase, body, server_rx, msgid) self.broadcast_message(phase, body, server_rx, msgid) return self.get_messages() # for rendezvous_web.py POST /add - def release(self, side, mood): - self._add_message(side, RELEASE, mood, time.time(), None) + def close(self, side, mood, when): + assert isinstance(side, type(u"")), type(side) db = self._db - seen = set([row["side"] for row in - db.execute("SELECT `side` FROM `messages`" - " WHERE `app_id`=? AND `channelid`=?", - (self._app_id, self._channelid))]) - freed = set([row["side"] for row in - db.execute("SELECT `side` FROM `messages`" - " WHERE `app_id`=? AND `channelid`=?" - " AND `phase`=?", - (self._app_id, self._channelid, RELEASE))]) - if seen - freed: - return False - self.delete_and_summarize() - return True + row = db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, self._mailbox_id)).fetchone() + if not row: + return + sr = remove_side(row, side) + if sr.empty: + rows = db.execute("SELECT DISTINCT(side) FROM `messages`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, self._mailbox_id)).fetchall() + num_sides = len(rows) + self._summarize_and_store(row, num_sides, mood, when, pruned=False) + self._delete() + db.commit() + elif sr.changed: + db.execute("UPDATE `mailboxes`" + " SET `side1`=?, `side2`=?, `first_mood`=?" + " WHERE `app_id`=? AND `id`=?", + (sr.side1, sr.side2, mood, + self._app_id, self._mailbox_id)) + db.commit() + + def _delete(self): + # requires caller to db.commit() + self._db.execute("DELETE FROM `mailboxes`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, self._mailbox_id)) + self._db.execute("DELETE FROM `messages`" + " WHERE `app_id`=? AND `mailbox_id`=?", + (self._app_id, self._mailbox_id)) + + # Shut down any listeners, just in case they're still lingering + # around. + for (send_f, stop_f) in self._listeners.values(): + stop_f() + + self._app.free_mailbox(self._mailbox_id) + + def _summarize_and_store(self, row, num_sides, second_mood, delete_time, + pruned): + u = self._summarize(row, num_sides, second_mood, delete_time, pruned) + self._db.execute("INSERT INTO `mailbox_usage`" + " (`app_id`, " + " `started`, `total_time`, `waiting_time`, `result`)" + " VALUES (?, ?,?,?,?)", + (self._app_id, + u.started, u.total_time, u.waiting_time, u.result)) + + def _summarize(self, row, num_sides, second_mood, delete_time, pruned): + started = row["started"] + if self._blur_usage: + started = self._blur_usage * (started // self._blur_usage) + waiting_time = None + if row["second"]: + waiting_time = row["second"] - row["started"] + total_time = delete_time - row["started"] + + if len(num_sides) == 0: + result = u"quiet" + elif len(num_sides) == 1: + result = u"lonely" + else: + result = u"happy" + + moods = set([row["first_mood"], second_mood]) + if u"lonely" in moods: + result = u"lonely" + if u"errory" in moods: + result = u"errory" + if u"scary" in moods: + result = u"scary" + if pruned: + result = u"pruney" + if row["crowded"]: + result = u"crowded" + + return Usage(started=started, waiting_time=waiting_time, + total_time=total_time, result=result) def is_idle(self): if self._listeners: return False c = self._db.execute("SELECT `server_rx` FROM `messages`" - " WHERE `app_id`=? AND `channelid`=?" + " WHERE `app_id`=? AND `mailbox_id`=?" " ORDER BY `server_rx` DESC LIMIT 1", - (self._app_id, self._channelid)) + (self._app_id, self._mailbox_id)) rows = c.fetchall() if not rows: return True @@ -167,88 +225,6 @@ class Mailbox: return True return False - def _store_summary(self, summary): - (started, result, total_time, waiting_time) = summary - if self._blur_usage: - started = self._blur_usage * (started // self._blur_usage) - self._db.execute("INSERT INTO `usage`" - " (`type`, `started`, `result`," - " `total_time`, `waiting_time`)" - " VALUES (?,?,?, ?,?)", - (u"rendezvous", started, result, - total_time, waiting_time)) - self._db.commit() - - def _summarize(self, messages, delete_time): - all_sides = set([m["side"] for m in messages]) - if len(all_sides) == 0: - log.msg("_summarize was given zero messages") # shouldn't happen - return - - started = min([m["server_rx"] for m in messages]) - # 'total_time' is how long the channel was occupied. That ends now, - # both for channels that got pruned for inactivity, and for channels - # that got pruned because of two RELEASE messages - total_time = delete_time - started - - if len(all_sides) == 1: - return (started, "lonely", total_time, None) - if len(all_sides) > 2: - # TODO: it'll be useful to have more detail here - return (started, "crowded", total_time, None) - - # exactly two sides were involved - A_side = sorted(messages, key=lambda m: m["server_rx"])[0]["side"] - B_side = list(all_sides - set([A_side]))[0] - - # How long did the first side wait until the second side showed up? - first_A = min([m["server_rx"] for m in messages if m["side"] == A_side]) - first_B = min([m["server_rx"] for m in messages if m["side"] == B_side]) - waiting_time = first_B - first_A - - # now, were all sides closed? If not, this is "pruney" - A_deallocs = [m for m in messages - if m["phase"] == RELEASE and m["side"] == A_side] - B_deallocs = [m for m in messages - if m["phase"] == RELEASE and m["side"] == B_side] - if not A_deallocs or not B_deallocs: - return (started, "pruney", total_time, None) - - # ok, both sides closed. figure out the mood - A_mood = A_deallocs[0]["body"] # maybe None - B_mood = B_deallocs[0]["body"] # maybe None - mood = "quiet" - if A_mood == u"happy" and B_mood == u"happy": - mood = "happy" - if A_mood == u"lonely" or B_mood == u"lonely": - mood = "lonely" - if A_mood == u"errory" or B_mood == u"errory": - mood = "errory" - if A_mood == u"scary" or B_mood == u"scary": - mood = "scary" - return (started, mood, total_time, waiting_time) - - def delete_and_summarize(self): - db = self._db - c = self._db.execute("SELECT * FROM `messages`" - " WHERE `app_id`=? AND `channelid`=?" - " ORDER BY `server_rx`", - (self._app_id, self._channelid)) - messages = c.fetchall() - summary = self._summarize(messages, time.time()) - self._store_summary(summary) - db.execute("DELETE FROM `messages`" - " WHERE `app_id`=? AND `channelid`=?", - (self._app_id, self._channelid)) - db.commit() - - # Shut down any listeners, just in case they're still lingering - # around. - for (send_f, stop_f) in self._listeners.values(): - stop_f() - - self._app.free_channel(self._channelid) - def _shutdown(self): # used at test shutdown to accelerate client disconnects for (send_f, stop_f) in self._listeners.values(): @@ -261,7 +237,7 @@ class AppNamespace: self._blur_usage = blur_usage self._log_requests = log_requests self._app_id = app_id - self._channels = {} + self._mailboxes = {} def get_nameplate_ids(self): db = self._db @@ -315,15 +291,19 @@ class AppNamespace: (self._app_id, nameplate_id)).fetchone() if row: mailbox_id = row["mailbox_id"] - sides = [row["side1"], row["sides2"]] - if side not in sides: - if sides[0] and sides[1]: - raise XXXERROR("crowded") - sides[1] = side - db.execute("UPDATE `nameplates` SET " - "`side1`=?, `side2`=?, `mailbox_id`=?, `second`=?" + try: + sr = add_side(row, side) + except CrowdedError: + db.execute("UPDATE `nameplates` SET `crowded`=?" " WHERE `app_id`=? AND `id`=?", - (sides[0], sides[1], mailbox_id, when, + (True, self._app_id, nameplate_id)) + db.commit() + raise + if sr.changed: + db.execute("UPDATE `nameplates` SET" + " `side1`=?, `side2`=?, `updated`=?, `second`=?" + " WHERE `app_id`=? AND `id`=?", + (sr.side1, sr.side2, when, when, self._app_id, nameplate_id)) else: if self._log_requests: @@ -331,9 +311,11 @@ class AppNamespace: (nameplate_id, self._app_id)) mailbox_id = generate_mailbox_id() db.execute("INSERT INTO `nameplates`" - " (`app_id`, `id`, `mailbox_id`, `side1`, `started`)" - " VALUES(?,?,?,?,?)", - (self._app_id, nameplate_id, mailbox_id, side, when)) + " (`app_id`, `id`, `mailbox_id`, `side1`," + " `updated`, `started`)" + " VALUES(?,?,?,?, ?,?)", + (self._app_id, nameplate_id, mailbox_id, side, + when, when)) db.commit() return mailbox_id @@ -351,75 +333,120 @@ class AppNamespace: (self._app_id, nameplate_id)).fetchone() if not row: return - sides = get_sides(row) - if side not in sides: - return - sides.discard(side) - if sides: - s12 = make_sides(sides) - db.execute("UPDATE `nameplates` SET `side1`=?, `side2`=?" - " WHERE `app_id`=? AND `id`=?", - (s12[0], s12[1], self._app_id, nameplate_id)) - else: + sr = remove_side(row, side) + if sr.empty: db.execute("DELETE FROM `nameplates`" " WHERE `app_id`=? AND `id`=?", (self._app_id, nameplate_id)) - self._summarize_nameplate(row) + self._summarize_nameplate_and_store(row, when, pruned=False) + db.commit() + elif sr.changed: + db.execute("UPDATE `nameplates`" + " SET `side1`=?, `side2`=?, `updated`=?" + " WHERE `app_id`=? AND `id`=?", + (sr.side1, sr.side2, when, + self._app_id, nameplate_id)) + db.commit() - def open_mailbox(self, channelid, side): - assert isinstance(channelid, type(u"")), type(channelid) - channel = self.get_channel(channelid) - channel.claim(side) - return channel - # some of this overlaps with open() on a new mailbox - db.execute("INSERT INTO `mailboxes`" - " (`app_id`, `id`, `nameplate_started`, `started`)" - " VALUES(?,?,?,?)", - (self._app_id, mailbox_id, when, when)) + def _summarize_nameplate_and_store(self, row, delete_time, pruned): + # requires caller to db.commit() + u = self._summarize_nameplate_usage(row, delete_time, pruned) + self._db.execute("INSERT INTO `nameplate_usage`" + " (`app_id`," + " `started`, `total_time`, `waiting_time`, `result`)" + " VALUES (?, ?,?,?,?)", + (self._app_id, + u.started, u.total_time, u.waiting_time, u.result)) - def get_channel(self, channelid): - assert isinstance(channelid, type(u"")) - if not channelid in self._channels: + def _summarize_nameplate_usage(self, row, delete_time, pruned): + started = row["started"] + if self._blur_usage: + started = self._blur_usage * (started // self._blur_usage) + waiting_time = None + if row["second"]: + waiting_time = row["second"] - row["started"] + total_time = delete_time - row["started"] + result = u"lonely" + if pruned: + result = u"pruney" + if row["second"]: + result = u"happy" + if row["crowded"]: + result = u"crowded" + return Usage(started=started, waiting_time=waiting_time, + total_time=total_time, result=result) + + def _prune_nameplate(self, row, delete_time): + # requires caller to db.commit() + db = self._db + db.execute("DELETE FROM `nameplates` WHERE `app_id`=? AND `id`=?", + (self._app_id, row["id"])) + self._summarize_nameplate_and_store(row, delete_time, pruned=True) + # TODO: make a Nameplate object, keep track of when there's a + # websocket that's watching it, don't prune a nameplate that someone + # is watching, even if they started watching a long time ago + + def prune_nameplates(self, old): + db = self._db + for row in db.execute("SELECT * FROM `nameplates`" + " WHERE `updated` < ?", + (old,)).fetchall(): + self._prune_nameplate(row) + count = db.execute("SELECT COUNT(*) FROM `nameplates`").fetchone()[0] + return count + + def open_mailbox(self, mailbox_id, side, when): + assert isinstance(mailbox_id, type(u"")), type(mailbox_id) + db = self._db + if not mailbox_id in self._mailboxes: if self._log_requests: - log.msg("spawning #%s for app_id %s" % (channelid, self._app_id)) - self._channels[channelid] = Channel(self, self._db, - self._blur_usage, - self._log_requests, - self._app_id, channelid) - return self._channels[channelid] + log.msg("spawning #%s for app_id %s" % (mailbox_id, + self._app_id)) + db.execute("INSERT INTO `mailboxes`" + " (`app_id`, `id`, `started`)" + " VALUES(?,?,?)", + (self._app_id, mailbox_id, when)) + self._mailboxes[mailbox_id] = Mailbox(self, self._db, + self._blur_usage, + self._log_requests, + self._app_id, mailbox_id) + mailbox = self._mailboxes[mailbox_id] + mailbox.open(side) + db.commit() + return mailbox - def free_channel(self, channelid): - # called from Channel.delete_and_summarize(), which deletes any + def free_mailbox(self, mailbox_id): + # called from Mailbox.delete_and_summarize(), which deletes any # messages - if channelid in self._channels: - self._channels.pop(channelid) + if mailbox_id in self._mailboxes: + self._mailboxes.pop(mailbox_id) if self._log_requests: - log.msg("freed+killed #%s, now have %d DB channels, %d live" % - (channelid, len(self.get_claimed()), len(self._channels))) + log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % + (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) - def prune_old_channels(self): + def prune_mailboxes(self, old): # For now, pruning is logged even if log_requests is False, to debug # the pruning process, and since pruning is triggered by a timer - # instead of by user action. It does reveal which channels were + # instead of by user action. It does reveal which mailboxes were # present when the pruning process began, though, so in the log run # it should do less logging. log.msg(" channel prune begins") # a channel is deleted when there are no listeners and there have # been no messages added in CHANNEL_EXPIRATION_TIME seconds - channels = set(self.get_claimed()) # these have messages - channels.update(self._channels) # these might have listeners - for channelid in channels: - log.msg(" channel prune checking %d" % channelid) - channel = self.get_channel(channelid) + mailboxes = set(self.get_claimed()) # these have messages + mailboxes.update(self._mailboxes) # these might have listeners + for mailbox_id in mailboxes: + log.msg(" channel prune checking %d" % mailbox_id) + channel = self.get_channel(mailbox_id) if channel.is_idle(): - log.msg(" channel prune expiring %d" % channelid) + log.msg(" channel prune expiring %d" % mailbox_id) channel.delete_and_summarize() # calls self.free_channel - log.msg(" channel prune done, %r left" % (self._channels.keys(),)) - return bool(self._channels) + log.msg(" channel prune done, %r left" % (self._mailboxes.keys(),)) + return bool(self._mailboxes) def _shutdown(self): - for channel in self._channels.values(): + for channel in self._mailboxes.values(): channel._shutdown() class Rendezvous(service.MultiService): @@ -427,7 +454,7 @@ class Rendezvous(service.MultiService): service.MultiService.__init__(self) self._db = db self._welcome = welcome - self._blur_usage = blur_usage + self._blur_usage = None log_requests = blur_usage is None self._log_requests = log_requests self._apps = {} @@ -449,15 +476,18 @@ class Rendezvous(service.MultiService): self._log_requests, app_id) return self._apps[app_id] - def prune(self): - # As with AppNamespace.prune_old_channels, we log for now. + def prune(self, old=None): + # As with AppNamespace.prune_old_mailboxes, we log for now. log.msg("beginning app prune") + if old is None: + old = time.time() - CHANNEL_EXPIRATION_TIME c = self._db.execute("SELECT DISTINCT `app_id` FROM `messages`") apps = set([row["app_id"] for row in c.fetchall()]) # these have messages apps.update(self._apps) # these might have listeners for app_id in apps: log.msg(" app prune checking %r" % (app_id,)) - still_active = self.get_app(app_id).prune_old_channels() + app = self.get_app(app_id) + still_active = app.prune_nameplates(old) + app.prune_mailboxes(old) if not still_active: log.msg("prune pops app %r" % (app_id,)) self._apps.pop(app_id) diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py index 612b1f3..0f6d5d2 100644 --- a/src/wormhole/server/rendezvous_websocket.py +++ b/src/wormhole/server/rendezvous_websocket.py @@ -2,6 +2,7 @@ import json, time from twisted.internet import reactor from twisted.python import log from autobahn.twisted import websocket +from .rendezvous import CrowdedError # The WebSocket allows the client to send "commands" to the server, and the # server to send "responses" to the client. Note that commands and responses @@ -84,7 +85,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): self._app = None self._side = None self._did_allocate = False # only one allocate() per websocket - self._channels = {} # channel-id -> Channel (claimed) + self._mailbox = None def onConnect(self, request): rv = self.factory.rendezvous @@ -122,11 +123,11 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): return self.handle_release(msg, server_rx) if mtype == "open": - return self.handle_open(msg) + return self.handle_open(msg, server_rx) if mtype == "add": return self.handle_add(msg, server_rx) if mtype == "close": - return self.handle_close(msg) + return self.handle_close(msg, server_rx) raise Error("Unknown type") except Error as e: @@ -154,7 +155,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): def handle_allocate(self, server_rx): if self._did_allocate: - raise Error("You already allocated one channel, don't be greedy") + raise Error("You already allocated one mailbox, don't be greedy") nameplate_id = self._app.allocate_nameplate(self._side, server_rx) assert isinstance(nameplate_id, type(u"")) self._did_allocate = True @@ -164,56 +165,52 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): if "nameplate" not in msg: raise Error("claim requires 'nameplate'") nameplate_id = msg["nameplate"] + assert isinstance(nameplate_id, type(u"")), type(nameplate_id) self._nameplate_id = nameplate_id - assert isinstance(nameplate_id, type(u"")), type(nameplate) - mailbox_id = self._app.claim_nameplate(nameplate_id, self._side, - server_rx) + try: + mailbox_id = self._app.claim_nameplate(nameplate_id, self._side, + server_rx) + except CrowdedError: + raise Error("crowded") self.send("mailbox", mailbox=mailbox_id) def handle_release(self, server_rx): if not self._nameplate_id: raise Error("must claim a nameplate before releasing it") - - deleted = self._app.release_nameplate(self._nameplate_id, - self._side, server_rx) + self._app.release_nameplate(self._nameplate_id, self._side, server_rx) self._nameplate_id = None - def handle_open(self, msg): - channelid = msg["channelid"] - if channelid not in self._channels: - raise Error("must claim channel before watching") - assert isinstance(channelid, type(u"")) - channel = self._channels[channelid] + def handle_open(self, msg, server_rx): + if self._mailbox: + raise Error("you already have a mailbox open") + mailbox_id = msg["mailbox_id"] + assert isinstance(mailbox_id, type(u"")) + self._mailbox = self._app.open_mailbox(mailbox_id, self._side, + server_rx) def _send(event): - self.send("message", channelid=channelid, message=event) + self.send("message", message=event) def _stop(): self._reactor.callLater(0, self.transport.loseConnection) - for old_message in channel.add_listener(self, _send, _stop): + for old_message in self._mailbox.add_listener(self, _send, _stop): _send(old_message) def handle_add(self, msg, server_rx): - channelid = msg["channelid"] - if channelid not in self._channels: - raise Error("must claim channel before adding") - assert isinstance(channelid, type(u"")) - channel = self._channels[channelid] + if not self._mailbox: + raise Error("must open mailbox before adding") if "phase" not in msg: raise Error("missing 'phase'") if "body" not in msg: raise Error("missing 'body'") msgid = msg.get("id") # optional - channel.add_message(self._side, msg["phase"], msg["body"], - server_rx, msgid) + self._mailbox.add_message(self._side, msg["phase"], msg["body"], + server_rx, msgid) - def handle_close(self, msg): - channelid = msg["channelid"] - if channelid not in self._channels: - raise Error("must claim channel before releasing") - assert isinstance(channelid, type(u"")) - channel = self._channels[channelid] - deleted = channel.release(self._side, msg.get("mood")) - del self._channels[channelid] + def handle_close(self, msg, server_rx): + if not self._mailbox: + raise Error("must open mailbox before closing") + deleted = self._mailbox.close(self._side, msg.get("mood"), server_rx) + self._mailbox = None self.send("released", status="deleted" if deleted else "waiting") def send(self, mtype, **kwargs):