diff --git a/src/wormhole/server/db-schemas/v3.sql b/src/wormhole/server/db-schemas/v3.sql index 31fbaf4..09afd41 100644 --- a/src/wormhole/server/db-schemas/v3.sql +++ b/src/wormhole/server/db-schemas/v3.sql @@ -16,7 +16,7 @@ CREATE TABLE `nameplates` `id` INTEGER PRIMARY KEY AUTOINCREMENT, `app_id` VARCHAR, `name` VARCHAR, - `mailbox_id` VARCHAR, -- really a foreign key + `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), `request_id` VARCHAR, -- from 'allocate' message, for future deduplication `updated` INTEGER -- time of last activity, used for pruning ); diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 45b78f4..4a35043 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -54,12 +54,6 @@ class Mailbox: (when, self._mailbox_id)) db.commit() # XXX: reconcile the need for this with the comment above - rows = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (self._mailbox_id,)).fetchall() - if len(rows) > 2: - raise CrowdedError("too many sides have opened this mailbox") - def get_messages(self): messages = [] db = self._db @@ -200,12 +194,12 @@ class AppNamespace: del mailbox_id # ignored, they'll learn it from claim() return nameplate_id - def claim_nameplate(self, name, side, when, _test_mailbox_id=None): + def claim_nameplate(self, name, side, when): # when we're done: # * there will be one row for the nameplate # * there will be one 'side' attached to it, with claimed=True - # * a mailbox id will be created, but not a mailbox row - # (ids are randomly unique, so we can defer creation until 'open') + # * a mailbox id and mailbox row will be created + # * a mailbox 'side' will be attached, with opened=True assert isinstance(name, type("")), type(name) assert isinstance(side, type("")), type(side) db = self._db @@ -216,15 +210,12 @@ class AppNamespace: if self._log_requests: log.msg("creating nameplate#%s for app_id %s" % (name, self._app_id)) - if _test_mailbox_id is not None: # for unit tests - mailbox_id = _test_mailbox_id - else: - mailbox_id = generate_mailbox_id() + mailbox_id = generate_mailbox_id() + self._add_mailbox(mailbox_id, side, when) # ensure row exists sql = ("INSERT INTO `nameplates`" " (`app_id`, `name`, `mailbox_id`, `updated`)" " VALUES(?,?,?,?)") - npid = db.execute(sql, - (self._app_id, name, mailbox_id, when) + npid = db.execute(sql, (self._app_id, name, mailbox_id, when) ).lastrowid else: npid = row["id"] @@ -242,9 +233,12 @@ class AppNamespace: (when, npid)) db.commit() + self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError rows = db.execute("SELECT * FROM `nameplate_sides`" " WHERE `nameplates_id`=?", (npid,)).fetchall() if len(rows) > 2: + # this line will probably never get hit: any crowding is noticed + # on mailbox_sides first, inside open_mailbox() raise CrowdedError("too many sides have claimed this nameplate") return mailbox_id @@ -317,24 +311,41 @@ class AppNamespace: return Usage(started=started, waiting_time=waiting_time, total_time=total_time, result=result) - def open_mailbox(self, mailbox_id, side, when): + def _add_mailbox(self, mailbox_id, side, when): assert isinstance(mailbox_id, type("")), type(mailbox_id) db = self._db - if not mailbox_id in self._mailboxes: + row = db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, mailbox_id)).fetchone() + if not row: + self._db.execute("INSERT INTO `mailboxes`" + " (`app_id`, `id`, `updated`)" + " VALUES(?,?,?)", + (self._app_id, mailbox_id, when)) + # we don't need a commit here, because mailbox.open() only + # does SELECT FROM `mailbox_sides`, not from `mailboxes` + + def open_mailbox(self, mailbox_id, side, when): + assert isinstance(mailbox_id, type("")), type(mailbox_id) + self._add_mailbox(mailbox_id, side, when) # ensure row exists + db = self._db + if not mailbox_id in self._mailboxes: # ensure Mailbox object exists if self._log_requests: log.msg("spawning #%s for app_id %s" % (mailbox_id, self._app_id)) - db.execute("INSERT INTO `mailboxes`" - " (`app_id`, `id`, `updated`)" - " VALUES(?,?,?)", - (self._app_id, mailbox_id, when)) - # we don't need a commit here, because mailbox.open() only - # does SELECT FROM `mailbox_sides`, not from `mailboxes` self._mailboxes[mailbox_id] = Mailbox(self, self._db, self._app_id, mailbox_id) mailbox = self._mailboxes[mailbox_id] + + # delegate to mailbox.open() to add a row to mailbox_sides, and + # update the mailbox.updated timestamp mailbox.open(side, when) db.commit() + rows = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=?", + (mailbox_id,)).fetchall() + if len(rows) > 2: + raise CrowdedError("too many sides have opened this mailbox") return mailbox def free_mailbox(self, mailbox_id): diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index f5ac71b..a0ef90f 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -320,7 +320,7 @@ class Prune(unittest.TestCase): self.assertFalse(mb.is_active()) self.assertFalse(app.is_active()) - def test_basic(self): + def test_nameplates(self): db = get_db(":memory:") rv = rendezvous.Rendezvous(db, None, 3600) @@ -328,14 +328,11 @@ class Prune(unittest.TestCase): #OLD = "old"; NEW = "new" #when = {OLD: 1, NEW: 60} new_nameplates = set() - new_mailboxes = set() - new_messages = set() APPID = "appid" app = rv.get_app(APPID) - # Exercise the first-vs-second newness tests. These nameplates have - # no mailbox. + # Exercise the first-vs-second newness tests app.claim_nameplate("np-1", "side1", 1) app.claim_nameplate("np-2", "side1", 1) app.claim_nameplate("np-2", "side2", 2) @@ -348,7 +345,28 @@ class Prune(unittest.TestCase): app.claim_nameplate("np-5", "side2", 61) new_nameplates.add("np-5") - # same for mailboxes + rv.prune(now=123, old=50) + + nameplates = set([row["name"] for row in + db.execute("SELECT * FROM `nameplates`").fetchall()]) + self.assertEqual(new_nameplates, nameplates) + mailboxes = set([row["id"] for row in + db.execute("SELECT * FROM `mailboxes`").fetchall()]) + self.assertEqual(len(new_nameplates), len(mailboxes)) + + def test_mailboxes(self): + db = get_db(":memory:") + rv = rendezvous.Rendezvous(db, None, 3600) + + # timestamps <=50 are "old", >=51 are "new" + #OLD = "old"; NEW = "new" + #when = {OLD: 1, NEW: 60} + new_mailboxes = set() + + APPID = "appid" + app = rv.get_app(APPID) + + # Exercise the first-vs-second newness tests app.open_mailbox("mb-11", "side1", 1) app.open_mailbox("mb-12", "side1", 1) app.open_mailbox("mb-12", "side2", 2) @@ -363,24 +381,15 @@ class Prune(unittest.TestCase): rv.prune(now=123, old=50) - nameplates = set([row["name"] for row in - db.execute("SELECT * FROM `nameplates`").fetchall()]) - self.assertEqual(new_nameplates, nameplates) mailboxes = set([row["id"] for row in db.execute("SELECT * FROM `mailboxes`").fetchall()]) self.assertEqual(new_mailboxes, mailboxes) - messages = set([row["msg_id"] for row in - db.execute("SELECT * FROM `messages`").fetchall()]) - self.assertEqual(new_messages, messages) def test_lots(self): OLD = "old"; NEW = "new" for nameplate in [None, OLD, NEW]: for mailbox in [OLD, NEW]: - listeners = [False] - if mailbox is not None: - listeners = [False, True] - for has_listeners in listeners: + for has_listeners in [False, True]: self.one(nameplate, mailbox, has_listeners) def test_one(self): @@ -405,8 +414,7 @@ class Prune(unittest.TestCase): mbid = "mbid" if nameplate is not None: - app.claim_nameplate("npid", "side1", when[nameplate], - _test_mailbox_id=mbid) + mbid = app.claim_nameplate("npid", "side1", when[nameplate]) mb = app.open_mailbox(mbid, "side1", when[mailbox]) # the pruning algorithm doesn't care about the age of messages, @@ -752,11 +760,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(len(side_rows), 1) self.assertEqual(side_rows[0]["side"], "side") - # claiming a nameplate will assign a random mailbox id, but won't - # create the mailbox itself + # claiming a nameplate assigns a random mailbox id and creates the + # mailbox row mailboxes = app._db.execute("SELECT * FROM `mailboxes`" " WHERE `app_id`='appid'").fetchall() - self.assertEqual(len(mailboxes), 0) + self.assertEqual(len(mailboxes), 1) @inlineCallbacks def test_claim_crowded(self): @@ -987,6 +995,8 @@ class Summary(unittest.TestCase): row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() self.assertEqual(row["started"], 10) + db.execute("DELETE FROM `mailbox_usage`") + db.commit() app = rv.get_app(APPID) app.open_mailbox("mbid", "side1", 20) # start time is 20 rv.prune(now=123, old=50)