From 26c7008c23f6a28e966c28f030db215e9a291945 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 13 Nov 2015 18:22:37 -0800 Subject: [PATCH] DB: use 'messages' to track allocations, not 'allocations' This removes the 'allocations' table entirely, and cleans up the way we prune old messages. This should make it easier to summarize each connection (for usage stats) when it gets deallocated, as well as making pruning more reliable. --- src/wormhole/db-schemas/v1.sql | 9 +- src/wormhole/servers/relay_server.py | 147 ++++++++++++++++++--------- 2 files changed, 102 insertions(+), 54 deletions(-) diff --git a/src/wormhole/db-schemas/v1.sql b/src/wormhole/db-schemas/v1.sql index 9f7872d..0fa51c3 100644 --- a/src/wormhole/db-schemas/v1.sql +++ b/src/wormhole/db-schemas/v1.sql @@ -13,15 +13,8 @@ CREATE TABLE `messages` `channelid` INTEGER, `side` VARCHAR, `phase` VARCHAR, -- not numeric, more of a PAKE-phase indicator string + -- phase="_allocate" and "_deallocate" are used internally `body` VARCHAR, `when` INTEGER ); CREATE INDEX `messages_idx` ON `messages` (`appid`, `channelid`); - -CREATE TABLE `allocations` -( - `appid` VARCHAR, - `channelid` INTEGER, - `side` VARCHAR -); -CREATE INDEX `allocations_idx` ON `allocations` (`channelid`); diff --git a/src/wormhole/servers/relay_server.py b/src/wormhole/servers/relay_server.py index 747320c..4f6fe73 100644 --- a/src/wormhole/servers/relay_server.py +++ b/src/wormhole/servers/relay_server.py @@ -13,6 +13,9 @@ MB = 1000*1000 CHANNEL_EXPIRATION_TIME = 3*DAY EXPIRATION_CHECK_PERIOD = 2*HOUR +ALLOCATE = u"_allocate" +DEALLOCATE = u"_deallocate" + def json_response(request, data): request.setHeader(b"content-type", b"application/json; charset=utf-8") return (json.dumps(data)+"\n").encode("utf-8") @@ -167,9 +170,14 @@ class Deallocator(RelayResource): appid = data["appid"] channelid = int(data["channelid"]) side = data["side"] + if not isinstance(side, type(u"")): + raise TypeError("side must be string, not '%s'" % type(side)) + mood = data.get("mood") #print("DEALLOCATE", appid, channelid, side) + app = self._relay.get_app(appid) - deleted = app.maybe_free_child(channelid, side) + channel = app.get_channel(channelid) + deleted = channel.deallocate(side, mood) response = {"status": "waiting"} if deleted: response = {"status": "deleted"} @@ -193,6 +201,8 @@ class Channel: " WHERE `appid`=? AND `channelid`=?" " ORDER BY `when` ASC", (self._appid, self._channelid)).fetchall(): + if row["phase"] in (u"_allocate", u"_deallocate"): + continue messages.append({"phase": row["phase"], "body": row["body"]}) data = {"welcome": self._welcome, "messages": messages} return data @@ -204,6 +214,8 @@ class Channel: " WHERE `appid`=? AND `channelid`=?" " ORDER BY `when` ASC", (self._appid, self._channelid)).fetchall(): + if row["phase"] in (u"_allocate", u"_deallocate"): + continue yield json.dumps({"phase": row["phase"], "body": row["body"]}) def remove_listener(self, listener): self._listeners.discard(listener) @@ -213,21 +225,76 @@ class Channel: for listener in self._listeners: listener(data) - def add_message(self, side, phase, body): + def _add_message(self, side, phase, body): db = self._db db.execute("INSERT INTO `messages`" " (`appid`, `channelid`, `side`, `phase`, `body`, `when`)" " VALUES (?,?,?,?, ?,?)", (self._appid, self._channelid, side, phase, body, time.time())) - db.execute("INSERT INTO `allocations`" - " (`appid`, `channelid`, `side`)" - " VALUES (?,?,?)", - (self._appid, self._channelid, side)) db.commit() + + def allocate(self, side): + self._add_message(side, ALLOCATE, None) + + def add_message(self, side, phase, body): + self._add_message(side, phase, body) self.broadcast_message(phase, body) return self.get_messages() + def deallocate(self, side, mood): + self._add_message(side, DEALLOCATE, mood) + db = self._db + seen = set([row["side"] for row in + db.execute("SELECT `side` FROM `messages`" + " WHERE `appid`=? AND `channelid`=?", + (self._appid, self._channelid))]) + freed = set([row["side"] for row in + db.execute("SELECT `side` FROM `messages`" + " WHERE `appid`=? AND `channelid`=?" + " AND `phase`=?", + (self._appid, self._channelid, DEALLOCATE))]) + if seen - freed: + return False + self.delete_and_summarize() + return True + + def is_idle(self): + if self._listeners: + return False + c = self._db.execute("SELECT `when` FROM `messages`" + " WHERE `appid`=? AND `channelid`=?" + " ORDER BY `when` DESC LIMIT 1", + (self._appid, self._channelid)) + rows = c.fetchall() + if not rows: + return True + old = time.time() - CHANNEL_EXPIRATION_TIME + if rows[0]["when"] < old: + return True + return False + + def delete_and_summarize(self): + # TODO: summarize usage, write into DB + db = self._db + db.execute("DELETE FROM `messages`" + " WHERE `appid`=? AND `channelid`=?", + (self._appid, self._channelid)) + db.commit() + + # It'd be nice to shut down any EventSource listeners here. But we + # don't hang on to the EventsProtocol, so we can't really shut it + # down here: any listeners will stick around until they shut down + # from the client side. That will keep the Channel object in memory, + # but it won't be reachable from the AppNamespace, so no further + # messages will be sent to it. Eventually, when they close the TCP + # connection, self.remove_listener() will be called, ep.sendEvent + # will be removed from self._listeners, breaking the circular + # reference, and everything will get freed. + + self._app.free_channel(self._channelid) + + class AppNamespace: def __init__(self, db, welcome, appid): self._db = db @@ -237,7 +304,7 @@ class AppNamespace: def get_allocated(self): db = self._db - c = db.execute("SELECT DISTINCT `channelid` FROM `allocations`" + c = db.execute("SELECT DISTINCT `channelid` FROM `messages`" " WHERE `appid`=?", (self._appid,)) return set([row["channelid"] for row in c.fetchall()]) @@ -258,10 +325,9 @@ class AppNamespace: raise ValueError("unable to find a free channel-id") def allocate_channel(self, channelid, side): - db = self._db - db.execute("INSERT INTO `allocations` VALUES (?,?,?)", - (self._appid, channelid, side)) - db.commit() + channel = self.get_channel(channelid) + channel.allocate(side) + return channel def get_channel(self, channelid): assert isinstance(channelid, int) @@ -271,46 +337,28 @@ class AppNamespace: self._appid, channelid) return self._channels[channelid] - def maybe_free_child(self, channelid, side): - db = self._db - db.execute("DELETE FROM `allocations`" - " WHERE `appid`=? AND `channelid`=? AND `side`=?", - (self._appid, channelid, side)) - db.commit() - remaining = db.execute("SELECT COUNT(*) FROM `allocations`" - " WHERE `appid`=? AND `channelid`=?", - (self._appid, channelid)).fetchone()[0] - if remaining: - return False - self._free_child(channelid) - return True + def free_channel(self, channelid): + # called from Channel.delete_and_summarize(), which deletes any + # messages - def _free_child(self, channelid): - db = self._db - db.execute("DELETE FROM `allocations`" - " WHERE `appid`=? AND `channelid`=?", - (self._appid, channelid)) - db.execute("DELETE FROM `messages`" - " WHERE `appid`=? AND `channelid`=?", - (self._appid, channelid)) - db.commit() if channelid in self._channels: self._channels.pop(channelid) log.msg("freed+killed #%d, now have %d DB channels, %d live" % (channelid, len(self.get_allocated()), len(self._channels))) def prune_old_channels(self): - db = self._db - old = time.time() - CHANNEL_EXPIRATION_TIME - for channelid in self.get_allocated(): - c = db.execute("SELECT `when` FROM `messages`" - " WHERE `appid`=? AND `channelid`=?" - " ORDER BY `when` DESC LIMIT 1", - (self._appid, channelid)) - rows = c.fetchall() - if not rows or (rows[0]["when"] < old): - log.msg("expiring %d" % channelid) - self._free_child(channelid) + log.msg(" channel prune begins") + # a channel is deleted when there are no listeners and there have + # been no messages added in CHANNEL_EXPIRATION_TIME seconds + channels = set(self.get_allocated()) # these have messages + channels.update(self._channels) # these might have listeners + for channelid in channels: + log.msg(" channel prune checking %d" % channelid) + channel = self.get_channel(channelid) + if channel.is_idle(): + log.msg(" channel prune expiring %d" % channelid) + channel.delete_and_summarize() # calls self.free_channel + log.msg(" channel prune done, %r left" % (self._channels.keys(),)) return bool(self._channels) class Relay(resource.Resource, service.MultiService): @@ -345,7 +393,14 @@ class Relay(resource.Resource, service.MultiService): return self._apps[appid] def prune(self): - for appid in list(self._apps): - still_active = self._apps[appid].prune_old_channels() + log.msg("beginning app prune") + c = self._db.execute("SELECT DISTINCT `appid` FROM `messages`") + apps = set([row["appid"] for row in c.fetchall()]) # these have messages + apps.update(self._apps) # these might have listeners + for appid in apps: + log.msg(" app prune checking %r" % (appid,)) + still_active = self.get_app(appid).prune_old_channels() if not still_active: + log.msg("prune pops app %r" % (appid,)) self._apps.pop(appid) + log.msg("app prune ends, %d remaining apps" % len(self._apps))