diff --git a/src/wormhole/server/db-schemas/v2.sql b/src/wormhole/server/db-schemas/v2.sql index 83751ce..b436897 100644 --- a/src/wormhole/server/db-schemas/v2.sql +++ b/src/wormhole/server/db-schemas/v2.sql @@ -34,8 +34,8 @@ CREATE TABLE `mailboxes` ( `app_id` VARCHAR, `id` VARCHAR, - `side1` VARCHAR -- side name, or NULL - `side2` VARCHAR -- side name, or NULL + `side1` VARCHAR, -- side name, or NULL + `side2` VARCHAR, -- side name, or NULL `crowded` BOOLEAN, -- at some point, three or more sides were involved `first_mood` VARCHAR, -- timing data for the mailbox itself diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 76a8fed..f2b0829 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -31,6 +31,7 @@ class CrowdedError(Exception): def add_side(row, new_side): old_sides = [s for s in [row["side1"], row["side2"]] if s] + assert old_sides if new_side in old_sides: return Unchanged if len(old_sides) == 2: @@ -98,7 +99,7 @@ class Mailbox: if row["phase"] in (CLAIM, RELEASE): continue messages.append({"phase": row["phase"], "body": row["body"], - "server_rx": row["server_rx"], "id": row["msgid"]}) + "server_rx": row["server_rx"], "id": row["msg_id"]}) return messages def add_listener(self, handle, send_f, stop_f): @@ -117,7 +118,7 @@ class Mailbox: db = self._db db.execute("INSERT INTO `messages`" " (`app_id`, `mailbox_id`, `side`, `phase`, `body`," - " `server_rx`, `msgid`)" + " `server_rx`, `msg_id`)" " VALUES (?,?,?,?,?, ?,?)", (self._app_id, self._mailbox_id, side, phase, body, server_rx, msgid)) @@ -138,8 +139,8 @@ class Mailbox: return sr = remove_side(row, side) if sr.empty: - rows = db.execute("SELECT DISTINCT(side) FROM `messages`" - " WHERE `app_id`=? AND `id`=?", + rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`" + " WHERE `app_id`=? AND `mailbox_id`=?", (self._app_id, self._mailbox_id)).fetchall() num_sides = len(rows) self._summarize_and_store(row, num_sides, mood, when, pruned=False) @@ -188,9 +189,9 @@ class Mailbox: waiting_time = row["second"] - row["started"] total_time = delete_time - row["started"] - if len(num_sides) == 0: + if num_sides == 0: result = u"quiet" - elif len(num_sides) == 1: + elif num_sides == 1: result = u"lonely" else: result = u"happy" @@ -266,7 +267,7 @@ class AppNamespace: def allocate_nameplate(self, side, when): nameplate_id = self._find_available_nameplate_id() - mailbox_id = self.claim_nameplate(self, nameplate_id, side, when) + mailbox_id = self.claim_nameplate(nameplate_id, side, when) del mailbox_id # ignored, they'll learn it from claim() return nameplate_id @@ -311,10 +312,10 @@ class AppNamespace: (nameplate_id, self._app_id)) mailbox_id = generate_mailbox_id() db.execute("INSERT INTO `nameplates`" - " (`app_id`, `id`, `mailbox_id`, `side1`," + " (`app_id`, `id`, `mailbox_id`, `side1`, `crowded`," " `updated`, `started`)" - " VALUES(?,?,?,?, ?,?)", - (self._app_id, nameplate_id, mailbox_id, side, + " VALUES(?,?,?,?,?, ?,?)", + (self._app_id, nameplate_id, mailbox_id, side, False, when, when)) db.commit() return mailbox_id @@ -367,10 +368,10 @@ class AppNamespace: waiting_time = row["second"] - row["started"] total_time = delete_time - row["started"] result = u"lonely" - if pruned: - result = u"pruney" if row["second"]: result = u"happy" + if pruned: + result = u"pruney" if row["crowded"]: result = u"crowded" return Usage(started=started, waiting_time=waiting_time, @@ -403,15 +404,17 @@ class AppNamespace: log.msg("spawning #%s for app_id %s" % (mailbox_id, self._app_id)) db.execute("INSERT INTO `mailboxes`" - " (`app_id`, `id`, `started`)" - " VALUES(?,?,?)", - (self._app_id, mailbox_id, when)) + " (`app_id`, `id`, `side1`, `crowded`, `started`)" + " VALUES(?,?,?,?,?)", + (self._app_id, mailbox_id, side, False, when)) + db.commit() # XXX + # mailbox.open() does a SELECT to find the old sides self._mailboxes[mailbox_id] = Mailbox(self, self._db, self._blur_usage, self._log_requests, self._app_id, mailbox_id) mailbox = self._mailboxes[mailbox_id] - mailbox.open(side) + mailbox.open(side, when) db.commit() return mailbox @@ -421,9 +424,9 @@ class AppNamespace: if mailbox_id in self._mailboxes: self._mailboxes.pop(mailbox_id) - if self._log_requests: - log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % - (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) + #if self._log_requests: + # log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % + # (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) def prune_mailboxes(self, old): # For now, pruning is logged even if log_requests is False, to debug diff --git a/src/wormhole/server/transit_server.py b/src/wormhole/server/transit_server.py index 2412c89..7c55b44 100644 --- a/src/wormhole/server/transit_server.py +++ b/src/wormhole/server/transit_server.py @@ -186,12 +186,12 @@ class Transit(protocol.ServerFactory, service.MultiService): if self._blur_usage: started = self._blur_usage * (started // self._blur_usage) total_bytes = blur_size(total_bytes) - self._db.execute("INSERT INTO `usage`" - " (`type`, `started`, `result`, `total_bytes`," - " `total_time`, `waiting_time`)" - " VALUES (?,?,?,?, ?,?)", - (u"transit", started, result, total_bytes, - total_time, waiting_time)) + self._db.execute("INSERT INTO `transit_usage`" + " (`started`, `total_time`, `waiting_time`," + " `total_bytes`, `result`)" + " VALUES (?,?,?, ?,?)", + (started, total_time, waiting_time, + total_bytes, result)) self._db.commit() def transitFinished(self, p, token, description): diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index e98f4d1..1497e7d 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -10,6 +10,7 @@ from autobahn.twisted import websocket from .. import __version__ from .common import ServerBase from ..server import rendezvous, transit_server +from ..server.rendezvous import Usage class Reachable(ServerBase, unittest.TestCase): @@ -35,6 +36,218 @@ class Reachable(ServerBase, unittest.TestCase): d.addCallback(_got) return d +class Server(ServerBase, unittest.TestCase): + def test_apps(self): + app1 = self._rendezvous.get_app(u"appid1") + self.assertIdentical(app1, self._rendezvous.get_app(u"appid1")) + app2 = self._rendezvous.get_app(u"appid2") + self.assertNotIdentical(app1, app2) + + def test_nameplate_allocation(self): + app = self._rendezvous.get_app(u"appid") + nids = set() + # this takes a second, and claims all the short-numbered nameplates + def add(): + nameplate_id = app.allocate_nameplate(u"side1", 0) + self.assertEqual(type(nameplate_id), type(u"")) + nid = int(nameplate_id) + nids.add(nid) + for i in range(9): add() + self.assertNotIn(0, nids) + self.assertEqual(set(range(1,10)), nids) + + for i in range(100-10): add() + self.assertEqual(len(nids), 99) + self.assertEqual(set(range(1,100)), nids) + + for i in range(1000-100): add() + self.assertEqual(len(nids), 999) + self.assertEqual(set(range(1,1000)), nids) + + add() + self.assertEqual(len(nids), 1000) + biggest = max(nids) + self.assert_(1000 <= biggest < 1000000, biggest) + + def _nameplate(self, app, nameplate_id): + return app._db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`='appid' AND `id`=?", + (nameplate_id,)).fetchone() + + def test_nameplate(self): + app = self._rendezvous.get_app(u"appid") + nameplate_id = app.allocate_nameplate(u"side1", 0) + self.assertEqual(type(nameplate_id), type(u"")) + nid = int(nameplate_id) + self.assert_(0 < nid < 10, nid) + self.assertEqual(app.get_nameplate_ids(), set([nameplate_id])) + # allocate also does a claim + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], None) + self.assertEqual(row["crowded"], False) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], None) + + mailbox_id = app.claim_nameplate(nameplate_id, u"side1", 1) + self.assertEqual(type(mailbox_id), type(u"")) + # duplicate claims by the same side are combined + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], None) + + mailbox_id2 = app.claim_nameplate(nameplate_id, u"side1", 2) + self.assertEqual(mailbox_id, mailbox_id2) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], None) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], None) + + # claim by the second side is new + mailbox_id3 = app.claim_nameplate(nameplate_id, u"side2", 3) + self.assertEqual(mailbox_id, mailbox_id3) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], u"side2") + self.assertEqual(row["crowded"], False) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], 3) + + # a third claim marks the nameplate as "crowded", but leaves the two + # existing claims alone + self.assertRaises(rendezvous.CrowdedError, + app.claim_nameplate, nameplate_id, u"side3", 0) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], u"side2") + self.assertEqual(row["crowded"], True) + + # releasing a non-existent nameplate is ignored + app.release_nameplate(nameplate_id+u"not", u"side4", 0) + + # releasing a side that never claimed the nameplate is ignored + app.release_nameplate(nameplate_id, u"side4", 0) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], u"side2") + + # releasing one side leaves the second claim + app.release_nameplate(nameplate_id, u"side1", 5) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side2") + self.assertEqual(row["side2"], None) + + # releasing one side multiple times is ignored + app.release_nameplate(nameplate_id, u"side1", 5) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row["side1"], u"side2") + self.assertEqual(row["side2"], None) + + # releasing the second side frees the nameplate, and adds usage + app.release_nameplate(nameplate_id, u"side2", 6) + row = self._nameplate(app, nameplate_id) + self.assertEqual(row, None) + usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone() + self.assertEqual(usage["app_id"], u"appid") + self.assertEqual(usage["started"], 0) + self.assertEqual(usage["waiting_time"], 3) + self.assertEqual(usage["total_time"], 6) + self.assertEqual(usage["result"], u"crowded") + + + def _mailbox(self, app, mailbox_id): + return app._db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`='appid' AND `id`=?", + (mailbox_id,)).fetchone() + + def test_mailbox(self): + app = self._rendezvous.get_app(u"appid") + mailbox_id = u"mid" + m1 = app.open_mailbox(mailbox_id, u"side1", 0) + + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], None) + self.assertEqual(row["crowded"], False) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], None) + + # opening the same mailbox twice, by the same side, gets the same + # object + self.assertIdentical(m1, app.open_mailbox(mailbox_id, u"side1", 1)) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], None) + self.assertEqual(row["crowded"], False) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], None) + + # opening a second side gets the same object, and adds a new claim + self.assertIdentical(m1, app.open_mailbox(mailbox_id, u"side2", 2)) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], u"side2") + self.assertEqual(row["crowded"], False) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], 2) + + # a third open marks it as crowded + self.assertRaises(rendezvous.CrowdedError, + app.open_mailbox, mailbox_id, u"side3", 3) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], u"side2") + self.assertEqual(row["crowded"], True) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], 2) + + # closing a side that never claimed the mailbox is ignored + m1.close(u"side4", u"mood", 4) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side1") + self.assertEqual(row["side2"], u"side2") + self.assertEqual(row["crowded"], True) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], 2) + + # closing one side leaves the second claim + m1.close(u"side1", u"mood", 5) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side2") + self.assertEqual(row["side2"], None) + self.assertEqual(row["crowded"], True) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], 2) + + # closing one side multiple is ignored + m1.close(u"side1", u"mood", 6) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row["side1"], u"side2") + self.assertEqual(row["side2"], None) + self.assertEqual(row["crowded"], True) + self.assertEqual(row["started"], 0) + self.assertEqual(row["second"], 2) + + # closing the second side frees the mailbox, and adds usage + m1.close(u"side2", u"mood", 7) + row = self._mailbox(app, mailbox_id) + self.assertEqual(row, None) + usage = app._db.execute("SELECT * FROM `mailbox_usage`").fetchone() + self.assertEqual(usage["app_id"], u"appid") + self.assertEqual(usage["started"], 0) + self.assertEqual(usage["waiting_time"], 2) + self.assertEqual(usage["total_time"], 7) + self.assertEqual(usage["result"], u"crowded") + + def test_messages(self): + app = self._rendezvous.get_app(u"appid") + mailbox_id = u"mid" + m1 = app.open_mailbox(mailbox_id, u"side1", 0) + m1.add_message(u"side1", u"phase", u"body", 1, u"msgid") + # XXX more + + def strip_message(msg): m2 = msg.copy() m2.pop("id", None) @@ -465,73 +678,60 @@ class WebSocketAPI(ServerBase, unittest.TestCase): class Summary(unittest.TestCase): - def test_summarize(self): - c = rendezvous.Channel(None, None, None, False, None, None) - A = rendezvous.CLAIM - D = rendezvous.RELEASE + def test_mailbox(self): + c = rendezvous.Mailbox(None, None, None, False, None, None) + # starts at time 1, maybe gets second open at time 3, closes at 5 + base_row = {u"started": 1, u"second": None, + u"first_mood": None, u"crowded": False} + def summ(num_sides, second_mood=None, pruned=False, **kwargs): + row = base_row.copy() + row.update(kwargs) + return c._summarize(row, num_sides, second_mood, 5, pruned) - messages = [{"server_rx": 1, "side": "a", "phase": A}] - self.failUnlessEqual(c._summarize(messages, 2), - (1, "lonely", 1, None)) + self.assertEqual(summ(1), Usage(1, None, 4, u"lonely")) + self.assertEqual(summ(1, u"lonely"), Usage(1, None, 4, u"lonely")) + self.assertEqual(summ(1, u"errory"), Usage(1, None, 4, u"errory")) + self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, u"crowded")) - messages = [{"server_rx": 1, "side": "a", "phase": A}, - {"server_rx": 2, "side": "a", "phase": D, "body": "lonely"}, - ] - self.failUnlessEqual(c._summarize(messages, 3), - (1, "lonely", 2, None)) + self.assertEqual(summ(2, first_mood=u"happy", + second=3, second_mood=u"happy"), + Usage(1, 2, 4, u"happy")) - messages = [{"server_rx": 1, "side": "a", "phase": A}, - {"server_rx": 2, "side": "b", "phase": A}, - {"server_rx": 3, "side": "c", "phase": A}, - ] - self.failUnlessEqual(c._summarize(messages, 4), - (1, "crowded", 3, None)) + self.assertEqual(summ(2, first_mood=u"errory", + second=3, second_mood=u"happy"), + Usage(1, 2, 4, u"errory")) - base = [{"server_rx": 1, "side": "a", "phase": A}, - {"server_rx": 2, "side": "a", "phase": "pake", "body": "msg1"}, - {"server_rx": 10, "side": "b", "phase": "pake", "body": "msg2"}, - {"server_rx": 11, "side": "b", "phase": "data", "body": "msg3"}, - {"server_rx": 20, "side": "a", "phase": "data", "body": "msg4"}, - ] - def make_moods(A_mood, B_mood): - return base + [ - {"server_rx": 21, "side": "a", "phase": D, "body": A_mood}, - {"server_rx": 30, "side": "b", "phase": D, "body": B_mood}, - ] + self.assertEqual(summ(2, first_mood=u"happy", + second=3, second_mood=u"errory"), + Usage(1, 2, 4, u"errory")) - self.failUnlessEqual(c._summarize(make_moods("happy", "happy"), 41), - (1, "happy", 40, 9)) + self.assertEqual(summ(2, first_mood=u"scary", + second=3, second_mood=u"happy"), + Usage(1, 2, 4, u"scary")) - self.failUnlessEqual(c._summarize(make_moods("scary", "happy"), 41), - (1, "scary", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods("happy", "scary"), 41), - (1, "scary", 40, 9)) + self.assertEqual(summ(2, first_mood=u"scary", + second=3, second_mood=u"errory"), + Usage(1, 2, 4, u"scary")) - self.failUnlessEqual(c._summarize(make_moods("lonely", "happy"), 41), - (1, "lonely", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods("happy", "lonely"), 41), - (1, "lonely", 40, 9)) + self.assertEqual(summ(2, first_mood=u"happy", second=3, pruned=True), + Usage(1, 2, 4, u"pruney")) - self.failUnlessEqual(c._summarize(make_moods("errory", "happy"), 41), - (1, "errory", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods("happy", "errory"), 41), - (1, "errory", 40, 9)) + def test_nameplate(self): + a = rendezvous.AppNamespace(None, None, None, False, None) + # starts at time 1, maybe gets second open at time 3, closes at 5 + base_row = {u"started": 1, u"second": None, u"crowded": False} + def summ(num_sides, pruned=False, **kwargs): + row = base_row.copy() + row.update(kwargs) + return a._summarize_nameplate_usage(row, 5, pruned) - # scary trumps other moods - self.failUnlessEqual(c._summarize(make_moods("scary", "lonely"), 41), - (1, "scary", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods("scary", "errory"), 41), - (1, "scary", 40, 9)) + self.assertEqual(summ(1), Usage(1, None, 4, u"lonely")) + self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, u"crowded")) - # older clients don't send a mood - self.failUnlessEqual(c._summarize(make_moods(None, None), 41), - (1, "quiet", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods(None, "happy"), 41), - (1, "quiet", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods(None, "happy"), 41), - (1, "quiet", 40, 9)) - self.failUnlessEqual(c._summarize(make_moods(None, "scary"), 41), - (1, "scary", 40, 9)) + self.assertEqual(summ(2, second=3), Usage(1, 2, 4, u"happy")) + + self.assertEqual(summ(2, second=3, pruned=True), + Usage(1, 2, 4, u"pruney")) class Accumulator(protocol.Protocol): def __init__(self):