diff --git a/src/wormhole/server/db-schemas/v3.sql b/src/wormhole/server/db-schemas/v3.sql index c28f00a..5ee66c0 100644 --- a/src/wormhole/server/db-schemas/v3.sql +++ b/src/wormhole/server/db-schemas/v3.sql @@ -13,23 +13,27 @@ CREATE TABLE `version` -- nameplates, but the protocol and server allow can use arbitrary strings. CREATE TABLE `nameplates` ( + `id` INTEGER PRIMARY KEY AUTOINCREMENT, `app_id` VARCHAR, `name` VARCHAR, `mailbox_id` VARCHAR, -- really a foreign key - `side1` VARCHAR, -- side name, or NULL - `side2` VARCHAR, -- side name, or NULL `request_id` VARCHAR, -- from 'allocate' message, for future deduplication - `crowded` BOOLEAN, -- at some point, three or more sides were involved - `updated` INTEGER, -- time of last activity, used for pruning - -- timing data - `started` INTEGER, -- time when nameplace was opened - `second` INTEGER -- time when second side opened + `updated` INTEGER -- time of last activity, used for pruning ); CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`); CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); +CREATE TABLE `nameplate_sides` +( + `nameplates_id` REFERENCES `nameplates`(`id`), + `claimed` BOOLEAN, -- True after claim(), False after release() + `side` VARCHAR, + `added` INTEGER -- time when this side first claimed the nameplate +); + + -- Clients exchange messages through a "mailbox", which has a long (randomly -- unique) identifier and a queue of messages. CREATE TABLE `mailboxes` diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 3b964ff..5df1d33 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -219,84 +219,96 @@ class AppNamespace: del mailbox_id # ignored, they'll learn it from claim() return nameplate_id - def claim_nameplate(self, nameplate_id, side, when, _test_mailbox_id=None): + def claim_nameplate(self, name, side, when, _test_mailbox_id=None): # when we're done: # * there will be one row for the nameplate - # * side1 or side2 will be populated - # * started or second will be populated - # * a mailbox id will be created, but not a mailbox row - # (ids are randomly unique, so we can defer creation until 'open') - assert isinstance(nameplate_id, type("")), type(nameplate_id) + # * there will be one 'side' attached to it, with claimed=True + # * a mailbox id will be created, but not a mailbox row + # (ids are randomly unique, so we can defer creation until 'open') + assert isinstance(name, type("")), type(name) assert isinstance(side, type("")), type(side) db = self._db row = db.execute("SELECT * FROM `nameplates`" " WHERE `app_id`=? AND `name`=?", - (self._app_id, nameplate_id)).fetchone() - if row: - mailbox_id = row["mailbox_id"] - try: - sr = add_side(row, side) - except CrowdedError: - db.execute("UPDATE `nameplates` SET `crowded`=?" - " WHERE `app_id`=? AND `name`=?", - (True, self._app_id, nameplate_id)) - db.commit() - raise - if sr.changed: - db.execute("UPDATE `nameplates` SET" - " `side1`=?, `side2`=?, `updated`=?, `second`=?" - " WHERE `app_id`=? AND `name`=?", - (sr.side1, sr.side2, when, when, - self._app_id, nameplate_id)) - else: + (self._app_id, name)).fetchone() + if not row: if self._log_requests: log.msg("creating nameplate#%s for app_id %s" % - (nameplate_id, self._app_id)) + (name, self._app_id)) if _test_mailbox_id is not None: # for unit tests mailbox_id = _test_mailbox_id else: mailbox_id = generate_mailbox_id() - db.execute("INSERT INTO `nameplates`" - " (`app_id`, `name`, `mailbox_id`, `side1`, `crowded`," - " `updated`, `started`)" - " VALUES(?,?,?,?,?, ?,?)", - (self._app_id, nameplate_id, mailbox_id, side, False, - when, when)) + sql = ("INSERT INTO `nameplates`" + " (`app_id`, `name`, `mailbox_id`, `updated`)" + " VALUES(?,?,?,?)") + npid = db.execute(sql, + (self._app_id, name, mailbox_id, when) + ).lastrowid + else: + npid = row["id"] + mailbox_id = row["mailbox_id"] + + row = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=? AND `side`=?", + (npid, side)).fetchone() + if not row: + db.execute("INSERT INTO `nameplate_sides`" + " (`nameplates_id`, `claimed`, `side`, `added`)" + " VALUES(?,?,?,?)", + (npid, True, side, when)) + db.execute("UPDATE `nameplates` SET `updated`=? WHERE `id`=?", + (when, npid)) db.commit() + rows = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", (npid,)).fetchall() + if len(rows) > 2: + raise CrowdedError("too many sides have claimed this nameplate") return mailbox_id - def release_nameplate(self, nameplate_id, side, when): + def release_nameplate(self, name, side, when): # when we're done: - # * in the nameplate row, side1 or side2 will be removed - # * if the nameplate is now unused: + # * the 'claimed' flag will be cleared on the nameplate_sides row + # * if the nameplate is now unused (no claimed sides): # * mailbox.nameplate_closed will be populated # * the nameplate row will be removed - assert isinstance(nameplate_id, type("")), type(nameplate_id) + # * the nameplate sides will be removed + assert isinstance(name, type("")), type(name) assert isinstance(side, type("")), type(side) db = self._db - row = db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`=? AND `name`=?", - (self._app_id, nameplate_id)).fetchone() + np_row = db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`=? AND `name`=?", + (self._app_id, name)).fetchone() + if not np_row: + return + npid = np_row["id"] + row = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=? AND `side`=?", + (npid, side)).fetchone() if not row: return - sr = remove_side(row, side) - if sr.empty: - db.execute("DELETE FROM `nameplates`" - " WHERE `app_id`=? AND `name`=?", - (self._app_id, nameplate_id)) - self._summarize_nameplate_and_store(row, when, pruned=False) - db.commit() - elif sr.changed: - db.execute("UPDATE `nameplates`" - " SET `side1`=?, `side2`=?, `updated`=?" - " WHERE `app_id`=? AND `name`=?", - (sr.side1, sr.side2, when, - self._app_id, nameplate_id)) - db.commit() + db.execute("UPDATE `nameplate_sides` SET `claimed`=?" + " WHERE `nameplates_id`=? AND `side`=?", + (False, npid, side)) + db.commit() - def _summarize_nameplate_and_store(self, row, delete_time, pruned): + # now, are there any remaining claims? + side_rows = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + claims = [1 for sr in side_rows if sr["claimed"]] + if claims: + return + # delete and summarize + db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?", + (npid,)) + db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,)) + self._summarize_nameplate_and_store(side_rows, when, pruned=False) + db.commit() + + def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned): # requires caller to db.commit() - u = self._summarize_nameplate_usage(row, delete_time, pruned) + u = self._summarize_nameplate_usage(side_rows, delete_time, pruned) self._db.execute("INSERT INTO `nameplate_usage`" " (`app_id`," " `started`, `total_time`, `waiting_time`, `result`)" @@ -304,20 +316,21 @@ class AppNamespace: (self._app_id, u.started, u.total_time, u.waiting_time, u.result)) - def _summarize_nameplate_usage(self, row, delete_time, pruned): - started = row["started"] + def _summarize_nameplate_usage(self, side_rows, delete_time, pruned): + times = sorted([row["added"] for row in side_rows]) + started = times[0] if self._blur_usage: started = self._blur_usage * (started // self._blur_usage) waiting_time = None - if row["second"]: - waiting_time = row["second"] - row["started"] - total_time = delete_time - row["started"] + if len(times) > 1: + waiting_time = times[1] - times[0] + total_time = delete_time - times[0] result = "lonely" - if row["second"]: + if len(times) == 2: result = "happy" if pruned: result = "pruney" - if row["crowded"]: + if len(times) > 2: result = "crowded" return Usage(started=started, waiting_time=waiting_time, total_time=total_time, result=result) @@ -470,13 +483,11 @@ class AppNamespace: # if it is old: # if the nameplate is new: # classify mailbox as new - all_nameplates = {} - all_nameplate_rows = {} + old_nameplate_ids = [] for row in db.execute("SELECT * FROM `nameplates`" " WHERE `app_id`=?", (self._app_id,)).fetchall(): - nameplate_id = row["name"] - all_nameplate_rows[nameplate_id] = row + npid = row["id"] if row["updated"] > old: which = NEW else: @@ -488,21 +499,23 @@ class AppNamespace: else: if which == NEW: all_mailboxes[mailbox_id] = NEW - all_nameplates[nameplate_id] = which - #log.msg(" 6: all_nameplates", all_nameplates, all_nameplate_rows) + if which == OLD: + old_nameplate_ids.append(npid) + #log.msg(" 6: old_nameplate_ids", old_nameplate_ids) # delete all old nameplates # invariant check: if there is a linked mailbox, it is old - for nameplate_id, which in all_nameplates.items(): - if which == OLD: - log.msg(" deleting nameplate", nameplate_id) - row = all_nameplate_rows[nameplate_id] - self._summarize_nameplate_and_store(row, now, pruned=True) - db.execute("DELETE FROM `nameplates`" - " WHERE `app_id`=? AND `name`=?", - (self._app_id, nameplate_id)) - modified = True + for npid in old_nameplate_ids: + log.msg(" deleting nameplate", npid) + side_rows = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?", + (npid,)) + db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,)) + self._summarize_nameplate_and_store(side_rows, now, pruned=True) + modified = True # delete all messages for old mailboxes # delete all old mailboxes diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index c62cf27..0d6b286 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -48,90 +48,106 @@ class Server(ServerBase, unittest.TestCase): biggest = max(nids) self.assert_(1000 <= biggest < 1000000, biggest) - def _nameplate(self, app, nameplate_id): - return app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `name`=?", - (nameplate_id,)).fetchone() + def _nameplate(self, app, name): + np_row = app._db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`='appid' AND `name`=?", + (name,)).fetchone() + if not np_row: + return None, None + npid = np_row["id"] + side_rows = app._db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + return np_row, side_rows def test_nameplate(self): app = self._rendezvous.get_app("appid") - nameplate_id = app.allocate_nameplate("side1", 0) - self.assertEqual(type(nameplate_id), type("")) - nid = int(nameplate_id) + name = app.allocate_nameplate("side1", 0) + self.assertEqual(type(name), type("")) + nid = int(name) self.assert_(0 < nid < 10, nid) - self.assertEqual(app.get_nameplate_ids(), set([nameplate_id])) + self.assertEqual(app.get_nameplate_ids(), set([name])) # allocate also does a claim - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], None) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side1") + self.assertEqual(side_rows[0]["added"], 0) - mailbox_id = app.claim_nameplate(nameplate_id, "side1", 1) - self.assertEqual(type(mailbox_id), type("")) # duplicate claims by the same side are combined - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) + mailbox_id = app.claim_nameplate(name, "side1", 1) + self.assertEqual(type(mailbox_id), type("")) + self.assertEqual(mailbox_id, np_row["mailbox_id"]) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["added"], 0) + self.assertEqual(mailbox_id, np_row["mailbox_id"]) - mailbox_id2 = app.claim_nameplate(nameplate_id, "side1", 2) + # and they don't updated the 'added' time + mailbox_id2 = app.claim_nameplate(name, "side1", 2) self.assertEqual(mailbox_id, mailbox_id2) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], None) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["added"], 0) # claim by the second side is new - mailbox_id3 = app.claim_nameplate(nameplate_id, "side2", 3) + mailbox_id3 = app.claim_nameplate(name, "side2", 3) self.assertEqual(mailbox_id, mailbox_id3) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 3) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 2) + self.assertEqual(sorted([row["side"] for row in side_rows]), + sorted(["side1", "side2"])) + self.assertIn(("side2", 3), + [(row["side"], row["added"]) for row in side_rows]) - # a third claim marks the nameplate as "crowded", but leaves the two - # existing claims alone + # a third claim marks the nameplate as "crowded", and adds a third + # claim (which must be released later), but leaves the two existing + # claims alone self.assertRaises(rendezvous.CrowdedError, - app.claim_nameplate, nameplate_id, "side3", 0) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], True) + app.claim_nameplate, name, "side3", 4) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 3) # releasing a non-existent nameplate is ignored - app.release_nameplate(nameplate_id+"not", "side4", 0) + app.release_nameplate(name+"not", "side4", 0) # releasing a side that never claimed the nameplate is ignored - app.release_nameplate(nameplate_id, "side4", 0) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") + app.release_nameplate(name, "side4", 0) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 3) # releasing one side leaves the second claim - app.release_nameplate(nameplate_id, "side1", 5) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) + app.release_nameplate(name, "side1", 5) + np_row, side_rows = self._nameplate(app, name) + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side1", False), claims) + self.assertIn(("side2", True), claims) + self.assertIn(("side3", True), claims) # releasing one side multiple times is ignored - app.release_nameplate(nameplate_id, "side1", 5) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) + app.release_nameplate(name, "side1", 5) + np_row, side_rows = self._nameplate(app, name) + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side1", False), claims) + self.assertIn(("side2", True), claims) + self.assertIn(("side3", True), claims) - # releasing the second side frees the nameplate, and adds usage - app.release_nameplate(nameplate_id, "side2", 6) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row, None) + # release the second side + app.release_nameplate(name, "side2", 6) + np_row, side_rows = self._nameplate(app, name) + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side1", False), claims) + self.assertIn(("side2", False), claims) + self.assertIn(("side3", True), claims) + + # releasing the third side frees the nameplate, and adds usage + app.release_nameplate(name, "side3", 7) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(np_row, None) usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone() self.assertEqual(usage["app_id"], "appid") self.assertEqual(usage["started"], 0) self.assertEqual(usage["waiting_time"], 3) - self.assertEqual(usage["total_time"], 6) + self.assertEqual(usage["total_time"], 7) self.assertEqual(usage["result"], "crowded") @@ -695,6 +711,18 @@ class WebSocketAPI(ServerBase, unittest.TestCase): nids.add(n["id"]) self.assertEqual(nids, set([nameplate_id1, "np2"])) + def _nameplate(self, app, name): + np_row = app._db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`='appid' AND `name`=?", + (name,)).fetchone() + if not np_row: + return None, None + npid = np_row["id"] + side_rows = app._db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + return np_row, side_rows + @inlineCallbacks def test_allocate(self): c1 = yield self.make_client() @@ -710,11 +738,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase): c1.send("allocate") m = yield c1.next_non_ack() self.assertEqual(m["type"], "allocated") - nameplate_id = m["nameplate"] + name = m["nameplate"] nids = app.get_nameplate_ids() self.assertEqual(len(nids), 1) - self.assertEqual(nameplate_id, list(nids)[0]) + self.assertEqual(name, list(nids)[0]) c1.send("allocate") err = yield c1.next_non_ack() @@ -722,13 +750,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(err["error"], "you already allocated one, don't be greedy") - c1.send("claim", nameplate=nameplate_id) # allocate+claim is ok + c1.send("claim", nameplate=name) # allocate+claim is ok yield c1.sync() - row = app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `name`=?", - (nameplate_id,)).fetchone() - self.assertEqual(row["side1"], "side") - self.assertEqual(row["side2"], None) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side") @inlineCallbacks def test_claim(self): @@ -751,6 +777,9 @@ class WebSocketAPI(ServerBase, unittest.TestCase): nids = app.get_nameplate_ids() self.assertEqual(len(nids), 1) self.assertEqual("np1", list(nids)[0]) + np_row, side_rows = self._nameplate(app, "np1") + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side") # claiming a nameplate will assign a random mailbox id, but won't # create the mailbox itself @@ -796,10 +825,10 @@ class WebSocketAPI(ServerBase, unittest.TestCase): m = yield c1.next_non_ack() self.assertEqual(m["type"], "released") - row = app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `name`='np1'").fetchone() - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) + np_row, side_rows = self._nameplate(app, "np1") + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side", False), claims) + self.assertIn(("side2", True), claims) c1.send("release") # no longer claimed err = yield c1.next_non_ack() @@ -941,19 +970,20 @@ class Summary(unittest.TestCase): def test_nameplate(self): a = rendezvous.AppNamespace(None, None, False, None) # starts at time 1, maybe gets second open at time 3, closes at 5 - base_row = {"started": 1, "second": None, "crowded": False} - def summ(num_sides, pruned=False, **kwargs): - row = base_row.copy() - row.update(kwargs) - return a._summarize_nameplate_usage(row, 5, pruned) + def s(rows, pruned=False): + return a._summarize_nameplate_usage(rows, 5, pruned) - self.assertEqual(summ(1), Usage(1, None, 4, "lonely")) - self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, "crowded")) + rows = [dict(added=1)] + self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) + rows = [dict(added=1), dict(added=3)] + self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - self.assertEqual(summ(2, second=3), Usage(1, 2, 4, "happy")) + rows = [dict(added=1), dict(added=3)] + self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) + + rows = [dict(added=1), dict(added=3), dict(added=4)] + self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) - self.assertEqual(summ(2, second=3, pruned=True), - Usage(1, 2, 4, "pruney")) def test_blur(self): db = get_db(":memory:")