From ffb1a9b9c9014feb7d3683f2f32057bf39928567 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 24 Jun 2016 17:35:23 -0700 Subject: [PATCH] change pruning algorithm The new approach runs every 10 minutes and keeps a nameplate/mailbox/messages "channel" alive if the mailbox has been updated within 11 minutes, or if there has been an attached listener within that time. Also remove the "nameplates.updated" column. Now we only track "updated" timestamps on the "mailboxes" table, and a new mailbox will preserve any attached nameplate. --- .../server/db-schemas/upgrade-to-v3.sql | 4 +- src/wormhole/server/db-schemas/v3.sql | 4 +- src/wormhole/server/rendezvous.py | 108 +++++++----------- src/wormhole/test/test_server.py | 10 +- 4 files changed, 46 insertions(+), 80 deletions(-) diff --git a/src/wormhole/server/db-schemas/upgrade-to-v3.sql b/src/wormhole/server/db-schemas/upgrade-to-v3.sql index 0a01581..4919206 100644 --- a/src/wormhole/server/db-schemas/upgrade-to-v3.sql +++ b/src/wormhole/server/db-schemas/upgrade-to-v3.sql @@ -12,11 +12,9 @@ CREATE TABLE `nameplates` `app_id` VARCHAR, `name` VARCHAR, `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), - `request_id` VARCHAR, -- from 'allocate' message, for future deduplication - `updated` INTEGER -- time of last activity, used for pruning + `request_id` VARCHAR -- from 'allocate' message, for future deduplication ); CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); -CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`); CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); diff --git a/src/wormhole/server/db-schemas/v3.sql b/src/wormhole/server/db-schemas/v3.sql index 09afd41..1274a76 100644 --- a/src/wormhole/server/db-schemas/v3.sql +++ b/src/wormhole/server/db-schemas/v3.sql @@ -17,11 +17,9 @@ CREATE TABLE `nameplates` `app_id` VARCHAR, `name` VARCHAR, `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), - `request_id` VARCHAR, -- from 'allocate' message, for future deduplication - `updated` INTEGER -- time of last activity, used for pruning + `request_id` VARCHAR -- from 'allocate' message, for future deduplication ); CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); -CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`); CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 696e8d7..bc97f68 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -6,12 +6,9 @@ from twisted.application import service, internet SECONDS = 1.0 MINUTE = 60*SECONDS -HOUR = 60*MINUTE -DAY = 24*HOUR -MB = 1000*1000 -CHANNEL_EXPIRATION_TIME = 2*HOUR -EXPIRATION_CHECK_PERIOD = 1*HOUR +CHANNEL_EXPIRATION_TIME = 11*MINUTE +EXPIRATION_CHECK_PERIOD = 10*MINUTE def generate_mailbox_id(): return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii") @@ -203,9 +200,9 @@ class AppNamespace: mailbox_id = generate_mailbox_id() self._add_mailbox(mailbox_id, side, when) # ensure row exists sql = ("INSERT INTO `nameplates`" - " (`app_id`, `name`, `mailbox_id`, `updated`)" - " VALUES(?,?,?,?)") - npid = db.execute(sql, (self._app_id, name, mailbox_id, when) + " (`app_id`, `name`, `mailbox_id`)" + " VALUES(?,?,?)") + npid = db.execute(sql, (self._app_id, name, mailbox_id) ).lastrowid else: npid = row["id"] @@ -219,8 +216,6 @@ class AppNamespace: " (`nameplates_id`, `claimed`, `side`, `added`)" " VALUES(?,?,?,?)", (npid, True, side, when)) - db.execute("UPDATE `nameplates` SET `updated`=? WHERE `id`=?", - (when, npid)) db.commit() self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError @@ -398,10 +393,11 @@ class AppNamespace: # allowed to disconnect for up to 9 minutes without losing the # channel (nameplate, mailbox, and messages). - # Each time a client does something, the "updated" field is updated. - # If a client is subscribed to the mailbox when pruning check runs, - # the "updated" field is updated. After that check, if the "updated" - # field is "old", the channel is deleted. + # Each time a client does something, the mailbox.updated field is + # updated with the current timestamp. If a client is subscribed to + # the mailbox when pruning check runs, the "updated" field is also + # updated. After that check, if the "updated" field is "old", the + # channel is deleted. # For now, pruning is logged even if log_requests is False, to debug # the pruning process, and since pruning is triggered by a timer @@ -411,58 +407,33 @@ class AppNamespace: log.msg(" prune begins (%s)" % self._app_id) db = self._db modified = False - # for all `mailboxes`: classify as new or old - OLD = 0; NEW = 1 - all_mailboxes = {} + + for mailbox in self._mailboxes.values(): + if mailbox.has_listeners(): + mailbox._touch(now) + db.commit() # make sure the updates are visible below + + new_mailboxes = set() + old_mailboxes = set() for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?", (self._app_id,)).fetchall(): mailbox_id = row["id"] if row["updated"] > old: - which = NEW + new_mailboxes.add(mailbox_id) else: - which = OLD - all_mailboxes[mailbox_id] = which - #log.msg(" 2: all_mailboxes", all_mailboxes) + old_mailboxes.add(mailbox_id) + #log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes) - # for all mailbox objects with active listeners: - # classify the mailbox as new - for mailbox_id in self._mailboxes: - #log.msg(" -5: checking", mailbox_id, self._mailboxes[mailbox_id]) - if self._mailboxes[mailbox_id].has_listeners(): - all_mailboxes[mailbox_id] = NEW - #log.msg(" 5: all_mailboxes", all_mailboxes) - - # for all `nameplates`: - # classify as new or old - # if the linked mailbox exists: - # if it is new: - # classify nameplate as new - # if it is old: - # if the nameplate is new: - # classify mailbox as new - old_nameplate_ids = [] + old_nameplates = set() for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?", (self._app_id,)).fetchall(): npid = row["id"] - if row["updated"] > old: - which = NEW - else: - which = OLD mailbox_id = row["mailbox_id"] - if mailbox_id in all_mailboxes: - if all_mailboxes[mailbox_id] == NEW: - which = NEW - else: - if which == NEW: - all_mailboxes[mailbox_id] = NEW - if which == OLD: - old_nameplate_ids.append(npid) - #log.msg(" 6: old_nameplate_ids", old_nameplate_ids) + if mailbox_id in old_mailboxes: + old_nameplates.add(npid) + #log.msg(" 3: old_nameplates", old_nameplates) - # delete all old nameplates - # invariant check: if there is a linked mailbox, it is old - - for npid in old_nameplate_ids: + for npid in old_nameplates: log.msg(" deleting nameplate", npid) side_rows = db.execute("SELECT * FROM `nameplate_sides`" " WHERE `nameplates_id`=?", @@ -476,20 +447,19 @@ class AppNamespace: # delete all messages for old mailboxes # delete all old mailboxes - for mailbox_id, which in all_mailboxes.items(): - if which == OLD: - log.msg(" deleting mailbox", mailbox_id) - side_rows = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (mailbox_id,)).fetchall() - db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?", - (mailbox_id,)) - db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?", - (mailbox_id,)) - db.execute("DELETE FROM `mailboxes` WHERE `id`=?", - (mailbox_id,)) - self._summarize_mailbox_and_store(side_rows, now, pruned=True) - modified = True + for mailbox_id in old_mailboxes: + log.msg(" deleting mailbox", mailbox_id) + side_rows = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=?", + (mailbox_id,)).fetchall() + db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?", + (mailbox_id,)) + db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?", + (mailbox_id,)) + db.execute("DELETE FROM `mailboxes` WHERE `id`=?", + (mailbox_id,)) + self._summarize_mailbox_and_store(side_rows, now, pruned=True) + modified = True if modified: db.commit() diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index 96f49d6..611f2c3 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -391,7 +391,7 @@ class Prune(unittest.TestCase): def test_lots(self): OLD = "old"; NEW = "new" - for nameplate in [None, OLD, NEW]: + for nameplate in [False, True]: for mailbox in [OLD, NEW]: for has_listeners in [False, True]: self.one(nameplate, mailbox, has_listeners) @@ -417,8 +417,8 @@ class Prune(unittest.TestCase): mailbox_survives = False mbid = "mbid" - if nameplate is not None: - mbid = app.claim_nameplate("npid", "side1", when[nameplate]) + if nameplate: + mbid = app.claim_nameplate("npid", "side1", when[mailbox]) mb = app.open_mailbox(mbid, "side1", when[mailbox]) # the pruning algorithm doesn't care about the age of messages, @@ -430,8 +430,8 @@ class Prune(unittest.TestCase): if has_listeners: mb.add_listener("handle", None, None) - if (nameplate == NEW or mailbox == NEW or has_listeners): - if nameplate is not None: + if (mailbox == NEW or has_listeners): + if nameplate: nameplate_survives = True mailbox_survives = True messages_survive = mailbox_survives