diff --git a/src/wormhole/db-schemas/v1.sql b/src/wormhole/db-schemas/v1.sql index 9f7872d..0fa51c3 100644 --- a/src/wormhole/db-schemas/v1.sql +++ b/src/wormhole/db-schemas/v1.sql @@ -13,15 +13,8 @@ CREATE TABLE `messages` `channelid` INTEGER, `side` VARCHAR, `phase` VARCHAR, -- not numeric, more of a PAKE-phase indicator string + -- phase="_allocate" and "_deallocate" are used internally `body` VARCHAR, `when` INTEGER ); CREATE INDEX `messages_idx` ON `messages` (`appid`, `channelid`); - -CREATE TABLE `allocations` -( - `appid` VARCHAR, - `channelid` INTEGER, - `side` VARCHAR -); -CREATE INDEX `allocations_idx` ON `allocations` (`channelid`); diff --git a/src/wormhole/servers/relay_server.py b/src/wormhole/servers/relay_server.py index 747320c..4f6fe73 100644 --- a/src/wormhole/servers/relay_server.py +++ b/src/wormhole/servers/relay_server.py @@ -13,6 +13,9 @@ MB = 1000*1000 CHANNEL_EXPIRATION_TIME = 3*DAY EXPIRATION_CHECK_PERIOD = 2*HOUR +ALLOCATE = u"_allocate" +DEALLOCATE = u"_deallocate" + def json_response(request, data): request.setHeader(b"content-type", b"application/json; charset=utf-8") return (json.dumps(data)+"\n").encode("utf-8") @@ -167,9 +170,14 @@ class Deallocator(RelayResource): appid = data["appid"] channelid = int(data["channelid"]) side = data["side"] + if not isinstance(side, type(u"")): + raise TypeError("side must be string, not '%s'" % type(side)) + mood = data.get("mood") #print("DEALLOCATE", appid, channelid, side) + app = self._relay.get_app(appid) - deleted = app.maybe_free_child(channelid, side) + channel = app.get_channel(channelid) + deleted = channel.deallocate(side, mood) response = {"status": "waiting"} if deleted: response = {"status": "deleted"} @@ -193,6 +201,8 @@ class Channel: " WHERE `appid`=? AND `channelid`=?" " ORDER BY `when` ASC", (self._appid, self._channelid)).fetchall(): + if row["phase"] in (u"_allocate", u"_deallocate"): + continue messages.append({"phase": row["phase"], "body": row["body"]}) data = {"welcome": self._welcome, "messages": messages} return data @@ -204,6 +214,8 @@ class Channel: " WHERE `appid`=? AND `channelid`=?" " ORDER BY `when` ASC", (self._appid, self._channelid)).fetchall(): + if row["phase"] in (u"_allocate", u"_deallocate"): + continue yield json.dumps({"phase": row["phase"], "body": row["body"]}) def remove_listener(self, listener): self._listeners.discard(listener) @@ -213,21 +225,76 @@ class Channel: for listener in self._listeners: listener(data) - def add_message(self, side, phase, body): + def _add_message(self, side, phase, body): db = self._db db.execute("INSERT INTO `messages`" " (`appid`, `channelid`, `side`, `phase`, `body`, `when`)" " VALUES (?,?,?,?, ?,?)", (self._appid, self._channelid, side, phase, body, time.time())) - db.execute("INSERT INTO `allocations`" - " (`appid`, `channelid`, `side`)" - " VALUES (?,?,?)", - (self._appid, self._channelid, side)) db.commit() + + def allocate(self, side): + self._add_message(side, ALLOCATE, None) + + def add_message(self, side, phase, body): + self._add_message(side, phase, body) self.broadcast_message(phase, body) return self.get_messages() + def deallocate(self, side, mood): + self._add_message(side, DEALLOCATE, mood) + db = self._db + seen = set([row["side"] for row in + db.execute("SELECT `side` FROM `messages`" + " WHERE `appid`=? AND `channelid`=?", + (self._appid, self._channelid))]) + freed = set([row["side"] for row in + db.execute("SELECT `side` FROM `messages`" + " WHERE `appid`=? AND `channelid`=?" + " AND `phase`=?", + (self._appid, self._channelid, DEALLOCATE))]) + if seen - freed: + return False + self.delete_and_summarize() + return True + + def is_idle(self): + if self._listeners: + return False + c = self._db.execute("SELECT `when` FROM `messages`" + " WHERE `appid`=? AND `channelid`=?" + " ORDER BY `when` DESC LIMIT 1", + (self._appid, self._channelid)) + rows = c.fetchall() + if not rows: + return True + old = time.time() - CHANNEL_EXPIRATION_TIME + if rows[0]["when"] < old: + return True + return False + + def delete_and_summarize(self): + # TODO: summarize usage, write into DB + db = self._db + db.execute("DELETE FROM `messages`" + " WHERE `appid`=? AND `channelid`=?", + (self._appid, self._channelid)) + db.commit() + + # It'd be nice to shut down any EventSource listeners here. But we + # don't hang on to the EventsProtocol, so we can't really shut it + # down here: any listeners will stick around until they shut down + # from the client side. That will keep the Channel object in memory, + # but it won't be reachable from the AppNamespace, so no further + # messages will be sent to it. Eventually, when they close the TCP + # connection, self.remove_listener() will be called, ep.sendEvent + # will be removed from self._listeners, breaking the circular + # reference, and everything will get freed. + + self._app.free_channel(self._channelid) + + class AppNamespace: def __init__(self, db, welcome, appid): self._db = db @@ -237,7 +304,7 @@ class AppNamespace: def get_allocated(self): db = self._db - c = db.execute("SELECT DISTINCT `channelid` FROM `allocations`" + c = db.execute("SELECT DISTINCT `channelid` FROM `messages`" " WHERE `appid`=?", (self._appid,)) return set([row["channelid"] for row in c.fetchall()]) @@ -258,10 +325,9 @@ class AppNamespace: raise ValueError("unable to find a free channel-id") def allocate_channel(self, channelid, side): - db = self._db - db.execute("INSERT INTO `allocations` VALUES (?,?,?)", - (self._appid, channelid, side)) - db.commit() + channel = self.get_channel(channelid) + channel.allocate(side) + return channel def get_channel(self, channelid): assert isinstance(channelid, int) @@ -271,46 +337,28 @@ class AppNamespace: self._appid, channelid) return self._channels[channelid] - def maybe_free_child(self, channelid, side): - db = self._db - db.execute("DELETE FROM `allocations`" - " WHERE `appid`=? AND `channelid`=? AND `side`=?", - (self._appid, channelid, side)) - db.commit() - remaining = db.execute("SELECT COUNT(*) FROM `allocations`" - " WHERE `appid`=? AND `channelid`=?", - (self._appid, channelid)).fetchone()[0] - if remaining: - return False - self._free_child(channelid) - return True + def free_channel(self, channelid): + # called from Channel.delete_and_summarize(), which deletes any + # messages - def _free_child(self, channelid): - db = self._db - db.execute("DELETE FROM `allocations`" - " WHERE `appid`=? AND `channelid`=?", - (self._appid, channelid)) - db.execute("DELETE FROM `messages`" - " WHERE `appid`=? AND `channelid`=?", - (self._appid, channelid)) - db.commit() if channelid in self._channels: self._channels.pop(channelid) log.msg("freed+killed #%d, now have %d DB channels, %d live" % (channelid, len(self.get_allocated()), len(self._channels))) def prune_old_channels(self): - db = self._db - old = time.time() - CHANNEL_EXPIRATION_TIME - for channelid in self.get_allocated(): - c = db.execute("SELECT `when` FROM `messages`" - " WHERE `appid`=? AND `channelid`=?" - " ORDER BY `when` DESC LIMIT 1", - (self._appid, channelid)) - rows = c.fetchall() - if not rows or (rows[0]["when"] < old): - log.msg("expiring %d" % channelid) - self._free_child(channelid) + log.msg(" channel prune begins") + # a channel is deleted when there are no listeners and there have + # been no messages added in CHANNEL_EXPIRATION_TIME seconds + channels = set(self.get_allocated()) # these have messages + channels.update(self._channels) # these might have listeners + for channelid in channels: + log.msg(" channel prune checking %d" % channelid) + channel = self.get_channel(channelid) + if channel.is_idle(): + log.msg(" channel prune expiring %d" % channelid) + channel.delete_and_summarize() # calls self.free_channel + log.msg(" channel prune done, %r left" % (self._channels.keys(),)) return bool(self._channels) class Relay(resource.Resource, service.MultiService): @@ -345,7 +393,14 @@ class Relay(resource.Resource, service.MultiService): return self._apps[appid] def prune(self): - for appid in list(self._apps): - still_active = self._apps[appid].prune_old_channels() + log.msg("beginning app prune") + c = self._db.execute("SELECT DISTINCT `appid` FROM `messages`") + apps = set([row["appid"] for row in c.fetchall()]) # these have messages + apps.update(self._apps) # these might have listeners + for appid in apps: + log.msg(" app prune checking %r" % (appid,)) + still_active = self.get_app(appid).prune_old_channels() if not still_active: + log.msg("prune pops app %r" % (appid,)) self._apps.pop(appid) + log.msg("app prune ends, %d remaining apps" % len(self._apps))