From eebc9ebd54b369c344516f7ef6d41966de801a91 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 26 May 2016 22:43:29 -0700 Subject: [PATCH] rewrite pruning, add full tests Apparently it was broken: the first time the LoopingCall fired, it would throw an exception, and never try again. Now it should be fixed. --- src/wormhole/server/rendezvous.py | 329 +++++++++++++++++++----------- src/wormhole/test/test_server.py | 181 +++++++++++++++- 2 files changed, 391 insertions(+), 119 deletions(-) diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 96470f6..8633df9 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -52,11 +52,9 @@ SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body", "server_rx", "msg_id"]) class Mailbox: - def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id): + def __init__(self, app, db, app_id, mailbox_id): self._app = app self._db = db - self._blur_usage = blur_usage - self._log_requests = log_requests self._app_id = app_id self._mailbox_id = mailbox_id self._listeners = {} # handle -> (send_f, stop_f) @@ -99,12 +97,16 @@ class Mailbox: return messages def add_listener(self, handle, send_f, stop_f): + # TODO: update 'updated' self._listeners[handle] = (send_f, stop_f) return self.get_messages() def remove_listener(self, handle): self._listeners.pop(handle) + def has_listeners(self): + return bool(self._listeners) + def broadcast_message(self, sm): for (send_f, stop_f) in self._listeners.values(): send_f(sm) @@ -133,11 +135,8 @@ class Mailbox: return sr = remove_side(row, side) if sr.empty: - rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?", - (self._app_id, self._mailbox_id)).fetchall() - num_sides = len(rows) - self._summarize_and_store(row, num_sides, mood, when, pruned=False) + self._app._summarize_mailbox_and_store(self._mailbox_id, row, + mood, when, pruned=False) self._delete() db.commit() elif sr.changed: @@ -164,61 +163,8 @@ class Mailbox: self._app.free_mailbox(self._mailbox_id) - def _summarize_and_store(self, row, num_sides, second_mood, delete_time, - pruned): - u = self._summarize(row, num_sides, second_mood, delete_time, pruned) - self._db.execute("INSERT INTO `mailbox_usage`" - " (`app_id`, " - " `started`, `total_time`, `waiting_time`, `result`)" - " VALUES (?, ?,?,?,?)", - (self._app_id, - u.started, u.total_time, u.waiting_time, u.result)) - - def _summarize(self, row, num_sides, second_mood, delete_time, pruned): - started = row["started"] - if self._blur_usage: - started = self._blur_usage * (started // self._blur_usage) - waiting_time = None - if row["second"]: - waiting_time = row["second"] - row["started"] - total_time = delete_time - row["started"] - - if num_sides == 0: - result = u"quiet" - elif num_sides == 1: - result = u"lonely" - else: - result = u"happy" - - moods = set([row["first_mood"], second_mood]) - if u"lonely" in moods: - result = u"lonely" - if u"errory" in moods: - result = u"errory" - if u"scary" in moods: - result = u"scary" - if pruned: - result = u"pruney" - if row["crowded"]: - result = u"crowded" - - return Usage(started=started, waiting_time=waiting_time, - total_time=total_time, result=result) - - def is_idle(self): - if self._listeners: - return False - c = self._db.execute("SELECT `server_rx` FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " ORDER BY `server_rx` DESC LIMIT 1", - (self._app_id, self._mailbox_id)) - rows = c.fetchall() - if not rows: - return True - old = time.time() - CHANNEL_EXPIRATION_TIME - if rows[0]["server_rx"] < old: - return True - return False + def is_active(self): + return bool(self._listeners) def _shutdown(self): # used at test shutdown to accelerate client disconnects @@ -226,14 +172,22 @@ class Mailbox: stop_f() class AppNamespace: - def __init__(self, db, welcome, blur_usage, log_requests, app_id): + def __init__(self, db, blur_usage, log_requests, app_id): self._db = db - self._welcome = welcome self._blur_usage = blur_usage self._log_requests = log_requests self._app_id = app_id self._mailboxes = {} + def is_active(self): + # An idle AppNamespace does not need to be kept in memory: it can be + # reconstructed from the DB if needed. And active one must be kept + # alive. + for mb in self._mailboxes.values(): + if mb.is_active(): + return True + return False + def get_nameplate_ids(self): db = self._db # TODO: filter this to numeric ids? @@ -265,7 +219,7 @@ class AppNamespace: del mailbox_id # ignored, they'll learn it from claim() return nameplate_id - def claim_nameplate(self, nameplate_id, side, when): + def claim_nameplate(self, nameplate_id, side, when, _test_mailbox_id=None): # when we're done: # * there will be one row for the nameplate # * side1 or side2 will be populated @@ -298,7 +252,10 @@ class AppNamespace: if self._log_requests: log.msg("creating nameplate#%s for app_id %s" % (nameplate_id, self._app_id)) - mailbox_id = generate_mailbox_id() + if _test_mailbox_id is not None: # for unit tests + mailbox_id = _test_mailbox_id + else: + mailbox_id = generate_mailbox_id() db.execute("INSERT INTO `nameplates`" " (`app_id`, `id`, `mailbox_id`, `side1`, `crowded`," " `updated`, `started`)" @@ -365,25 +322,6 @@ class AppNamespace: return Usage(started=started, waiting_time=waiting_time, total_time=total_time, result=result) - def _prune_nameplate(self, row, delete_time): - # requires caller to db.commit() - db = self._db - db.execute("DELETE FROM `nameplates` WHERE `app_id`=? AND `id`=?", - (self._app_id, row["id"])) - self._summarize_nameplate_and_store(row, delete_time, pruned=True) - # TODO: make a Nameplate object, keep track of when there's a - # websocket that's watching it, don't prune a nameplate that someone - # is watching, even if they started watching a long time ago - - def prune_nameplates(self, old): - db = self._db - for row in db.execute("SELECT * FROM `nameplates`" - " WHERE `updated` < ?", - (old,)).fetchall(): - self._prune_nameplate(row) - count = db.execute("SELECT COUNT(*) FROM `nameplates`").fetchone()[0] - return count - def open_mailbox(self, mailbox_id, side, when): assert isinstance(mailbox_id, type(u"")), type(mailbox_id) db = self._db @@ -398,8 +336,6 @@ class AppNamespace: db.commit() # XXX # mailbox.open() does a SELECT to find the old sides self._mailboxes[mailbox_id] = Mailbox(self, self._db, - self._blur_usage, - self._log_requests, self._app_id, mailbox_id) mailbox = self._mailboxes[mailbox_id] mailbox.open(side, when) @@ -416,25 +352,178 @@ class AppNamespace: # log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % # (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) - def prune_mailboxes(self, old): + def _summarize_mailbox_and_store(self, mailbox_id, row, + second_mood, delete_time, pruned): + db = self._db + rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`" + " WHERE `app_id`=? AND `mailbox_id`=?", + (self._app_id, mailbox_id)).fetchall() + num_sides = len(rows) + u = self._summarize_mailbox(row, num_sides, second_mood, delete_time, + pruned) + db.execute("INSERT INTO `mailbox_usage`" + " (`app_id`," + " `started`, `total_time`, `waiting_time`, `result`)" + " VALUES (?, ?,?,?,?)", + (self._app_id, + u.started, u.total_time, u.waiting_time, u.result)) + + def _summarize_mailbox(self, row, num_sides, second_mood, delete_time, + pruned): + started = row["started"] + if self._blur_usage: + started = self._blur_usage * (started // self._blur_usage) + waiting_time = None + if row["second"]: + waiting_time = row["second"] - row["started"] + total_time = delete_time - row["started"] + + if num_sides == 0: + result = u"quiet" + elif num_sides == 1: + result = u"lonely" + else: + result = u"happy" + + moods = set([row["first_mood"], second_mood]) + if u"lonely" in moods: + result = u"lonely" + if u"errory" in moods: + result = u"errory" + if u"scary" in moods: + result = u"scary" + if pruned: + result = u"pruney" + if row["crowded"]: + result = u"crowded" + + return Usage(started=started, waiting_time=waiting_time, + total_time=total_time, result=result) + + def prune(self, now, old): # For now, pruning is logged even if log_requests is False, to debug # the pruning process, and since pruning is triggered by a timer # instead of by user action. It does reveal which mailboxes were # present when the pruning process began, though, so in the log run # it should do less logging. - log.msg(" channel prune begins") - # a channel is deleted when there are no listeners and there have - # been no messages added in CHANNEL_EXPIRATION_TIME seconds - mailboxes = set(self.get_claimed()) # these have messages - mailboxes.update(self._mailboxes) # these might have listeners - for mailbox_id in mailboxes: - log.msg(" channel prune checking %d" % mailbox_id) - channel = self.get_channel(mailbox_id) - if channel.is_idle(): - log.msg(" channel prune expiring %d" % mailbox_id) - channel.delete_and_summarize() # calls self.free_channel - log.msg(" channel prune done, %r left" % (self._mailboxes.keys(),)) - return bool(self._mailboxes) + log.msg(" prune begins (%s)" % self._app_id) + db = self._db + modified = False + # for all `mailboxes`: classify as new or old + OLD = 0; NEW = 1 + all_mailboxes = {} + all_mailbox_rows = {} + for row in db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`=?", + (self._app_id,)).fetchall(): + mailbox_id = row["id"] + all_mailbox_rows[mailbox_id] = row + if row["started"] > old: + which = NEW + elif row["second"] and row["second"] > old: + which = NEW + else: + which = OLD + all_mailboxes[mailbox_id] = which + #log.msg(" 2: all_mailboxes", all_mailboxes, all_mailbox_rows) + + # for all mailbox ids used by `messages`: + # if there is no matching mailbox: delete the messages + # if there is at least one new message (select when>old limit 1): + # classify the mailbox as new + for row in db.execute("SELECT DISTINCT(`mailbox_id`)" + " FROM `messages`" + " WHERE `app_id`=?", + (self._app_id,)).fetchall(): + mailbox_id = row["mailbox_id"] + if mailbox_id not in all_mailboxes: + log.msg(" deleting orphan messages", mailbox_id) + db.execute("DELETE FROM `messages`" + " WHERE `app_id`=? AND `mailbox_id`=?", + (self._app_id, mailbox_id)) + modified = True + else: + new_msgs = db.execute("SELECT * FROM `messages`" + " WHERE `app_id`=? AND `mailbox_id`=?" + " AND `server_rx` > ?" + " LIMIT 1", + (self._app_id, mailbox_id, old) + ).fetchall() + if new_msgs: + #log.msg(" 3-: saved by new messages", new_msgs) + all_mailboxes[mailbox_id] = NEW + #log.msg(" 4: all_mailboxes", all_mailboxes) + + # for all mailbox objects with active listeners: + # classify the mailbox as new + for mailbox_id in self._mailboxes: + #log.msg(" -5: checking", mailbox_id, self._mailboxes[mailbox_id]) + if self._mailboxes[mailbox_id].has_listeners(): + all_mailboxes[mailbox_id] = NEW + #log.msg(" 5: all_mailboxes", all_mailboxes) + + # for all `nameplates`: + # classify as new or old + # if the linked mailbox exists: + # if it is new: + # classify nameplate as new + # if it is old: + # if the nameplate is new: + # classify mailbox as new + all_nameplates = {} + all_nameplate_rows = {} + for row in db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`=?", + (self._app_id,)).fetchall(): + nameplate_id = row["id"] + all_nameplate_rows[nameplate_id] = row + if row["updated"] > old: + which = NEW + else: + which = OLD + mailbox_id = row["mailbox_id"] + if mailbox_id in all_mailboxes: + if all_mailboxes[mailbox_id] == NEW: + which = NEW + else: + if which == NEW: + all_mailboxes[mailbox_id] = NEW + all_nameplates[nameplate_id] = which + #log.msg(" 6: all_nameplates", all_nameplates, all_nameplate_rows) + + # delete all old nameplates + # invariant check: if there is a linked mailbox, it is old + + for nameplate_id, which in all_nameplates.items(): + if which == OLD: + log.msg(" deleting nameplate", nameplate_id) + row = all_nameplate_rows[nameplate_id] + self._summarize_nameplate_and_store(row, now, pruned=True) + db.execute("DELETE FROM `nameplates`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, nameplate_id)) + modified = True + + # delete all messages for old mailboxes + # delete all old mailboxes + + for mailbox_id, which in all_mailboxes.items(): + if which == OLD: + log.msg(" deleting mailbox", mailbox_id) + self._summarize_mailbox_and_store(mailbox_id, + all_mailbox_rows[mailbox_id], + u"pruney", now, pruned=True) + db.execute("DELETE FROM `messages`" + " WHERE `app_id`=? AND `mailbox_id`=?", + (self._app_id, mailbox_id)) + db.execute("DELETE FROM `mailboxes`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, mailbox_id)) + modified = True + + if modified: + db.commit() + log.msg(" prune complete, modified:", modified) def _shutdown(self): for channel in self._mailboxes.values(): @@ -462,25 +551,35 @@ class Rendezvous(service.MultiService): if not app_id in self._apps: if self._log_requests: log.msg("spawning app_id %s" % (app_id,)) - self._apps[app_id] = AppNamespace(self._db, self._welcome, - self._blur_usage, - self._log_requests, app_id) + self._apps[app_id] = AppNamespace(self._db, + self._blur_usage, + self._log_requests, app_id) return self._apps[app_id] - def prune(self, old=None): + def get_all_apps(self): + apps = set() + for row in self._db.execute("SELECT DISTINCT `app_id`" + " FROM `nameplates`").fetchall(): + apps.add(row["app_id"]) + for row in self._db.execute("SELECT DISTINCT `app_id`" + " FROM `mailboxes`").fetchall(): + apps.add(row["app_id"]) + for row in self._db.execute("SELECT DISTINCT `app_id`" + " FROM `messages`").fetchall(): + apps.add(row["app_id"]) + return apps + + def prune(self, now=None, old=None): # As with AppNamespace.prune_old_mailboxes, we log for now. log.msg("beginning app prune") - if old is None: - old = time.time() - CHANNEL_EXPIRATION_TIME - c = self._db.execute("SELECT DISTINCT `app_id` FROM `messages`") - apps = set([row["app_id"] for row in c.fetchall()]) # these have messages - apps.update(self._apps) # these might have listeners - for app_id in apps: + now = now or time.time() + old = old or (now - CHANNEL_EXPIRATION_TIME) + for app_id in sorted(self.get_all_apps()): log.msg(" app prune checking %r" % (app_id,)) app = self.get_app(app_id) - still_active = app.prune_nameplates(old) + app.prune_mailboxes(old) - if not still_active: - log.msg("prune pops app %r" % (app_id,)) + app.prune(now, old) + if not app.is_active(): # meaning no websockets + log.msg(" pruning idle app", app_id) self._apps.pop(app_id) log.msg("app prune ends, %d remaining apps" % len(self._apps)) diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index 437e8bd..7e6238a 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -1,7 +1,9 @@ from __future__ import print_function import json, itertools from binascii import hexlify +import mock from twisted.trial import unittest +from twisted.python import log from twisted.internet import protocol, reactor, defer from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.endpoints import clientFromString, connectProtocol @@ -9,7 +11,8 @@ from autobahn.twisted import websocket from .. import __version__ from .common import ServerBase from ..server import rendezvous, transit_server -from ..server.rendezvous import Usage, SidedMessage +from ..server.rendezvous import Usage, SidedMessage, Mailbox +from ..server.database import get_db class Server(ServerBase, unittest.TestCase): def test_apps(self): @@ -281,6 +284,175 @@ class Server(ServerBase, unittest.TestCase): self.assertEqual(len(msgs), 5) self.assertEqual(msgs[-1]["body"], u"body") +class Prune(unittest.TestCase): + + def test_apps(self): + rv = rendezvous.Rendezvous(get_db(":memory:"), None, None) + app = rv.get_app(u"appid") + app.allocate_nameplate(u"side", 121) + app.prune = mock.Mock() + rv.prune(now=123, old=122) + self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)]) + + def test_active(self): + rv = rendezvous.Rendezvous(get_db(":memory:"), None, None) + app = rv.get_app(u"appid1") + self.assertFalse(app.is_active()) + + mb = app.open_mailbox(u"mbid", u"side1", 0) + self.assertFalse(mb.is_active()) + self.assertFalse(app.is_active()) + + mb.add_listener(u"handle", None, None) + self.assertTrue(mb.is_active()) + self.assertTrue(app.is_active()) + + mb.remove_listener(u"handle") + self.assertFalse(mb.is_active()) + self.assertFalse(app.is_active()) + + def test_basic(self): + db = get_db(":memory:") + rv = rendezvous.Rendezvous(db, None, 3600) + + # timestamps <=50 are "old", >=51 are "new" + #OLD = "old"; NEW = "new" + #when = {OLD: 1, NEW: 60} + new_nameplates = set() + new_mailboxes = set() + new_messages = set() + + APPID = u"appid" + app = rv.get_app(APPID) + + # Exercise the first-vs-second newness tests. These nameplates have + # no mailbox. + app.claim_nameplate(u"np-1", u"side1", 1) + app.claim_nameplate(u"np-2", u"side1", 1) + app.claim_nameplate(u"np-2", u"side2", 2) + app.claim_nameplate(u"np-3", u"side1", 60) + new_nameplates.add(u"np-3") + app.claim_nameplate(u"np-4", u"side1", 1) + app.claim_nameplate(u"np-4", u"side2", 60) + new_nameplates.add(u"np-4") + app.claim_nameplate(u"np-5", u"side1", 60) + app.claim_nameplate(u"np-5", u"side2", 61) + new_nameplates.add(u"np-5") + + # same for mailboxes + app.open_mailbox(u"mb-11", u"side1", 1) + app.open_mailbox(u"mb-12", u"side1", 1) + app.open_mailbox(u"mb-12", u"side2", 2) + app.open_mailbox(u"mb-13", u"side1", 60) + new_mailboxes.add(u"mb-13") + app.open_mailbox(u"mb-14", u"side1", 1) + app.open_mailbox(u"mb-14", u"side2", 60) + new_mailboxes.add(u"mb-14") + app.open_mailbox(u"mb-15", u"side1", 60) + app.open_mailbox(u"mb-15", u"side2", 61) + new_mailboxes.add(u"mb-15") + + rv.prune(now=123, old=50) + + nameplates = set([row["id"] for row in + db.execute("SELECT * FROM `nameplates`").fetchall()]) + self.assertEqual(new_nameplates, nameplates) + mailboxes = set([row["id"] for row in + db.execute("SELECT * FROM `mailboxes`").fetchall()]) + self.assertEqual(new_mailboxes, mailboxes) + messages = set([row["msg_id"] for row in + db.execute("SELECT * FROM `messages`").fetchall()]) + self.assertEqual(new_messages, messages) + + def test_lots(self): + OLD = "old"; NEW = "new" + for nameplate in [None, OLD, NEW]: + for mailbox in [None, OLD, NEW]: + listeners = [False] + if mailbox is not None: + listeners = [False, True] + for has_listeners in listeners: + for messages in [None, OLD, NEW]: + self.one(nameplate, mailbox, has_listeners, messages) + + #def test_one(self): + # # to debug specific problems found by test_lots + # self.one(None, "old", True, None) + + def one(self, nameplate, mailbox, has_listeners, messages): + desc = ("nameplate=%s, mailbox=%s, has_listeners=%s," + " messages=%s" % + (nameplate, mailbox, has_listeners, messages)) + log.msg(desc) + + db = get_db(":memory:") + rv = rendezvous.Rendezvous(db, None, 3600) + APPID = u"appid" + app = rv.get_app(APPID) + + # timestamps <=50 are "old", >=51 are "new" + OLD = "old"; NEW = "new" + when = {OLD: 1, NEW: 60} + nameplate_survives = False + mailbox_survives = False + messages_survive = False + + mbid = u"mbid" + if nameplate is not None: + app.claim_nameplate(u"npid", u"side1", when[nameplate], + _test_mailbox_id=mbid) + if mailbox is not None: + mb = app.open_mailbox(mbid, u"side1", when[mailbox]) + else: + # We might want a Mailbox, because that's the easiest way to add + # a "messages" row, but we can't use app.open_mailbox() because + # that modifies both the "mailboxes" table and app._mailboxes, + # and sometimes we're testing what happens when there are + # messages but not a mailbox + mb = Mailbox(app, db, APPID, mbid) + # we need app._mailboxes to know about this, because that's + # where it looks to find listeners + app._mailboxes[mbid] = mb + + if messages is not None: + sm = SidedMessage(u"side1", u"phase", u"body", when[messages], + u"msgid") + mb.add_message(sm) + + if has_listeners: + mb.add_listener("handle", None, None) + + if mailbox is None and messages is not None: + # orphaned messages, even new ones, can't keep a nameplate alive + messages = None + messages_survive = False + + if (nameplate is NEW or mailbox is NEW + or has_listeners or messages is NEW): + if nameplate is not None: + nameplate_survives = True + if mailbox is not None: + mailbox_survives = True + if messages is not None: + messages_survive = True + + rv.prune(now=123, old=50) + + nameplates = set([row["id"] for row in + db.execute("SELECT * FROM `nameplates`").fetchall()]) + self.assertEqual(nameplate_survives, bool(nameplates), + ("nameplate", nameplate_survives, nameplates, desc)) + + mailboxes = set([row["id"] for row in + db.execute("SELECT * FROM `mailboxes`").fetchall()]) + self.assertEqual(mailbox_survives, bool(mailboxes), + ("mailbox", mailbox_survives, mailboxes, desc)) + + messages = set([row["msg_id"] for row in + db.execute("SELECT * FROM `messages`").fetchall()]) + self.assertEqual(messages_survive, bool(messages), + ("messages", messages_survive, messages, desc)) + def strip_message(msg): m2 = msg.copy() @@ -726,14 +898,15 @@ class WebSocketAPI(ServerBase, unittest.TestCase): class Summary(unittest.TestCase): def test_mailbox(self): - c = rendezvous.Mailbox(None, None, None, False, None, None) + app = rendezvous.AppNamespace(None, None, False, None) # starts at time 1, maybe gets second open at time 3, closes at 5 base_row = {u"started": 1, u"second": None, u"first_mood": None, u"crowded": False} def summ(num_sides, second_mood=None, pruned=False, **kwargs): row = base_row.copy() row.update(kwargs) - return c._summarize(row, num_sides, second_mood, 5, pruned) + return app._summarize_mailbox(row, num_sides, second_mood, 5, + pruned) self.assertEqual(summ(1), Usage(1, None, 4, u"lonely")) self.assertEqual(summ(1, u"lonely"), Usage(1, None, 4, u"lonely")) @@ -764,7 +937,7 @@ class Summary(unittest.TestCase): Usage(1, 2, 4, u"pruney")) def test_nameplate(self): - a = rendezvous.AppNamespace(None, None, None, False, None) + a = rendezvous.AppNamespace(None, None, False, None) # starts at time 1, maybe gets second open at time 3, closes at 5 base_row = {u"started": 1, u"second": None, u"crowded": False} def summ(num_sides, pruned=False, **kwargs):