diff --git a/src/wormhole/test/common.py b/src/wormhole/test/common.py index 7cdfbcc..3b4d73b 100644 --- a/src/wormhole/test/common.py +++ b/src/wormhole/test/common.py @@ -7,6 +7,7 @@ import mock from ..cli import cli from ..transit import allocate_tcp_port from wormhole_mailbox_server.server import make_server +from wormhole_mailbox_server.web import make_web_server from wormhole_mailbox_server.database import create_channel_db from wormhole_transit_relay.transit_server import Transit @@ -23,9 +24,9 @@ class ServerBase: self._rendezvous = make_server(db, advertise_version=advertise_version, signal_error=error) - ep = endpoints.TCP4ServerEndpoint(reactor 0, interface="127.0.01") - site = make_web_server(self._rendezvous) - s = StreamServerEndpointService(ep, site) + ep = endpoints.TCP4ServerEndpoint(reactor, 0, interface="127.0.01") + site = make_web_server(self._rendezvous, log_requests=False) + s = internet.StreamServerEndpointService(ep, site) s.setServiceParent(self.sp) self.rdv_ws_port = s.__lp.getHost().port self._relay_server = s diff --git a/src/wormhole/test/test_cli.py b/src/wormhole/test/test_cli.py index 2ff080e..644b71b 100644 --- a/src/wormhole/test/test_cli.py +++ b/src/wormhole/test/test_cli.py @@ -17,8 +17,6 @@ from ..cli import cmd_send, cmd_receive, welcome, cli from ..errors import (TransferError, WrongPasswordError, WelcomeError, UnsendableFileError, ServerConnectionError) from .._interfaces import ITorManager -from wormhole.server.cmd_server import MyPlugin -from wormhole.server.cli import server def build_offer(args): @@ -1260,97 +1258,3 @@ class Help(unittest.TestCase): result = CliRunner().invoke(cli.wormhole, ["--help"]) self._check_top_level_help(result.output) self.assertEqual(result.exit_code, 0) - -class FakeConfig(object): - no_daemon = True - blur_usage = True - advertise_version = u"fake.version.1" - transit = str('tcp:4321') - rendezvous = str('tcp:1234') - signal_error = True - allow_list = False - relay_database_path = "relay.sqlite" - stats_json_path = "stats.json" - - -class Server(unittest.TestCase): - - def setUp(self): - self.runner = CliRunner() - - @mock.patch('wormhole.server.cmd_server.twistd') - def test_server_disallow_list(self, fake_twistd): - result = self.runner.invoke(server, ['start', '--no-daemon', '--disallow-list']) - self.assertEqual(0, result.exit_code) - - def test_server_plugin(self): - cfg = FakeConfig() - plugin = MyPlugin(cfg) - relay = plugin.makeService(None) - self.assertEqual(False, relay._allow_list) - - @mock.patch("wormhole.server.cmd_server.start_server") - def test_start_no_args(self, fake_start_server): - result = self.runner.invoke(server, ['start']) - self.assertEqual(0, result.exit_code) - cfg = fake_start_server.mock_calls[0][1][0] - MyPlugin(cfg).makeService(None) - - @mock.patch("wormhole.server.cmd_server.restart_server") - def test_restart_no_args(self, fake_start_reserver): - result = self.runner.invoke(server, ['restart']) - self.assertEqual(0, result.exit_code) - cfg = fake_start_reserver.mock_calls[0][1][0] - MyPlugin(cfg).makeService(None) - - def test_state_locations(self): - cfg = FakeConfig() - plugin = MyPlugin(cfg) - relay = plugin.makeService(None) - self.assertEqual('relay.sqlite', relay._db_url) - self.assertEqual('stats.json', relay._stats_file) - - @mock.patch("wormhole.server.cmd_server.start_server") - def test_websocket_protocol_options(self, fake_start_server): - result = self.runner.invoke( - server, [ - 'start', - '--websocket-protocol-option=a=3', - '--websocket-protocol-option=b=true', - '--websocket-protocol-option=c=3.5', - '--websocket-protocol-option=d=["foo","bar"]', - '--websocket-protocol-option', 'e=["foof","barf"]', - ]) - self.assertEqual(0, result.exit_code) - cfg = fake_start_server.mock_calls[0][1][0] - self.assertEqual( - cfg.websocket_protocol_option, - [("a", 3), ("b", True), ("c", 3.5), ("d", ['foo', 'bar']), - ("e", ['foof', 'barf']), - ], - ) - - def test_broken_websocket_protocol_options(self): - result = self.runner.invoke( - server, [ - 'start', - '--websocket-protocol-option=a', - ]) - self.assertNotEqual(0, result.exit_code) - self.assertIn( - 'Error: Invalid value for "--websocket-protocol-option": ' - 'format options as OPTION=VALUE', - result.output, - ) - - result = self.runner.invoke( - server, [ - 'start', - '--websocket-protocol-option=a=foo', - ]) - self.assertNotEqual(0, result.exit_code) - self.assertIn( - 'Error: Invalid value for "--websocket-protocol-option": ' - 'could not parse JSON value for a', - result.output, - ) diff --git a/src/wormhole/test/test_database.py b/src/wormhole/test/test_database.py deleted file mode 100644 index 4ebc2cb..0000000 --- a/src/wormhole/test/test_database.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import print_function, unicode_literals -import os -from twisted.python import filepath -from twisted.trial import unittest -from ..server import database -from ..server.database import get_db, TARGET_VERSION, dump_db - -class DB(unittest.TestCase): - def test_create_default(self): - db_url = ":memory:" - db = get_db(db_url) - rows = db.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], TARGET_VERSION) - - def test_failed_create_allows_subsequent_create(self): - patch = self.patch(database, "get_schema", lambda version: b"this is a broken schema") - dbfile = filepath.FilePath(self.mktemp()) - self.assertRaises(Exception, lambda: get_db(dbfile.path)) - patch.restore() - get_db(dbfile.path) - - def test_upgrade(self): - basedir = self.mktemp() - os.mkdir(basedir) - fn = os.path.join(basedir, "upgrade.db") - self.assertNotEqual(TARGET_VERSION, 2) - - # create an old-version DB in a file - db = get_db(fn, 2) - rows = db.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], 2) - del db - - # then upgrade the file to the latest version - dbA = get_db(fn, TARGET_VERSION) - rows = dbA.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], TARGET_VERSION) - dbA_text = dump_db(dbA) - del dbA - - # make sure the upgrades got committed to disk - dbB = get_db(fn, TARGET_VERSION) - dbB_text = dump_db(dbB) - del dbB - self.assertEqual(dbA_text, dbB_text) - - # The upgraded schema should be equivalent to that of a new DB. - # However a text dump will differ because ALTER TABLE always appends - # the new column to the end of a table, whereas our schema puts it - # somewhere in the middle (wherever it fits naturally). Also ALTER - # TABLE doesn't include comments. - if False: - latest_db = get_db(":memory:", TARGET_VERSION) - latest_text = dump_db(latest_db) - with open("up.sql","w") as f: f.write(dbA_text) - with open("new.sql","w") as f: f.write(latest_text) - # check with "diff -u _trial_temp/up.sql _trial_temp/new.sql" - self.assertEqual(dbA_text, latest_text) diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py deleted file mode 100644 index 6cc202f..0000000 --- a/src/wormhole/test/test_server.py +++ /dev/null @@ -1,1424 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, json, itertools, time -import mock -from twisted.trial import unittest -from twisted.python import log -from twisted.internet import reactor, defer, endpoints -from twisted.internet.defer import inlineCallbacks, returnValue -from autobahn.twisted import websocket -from .common import ServerBase -from ..server import server, rendezvous -from ..server.rendezvous import Usage, SidedMessage -from ..server.database import get_db - -def easy_relay( - rendezvous_web_port=str("tcp:0"), - advertise_version=None, - **kwargs -): - return server.RelayServer( - rendezvous_web_port, - advertise_version, - **kwargs - ) - -class RLimits(unittest.TestCase): - def test_rlimit(self): - def patch_s(name, *args, **kwargs): - return mock.patch("wormhole.server.server." + name, *args, **kwargs) - # We never start this, so the endpoints can be fake. - # serverFromString() requires bytes on py2 and str on py3, so this - # is easier than just passing "tcp:0" - ep = endpoints.TCP4ServerEndpoint(None, 0) - with patch_s("endpoints.serverFromString", return_value=ep): - s = server.RelayServer("fake", None) - fakelog = [] - def checklog(*expected): - self.assertEqual(fakelog, list(expected)) - fakelog[:] = [] - NF = "NOFILE" - mock_NF = patch_s("RLIMIT_NOFILE", NF) - - with patch_s("log.msg", fakelog.append): - with patch_s("getrlimit", None): - s.increase_rlimits() - checklog("unable to import 'resource', leaving rlimit alone") - - with mock_NF: - with patch_s("getrlimit", return_value=(20000, 30000)) as gr: - s.increase_rlimits() - self.assertEqual(gr.mock_calls, [mock.call(NF)]) - checklog("RLIMIT_NOFILE.soft was 20000, leaving it alone") - - with patch_s("getrlimit", return_value=(10, 30000)) as gr: - with patch_s("setrlimit", side_effect=TypeError("other")): - with patch_s("log.err") as err: - s.increase_rlimits() - self.assertEqual(err.mock_calls, [mock.call()]) - checklog("changing RLIMIT_NOFILE from (10,30000) to (30000,30000)", - "other error during setrlimit, leaving it alone") - - for maxlimit in [40000, 20000, 9000, 2000, 1000]: - def setrlimit(which, newlimit): - if newlimit[0] > maxlimit: - raise ValueError("nope") - return None - calls = [] - expected = [] - for tries in [30000, 10000, 3200, 1024]: - calls.append(mock.call(NF, (tries, 30000))) - expected.append("changing RLIMIT_NOFILE from (10,30000) to (%d,30000)" % tries) - if tries > maxlimit: - expected.append("error during setrlimit: nope") - else: - expected.append("setrlimit successful") - break - else: - expected.append("unable to change rlimit, leaving it alone") - - with patch_s("setrlimit", side_effect=setrlimit) as sr: - s.increase_rlimits() - self.assertEqual(sr.mock_calls, calls) - checklog(*expected) - -class _Util: - def _nameplate(self, app, name): - np_row = app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `name`=?", - (name,)).fetchone() - if not np_row: - return None, None - npid = np_row["id"] - side_rows = app._db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=?", - (npid,)).fetchall() - return np_row, side_rows - - def _mailbox(self, app, mailbox_id): - mb_row = app._db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`='appid' AND `id`=?", - (mailbox_id,)).fetchone() - if not mb_row: - return None, None - side_rows = app._db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (mailbox_id,)).fetchall() - return mb_row, side_rows - - def _messages(self, app): - c = app._db.execute("SELECT * FROM `messages`" - " WHERE `app_id`='appid' AND `mailbox_id`='mid'") - return c.fetchall() - -class Server(_Util, ServerBase, unittest.TestCase): - def test_apps(self): - app1 = self._rendezvous.get_app("appid1") - self.assertIdentical(app1, self._rendezvous.get_app("appid1")) - app2 = self._rendezvous.get_app("appid2") - self.assertNotIdentical(app1, app2) - - def test_nameplate_allocation(self): - app = self._rendezvous.get_app("appid") - nids = set() - # this takes a second, and claims all the short-numbered nameplates - def add(): - nameplate_id = app.allocate_nameplate("side1", 0) - self.assertEqual(type(nameplate_id), type("")) - nid = int(nameplate_id) - nids.add(nid) - for i in range(9): add() - self.assertNotIn(0, nids) - self.assertEqual(set(range(1,10)), nids) - - for i in range(100-10): add() - self.assertEqual(len(nids), 99) - self.assertEqual(set(range(1,100)), nids) - - for i in range(1000-100): add() - self.assertEqual(len(nids), 999) - self.assertEqual(set(range(1,1000)), nids) - - add() - self.assertEqual(len(nids), 1000) - biggest = max(nids) - self.assert_(1000 <= biggest < 1000000, biggest) - - def test_nameplate(self): - app = self._rendezvous.get_app("appid") - name = app.allocate_nameplate("side1", 0) - self.assertEqual(type(name), type("")) - nid = int(name) - self.assert_(0 < nid < 10, nid) - self.assertEqual(app.get_nameplate_ids(), set([name])) - # allocate also does a claim - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side1") - self.assertEqual(side_rows[0]["added"], 0) - - # duplicate claims by the same side are combined - mailbox_id = app.claim_nameplate(name, "side1", 1) - self.assertEqual(type(mailbox_id), type("")) - self.assertEqual(mailbox_id, np_row["mailbox_id"]) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["added"], 0) - self.assertEqual(mailbox_id, np_row["mailbox_id"]) - - # and they don't updated the 'added' time - mailbox_id2 = app.claim_nameplate(name, "side1", 2) - self.assertEqual(mailbox_id, mailbox_id2) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["added"], 0) - - # claim by the second side is new - mailbox_id3 = app.claim_nameplate(name, "side2", 3) - self.assertEqual(mailbox_id, mailbox_id3) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 2) - self.assertEqual(sorted([row["side"] for row in side_rows]), - sorted(["side1", "side2"])) - self.assertIn(("side2", 3), - [(row["side"], row["added"]) for row in side_rows]) - - # a third claim marks the nameplate as "crowded", and adds a third - # claim (which must be released later), but leaves the two existing - # claims alone - self.assertRaises(rendezvous.CrowdedError, - app.claim_nameplate, name, "side3", 4) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 3) - - # releasing a non-existent nameplate is ignored - app.release_nameplate(name+"not", "side4", 0) - - # releasing a side that never claimed the nameplate is ignored - app.release_nameplate(name, "side4", 0) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 3) - - # releasing one side leaves the second claim - app.release_nameplate(name, "side1", 5) - np_row, side_rows = self._nameplate(app, name) - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side1", False), claims) - self.assertIn(("side2", True), claims) - self.assertIn(("side3", True), claims) - - # releasing one side multiple times is ignored - app.release_nameplate(name, "side1", 5) - np_row, side_rows = self._nameplate(app, name) - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side1", False), claims) - self.assertIn(("side2", True), claims) - self.assertIn(("side3", True), claims) - - # release the second side - app.release_nameplate(name, "side2", 6) - np_row, side_rows = self._nameplate(app, name) - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side1", False), claims) - self.assertIn(("side2", False), claims) - self.assertIn(("side3", True), claims) - - # releasing the third side frees the nameplate, and adds usage - app.release_nameplate(name, "side3", 7) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(np_row, None) - usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone() - self.assertEqual(usage["app_id"], "appid") - self.assertEqual(usage["started"], 0) - self.assertEqual(usage["waiting_time"], 3) - self.assertEqual(usage["total_time"], 7) - self.assertEqual(usage["result"], "crowded") - - - def test_mailbox(self): - app = self._rendezvous.get_app("appid") - mailbox_id = "mid" - m1 = app.open_mailbox(mailbox_id, "side1", 0) - - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side1") - self.assertEqual(side_rows[0]["added"], 0) - - # opening the same mailbox twice, by the same side, gets the same - # object, and does not update the "added" timestamp - self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side1", 1)) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side1") - self.assertEqual(side_rows[0]["added"], 0) - - # opening a second side gets the same object, and adds a new claim - self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side2", 2)) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 2) - adds = [(row["side"], row["added"]) for row in side_rows] - self.assertIn(("side1", 0), adds) - self.assertIn(("side2", 2), adds) - - # a third open marks it as crowded - self.assertRaises(rendezvous.CrowdedError, - app.open_mailbox, mailbox_id, "side3", 3) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 3) - m1.close("side3", "company", 4) - - # closing a side that never claimed the mailbox is ignored - m1.close("side4", "mood", 4) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 3) - - # closing one side leaves the second claim - m1.close("side1", "mood", 5) - mb_row, side_rows = self._mailbox(app, mailbox_id) - sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows] - self.assertIn(("side1", False, "mood"), sides) - self.assertIn(("side2", True, None), sides) - self.assertIn(("side3", False, "company"), sides) - - # closing one side multiple times is ignored - m1.close("side1", "mood", 6) - mb_row, side_rows = self._mailbox(app, mailbox_id) - sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows] - self.assertIn(("side1", False, "mood"), sides) - self.assertIn(("side2", True, None), sides) - self.assertIn(("side3", False, "company"), sides) - - l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) - m1.add_listener("handle1", l1.append, stop1_f) - - # closing the second side frees the mailbox, and adds usage - m1.close("side2", "mood", 7) - self.assertEqual(stop1, [True]) - - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(mb_row, None) - usage = app._db.execute("SELECT * FROM `mailbox_usage`").fetchone() - self.assertEqual(usage["app_id"], "appid") - self.assertEqual(usage["started"], 0) - self.assertEqual(usage["waiting_time"], 2) - self.assertEqual(usage["total_time"], 7) - self.assertEqual(usage["result"], "crowded") - - def test_messages(self): - app = self._rendezvous.get_app("appid") - mailbox_id = "mid" - m1 = app.open_mailbox(mailbox_id, "side1", 0) - m1.add_message(SidedMessage(side="side1", phase="phase", - body="body", server_rx=1, - msg_id="msgid")) - msgs = self._messages(app) - self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0]["body"], "body") - - l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) - l2 = []; stop2 = []; stop2_f = lambda: stop2.append(True) - old = m1.add_listener("handle1", l1.append, stop1_f) - self.assertEqual(len(old), 1) - self.assertEqual(old[0].side, "side1") - self.assertEqual(old[0].body, "body") - - m1.add_message(SidedMessage(side="side1", phase="phase2", - body="body2", server_rx=1, - msg_id="msgid")) - self.assertEqual(len(l1), 1) - self.assertEqual(l1[0].body, "body2") - old = m1.add_listener("handle2", l2.append, stop2_f) - self.assertEqual(len(old), 2) - - m1.add_message(SidedMessage(side="side1", phase="phase3", - body="body3", server_rx=1, - msg_id="msgid")) - self.assertEqual(len(l1), 2) - self.assertEqual(l1[-1].body, "body3") - self.assertEqual(len(l2), 1) - self.assertEqual(l2[-1].body, "body3") - - m1.remove_listener("handle1") - - m1.add_message(SidedMessage(side="side1", phase="phase4", - body="body4", server_rx=1, - msg_id="msgid")) - self.assertEqual(len(l1), 2) - self.assertEqual(l1[-1].body, "body3") - self.assertEqual(len(l2), 2) - self.assertEqual(l2[-1].body, "body4") - - m1._shutdown() - self.assertEqual(stop1, []) - self.assertEqual(stop2, [True]) - - # message adds are not idempotent: clients filter duplicates - m1.add_message(SidedMessage(side="side1", phase="phase", - body="body", server_rx=1, - msg_id="msgid")) - msgs = self._messages(app) - self.assertEqual(len(msgs), 5) - self.assertEqual(msgs[-1]["body"], "body") - -class Prune(unittest.TestCase): - - def _get_mailbox_updated(self, app, mbox_id): - row = app._db.execute("SELECT * FROM `mailboxes` WHERE" - " `app_id`=? AND `id`=?", - (app._app_id, mbox_id)).fetchone() - return row["updated"] - - def test_update(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, None, True) - app = rv.get_app("appid") - mbox_id = "mbox1" - app.open_mailbox(mbox_id, "side1", 1) - self.assertEqual(self._get_mailbox_updated(app, mbox_id), 1) - - mb = app.open_mailbox(mbox_id, "side2", 2) - self.assertEqual(self._get_mailbox_updated(app, mbox_id), 2) - - sm = SidedMessage("side1", "phase", "body", 3, "msgid") - mb.add_message(sm) - self.assertEqual(self._get_mailbox_updated(app, mbox_id), 3) - - def test_apps(self): - rv = rendezvous.Rendezvous(get_db(":memory:"), None, None, True) - app = rv.get_app("appid") - app.allocate_nameplate("side", 121) - app.prune = mock.Mock() - rv.prune_all_apps(now=123, old=122) - self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)]) - - def test_nameplates(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - - # timestamps <=50 are "old", >=51 are "new" - #OLD = "old"; NEW = "new" - #when = {OLD: 1, NEW: 60} - new_nameplates = set() - - APPID = "appid" - app = rv.get_app(APPID) - - # Exercise the first-vs-second newness tests - app.claim_nameplate("np-1", "side1", 1) - app.claim_nameplate("np-2", "side1", 1) - app.claim_nameplate("np-2", "side2", 2) - app.claim_nameplate("np-3", "side1", 60) - new_nameplates.add("np-3") - app.claim_nameplate("np-4", "side1", 1) - app.claim_nameplate("np-4", "side2", 60) - new_nameplates.add("np-4") - app.claim_nameplate("np-5", "side1", 60) - app.claim_nameplate("np-5", "side2", 61) - new_nameplates.add("np-5") - - rv.prune_all_apps(now=123, old=50) - - nameplates = set([row["name"] for row in - db.execute("SELECT * FROM `nameplates`").fetchall()]) - self.assertEqual(new_nameplates, nameplates) - mailboxes = set([row["id"] for row in - db.execute("SELECT * FROM `mailboxes`").fetchall()]) - self.assertEqual(len(new_nameplates), len(mailboxes)) - - def test_mailboxes(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - - # timestamps <=50 are "old", >=51 are "new" - #OLD = "old"; NEW = "new" - #when = {OLD: 1, NEW: 60} - new_mailboxes = set() - - APPID = "appid" - app = rv.get_app(APPID) - - # Exercise the first-vs-second newness tests - app.open_mailbox("mb-11", "side1", 1) - app.open_mailbox("mb-12", "side1", 1) - app.open_mailbox("mb-12", "side2", 2) - app.open_mailbox("mb-13", "side1", 60) - new_mailboxes.add("mb-13") - app.open_mailbox("mb-14", "side1", 1) - app.open_mailbox("mb-14", "side2", 60) - new_mailboxes.add("mb-14") - app.open_mailbox("mb-15", "side1", 60) - app.open_mailbox("mb-15", "side2", 61) - new_mailboxes.add("mb-15") - - rv.prune_all_apps(now=123, old=50) - - mailboxes = set([row["id"] for row in - db.execute("SELECT * FROM `mailboxes`").fetchall()]) - self.assertEqual(new_mailboxes, mailboxes) - - def test_lots(self): - OLD = "old"; NEW = "new" - for nameplate in [False, True]: - for mailbox in [OLD, NEW]: - for has_listeners in [False, True]: - self.one(nameplate, mailbox, has_listeners) - - def test_one(self): - # to debug specific problems found by test_lots - self.one(None, "new", False) - - def one(self, nameplate, mailbox, has_listeners): - desc = ("nameplate=%s, mailbox=%s, has_listeners=%s" % - (nameplate, mailbox, has_listeners)) - log.msg(desc) - - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - APPID = "appid" - app = rv.get_app(APPID) - - # timestamps <=50 are "old", >=51 are "new" - OLD = "old"; NEW = "new" - when = {OLD: 1, NEW: 60} - nameplate_survives = False - mailbox_survives = False - - mbid = "mbid" - if nameplate: - mbid = app.claim_nameplate("npid", "side1", when[mailbox]) - mb = app.open_mailbox(mbid, "side1", when[mailbox]) - - # the pruning algorithm doesn't care about the age of messages, - # because mailbox.updated is always updated each time we add a - # message - sm = SidedMessage("side1", "phase", "body", when[mailbox], "msgid") - mb.add_message(sm) - - if has_listeners: - mb.add_listener("handle", None, None) - - if (mailbox == NEW or has_listeners): - if nameplate: - nameplate_survives = True - mailbox_survives = True - messages_survive = mailbox_survives - - rv.prune_all_apps(now=123, old=50) - - nameplates = set([row["name"] for row in - db.execute("SELECT * FROM `nameplates`").fetchall()]) - self.assertEqual(nameplate_survives, bool(nameplates), - ("nameplate", nameplate_survives, nameplates, desc)) - - mailboxes = set([row["id"] for row in - db.execute("SELECT * FROM `mailboxes`").fetchall()]) - self.assertEqual(mailbox_survives, bool(mailboxes), - ("mailbox", mailbox_survives, mailboxes, desc)) - - messages = set([row["msg_id"] for row in - db.execute("SELECT * FROM `messages`").fetchall()]) - self.assertEqual(messages_survive, bool(messages), - ("messages", messages_survive, messages, desc)) - - -def strip_message(msg): - m2 = msg.copy() - m2.pop("id", None) - m2.pop("server_rx", None) - return m2 - -def strip_messages(messages): - return [strip_message(m) for m in messages] - -class WSClient(websocket.WebSocketClientProtocol): - def __init__(self): - websocket.WebSocketClientProtocol.__init__(self) - self.events = [] - self.errors = [] - self.d = None - self.ping_counter = itertools.count(0) - def onOpen(self): - self.factory.d.callback(self) - def onMessage(self, payload, isBinary): - assert not isBinary - event = json.loads(payload.decode("utf-8")) - if event["type"] == "error": - self.errors.append(event) - if self.d: - assert not self.events - d,self.d = self.d,None - d.callback(event) - return - self.events.append(event) - - def close(self): - self.d = defer.Deferred() - self.transport.loseConnection() - return self.d - def onClose(self, wasClean, code, reason): - if self.d: - self.d.callback((wasClean, code, reason)) - - def next_event(self): - assert not self.d - if self.events: - event = self.events.pop(0) - return defer.succeed(event) - self.d = defer.Deferred() - return self.d - - @inlineCallbacks - def next_non_ack(self): - while True: - m = yield self.next_event() - if isinstance(m, tuple): - print("unexpected onClose", m) - raise AssertionError("unexpected onClose") - if m["type"] != "ack": - returnValue(m) - - def strip_acks(self): - self.events = [e for e in self.events if e["type"] != "ack"] - - def send(self, mtype, **kwargs): - kwargs["type"] = mtype - payload = json.dumps(kwargs).encode("utf-8") - self.sendMessage(payload, False) - - def send_notype(self, **kwargs): - payload = json.dumps(kwargs).encode("utf-8") - self.sendMessage(payload, False) - - @inlineCallbacks - def sync(self): - ping = next(self.ping_counter) - self.send("ping", ping=ping) - # queue all messages until the pong, then put them back - old_events = [] - while True: - ev = yield self.next_event() - if ev["type"] == "pong" and ev["pong"] == ping: - self.events = old_events + self.events - returnValue(None) - old_events.append(ev) - -class WSFactory(websocket.WebSocketClientFactory): - protocol = WSClient - -class WSClientSync(unittest.TestCase): - # make sure my 'sync' method actually works - - @inlineCallbacks - def test_sync(self): - sent = [] - c = WSClient() - def _send(mtype, **kwargs): - sent.append( (mtype, kwargs) ) - c.send = _send - def add(mtype, **kwargs): - kwargs["type"] = mtype - c.onMessage(json.dumps(kwargs).encode("utf-8"), False) - # no queued messages - d = c.sync() - self.assertEqual(sent, [("ping", {"ping": 0})]) - self.assertNoResult(d) - add("pong", pong=0) - yield d - self.assertEqual(c.events, []) - - # one,two,ping,pong - add("one") - add("two", two=2) - d = c.sync() - add("pong", pong=1) - yield d - m = yield c.next_non_ack() - self.assertEqual(m["type"], "one") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "two") - self.assertEqual(c.events, []) - - # one,ping,two,pong - add("one") - d = c.sync() - add("two", two=2) - add("pong", pong=2) - yield d - m = yield c.next_non_ack() - self.assertEqual(m["type"], "one") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "two") - self.assertEqual(c.events, []) - - # ping,one,two,pong - d = c.sync() - add("one") - add("two", two=2) - add("pong", pong=3) - yield d - m = yield c.next_non_ack() - self.assertEqual(m["type"], "one") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "two") - self.assertEqual(c.events, []) - - - -class WebSocketAPI(_Util, ServerBase, unittest.TestCase): - def setUp(self): - self._clients = [] - self._setup_relay(None, advertise_version="advertised.version") - - def tearDown(self): - for c in self._clients: - c.transport.loseConnection() - return ServerBase.tearDown(self) - - @inlineCallbacks - def make_client(self): - f = WSFactory(self.relayurl) - f.d = defer.Deferred() - reactor.connectTCP("127.0.0.1", self.rdv_ws_port, f) - c = yield f.d - self._clients.append(c) - returnValue(c) - - def check_welcome(self, data): - self.failUnlessIn("welcome", data) - self.failUnlessEqual(data["welcome"], - {"current_cli_version": "advertised.version"}) - - @inlineCallbacks - def test_welcome(self): - c1 = yield self.make_client() - msg = yield c1.next_non_ack() - self.check_welcome(msg) - self.assertEqual(self._rendezvous._apps, {}) - - @inlineCallbacks - def test_bind(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - - c1.send("bind", appid="appid") # missing side= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "bind requires 'side'") - - c1.send("bind", side="side") # missing appid= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "bind requires 'appid'") - - c1.send("bind", appid="appid", side="side") - yield c1.sync() - self.assertEqual(list(self._rendezvous._apps.keys()), ["appid"]) - - c1.send("bind", appid="appid", side="side") # duplicate - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "already bound") - - c1.send_notype(other="misc") # missing 'type' - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "missing 'type'") - - c1.send("___unknown") # unknown type - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "unknown type") - - c1.send("ping") # missing 'ping' - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "ping requires 'ping'") - - @inlineCallbacks - def test_list(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - - c1.send("list") # too early, must bind first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must bind first") - - c1.send("bind", appid="appid", side="side") - c1.send("list") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "nameplates") - self.assertEqual(m["nameplates"], []) - - app = self._rendezvous.get_app("appid") - nameplate_id1 = app.allocate_nameplate("side", 0) - app.claim_nameplate("np2", "side", 0) - - c1.send("list") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "nameplates") - nids = set() - for n in m["nameplates"]: - self.assertEqual(type(n), dict) - self.assertEqual(list(n.keys()), ["id"]) - nids.add(n["id"]) - self.assertEqual(nids, set([nameplate_id1, "np2"])) - - @inlineCallbacks - def test_allocate(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - - c1.send("allocate") # too early, must bind first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must bind first") - - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - c1.send("allocate") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "allocated") - name = m["nameplate"] - - nids = app.get_nameplate_ids() - self.assertEqual(len(nids), 1) - self.assertEqual(name, list(nids)[0]) - - c1.send("allocate") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], - "you already allocated one, don't be greedy") - - c1.send("claim", nameplate=name) # allocate+claim is ok - yield c1.sync() - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side") - - @inlineCallbacks - def test_claim(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("claim") # missing nameplate= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "claim requires 'nameplate'") - - c1.send("claim", nameplate="np1") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "claimed") - mailbox_id = m["mailbox"] - self.assertEqual(type(mailbox_id), type("")) - - c1.send("claim", nameplate="np1") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error", err) - self.assertEqual(err["error"], "only one claim per connection") - - nids = app.get_nameplate_ids() - self.assertEqual(len(nids), 1) - self.assertEqual("np1", list(nids)[0]) - np_row, side_rows = self._nameplate(app, "np1") - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side") - - # claiming a nameplate assigns a random mailbox id and creates the - # mailbox row - mailboxes = app._db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`='appid'").fetchall() - self.assertEqual(len(mailboxes), 1) - - @inlineCallbacks - def test_claim_crowded(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - app.claim_nameplate("np1", "side1", 0) - app.claim_nameplate("np1", "side2", 0) - - # the third claim will signal crowding - c1.send("claim", nameplate="np1") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "crowded") - - @inlineCallbacks - def test_release(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - app.claim_nameplate("np1", "side2", 0) - - c1.send("release") # didn't do claim first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], - "release without nameplate must follow claim") - - c1.send("claim", nameplate="np1") - yield c1.next_non_ack() - - c1.send("release") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released", m) - - np_row, side_rows = self._nameplate(app, "np1") - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side", False), claims) - self.assertIn(("side2", True), claims) - - c1.send("release") # no longer claimed - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "only one release per connection") - - @inlineCallbacks - def test_release_named(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("claim", nameplate="np1") - yield c1.next_non_ack() - - c1.send("release", nameplate="np1") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released", m) - - @inlineCallbacks - def test_release_named_ignored(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("release", nameplate="np1") # didn't do claim first, ignored - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released", m) - - @inlineCallbacks - def test_release_named_mismatch(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("claim", nameplate="np1") - yield c1.next_non_ack() - - c1.send("release", nameplate="np2") # mismatching nameplate - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], - "release and claim must use same nameplate") - - @inlineCallbacks - def test_open(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("open") # missing mailbox= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "open requires 'mailbox'") - - mb1 = app.open_mailbox("mb1", "side2", 0) - mb1.add_message(SidedMessage(side="side2", phase="phase", - body="body", server_rx=0, - msg_id="msgid")) - - c1.send("open", mailbox="mb1") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - self.assertTrue(mb1.has_listeners()) - - mb1.add_message(SidedMessage(side="side2", phase="phase2", - body="body2", server_rx=0, - msg_id="msgid")) - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body2") - - c1.send("open", mailbox="mb1") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "only one open per connection") - - @inlineCallbacks - def test_open_crowded(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - mbid = app.claim_nameplate("np1", "side1", 0) - app.claim_nameplate("np1", "side2", 0) - - # the third open will signal crowding - c1.send("open", mailbox=mbid) - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "crowded") - - @inlineCallbacks - def test_add(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - mb1 = app.open_mailbox("mb1", "side2", 0) - l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) - mb1.add_listener("handle1", l1.append, stop1_f) - - c1.send("add") # didn't open first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must open mailbox before adding") - - c1.send("open", mailbox="mb1") - - c1.send("add", body="body") # missing phase= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "missing 'phase'") - - c1.send("add", phase="phase") # missing body= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "missing 'body'") - - c1.send("add", phase="phase", body="body") - m = yield c1.next_non_ack() # echoed back - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - - self.assertEqual(len(l1), 1) - self.assertEqual(l1[0].body, "body") - - @inlineCallbacks - def test_close(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("close", mood="mood") # must open first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "close without mailbox must follow open") - - c1.send("open", mailbox="mb1") - yield c1.sync() - mb1 = app._mailboxes["mb1"] - self.assertTrue(mb1.has_listeners()) - - c1.send("close", mood="mood") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "closed") - self.assertFalse(mb1.has_listeners()) - - c1.send("close", mood="mood") # already closed - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error", m) - self.assertEqual(err["error"], "only one close per connection") - - @inlineCallbacks - def test_close_named(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("open", mailbox="mb1") - yield c1.sync() - - c1.send("close", mailbox="mb1", mood="mood") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "closed") - - @inlineCallbacks - def test_close_named_ignored(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("close", mailbox="mb1", mood="mood") # no open first, ignored - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "closed") - - @inlineCallbacks - def test_close_named_mismatch(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("open", mailbox="mb1") - yield c1.sync() - - c1.send("close", mailbox="mb2", mood="mood") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "open and close must use same mailbox") - - @inlineCallbacks - def test_close_crowded(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - mbid = app.claim_nameplate("np1", "side1", 0) - app.claim_nameplate("np1", "side2", 0) - - # a close that allocates a third side will signal crowding - c1.send("close", mailbox=mbid) - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "crowded") - - - @inlineCallbacks - def test_disconnect(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("open", mailbox="mb1") - yield c1.sync() - mb1 = app._mailboxes["mb1"] - self.assertTrue(mb1.has_listeners()) - - yield c1.close() - # wait for the server to notice the socket has closed - started = time.time() - while mb1.has_listeners() and (time.time()-started < 5.0): - d = defer.Deferred() - reactor.callLater(0.01, d.callback, None) - yield d - self.assertFalse(mb1.has_listeners()) - - @inlineCallbacks - def test_interrupted_client_nameplate(self): - # a client's interactions with the server might be split over - # multiple sequential WebSocket connections, e.g. when the server is - # bounced and the client reconnects, or vice versa - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c.send("claim", nameplate="np1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "claimed") - mailbox_id = m["mailbox"] - self.assertEqual(type(mailbox_id), type("")) - np_row, side_rows = self._nameplate(app, "np1") - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertEqual(claims, [("side", True)]) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("claim", nameplate="np1") # idempotent - m = yield c.next_non_ack() - self.assertEqual(m["type"], "claimed") - self.assertEqual(m["mailbox"], mailbox_id) # mailbox id is stable - np_row, side_rows = self._nameplate(app, "np1") - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertEqual(claims, [("side", True)]) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - # we haven't done a claim with this particular connection, but we can - # still send a release as long as we include the nameplate - c.send("release", nameplate="np1") # release-without-claim - m = yield c.next_non_ack() - self.assertEqual(m["type"], "released") - np_row, side_rows = self._nameplate(app, "np1") - self.assertEqual(np_row, None) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - # and the release is idempotent, when done on separate connections - c.send("release", nameplate="np1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "released") - np_row, side_rows = self._nameplate(app, "np1") - self.assertEqual(np_row, None) - c.close() - yield c.d - - - @inlineCallbacks - def test_interrupted_client_nameplate_reclaimed(self): - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - # a new claim on a previously-closed nameplate is forbidden. We make - # a new nameplate here and manually open a second claim on it, so the - # nameplate stays alive long enough for the code check to happen. - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("claim", nameplate="np2") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "claimed") - app.claim_nameplate("np2", "side2", 0) - c.send("release", nameplate="np2") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "released") - np_row, side_rows = self._nameplate(app, "np2") - claims = sorted([(row["side"], row["claimed"]) for row in side_rows]) - self.assertEqual(claims, [("side", 0), ("side2", 1)]) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("claim", nameplate="np2") # new claim is forbidden - err = yield c.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "reclaimed") - - np_row, side_rows = self._nameplate(app, "np2") - claims = sorted([(row["side"], row["claimed"]) for row in side_rows]) - self.assertEqual(claims, [("side", 0), ("side2", 1)]) - c.close() - yield c.d - - @inlineCallbacks - def test_interrupted_client_mailbox(self): - # a client's interactions with the server might be split over - # multiple sequential WebSocket connections, e.g. when the server is - # bounced and the client reconnects, or vice versa - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - mb1 = app.open_mailbox("mb1", "side2", 0) - mb1.add_message(SidedMessage(side="side2", phase="phase", - body="body", server_rx=0, - msg_id="msgid")) - - c.send("open", mailbox="mb1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - self.assertTrue(mb1.has_listeners()) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - # open should be idempotent - c.send("open", mailbox="mb1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - mb_row, side_rows = self._mailbox(app, "mb1") - openeds = [(row["side"], row["opened"]) for row in side_rows] - self.assertIn(("side", 1), openeds) # TODO: why 1, and not True? - - # close on the same connection as open is ok - c.send("close", mailbox="mb1", mood="mood") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "closed", m) - mb_row, side_rows = self._mailbox(app, "mb1") - openeds = [(row["side"], row["opened"]) for row in side_rows] - self.assertIn(("side", 0), openeds) - c.close() - yield c.d - - # close (on a separate connection) is idempotent - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("close", mailbox="mb1", mood="mood") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "closed", m) - mb_row, side_rows = self._mailbox(app, "mb1") - openeds = [(row["side"], row["opened"]) for row in side_rows] - self.assertIn(("side", 0), openeds) - c.close() - yield c.d - - -class Summary(unittest.TestCase): - def test_mailbox(self): - app = rendezvous.AppNamespace(None, None, False, None, True) - # starts at time 1, maybe gets second open at time 3, closes at 5 - def s(rows, pruned=False): - return app._summarize_mailbox(rows, 5, pruned) - - rows = [dict(added=1)] - self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) - rows = [dict(added=1, mood="lonely")] - self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) - rows = [dict(added=1, mood="errory")] - self.assertEqual(s(rows), Usage(1, None, 4, "errory")) - rows = [dict(added=1, mood=None)] - self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney")) - rows = [dict(added=1, mood="happy")] - self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney")) - - rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")] - self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - - rows = [dict(added=1, mood="errory"), dict(added=3, mood="happy")] - self.assertEqual(s(rows), Usage(1, 2, 4, "errory")) - - rows = [dict(added=1, mood="happy"), dict(added=3, mood="errory")] - self.assertEqual(s(rows), Usage(1, 2, 4, "errory")) - - rows = [dict(added=1, mood="scary"), dict(added=3, mood="happy")] - self.assertEqual(s(rows), Usage(1, 2, 4, "scary")) - - rows = [dict(added=1, mood="scary"), dict(added=3, mood="errory")] - self.assertEqual(s(rows), Usage(1, 2, 4, "scary")) - - rows = [dict(added=1, mood="happy"), dict(added=3, mood=None)] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) - rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) - - rows = [dict(added=1), dict(added=3), dict(added=4)] - self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) - - rows = [dict(added=1), dict(added=3), dict(added=4)] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "crowded")) - - def test_nameplate(self): - a = rendezvous.AppNamespace(None, None, False, None, True) - # starts at time 1, maybe gets second open at time 3, closes at 5 - def s(rows, pruned=False): - return a._summarize_nameplate_usage(rows, 5, pruned) - - rows = [dict(added=1)] - self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) - rows = [dict(added=1), dict(added=3)] - self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - - rows = [dict(added=1), dict(added=3)] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) - - rows = [dict(added=1), dict(added=3), dict(added=4)] - self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) - - def test_nameplate_disallowed(self): - db = get_db(":memory:") - a = rendezvous.AppNamespace(db, None, False, "some_app_id", False) - a.allocate_nameplate("side1", "123") - self.assertEqual([], a.get_nameplate_ids()) - - def test_nameplate_allowed(self): - db = get_db(":memory:") - a = rendezvous.AppNamespace(db, None, False, "some_app_id", True) - np = a.allocate_nameplate("side1", "321") - self.assertEqual(set([np]), a.get_nameplate_ids()) - - def test_blur(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - APPID = "appid" - app = rv.get_app(APPID) - app.claim_nameplate("npid", "side1", 10) # start time is 10 - rv.prune_all_apps(now=123, old=50) - # start time should be rounded to top of the hour (blur_usage=3600) - row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() - self.assertEqual(row["started"], 0) - - app = rv.get_app(APPID) - app.open_mailbox("mbid", "side1", 20) # start time is 20 - rv.prune_all_apps(now=123, old=50) - row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() - self.assertEqual(row["started"], 0) - - def test_no_blur(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, None, True) - APPID = "appid" - app = rv.get_app(APPID) - app.claim_nameplate("npid", "side1", 10) # start time is 10 - rv.prune_all_apps(now=123, old=50) - row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() - self.assertEqual(row["started"], 10) - - db.execute("DELETE FROM `mailbox_usage`") - db.commit() - app = rv.get_app(APPID) - app.open_mailbox("mbid", "side1", 20) # start time is 20 - rv.prune_all_apps(now=123, old=50) - row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() - self.assertEqual(row["started"], 20) - -class DumpStats(unittest.TestCase): - def test_nostats(self): - rs = easy_relay() - # with no ._stats_file, this should do nothing - rs.dump_stats(1, 1) - - def test_empty(self): - basedir = self.mktemp() - os.mkdir(basedir) - fn = os.path.join(basedir, "stats.json") - rs = easy_relay(stats_file=fn) - now = 1234 - validity = 500 - rs.dump_stats(now, validity) - with open(fn, "rb") as f: - data_bytes = f.read() - data = json.loads(data_bytes.decode("utf-8")) - self.assertEqual(data["created"], now) - self.assertEqual(data["valid_until"], now+validity) - self.assertEqual(data["rendezvous"]["all_time"]["mailboxes_total"], 0) - - -class Startup(unittest.TestCase): - - @mock.patch('wormhole.server.server.log') - def test_empty(self, fake_log): - rs = easy_relay(allow_list=False) - rs.startService() - try: - logs = '\n'.join([call[1][0] for call in fake_log.mock_calls]) - self.assertTrue( - 'listing of allocated nameplates disallowed' in logs - ) - finally: - rs.stopService() - - -class WebSocketProtocolOptions(unittest.TestCase): - @mock.patch('wormhole.server.server.WebSocketRendezvousFactory') - def test_set(self, fake_factory): - easy_relay( - websocket_protocol_options=[ - ("foo", "bar"), - ] - ) - self.assertEqual( - mock.call().setProtocolOptions(foo="bar"), - fake_factory.mock_calls[1], - )