diff --git a/MANIFEST.in b/MANIFEST.in index c42f3c6..dbecb87 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -2,12 +2,7 @@ include versioneer.py include src/wormhole/_version.py include LICENSE README.md NEWS.md recursive-include docs *.md *.rst *.dot -include docs/wormhole.1 docs/wormhole-server.8 +include docs/wormhole.1 include .coveragerc tox.ini snapcraft.yaml include misc/windows-build.cmd include misc/*.py misc/web/*.html misc/web/*.js misc/web/*.css -include misc/munin/wormhole_active -include misc/munin/wormhole_errors -include misc/munin/wormhole_event_rate -include misc/munin/wormhole_events -include misc/munin/wormhole_events_alltime diff --git a/docs/wormhole-server.8 b/docs/wormhole-server.8 deleted file mode 100644 index b0b7862..0000000 --- a/docs/wormhole-server.8 +++ /dev/null @@ -1,41 +0,0 @@ -.TH WORMHOLE-SERVER "8" "July 2016" -.SH NAME -wormhole-server \- Securely and simply transfer data between computers -.SH SYNOPSIS -.B wormhole-server -[\fI\,OPTIONS\/\fR] \fI\,COMMAND \/\fR[\fI\,ARGS\/\fR]... -.SH DESCRIPTION -.IP -Control a relay server (most users shouldn't need to worry about this and -can use the default server). -.SH OPTIONS -.TP -\fB\-\-help\fR -Show this message and exit. -.SS "Commands:" -.TP -count\-channels -Count active channels -.TP -count\-events -Count events -.TP -restart -Re\-start a relay server -.TP -start -Start a relay server -.TP -stop -Stop a relay server -.TP -tail\-usage -Follow the latest usage -.SH SEE ALSO -.BR wormhole (1) -.SH AUTHORS -Brian Warner -.PP -This manual was written by Jameson Rollins - for the Debian project (and may be used -by others). diff --git a/misc/munin/wormhole_active b/misc/munin/wormhole_active deleted file mode 100755 index 132ce86..0000000 --- a/misc/munin/wormhole_active +++ /dev/null @@ -1,41 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, json - -CONFIG = """\ -graph_title Magic-Wormhole Active Channels -graph_vlabel Channels -graph_category network -nameplates.label Nameplates -nameplates.draw LINE2 -nameplates.type GAUGE -mailboxes.label Mailboxes -mailboxes.draw LINE2 -mailboxes.type GAUGE -messages.label Messages -messages.draw LINE1 -messages.type GAUGE -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -serverdir = os.environ["serverdir"] -fn = os.path.join(serverdir, "stats.json") -with open(fn) as f: - data = json.load(f) -if time.time() > data["valid_until"]: - sys.exit(1) # expired - -ra = data["rendezvous"]["active"] -print "nameplates.value", ra["nameplates_total"] -print "mailboxes.value", ra["mailboxes_total"] -print "messages.value", ra["messages_total"] diff --git a/misc/munin/wormhole_errors b/misc/munin/wormhole_errors deleted file mode 100755 index d3f4a86..0000000 --- a/misc/munin/wormhole_errors +++ /dev/null @@ -1,43 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, json - -CONFIG = """\ -graph_title Magic-Wormhole Server Errors -graph_vlabel Events Since Reboot -graph_category network -nameplates.label Nameplate Errors (total) -nameplates.draw LINE1 -nameplates.type GAUGE -mailboxes.label Mailboxes (total) -mailboxes.draw LINE1 -mailboxes.type GAUGE -mailboxes_scary.label Mailboxes (scary) -mailboxes_scary.draw LINE1 -mailboxes_scary.type GAUGE -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -serverdir = os.environ["serverdir"] -fn = os.path.join(serverdir, "stats.json") -with open(fn) as f: - data = json.load(f) -if time.time() > data["valid_until"]: - sys.exit(1) # expired - -r = data["rendezvous"]["since_reboot"] -print "nameplates.value", (r["nameplates_total"] - - r["nameplate_moods"].get("happy", 0)) -print "mailboxes.value", (r["mailboxes_total"] - - r["mailbox_moods"].get("happy", 0)) -print "mailboxes_scary.value", r["mailbox_moods"].get("scary", 0) diff --git a/misc/munin/wormhole_event_rate b/misc/munin/wormhole_event_rate deleted file mode 100755 index 14401df..0000000 --- a/misc/munin/wormhole_event_rate +++ /dev/null @@ -1,50 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, json - -CONFIG = """\ -graph_title Magic-Wormhole Server Events -graph_vlabel Events per Hour -graph_category network -happy.label Happy -happy.draw LINE -happy.type DERIVE -happy.min 0 -happy.max 60 -happy.cdef happy,3600,* -incomplete.label Incomplete -incomplete.draw LINE -incomplete.type DERIVE -incomplete.min 0 -incomplete.max 60 -incomplete.cdef incomplete,3600,* -scary.label Scary -scary.draw LINE -scary.type DERIVE -scary.min 0 -scary.max 60 -scary.cdef scary,3600,* -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -serverdir = os.environ["serverdir"] -fn = os.path.join(serverdir, "stats.json") -with open(fn) as f: - data = json.load(f) -if time.time() > data["valid_until"]: - sys.exit(1) # expired - -atm = data["rendezvous"]["all_time"]["mailbox_moods"] -print "happy.value", atm.get("happy", 0) -print "incomplete.value", (atm.get("pruney", 0) + atm.get("lonely", 0)) -print "scary.value", atm.get("scary", 0) diff --git a/misc/munin/wormhole_events b/misc/munin/wormhole_events deleted file mode 100755 index 1d2c2cc..0000000 --- a/misc/munin/wormhole_events +++ /dev/null @@ -1,53 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, json - -CONFIG = """\ -graph_title Magic-Wormhole Mailbox Events (since reboot) -graph_vlabel Events Since Reboot -graph_category network -happy.label Happy -happy.draw LINE2 -happy.type GAUGE -total.label Total -total.draw LINE1 -total.type GAUGE -scary.label Scary -scary.draw LINE2 -scary.type GAUGE -pruney.label Pruney -pruney.draw LINE1 -pruney.type GAUGE -lonely.label Lonely -lonely.draw LINE2 -lonely.type GAUGE -errory.label Errory -errory.draw LINE1 -errory.type GAUGE -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -serverdir = os.environ["serverdir"] -fn = os.path.join(serverdir, "stats.json") -with open(fn) as f: - data = json.load(f) -if time.time() > data["valid_until"]: - sys.exit(1) # expired - -r = data["rendezvous"]["since_reboot"] -print "happy.value", r["mailbox_moods"].get("happy", 0) -print "total.value", r["mailboxes_total"] -print "scary.value", r["mailbox_moods"].get("scary", 0) -print "pruney.value", r["mailbox_moods"].get("pruney", 0) -print "lonely.value", r["mailbox_moods"].get("lonely", 0) -print "errory.value", r["mailbox_moods"].get("errory", 0) diff --git a/misc/munin/wormhole_events_alltime b/misc/munin/wormhole_events_alltime deleted file mode 100644 index fc8e38a..0000000 --- a/misc/munin/wormhole_events_alltime +++ /dev/null @@ -1,53 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, json - -CONFIG = """\ -graph_title Magic-Wormhole Mailbox Events (all time) -graph_vlabel Events Since DB Creation -graph_category network -happy.label Happy -happy.draw LINE2 -happy.type GAUGE -total.label Total -total.draw LINE1 -total.type GAUGE -scary.label Scary -scary.draw LINE2 -scary.type GAUGE -pruney.label Pruney -pruney.draw LINE1 -pruney.type GAUGE -lonely.label Lonely -lonely.draw LINE2 -lonely.type GAUGE -errory.label Errory -errory.draw LINE1 -errory.type GAUGE -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -serverdir = os.environ["serverdir"] -fn = os.path.join(serverdir, "stats.json") -with open(fn) as f: - data = json.load(f) -if time.time() > data["valid_until"]: - sys.exit(1) # expired - -r = data["rendezvous"]["all_time"] -print "happy.value", r["mailbox_moods"].get("happy", 0) -print "total.value", r["mailboxes_total"] -print "scary.value", r["mailbox_moods"].get("scary", 0) -print "pruney.value", r["mailbox_moods"].get("pruney", 0) -print "lonely.value", r["mailbox_moods"].get("lonely", 0) -print "errory.value", r["mailbox_moods"].get("errory", 0) diff --git a/setup.py b/setup.py index 72dd7a4..625e842 100644 --- a/setup.py +++ b/setup.py @@ -14,15 +14,12 @@ setup(name="magic-wormhole", package_dir={"": "src"}, packages=["wormhole", "wormhole.cli", - "wormhole.server", "wormhole.test", ], - package_data={"wormhole.server": ["db-schemas/*.sql"]}, entry_points={ "console_scripts": [ "wormhole = wormhole.cli.cli:wormhole", - "wormhole-server = wormhole.server.cli:server", ] }, install_requires=[ @@ -41,7 +38,8 @@ setup(name="magic-wormhole", extras_require={ ':sys_platform=="win32"': ["pypiwin32"], "dev": ["mock", "tox", "pyflakes", - "magic-wormhole-transit-relay==0.1.1"], + "magic-wormhole-transit-relay==0.1.1", + "magic-wormhole-mailbox-server==0.1.0"], }, test_suite="wormhole.test", cmdclass=commands, diff --git a/src/wormhole/server/__init__.py b/src/wormhole/server/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/wormhole/server/__main__.py b/src/wormhole/server/__main__.py deleted file mode 100644 index a30e0b4..0000000 --- a/src/wormhole/server/__main__.py +++ /dev/null @@ -1,8 +0,0 @@ -if __name__ != "__main__": - raise ImportError('this module should not be imported') - - -from wormhole.server import cli - - -cli.server() diff --git a/src/wormhole/server/cli.py b/src/wormhole/server/cli.py deleted file mode 100644 index e205413..0000000 --- a/src/wormhole/server/cli.py +++ /dev/null @@ -1,152 +0,0 @@ -from __future__ import print_function -import json -import click -from ..cli.cli import Config, _compose - -# can put this back in to get this command as "wormhole server" -# instead -#from ..cli.cli import wormhole -#@wormhole.group() -@click.group() -@click.pass_context -def server(ctx): # this is the setuptools entrypoint for bin/wormhole-server - """ - Control a relay server (most users shouldn't need to worry - about this and can use the default server). - """ - # just leaving this pointing to wormhole.cli.cli.Config for now, - # but if we want to keep wormhole-server as a separate command - # should probably have our own Config without all the options the - # server commands don't use - ctx.obj = Config() - -def _validate_websocket_protocol_options(ctx, param, value): - return list(_validate_websocket_protocol_option(option) for option in value) - -def _validate_websocket_protocol_option(option): - try: - key, value = option.split("=", 1) - except ValueError: - raise click.BadParameter("format options as OPTION=VALUE") - - try: - value = json.loads(value) - except: - raise click.BadParameter("could not parse JSON value for {}".format(key)) - - return (key, value) - -LaunchArgs = _compose( - click.option( - "--rendezvous", default="tcp:4000", metavar="tcp:PORT", - help="endpoint specification for the rendezvous port", - ), - click.option( - "--advertise-version", metavar="VERSION", - help="version to recommend to clients", - ), - click.option( - "--blur-usage", default=None, type=int, - metavar="SECONDS", - help="round logged access times to improve privacy", - ), - click.option( - "--no-daemon", "-n", is_flag=True, - help="Run in the foreground", - ), - click.option( - "--signal-error", is_flag=True, - help="force all clients to fail with a message", - ), - click.option( - "--allow-list/--disallow-list", default=True, - help="always/never send list of allocated nameplates", - ), - click.option( - "--relay-database-path", default="relay.sqlite", metavar="PATH", - help="location for the relay server state database", - ), - click.option( - "--stats-json-path", default="stats.json", metavar="PATH", - help="location to write the relay stats file", - ), - click.option( - "--websocket-protocol-option", multiple=True, metavar="OPTION=VALUE", - callback=_validate_websocket_protocol_options, - help="a websocket server protocol option to configure", - ), -) - - -@server.command() -@LaunchArgs -@click.pass_obj -def start(cfg, **kwargs): - """ - Start a relay server - """ - for name, value in kwargs.items(): - setattr(cfg, name, value) - from wormhole.server.cmd_server import start_server - start_server(cfg) - - -@server.command() -@LaunchArgs -@click.pass_obj -def restart(cfg, **kwargs): - """ - Re-start a relay server - """ - for name, value in kwargs.items(): - setattr(cfg, name, value) - from wormhole.server.cmd_server import restart_server - restart_server(cfg) - - -@server.command() -@click.pass_obj -def stop(cfg): - """ - Stop a relay server - """ - from wormhole.server.cmd_server import stop_server - stop_server(cfg) - - -@server.command(name="tail-usage") -@click.pass_obj -def tail_usage(cfg): - """ - Follow the latest usage - """ - from wormhole.server.cmd_usage import tail_usage - tail_usage(cfg) - - -@server.command(name='count-channels') -@click.option( - "--json", is_flag=True, -) -@click.pass_obj -def count_channels(cfg, json): - """ - Count active channels - """ - from wormhole.server.cmd_usage import count_channels - cfg.json = json - count_channels(cfg) - - -@server.command(name='count-events') -@click.option( - "--json", is_flag=True, -) -@click.pass_obj -def count_events(cfg, json): - """ - Count events - """ - from wormhole.server.cmd_usage import count_events - cfg.json = json - count_events(cfg) diff --git a/src/wormhole/server/cmd_server.py b/src/wormhole/server/cmd_server.py deleted file mode 100644 index f73295e..0000000 --- a/src/wormhole/server/cmd_server.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, time -from twisted.python import usage -from twisted.scripts import twistd - -class MyPlugin(object): - tapname = "xyznode" - - def __init__(self, args): - self.args = args - - def makeService(self, so): - # delay this import as late as possible, to allow twistd's code to - # accept --reactor= selection - from .server import RelayServer - return RelayServer( - str(self.args.rendezvous), - self.args.advertise_version, - self.args.relay_database_path, - self.args.blur_usage, - signal_error=self.args.signal_error, - stats_file=self.args.stats_json_path, - allow_list=self.args.allow_list, - ) - -class MyTwistdConfig(twistd.ServerOptions): - subCommands = [("XYZ", None, usage.Options, "node")] - -def start_server(args): - c = MyTwistdConfig() - #twistd_args = tuple(args.twistd_args) + ("XYZ",) - base_args = [] - if args.no_daemon: - base_args.append("--nodaemon") - twistd_args = base_args + ["XYZ"] - c.parseOptions(tuple(twistd_args)) - c.loadedPlugins = {"XYZ": MyPlugin(args)} - - print("starting wormhole relay server") - # this forks and never comes back. The parent calls os._exit(0) - twistd.runApp(c) - -def kill_server(): - try: - f = open("twistd.pid", "r") - except EnvironmentError: - print("Unable to find twistd.pid: is this really a server directory?") - print("oh well, ignoring 'stop'") - return - pid = int(f.read().strip()) - f.close() - os.kill(pid, 15) - print("server process %d sent SIGTERM" % pid) - return - -def stop_server(args): - kill_server() - -def restart_server(args): - kill_server() - time.sleep(0.1) - timeout = 0 - while os.path.exists("twistd.pid") and timeout < 10: - if timeout == 0: - print(" waiting for shutdown..") - timeout += 1 - time.sleep(1) - if os.path.exists("twistd.pid"): - print("error: unable to shut down old server") - return 1 - print(" old server shut down") - start_server(args) diff --git a/src/wormhole/server/cmd_usage.py b/src/wormhole/server/cmd_usage.py deleted file mode 100644 index 09aa494..0000000 --- a/src/wormhole/server/cmd_usage.py +++ /dev/null @@ -1,165 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, time, json -import click -from humanize import naturalsize -from .database import get_db - -def abbrev(t): - if t is None: - return "-" - if t > 1.0: - return "%.3fs" % t - if t > 1e-3: - return "%.1fms" % (t*1e3) - return "%.1fus" % (t*1e6) - - -def print_event(event): - event_type, started, result, total_bytes, waiting_time, total_time = event - followthrough = None - if waiting_time and total_time: - followthrough = total_time - waiting_time - print("%17s: total=%7s wait=%7s ft=%7s size=%s (%s)" % - ("%s-%s" % (event_type, result), - abbrev(total_time), - abbrev(waiting_time), - abbrev(followthrough), - naturalsize(total_bytes), - time.ctime(started), - )) - -def show_usage(args): - print("closed for renovation") - return 0 - -def tail_usage(args): - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - db = get_db("relay.sqlite") - # we don't seem to have unique row IDs, so this is an inaccurate and - # inefficient hack - seen = set() - try: - while True: - old = time.time() - 2*60*60 - c = db.execute("SELECT * FROM `usage`" - " WHERE `started` > ?" - " ORDER BY `started` ASC", (old,)) - for row in c.fetchall(): - event = (row["type"], row["started"], row["result"], - row["total_bytes"], row["waiting_time"], - row["total_time"]) - if event not in seen: - print_event(event) - seen.add(event) - time.sleep(2) - except KeyboardInterrupt: - return 0 - -def count_channels(args): - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - db = get_db("relay.sqlite") - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - OLD = time.time() - 10*60 - def q(query, values=()): - return list(db.execute(query, values).fetchone().values())[0] - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplates`")) - add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is null")) - add("connected nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is not null")) - add("stale nameplates", q("SELECT COUNT() FROM `nameplates`" - " where `updated` < ?", (OLD,))) - - add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`")) - add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is null")) - add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is not null")) - - stale_mailboxes = 0 - for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall(): - newest = db.execute("SELECT `server_rx` FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " ORDER BY `server_rx` DESC LIMIT 1", - (mbox_row["app_id"], mbox_row["id"])).fetchone() - if newest and newest[0] < OLD: - stale_mailboxes += 1 - add("stale mailboxes", stale_mailboxes) - - add("messages", q("SELECT COUNT() FROM `messages`")) - - if args.json: - print(json.dumps(c_dict)) - else: - for (key, value) in c_list: - print(key, value) - return 0 - -def count_events(args): - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - db = get_db("relay.sqlite") - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - def q(query, values=()): - return list(db.execute(query, values).fetchone().values())[0] - - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`")) - add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='happy'")) - add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='lonely'")) - add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='pruney'")) - add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='crowded'")) - - add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`")) - add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='happy'")) - add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='scary'")) - add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='lonely'")) - add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='errory'")) - add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='pruney'")) - add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='crowded'")) - - add("total transit", q("SELECT COUNT() FROM `transit_usage`")) - add("happy transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='happy'")) - add("lonely transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='lonely'")) - add("errory transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='errory'")) - - add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`")) - - if args.json: - print(json.dumps(c_dict)) - else: - for (key, value) in c_list: - print(key, value) - return 0 diff --git a/src/wormhole/server/database.py b/src/wormhole/server/database.py deleted file mode 100644 index eb188e1..0000000 --- a/src/wormhole/server/database.py +++ /dev/null @@ -1,126 +0,0 @@ -from __future__ import unicode_literals -import os -import sqlite3 -import tempfile -from pkg_resources import resource_string -from twisted.python import log - -class DBError(Exception): - pass - -def get_schema(version): - schema_bytes = resource_string("wormhole.server", - "db-schemas/v%d.sql" % version) - return schema_bytes.decode("utf-8") - -def get_upgrader(new_version): - schema_bytes = resource_string("wormhole.server", - "db-schemas/upgrade-to-v%d.sql" % new_version) - return schema_bytes.decode("utf-8") - -TARGET_VERSION = 3 - -def dict_factory(cursor, row): - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d - -def _initialize_db_schema(db, target_version): - """Creates the application schema in the given database. - """ - log.msg("populating new database with schema v%s" % target_version) - schema = get_schema(target_version) - db.executescript(schema) - db.execute("INSERT INTO version (version) VALUES (?)", - (target_version,)) - db.commit() - -def _initialize_db_connection(db): - """Sets up the db connection object with a row factory and with necessary - foreign key settings. - """ - db.row_factory = dict_factory - db.execute("PRAGMA foreign_keys = ON") - problems = db.execute("PRAGMA foreign_key_check").fetchall() - if problems: - raise DBError("failed foreign key check: %s" % (problems,)) - -def _open_db_connection(dbfile): - """Open a new connection to the SQLite3 database at the given path. - """ - try: - db = sqlite3.connect(dbfile) - except (EnvironmentError, sqlite3.OperationalError) as e: - raise DBError("Unable to create/open db file %s: %s" % (dbfile, e)) - _initialize_db_connection(db) - return db - -def _get_temporary_dbfile(dbfile): - """Get a temporary filename near the given path. - """ - fd, name = tempfile.mkstemp( - prefix=os.path.basename(dbfile) + ".", - dir=os.path.dirname(dbfile) - ) - os.close(fd) - return name - -def _atomic_create_and_initialize_db(dbfile, target_version): - """Create and return a new database, initialized with the application - schema. - - If anything goes wrong, nothing is left at the ``dbfile`` path. - """ - temp_dbfile = _get_temporary_dbfile(dbfile) - db = _open_db_connection(temp_dbfile) - _initialize_db_schema(db, target_version) - db.close() - os.rename(temp_dbfile, dbfile) - return _open_db_connection(dbfile) - -def get_db(dbfile, target_version=TARGET_VERSION): - """Open or create the given db file. The parent directory must exist. - Returns the db connection object, or raises DBError. - """ - if dbfile == ":memory:": - db = _open_db_connection(dbfile) - _initialize_db_schema(db, target_version) - elif os.path.exists(dbfile): - db = _open_db_connection(dbfile) - else: - db = _atomic_create_and_initialize_db(dbfile, target_version) - - try: - version = db.execute("SELECT version FROM version").fetchone()["version"] - except sqlite3.DatabaseError as e: - # this indicates that the file is not a compatible database format. - # Perhaps it was created with an old version, or it might be junk. - raise DBError("db file is unusable: %s" % e) - - while version < target_version: - log.msg(" need to upgrade from %s to %s" % (version, target_version)) - try: - upgrader = get_upgrader(version+1) - except ValueError: # ResourceError?? - log.msg(" unable to upgrade %s to %s" % (version, version+1)) - raise DBError("Unable to upgrade %s to version %s, left at %s" - % (dbfile, version+1, version)) - log.msg(" executing upgrader v%s->v%s" % (version, version+1)) - db.executescript(upgrader) - db.commit() - version = version+1 - - if version != target_version: - raise DBError("Unable to handle db version %s" % version) - - return db - -def dump_db(db): - # to let _iterdump work, we need to restore the original row factory - orig = db.row_factory - try: - db.row_factory = sqlite3.Row - return "".join(db.iterdump()) - finally: - db.row_factory = orig diff --git a/src/wormhole/server/db-schemas/upgrade-to-v3.sql b/src/wormhole/server/db-schemas/upgrade-to-v3.sql deleted file mode 100644 index 69bb255..0000000 --- a/src/wormhole/server/db-schemas/upgrade-to-v3.sql +++ /dev/null @@ -1,68 +0,0 @@ -DROP TABLE `nameplates`; -DROP TABLE `messages`; -DROP TABLE `mailboxes`; - - --- Wormhole codes use a "nameplate": a short name which is only used to --- reference a specific (long-named) mailbox. The codes only use numeric --- nameplates, but the protocol and server allow can use arbitrary strings. -CREATE TABLE `nameplates` -( - `id` INTEGER PRIMARY KEY AUTOINCREMENT, - `app_id` VARCHAR, - `name` VARCHAR, - `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), - `request_id` VARCHAR -- from 'allocate' message, for future deduplication -); -CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); -CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); - -CREATE TABLE `nameplate_sides` -( - `nameplates_id` REFERENCES `nameplates`(`id`), - `claimed` BOOLEAN, -- True after claim(), False after release() - `side` VARCHAR, - `added` INTEGER -- time when this side first claimed the nameplate -); - - --- Clients exchange messages through a "mailbox", which has a long (randomly --- unique) identifier and a queue of messages. --- `id` is randomly-generated and unique across all apps. -CREATE TABLE `mailboxes` -( - `app_id` VARCHAR, - `id` VARCHAR PRIMARY KEY, - `updated` INTEGER, -- time of last activity, used for pruning - `for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone -); -CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); - -CREATE TABLE `mailbox_sides` -( - `mailbox_id` REFERENCES `mailboxes`(`id`), - `opened` BOOLEAN, -- True after open(), False after close() - `side` VARCHAR, - `added` INTEGER, -- time when this side first opened the mailbox - `mood` VARCHAR -); - -CREATE TABLE `messages` -( - `app_id` VARCHAR, - `mailbox_id` VARCHAR, - `side` VARCHAR, - `phase` VARCHAR, -- numeric or string - `body` VARCHAR, - `server_rx` INTEGER, - `msg_id` VARCHAR -); -CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); - -ALTER TABLE `mailbox_usage` ADD COLUMN `for_nameplate` BOOLEAN; -CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`); -CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`); - -DELETE FROM `version`; -INSERT INTO `version` (`version`) VALUES (3); diff --git a/src/wormhole/server/db-schemas/v2.sql b/src/wormhole/server/db-schemas/v2.sql deleted file mode 100644 index 3ff6e58..0000000 --- a/src/wormhole/server/db-schemas/v2.sql +++ /dev/null @@ -1,105 +0,0 @@ - --- note: anything which isn't an boolean, integer, or human-readable unicode --- string, (i.e. binary strings) will be stored as hex - -CREATE TABLE `version` -( - `version` INTEGER -- contains one row, set to 2 -); - - --- Wormhole codes use a "nameplate": a short identifier which is only used to --- reference a specific (long-named) mailbox. The codes only use numeric --- nameplates, but the protocol and server allow can use arbitrary strings. -CREATE TABLE `nameplates` -( - `app_id` VARCHAR, - `id` VARCHAR, - `mailbox_id` VARCHAR, -- really a foreign key - `side1` VARCHAR, -- side name, or NULL - `side2` VARCHAR, -- side name, or NULL - `request_id` VARCHAR, -- from 'allocate' message, for future deduplication - `crowded` BOOLEAN, -- at some point, three or more sides were involved - `updated` INTEGER, -- time of last activity, used for pruning - -- timing data - `started` INTEGER, -- time when nameplace was opened - `second` INTEGER -- time when second side opened -); -CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `id`); -CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`); -CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); - --- Clients exchange messages through a "mailbox", which has a long (randomly --- unique) identifier and a queue of messages. -CREATE TABLE `mailboxes` -( - `app_id` VARCHAR, - `id` VARCHAR, - `side1` VARCHAR, -- side name, or NULL - `side2` VARCHAR, -- side name, or NULL - `crowded` BOOLEAN, -- at some point, three or more sides were involved - `first_mood` VARCHAR, - -- timing data for the mailbox itself - `started` INTEGER, -- time when opened - `second` INTEGER -- time when second side opened -); -CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); - -CREATE TABLE `messages` -( - `app_id` VARCHAR, - `mailbox_id` VARCHAR, - `side` VARCHAR, - `phase` VARCHAR, -- numeric or string - `body` VARCHAR, - `server_rx` INTEGER, - `msg_id` VARCHAR -); -CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); - -CREATE TABLE `nameplate_usage` -( - `app_id` VARCHAR, - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_time` INTEGER, -- seconds from open to last close/prune - `result` VARCHAR -- happy, lonely, pruney, crowded - -- nameplate moods: - -- "happy": two sides open and close - -- "lonely": one side opens and closes (no response from 2nd side) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`); - -CREATE TABLE `mailbox_usage` -( - `app_id` VARCHAR, - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- rendezvous moods: - -- "happy": both sides close with mood=happy - -- "scary": any side closes with mood=scary (bad MAC, probably wrong pw) - -- "lonely": any side closes with mood=lonely (no response from 2nd side) - -- "errory": any side closes with mood=errory (other errors) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`); - -CREATE TABLE `transit_usage` -( - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_bytes` INTEGER, -- total bytes relayed (both directions) - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- transit moods: - -- "errory": one side gave the wrong handshake - -- "lonely": good handshake, but the other side never showed up - -- "happy": both sides gave correct handshake -); -CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`); diff --git a/src/wormhole/server/db-schemas/v3.sql b/src/wormhole/server/db-schemas/v3.sql deleted file mode 100644 index e447744..0000000 --- a/src/wormhole/server/db-schemas/v3.sql +++ /dev/null @@ -1,115 +0,0 @@ - --- note: anything which isn't an boolean, integer, or human-readable unicode --- string, (i.e. binary strings) will be stored as hex - -CREATE TABLE `version` -( - `version` INTEGER -- contains one row, set to 3 -); - - --- Wormhole codes use a "nameplate": a short name which is only used to --- reference a specific (long-named) mailbox. The codes only use numeric --- nameplates, but the protocol and server allow can use arbitrary strings. -CREATE TABLE `nameplates` -( - `id` INTEGER PRIMARY KEY AUTOINCREMENT, - `app_id` VARCHAR, - `name` VARCHAR, - `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), - `request_id` VARCHAR -- from 'allocate' message, for future deduplication -); -CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); -CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); - -CREATE TABLE `nameplate_sides` -( - `nameplates_id` REFERENCES `nameplates`(`id`), - `claimed` BOOLEAN, -- True after claim(), False after release() - `side` VARCHAR, - `added` INTEGER -- time when this side first claimed the nameplate -); - - --- Clients exchange messages through a "mailbox", which has a long (randomly --- unique) identifier and a queue of messages. --- `id` is randomly-generated and unique across all apps. -CREATE TABLE `mailboxes` -( - `app_id` VARCHAR, - `id` VARCHAR PRIMARY KEY, - `updated` INTEGER, -- time of last activity, used for pruning - `for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone -); -CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); - -CREATE TABLE `mailbox_sides` -( - `mailbox_id` REFERENCES `mailboxes`(`id`), - `opened` BOOLEAN, -- True after open(), False after close() - `side` VARCHAR, - `added` INTEGER, -- time when this side first opened the mailbox - `mood` VARCHAR -); - -CREATE TABLE `messages` -( - `app_id` VARCHAR, - `mailbox_id` VARCHAR, - `side` VARCHAR, - `phase` VARCHAR, -- numeric or string - `body` VARCHAR, - `server_rx` INTEGER, - `msg_id` VARCHAR -); -CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); - -CREATE TABLE `nameplate_usage` -( - `app_id` VARCHAR, - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_time` INTEGER, -- seconds from open to last close/prune - `result` VARCHAR -- happy, lonely, pruney, crowded - -- nameplate moods: - -- "happy": two sides open and close - -- "lonely": one side opens and closes (no response from 2nd side) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`); - -CREATE TABLE `mailbox_usage` -( - `app_id` VARCHAR, - `for_nameplate` BOOLEAN, -- allocated for a nameplate, not standalone - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- rendezvous moods: - -- "happy": both sides close with mood=happy - -- "scary": any side closes with mood=scary (bad MAC, probably wrong pw) - -- "lonely": any side closes with mood=lonely (no response from 2nd side) - -- "errory": any side closes with mood=errory (other errors) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`); -CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`); - -CREATE TABLE `transit_usage` -( - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_bytes` INTEGER, -- total bytes relayed (both directions) - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- transit moods: - -- "errory": one side gave the wrong handshake - -- "lonely": good handshake, but the other side never showed up - -- "happy": both sides gave correct handshake -); -CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`); -CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`); diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py deleted file mode 100644 index 818c70c..0000000 --- a/src/wormhole/server/rendezvous.py +++ /dev/null @@ -1,659 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, random, base64, collections -from collections import namedtuple -from twisted.python import log -from twisted.application import service - -def generate_mailbox_id(): - return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii") - -class CrowdedError(Exception): - pass -class ReclaimedError(Exception): - pass - -Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"]) -TransitUsage = namedtuple("TransitUsage", - ["started", "waiting_time", "total_time", - "total_bytes", "result"]) - -SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body", - "server_rx", "msg_id"]) - -class Mailbox: - def __init__(self, app, db, app_id, mailbox_id): - self._app = app - self._db = db - self._app_id = app_id - self._mailbox_id = mailbox_id - self._listeners = {} # handle -> (send_f, stop_f) - # "handle" is a hashable object, for deregistration - # send_f() takes a JSONable object, stop_f() has no args - - def open(self, side, when): - # requires caller to db.commit() - assert isinstance(side, type("")), type(side) - db = self._db - - already = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=? AND `side`=?", - (self._mailbox_id, side)).fetchone() - if not already: - db.execute("INSERT INTO `mailbox_sides`" - " (`mailbox_id`, `opened`, `side`, `added`)" - " VALUES(?,?,?,?)", - (self._mailbox_id, True, side, when)) - # We accept re-opening a mailbox which a side previously closed, - # unlike claim_nameplate(), which forbids any side from re-claiming a - # nameplate which they previously released. (Nameplates forbid this - # because the act of claiming a nameplate for the first time causes a - # new mailbox to be created, which should only happen once). - # Mailboxes have their own distinct objects (to manage - # subscriptions), so closing one which was already closed requires - # making a new object, which works by calling open() just before - # close(). We really do want to support re-closing closed mailboxes, - # because this enables intermittently-connected clients, who remember - # sending a 'close' but aren't sure whether it was received or not, - # then get shut down. Those clients will wake up and re-send the - # 'close', until they receive the 'closed' ack message. - - self._touch(when) - db.commit() # XXX: reconcile the need for this with the comment above - - def _touch(self, when): - self._db.execute("UPDATE `mailboxes` SET `updated`=? WHERE `id`=?", - (when, self._mailbox_id)) - - def get_messages(self): - messages = [] - db = self._db - for row in db.execute("SELECT * FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " ORDER BY `server_rx` ASC", - (self._app_id, self._mailbox_id)).fetchall(): - sm = SidedMessage(side=row["side"], phase=row["phase"], - body=row["body"], server_rx=row["server_rx"], - msg_id=row["msg_id"]) - messages.append(sm) - return messages - - def add_listener(self, handle, send_f, stop_f): - #log.msg("add_listener", self._mailbox_id, handle) - self._listeners[handle] = (send_f, stop_f) - #log.msg(" added", len(self._listeners)) - return self.get_messages() - - def remove_listener(self, handle): - #log.msg("remove_listener", self._mailbox_id, handle) - self._listeners.pop(handle, None) - #log.msg(" removed", len(self._listeners)) - - def has_listeners(self): - return bool(self._listeners) - - def broadcast_message(self, sm): - for (send_f, stop_f) in self._listeners.values(): - send_f(sm) - - def _add_message(self, sm): - self._db.execute("INSERT INTO `messages`" - " (`app_id`, `mailbox_id`, `side`, `phase`, `body`," - " `server_rx`, `msg_id`)" - " VALUES (?,?,?,?,?, ?,?)", - (self._app_id, self._mailbox_id, sm.side, - sm.phase, sm.body, sm.server_rx, sm.msg_id)) - self._touch(sm.server_rx) - self._db.commit() - - def add_message(self, sm): - assert isinstance(sm, SidedMessage) - self._add_message(sm) - self.broadcast_message(sm) - - def close(self, side, mood, when): - assert isinstance(side, type("")), type(side) - db = self._db - row = db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, self._mailbox_id)).fetchone() - if not row: - return - for_nameplate = row["for_nameplate"] - - row = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=? AND `side`=?", - (self._mailbox_id, side)).fetchone() - if not row: - return - db.execute("UPDATE `mailbox_sides` SET `opened`=?, `mood`=?" - " WHERE `mailbox_id`=? AND `side`=?", - (False, mood, self._mailbox_id, side)) - db.commit() - - # are any sides still open? - side_rows = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (self._mailbox_id,)).fetchall() - if any([sr["opened"] for sr in side_rows]): - return - - # nope. delete and summarize - db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?", - (self._mailbox_id,)) - db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?", - (self._mailbox_id,)) - db.execute("DELETE FROM `mailboxes` WHERE `id`=?", (self._mailbox_id,)) - self._app._summarize_mailbox_and_store(for_nameplate, side_rows, - when, pruned=False) - db.commit() - # Shut down any listeners, just in case they're still lingering - # around. - for (send_f, stop_f) in self._listeners.values(): - stop_f() - self._listeners = {} - self._app.free_mailbox(self._mailbox_id) - - def _shutdown(self): - # used at test shutdown to accelerate client disconnects - for (send_f, stop_f) in self._listeners.values(): - stop_f() - self._listeners = {} - - -class AppNamespace(object): - - def __init__(self, db, blur_usage, log_requests, app_id, allow_list): - self._db = db - self._blur_usage = blur_usage - self._log_requests = log_requests - self._app_id = app_id - self._mailboxes = {} - self._nameplate_counts = collections.defaultdict(int) - self._mailbox_counts = collections.defaultdict(int) - self._allow_list = allow_list - - def get_nameplate_ids(self): - if not self._allow_list: - return [] - return self._get_nameplate_ids() - - def _get_nameplate_ids(self): - db = self._db - # TODO: filter this to numeric ids? - c = db.execute("SELECT DISTINCT `name` FROM `nameplates`" - " WHERE `app_id`=?", (self._app_id,)) - return set([row["name"] for row in c.fetchall()]) - - def _find_available_nameplate_id(self): - claimed = self._get_nameplate_ids() - for size in range(1,4): # stick to 1-999 for now - available = set() - for id_int in range(10**(size-1), 10**size): - id = "%d" % id_int - if id not in claimed: - available.add(id) - if available: - return random.choice(list(available)) - # ouch, 999 currently claimed. Try random ones for a while. - for tries in range(1000): - id_int = random.randrange(1000, 1000*1000) - id = "%d" % id_int - if id not in claimed: - return id - raise ValueError("unable to find a free nameplate-id") - - def allocate_nameplate(self, side, when): - nameplate_id = self._find_available_nameplate_id() - mailbox_id = self.claim_nameplate(nameplate_id, side, when) - del mailbox_id # ignored, they'll learn it from claim() - return nameplate_id - - def claim_nameplate(self, name, side, when): - # when we're done: - # * there will be one row for the nameplate - # * there will be one 'side' attached to it, with claimed=True - # * a mailbox id and mailbox row will be created - # * a mailbox 'side' will be attached, with opened=True - assert isinstance(name, type("")), type(name) - assert isinstance(side, type("")), type(side) - db = self._db - row = db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`=? AND `name`=?", - (self._app_id, name)).fetchone() - if not row: - if self._log_requests: - log.msg("creating nameplate#%s for app_id %s" % - (name, self._app_id)) - mailbox_id = generate_mailbox_id() - self._add_mailbox(mailbox_id, True, side, when) # ensure row exists - sql = ("INSERT INTO `nameplates`" - " (`app_id`, `name`, `mailbox_id`)" - " VALUES(?,?,?)") - npid = db.execute(sql, (self._app_id, name, mailbox_id) - ).lastrowid - else: - npid = row["id"] - mailbox_id = row["mailbox_id"] - - row = db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=? AND `side`=?", - (npid, side)).fetchone() - if not row: - db.execute("INSERT INTO `nameplate_sides`" - " (`nameplates_id`, `claimed`, `side`, `added`)" - " VALUES(?,?,?,?)", - (npid, True, side, when)) - else: - if not row["claimed"]: - raise ReclaimedError("you cannot re-claim a nameplate that your side previously released") - # since that might cause a new mailbox to be allocated - db.commit() - - self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError - rows = db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=?", (npid,)).fetchall() - if len(rows) > 2: - # this line will probably never get hit: any crowding is noticed - # on mailbox_sides first, inside open_mailbox() - raise CrowdedError("too many sides have claimed this nameplate") - return mailbox_id - - def release_nameplate(self, name, side, when): - # when we're done: - # * the 'claimed' flag will be cleared on the nameplate_sides row - # * if the nameplate is now unused (no claimed sides): - # * a usage record will be added - # * the nameplate row will be removed - # * the nameplate sides will be removed - assert isinstance(name, type("")), type(name) - assert isinstance(side, type("")), type(side) - db = self._db - np_row = db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`=? AND `name`=?", - (self._app_id, name)).fetchone() - if not np_row: - return - npid = np_row["id"] - row = db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=? AND `side`=?", - (npid, side)).fetchone() - if not row: - return - db.execute("UPDATE `nameplate_sides` SET `claimed`=?" - " WHERE `nameplates_id`=? AND `side`=?", - (False, npid, side)) - db.commit() - - # now, are there any remaining claims? - side_rows = db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=?", - (npid,)).fetchall() - claims = [1 for sr in side_rows if sr["claimed"]] - if claims: - return - # delete and summarize - db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?", - (npid,)) - db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,)) - self._summarize_nameplate_and_store(side_rows, when, pruned=False) - db.commit() - - def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned): - # requires caller to db.commit() - u = self._summarize_nameplate_usage(side_rows, delete_time, pruned) - self._db.execute("INSERT INTO `nameplate_usage`" - " (`app_id`," - " `started`, `total_time`, `waiting_time`, `result`)" - " VALUES (?, ?,?,?,?)", - (self._app_id, - u.started, u.total_time, u.waiting_time, u.result)) - self._nameplate_counts[u.result] += 1 - - def _summarize_nameplate_usage(self, side_rows, delete_time, pruned): - times = sorted([row["added"] for row in side_rows]) - started = times[0] - if self._blur_usage: - started = self._blur_usage * (started // self._blur_usage) - waiting_time = None - if len(times) > 1: - waiting_time = times[1] - times[0] - total_time = delete_time - times[0] - result = "lonely" - if len(times) == 2: - result = "happy" - if pruned: - result = "pruney" - if len(times) > 2: - result = "crowded" - return Usage(started=started, waiting_time=waiting_time, - total_time=total_time, result=result) - - def _add_mailbox(self, mailbox_id, for_nameplate, side, when): - assert isinstance(mailbox_id, type("")), type(mailbox_id) - db = self._db - row = db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, mailbox_id)).fetchone() - if not row: - self._db.execute("INSERT INTO `mailboxes`" - " (`app_id`, `id`, `for_nameplate`, `updated`)" - " VALUES(?,?,?,?)", - (self._app_id, mailbox_id, for_nameplate, when)) - # we don't need a commit here, because mailbox.open() only - # does SELECT FROM `mailbox_sides`, not from `mailboxes` - - def open_mailbox(self, mailbox_id, side, when): - assert isinstance(mailbox_id, type("")), type(mailbox_id) - self._add_mailbox(mailbox_id, False, side, when) # ensure row exists - db = self._db - if not mailbox_id in self._mailboxes: # ensure Mailbox object exists - if self._log_requests: - log.msg("spawning #%s for app_id %s" % (mailbox_id, - self._app_id)) - self._mailboxes[mailbox_id] = Mailbox(self, self._db, - self._app_id, mailbox_id) - mailbox = self._mailboxes[mailbox_id] - - # delegate to mailbox.open() to add a row to mailbox_sides, and - # update the mailbox.updated timestamp - mailbox.open(side, when) - db.commit() - rows = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (mailbox_id,)).fetchall() - if len(rows) > 2: - raise CrowdedError("too many sides have opened this mailbox") - return mailbox - - def free_mailbox(self, mailbox_id): - # called from Mailbox.delete_and_summarize(), which deletes any - # messages - - if mailbox_id in self._mailboxes: - self._mailboxes.pop(mailbox_id) - #if self._log_requests: - # log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % - # (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) - - def _summarize_mailbox_and_store(self, for_nameplate, side_rows, - delete_time, pruned): - db = self._db - u = self._summarize_mailbox(side_rows, delete_time, pruned) - db.execute("INSERT INTO `mailbox_usage`" - " (`app_id`, `for_nameplate`," - " `started`, `total_time`, `waiting_time`, `result`)" - " VALUES (?,?, ?,?,?,?)", - (self._app_id, for_nameplate, - u.started, u.total_time, u.waiting_time, u.result)) - self._mailbox_counts[u.result] += 1 - - def _summarize_mailbox(self, side_rows, delete_time, pruned): - times = sorted([row["added"] for row in side_rows]) - started = times[0] - if self._blur_usage: - started = self._blur_usage * (started // self._blur_usage) - waiting_time = None - if len(times) > 1: - waiting_time = times[1] - times[0] - total_time = delete_time - times[0] - - num_sides = len(times) - if num_sides == 0: - result = "quiet" - elif num_sides == 1: - result = "lonely" - else: - result = "happy" - - # "mood" is only recorded at close() - moods = [row["mood"] for row in side_rows if row.get("mood")] - if "lonely" in moods: - result = "lonely" - if "errory" in moods: - result = "errory" - if "scary" in moods: - result = "scary" - if pruned: - result = "pruney" - if num_sides > 2: - result = "crowded" - - return Usage(started=started, waiting_time=waiting_time, - total_time=total_time, result=result) - - def prune(self, now, old): - # The pruning check runs every 10 minutes, and "old" is defined to be - # 11 minutes ago (unit tests can use different values). The client is - # allowed to disconnect for up to 9 minutes without losing the - # channel (nameplate, mailbox, and messages). - - # Each time a client does something, the mailbox.updated field is - # updated with the current timestamp. If a client is subscribed to - # the mailbox when pruning check runs, the "updated" field is also - # updated. After that check, if the "updated" field is "old", the - # channel is deleted. - - # For now, pruning is logged even if log_requests is False, to debug - # the pruning process, and since pruning is triggered by a timer - # instead of by user action. It does reveal which mailboxes were - # present when the pruning process began, though, so in the log run - # it should do less logging. - log.msg(" prune begins (%s)" % self._app_id) - db = self._db - modified = False - - for mailbox in self._mailboxes.values(): - if mailbox.has_listeners(): - log.msg("touch %s because listeners" % mailbox._mailbox_id) - mailbox._touch(now) - db.commit() # make sure the updates are visible below - - new_mailboxes = set() - old_mailboxes = set() - for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?", - (self._app_id,)).fetchall(): - mailbox_id = row["id"] - log.msg(" 1: age=%s, old=%s, %s" % - (now - row["updated"], now - old, mailbox_id)) - if row["updated"] > old: - new_mailboxes.add(mailbox_id) - else: - old_mailboxes.add(mailbox_id) - log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes) - - old_nameplates = set() - for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?", - (self._app_id,)).fetchall(): - npid = row["id"] - mailbox_id = row["mailbox_id"] - if mailbox_id in old_mailboxes: - old_nameplates.add(npid) - log.msg(" 3: old_nameplates dbids", old_nameplates) - - for npid in old_nameplates: - log.msg(" deleting nameplate with dbid", npid) - side_rows = db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=?", - (npid,)).fetchall() - db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?", - (npid,)) - db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,)) - self._summarize_nameplate_and_store(side_rows, now, pruned=True) - modified = True - - # delete all messages for old mailboxes - # delete all old mailboxes - - for mailbox_id in old_mailboxes: - log.msg(" deleting mailbox", mailbox_id) - row = db.execute("SELECT * FROM `mailboxes`" - " WHERE `id`=?", (mailbox_id,)).fetchone() - for_nameplate = row["for_nameplate"] - side_rows = db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (mailbox_id,)).fetchall() - db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?", - (mailbox_id,)) - db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?", - (mailbox_id,)) - db.execute("DELETE FROM `mailboxes` WHERE `id`=?", - (mailbox_id,)) - self._summarize_mailbox_and_store(for_nameplate, side_rows, - now, pruned=True) - modified = True - - if modified: - db.commit() - log.msg(" prune complete, modified:", modified) - - def get_counts(self): - return (self._nameplate_counts, self._mailbox_counts) - - def _shutdown(self): - for channel in self._mailboxes.values(): - channel._shutdown() - - -class Rendezvous(service.MultiService): - - def __init__(self, db, welcome, blur_usage, allow_list): - service.MultiService.__init__(self) - self._db = db - self._welcome = welcome - self._blur_usage = blur_usage - log_requests = blur_usage is None - self._log_requests = log_requests - self._allow_list = allow_list - self._apps = {} - - def get_welcome(self): - return self._welcome - def get_log_requests(self): - return self._log_requests - - def get_app(self, app_id): - assert isinstance(app_id, type("")) - if not app_id in self._apps: - if self._log_requests: - log.msg("spawning app_id %s" % (app_id,)) - self._apps[app_id] = AppNamespace( - self._db, - self._blur_usage, - self._log_requests, - app_id, - self._allow_list, - ) - return self._apps[app_id] - - def get_all_apps(self): - apps = set() - for row in self._db.execute("SELECT DISTINCT `app_id`" - " FROM `nameplates`").fetchall(): - apps.add(row["app_id"]) - for row in self._db.execute("SELECT DISTINCT `app_id`" - " FROM `mailboxes`").fetchall(): - apps.add(row["app_id"]) - for row in self._db.execute("SELECT DISTINCT `app_id`" - " FROM `messages`").fetchall(): - apps.add(row["app_id"]) - return apps - - def prune_all_apps(self, now, old): - # As with AppNamespace.prune_old_mailboxes, we log for now. - log.msg("beginning app prune") - for app_id in sorted(self.get_all_apps()): - log.msg(" app prune checking %r" % (app_id,)) - app = self.get_app(app_id) - app.prune(now, old) - log.msg("app prune ends, %d apps" % len(self._apps)) - - def get_stats(self): - stats = {} - - # current status: expected to be zero most of the time - c = stats["active"] = {} - c["apps"] = len(self.get_all_apps()) - def q(query, values=()): - row = self._db.execute(query, values).fetchone() - return list(row.values())[0] - c["nameplates_total"] = q("SELECT COUNT() FROM `nameplates`") - # TODO: nameplates with only one side (most of them) - # TODO: nameplates with two sides (very fleeting) - # TODO: nameplates with three or more sides (crowded, unlikely) - c["mailboxes_total"] = q("SELECT COUNT() FROM `mailboxes`") - # TODO: mailboxes with only one side (most of them) - # TODO: mailboxes with two sides (somewhat fleeting, in-transit) - # TODO: mailboxes with three or more sides (unlikely) - c["messages_total"] = q("SELECT COUNT() FROM `messages`") - - # usage since last reboot - nameplate_counts = collections.defaultdict(int) - mailbox_counts = collections.defaultdict(int) - for app in self._apps.values(): - nc, mc = app.get_counts() - for result, count in nc.items(): - nameplate_counts[result] += count - for result, count in mc.items(): - mailbox_counts[result] += count - urb = stats["since_reboot"] = {} - urb["nameplate_moods"] = {} - for result, count in nameplate_counts.items(): - urb["nameplate_moods"][result] = count - urb["nameplates_total"] = sum(nameplate_counts.values()) - urb["mailbox_moods"] = {} - for result, count in mailbox_counts.items(): - urb["mailbox_moods"][result] = count - urb["mailboxes_total"] = sum(mailbox_counts.values()) - - # historical usage (all-time) - u = stats["all_time"] = {} - un = u["nameplate_moods"] = {} - # TODO: there's probably a single SQL query for all this - un["happy"] = q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='happy'") - un["lonely"] = q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='lonely'") - un["pruney"] = q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='pruney'") - un["crowded"] = q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='crowded'") - u["nameplates_total"] = q("SELECT COUNT() FROM `nameplate_usage`") - um = u["mailbox_moods"] = {} - um["happy"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='happy'") - um["scary"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='scary'") - um["lonely"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='lonely'") - um["quiet"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='quiet'") - um["errory"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='errory'") - um["pruney"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='pruney'") - um["crowded"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='crowded'") - u["mailboxes_total"] = q("SELECT COUNT() FROM `mailbox_usage`") - u["mailboxes_standalone"] = q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `for_nameplate`=0") - - # recent timings (last 100 operations) - # TODO: median/etc of nameplate.total_time - # TODO: median/etc of mailbox.waiting_time (should be the same) - # TODO: median/etc of mailbox.total_time - - # other - # TODO: mailboxes without nameplates (needs new DB schema) - - return stats - - def stopService(self): - # This forcibly boots any clients that are still connected, which - # helps with unit tests that use threads for both clients. One client - # hits an exception, which terminates the test (and .tearDown calls - # stopService on the relay), but the other client (in its thread) is - # still waiting for a message. By killing off all connections, that - # other client gets an error, and exits promptly. - for app in self._apps.values(): - app._shutdown() - return service.MultiService.stopService(self) diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py deleted file mode 100644 index a5f8eb0..0000000 --- a/src/wormhole/server/rendezvous_websocket.py +++ /dev/null @@ -1,306 +0,0 @@ -from __future__ import unicode_literals -import time -from twisted.internet import reactor -from twisted.python import log -from autobahn.twisted import websocket -from .rendezvous import CrowdedError, ReclaimedError, SidedMessage -from ..util import dict_to_bytes, bytes_to_dict - -# The WebSocket allows the client to send "commands" to the server, and the -# server to send "responses" to the client. Note that commands and responses -# are not necessarily one-to-one. All commands provoke an "ack" response -# (with a copy of the original message) for timing, testing, and -# synchronization purposes. All commands and responses are JSON-encoded. - -# Each WebSocket connection is bound to one "appid" and one "side", which are -# set by the "bind" command (which must be the first command on the -# connection), and must be set before any other command will be accepted. - -# Each connection can be bound to a single "mailbox" (a two-sided -# store-and-forward queue, identified by the "mailbox id": a long, randomly -# unique string identifier) by using the "open" command. This protects the -# mailbox from idle closure, enables the "add" command (to put new messages -# in the queue), and triggers delivery of past and future messages via the -# "message" response. The "close" command removes the binding (but note that -# it does not enable the subsequent binding of a second mailbox). When the -# last side closes a mailbox, its contents are deleted. - -# Additionally, the connection can be bound a single "nameplate", which is -# short identifier that makes up the first component of a wormhole code. Each -# nameplate points to a single long-id "mailbox". The "allocate" message -# determines the shortest available numeric nameplate, reserves it, and -# returns the nameplate id. "list" returns a list of all numeric nameplates -# which currently have only one side active (i.e. they are waiting for a -# partner). The "claim" message reserves an arbitrary nameplate id (perhaps -# the receiver of a wormhole connection typed in a code they got from the -# sender, or perhaps the two sides agreed upon a code offline and are both -# typing it in), and the "release" message releases it. When every side that -# has claimed the nameplate has also released it, the nameplate is -# deallocated (but they will probably keep the underlying mailbox open). - -# "claim" and "release" may only be called once per connection, however calls -# across connections (assuming a consistent "side") are idempotent. [connect, -# claim, disconnect, connect, claim] is legal, but not useful, as is a -# "release" for a nameplate that nobody is currently claiming. - -# "open" and "close" may only be called once per connection. They are -# basically idempotent, however "open" doubles as a subscribe action. So -# [connect, open, disconnect, connect, open] is legal *and* useful (without -# the second "open", the second connection would not be subscribed to hear -# about new messages). - -# Inbound (client to server) commands are marked as "->" below. Unrecognized -# inbound keys will be ignored. Outbound (server to client) responses use -# "<-". There is no guaranteed correlation between requests and responses. In -# this list, "A -> B" means that some time after A is received, at least one -# message of type B will be sent out (probably). - -# All responses include a "server_tx" key, which is a float (seconds since -# epoch) with the server clock just before the outbound response was written -# to the socket. - -# connection -> welcome -# <- {type: "welcome", welcome: {}} # .welcome keys are all optional: -# current_cli_version: out-of-date clients display a warning -# motd: all clients display message, then continue normally -# error: all clients display mesage, then terminate with error -# -> {type: "bind", appid:, side:} -# -# -> {type: "list"} -> nameplates -# <- {type: "nameplates", nameplates: [{id: str,..},..]} -# -> {type: "allocate"} -> nameplate, mailbox -# <- {type: "allocated", nameplate: str} -# -> {type: "claim", nameplate: str} -> mailbox -# <- {type: "claimed", mailbox: str} -# -> {type: "release"} -# .nameplate is optional, but must match previous claim() -# <- {type: "released"} -# -# -> {type: "open", mailbox: str} -> message -# sends old messages now, and subscribes to deliver future messages -# <- {type: "message", side:, phase:, body:, msg_id:}} # body is hex -# -> {type: "add", phase: str, body: hex} # will send echo in a "message" -# -# -> {type: "close", mood: str} -> closed -# .mailbox is optional, but must match previous open() -# <- {type: "closed"} -# -# <- {type: "error", error: str, orig: {}} # in response to malformed msgs - -# for tests that need to know when a message has been processed: -# -> {type: "ping", ping: int} -> pong (does not require bind/claim) -# <- {type: "pong", pong: int} - -class Error(Exception): - def __init__(self, explain): - self._explain = explain - -class WebSocketRendezvous(websocket.WebSocketServerProtocol): - def __init__(self): - websocket.WebSocketServerProtocol.__init__(self) - self._app = None - self._side = None - self._did_allocate = False # only one allocate() per websocket - self._listening = False - self._did_claim = False - self._nameplate_id = None - self._did_release = False - self._did_open = False - self._mailbox = None - self._mailbox_id = None - self._did_close = False - - def onConnect(self, request): - rv = self.factory.rendezvous - if rv.get_log_requests(): - log.msg("ws client connecting: %s" % (request.peer,)) - self._reactor = self.factory.reactor - - def onOpen(self): - rv = self.factory.rendezvous - self.send("welcome", welcome=rv.get_welcome()) - - def onMessage(self, payload, isBinary): - server_rx = time.time() - msg = bytes_to_dict(payload) - try: - if "type" not in msg: - raise Error("missing 'type'") - self.send("ack", id=msg.get("id")) - - mtype = msg["type"] - if mtype == "ping": - return self.handle_ping(msg) - if mtype == "bind": - return self.handle_bind(msg) - - if not self._app: - raise Error("must bind first") - if mtype == "list": - return self.handle_list() - if mtype == "allocate": - return self.handle_allocate(server_rx) - if mtype == "claim": - return self.handle_claim(msg, server_rx) - if mtype == "release": - return self.handle_release(msg, server_rx) - - if mtype == "open": - return self.handle_open(msg, server_rx) - if mtype == "add": - return self.handle_add(msg, server_rx) - if mtype == "close": - return self.handle_close(msg, server_rx) - - raise Error("unknown type") - except Error as e: - self.send("error", error=e._explain, orig=msg) - - def handle_ping(self, msg): - if "ping" not in msg: - raise Error("ping requires 'ping'") - self.send("pong", pong=msg["ping"]) - - def handle_bind(self, msg): - if self._app or self._side: - raise Error("already bound") - if "appid" not in msg: - raise Error("bind requires 'appid'") - if "side" not in msg: - raise Error("bind requires 'side'") - self._app = self.factory.rendezvous.get_app(msg["appid"]) - self._side = msg["side"] - - - def handle_list(self): - nameplate_ids = sorted(self._app.get_nameplate_ids()) - # provide room to add nameplate attributes later (like which wordlist - # is used for each, maybe how many words) - nameplates = [{"id": nid} for nid in nameplate_ids] - self.send("nameplates", nameplates=nameplates) - - def handle_allocate(self, server_rx): - if self._did_allocate: - raise Error("you already allocated one, don't be greedy") - nameplate_id = self._app.allocate_nameplate(self._side, server_rx) - assert isinstance(nameplate_id, type("")) - self._did_allocate = True - self.send("allocated", nameplate=nameplate_id) - - def handle_claim(self, msg, server_rx): - if "nameplate" not in msg: - raise Error("claim requires 'nameplate'") - if self._did_claim: - raise Error("only one claim per connection") - self._did_claim = True - nameplate_id = msg["nameplate"] - assert isinstance(nameplate_id, type("")), type(nameplate_id) - self._nameplate_id = nameplate_id - try: - mailbox_id = self._app.claim_nameplate(nameplate_id, self._side, - server_rx) - except CrowdedError: - raise Error("crowded") - except ReclaimedError: - raise Error("reclaimed") - self.send("claimed", mailbox=mailbox_id) - - def handle_release(self, msg, server_rx): - if self._did_release: - raise Error("only one release per connection") - if "nameplate" in msg: - if self._nameplate_id is not None: - if msg["nameplate"] != self._nameplate_id: - raise Error("release and claim must use same nameplate") - nameplate_id = msg["nameplate"] - else: - if self._nameplate_id is None: - raise Error("release without nameplate must follow claim") - nameplate_id = self._nameplate_id - assert nameplate_id is not None - self._did_release = True - self._app.release_nameplate(nameplate_id, self._side, server_rx) - self.send("released") - - - def handle_open(self, msg, server_rx): - if self._mailbox: - raise Error("only one open per connection") - if "mailbox" not in msg: - raise Error("open requires 'mailbox'") - mailbox_id = msg["mailbox"] - assert isinstance(mailbox_id, type("")) - self._mailbox_id = mailbox_id - try: - self._mailbox = self._app.open_mailbox(mailbox_id, self._side, - server_rx) - except CrowdedError: - raise Error("crowded") - def _send(sm): - self.send("message", side=sm.side, phase=sm.phase, - body=sm.body, server_rx=sm.server_rx, id=sm.msg_id) - def _stop(): - pass - self._listening = True - for old_sm in self._mailbox.add_listener(self, _send, _stop): - _send(old_sm) - - def handle_add(self, msg, server_rx): - if not self._mailbox: - raise Error("must open mailbox before adding") - if "phase" not in msg: - raise Error("missing 'phase'") - if "body" not in msg: - raise Error("missing 'body'") - msg_id = msg.get("id") # optional - sm = SidedMessage(side=self._side, phase=msg["phase"], - body=msg["body"], server_rx=server_rx, - msg_id=msg_id) - self._mailbox.add_message(sm) - - def handle_close(self, msg, server_rx): - if self._did_close: - raise Error("only one close per connection") - if "mailbox" in msg: - if self._mailbox_id is not None: - if msg["mailbox"] != self._mailbox_id: - raise Error("open and close must use same mailbox") - mailbox_id = msg["mailbox"] - else: - if self._mailbox_id is None: - raise Error("close without mailbox must follow open") - mailbox_id = self._mailbox_id - if not self._mailbox: - try: - self._mailbox = self._app.open_mailbox(mailbox_id, self._side, - server_rx) - except CrowdedError: - raise Error("crowded") - if self._listening: - self._mailbox.remove_listener(self) - self._listening = False - self._did_close = True - self._mailbox.close(self._side, msg.get("mood"), server_rx) - self._mailbox = None - self.send("closed") - - def send(self, mtype, **kwargs): - kwargs["type"] = mtype - kwargs["server_tx"] = time.time() - payload = dict_to_bytes(kwargs) - self.sendMessage(payload, False) - - def onClose(self, wasClean, code, reason): - #log.msg("onClose", self, self._mailbox, self._listening) - if self._mailbox and self._listening: - self._mailbox.remove_listener(self) - - -class WebSocketRendezvousFactory(websocket.WebSocketServerFactory): - protocol = WebSocketRendezvous - - def __init__(self, url, rendezvous): - websocket.WebSocketServerFactory.__init__(self, url) - self.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600) - self.rendezvous = rendezvous - self.reactor = reactor # for tests to control diff --git a/src/wormhole/server/server.py b/src/wormhole/server/server.py deleted file mode 100644 index 94b2b98..0000000 --- a/src/wormhole/server/server.py +++ /dev/null @@ -1,168 +0,0 @@ -# NO unicode_literals or static.Data() will break, because it demands -# a str on Python 2 -from __future__ import print_function -import os, time, json -try: - # 'resource' is unix-only - from resource import getrlimit, setrlimit, RLIMIT_NOFILE -except ImportError: # pragma: nocover - getrlimit, setrlimit, RLIMIT_NOFILE = None, None, None # pragma: nocover -from twisted.python import log -from twisted.internet import reactor, endpoints -from twisted.application import service, internet -from twisted.web import server, static -from twisted.web.resource import Resource -from autobahn.twisted.resource import WebSocketResource -from .database import get_db -from .rendezvous import Rendezvous -from .rendezvous_websocket import WebSocketRendezvousFactory - -SECONDS = 1.0 -MINUTE = 60*SECONDS - -CHANNEL_EXPIRATION_TIME = 11*MINUTE -EXPIRATION_CHECK_PERIOD = 10*MINUTE - -class Root(Resource): - # child_FOO is a nevow thing, not a twisted.web.resource thing - def __init__(self): - Resource.__init__(self) - self.putChild(b"", static.Data(b"Wormhole Relay\n", "text/plain")) - -class PrivacyEnhancedSite(server.Site): - logRequests = True - def log(self, request): - if self.logRequests: - return server.Site.log(self, request) - -class RelayServer(service.MultiService): - - def __init__(self, rendezvous_web_port, - advertise_version, db_url=":memory:", blur_usage=None, - signal_error=None, stats_file=None, allow_list=True, - websocket_protocol_options=()): - service.MultiService.__init__(self) - self._blur_usage = blur_usage - self._allow_list = allow_list - self._db_url = db_url - - db = get_db(db_url) - welcome = { - # adding .motd will cause all clients to display the message, - # then keep running normally - #"motd": "Welcome to the public relay.\nPlease enjoy this service.", - - # adding .error will cause all clients to fail, with this message - #"error": "This server has been disabled, see URL for details.", - } - - if advertise_version: - # The primary (python CLI) implementation will emit a message if - # its version does not match this key. If/when we have - # distributions which include older version, but we still expect - # them to be compatible, stop sending this key. - welcome["current_cli_version"] = advertise_version - if signal_error: - welcome["error"] = signal_error - - self._rendezvous = Rendezvous(db, welcome, blur_usage, self._allow_list) - self._rendezvous.setServiceParent(self) # for the pruning timer - - root = Root() - wsrf = WebSocketRendezvousFactory(None, self._rendezvous) - _set_options(websocket_protocol_options, wsrf) - root.putChild(b"v1", WebSocketResource(wsrf)) - - site = PrivacyEnhancedSite(root) - if blur_usage: - site.logRequests = False - - r = endpoints.serverFromString(reactor, rendezvous_web_port) - rendezvous_web_service = internet.StreamServerEndpointService(r, site) - rendezvous_web_service.setServiceParent(self) - - self._stats_file = stats_file - if self._stats_file and os.path.exists(self._stats_file): - os.unlink(self._stats_file) - # this will be regenerated immediately, but if something goes - # wrong in dump_stats(), it's better to have a missing file than - # a stale one - t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer) - t.setServiceParent(self) - - # make some things accessible for tests - self._db = db - self._root = root - self._rendezvous_web_service = rendezvous_web_service - self._rendezvous_websocket = wsrf - - def increase_rlimits(self): - if getrlimit is None: - log.msg("unable to import 'resource', leaving rlimit alone") - return - soft, hard = getrlimit(RLIMIT_NOFILE) - if soft >= 10000: - log.msg("RLIMIT_NOFILE.soft was %d, leaving it alone" % soft) - return - # OS-X defaults to soft=7168, and reports a huge number for 'hard', - # but won't accept anything more than soft=10240, so we can't just - # set soft=hard. Linux returns (1024, 1048576) and is fine with - # soft=hard. Cygwin is reported to return (256,-1) and accepts up to - # soft=3200. So we try multiple values until something works. - for newlimit in [hard, 10000, 3200, 1024]: - log.msg("changing RLIMIT_NOFILE from (%s,%s) to (%s,%s)" % - (soft, hard, newlimit, hard)) - try: - setrlimit(RLIMIT_NOFILE, (newlimit, hard)) - log.msg("setrlimit successful") - return - except ValueError as e: - log.msg("error during setrlimit: %s" % e) - continue - except: - log.msg("other error during setrlimit, leaving it alone") - log.err() - return - log.msg("unable to change rlimit, leaving it alone") - - def startService(self): - service.MultiService.startService(self) - self.increase_rlimits() - log.msg("websocket listening on /wormhole-relay/ws") - log.msg("Wormhole relay server (Rendezvous) running") - if self._blur_usage: - log.msg("blurring access times to %d seconds" % self._blur_usage) - log.msg("not logging HTTP requests") - else: - log.msg("not blurring access times") - if not self._allow_list: - log.msg("listing of allocated nameplates disallowed") - - def timer(self): - now = time.time() - old = now - CHANNEL_EXPIRATION_TIME - self._rendezvous.prune_all_apps(now, old) - self.dump_stats(now, validity=EXPIRATION_CHECK_PERIOD+60) - - def dump_stats(self, now, validity): - if not self._stats_file: - return - tmpfn = self._stats_file + ".tmp" - - data = {} - data["created"] = now - data["valid_until"] = now + validity - - start = time.time() - data["rendezvous"] = self._rendezvous.get_stats() - log.msg("get_stats took:", time.time() - start) - - with open(tmpfn, "wb") as f: - # json.dump(f) has str-vs-unicode issues on py2-vs-py3 - f.write(json.dumps(data, indent=1).encode("utf-8")) - f.write(b"\n") - os.rename(tmpfn, self._stats_file) - - -def _set_options(options, factory): - factory.setProtocolOptions(**dict(options)) diff --git a/src/wormhole/test/common.py b/src/wormhole/test/common.py index c1c4fe9..2521b38 100644 --- a/src/wormhole/test/common.py +++ b/src/wormhole/test/common.py @@ -6,27 +6,64 @@ from click.testing import CliRunner import mock from ..cli import cli from ..transit import allocate_tcp_port -from ..server.server import RelayServer +from wormhole_mailbox_server.server import make_server +from wormhole_mailbox_server.web import make_web_server +from wormhole_mailbox_server.database import create_channel_db, create_usage_db from wormhole_transit_relay.transit_server import Transit -class ServerBase: - def setUp(self): - self._setup_relay(None) +class MyInternetService(service.Service, object): + # like StreamServerEndpointService, but you can retrieve the port + def __init__(self, endpoint, factory): + self.endpoint = endpoint + self.factory = factory + self._port_d = defer.Deferred() + self._lp = None + def startService(self): + super(MyInternetService, self).startService() + d = self.endpoint.listen(self.factory) + def good(lp): + self._lp = lp + self._port_d.callback(lp.getHost().port) + def bad(f): + log.err(f) + self._port_d.errback(f) + d.addCallbacks(good, bad) + + @defer.inlineCallbacks + def stopService(self): + if self._lp: + yield self._lp.stopListening() + + def getPort(self): # only call once! + return self._port_d + +class ServerBase: + @defer.inlineCallbacks + def setUp(self): + yield self._setup_relay(None) + + @defer.inlineCallbacks def _setup_relay(self, error, advertise_version=None): self.sp = service.MultiService() self.sp.startService() - self.relayport = allocate_tcp_port() # need to talk to twisted team about only using unicode in # endpoints.serverFromString - s = RelayServer("tcp:%d:interface=127.0.0.1" % self.relayport, - advertise_version=advertise_version, - signal_error=error) + db = create_channel_db(":memory:") + self._usage_db = create_usage_db(":memory:") + self._rendezvous = make_server(db, + advertise_version=advertise_version, + signal_error=error, + usage_db=self._usage_db) + ep = endpoints.TCP4ServerEndpoint(reactor, 0, interface="127.0.0.1") + site = make_web_server(self._rendezvous, log_requests=False) + #self._lp = yield ep.listen(site) + s = MyInternetService(ep, site) s.setServiceParent(self.sp) + self.rdv_ws_port = yield s.getPort() self._relay_server = s - self._rendezvous = s._rendezvous - self.relayurl = u"ws://127.0.0.1:%d/v1" % self.relayport - self.rdv_ws_port = self.relayport + #self._rendezvous = s._rendezvous + self.relayurl = u"ws://127.0.0.1:%d/v1" % self.rdv_ws_port # ws://127.0.0.1:%d/wormhole-relay/ws self.transitport = allocate_tcp_port() @@ -38,6 +75,7 @@ class ServerBase: internet.StreamServerEndpointService(ep, f).setServiceParent(self.sp) self.transit = u"tcp:127.0.0.1:%d" % self.transitport + @defer.inlineCallbacks def tearDown(self): # Unit tests that spawn a (blocking) client in a thread might still # have threads running at this point, if one is stuck waiting for a @@ -49,34 +87,27 @@ class ServerBase: # XXX FIXME there's something in _noclobber test that's not # waiting for a close, I think -- was pretty relieably getting # unclean-reactor, but adding a slight pause here stops it... - from twisted.internet import reactor tp = reactor.getThreadPool() if not tp.working: - d = defer.succeed(None) - d.addCallback(lambda _: self.sp.stopService()) - d.addCallback(lambda _: task.deferLater(reactor, 0.1, lambda: None)) - return d - return self.sp.stopService() + yield self.sp.stopService() + yield task.deferLater(reactor, 0.1, lambda: None) + defer.returnValue(None) # disconnect all callers d = defer.maybeDeferred(self.sp.stopService) - wait_d = defer.Deferred() # wait a second, then check to see if it worked - reactor.callLater(1.0, wait_d.callback, None) - def _later(res): - if len(tp.working): - log.msg("wormhole.test.common.ServerBase.tearDown:" - " I was unable to convince all threads to exit.") - tp.dumpStats() - print("tearDown warning: threads are still active") - print("This test will probably hang until one of the" - " clients gives up of their own accord.") - else: - log.msg("wormhole.test.common.ServerBase.tearDown:" - " I convinced all threads to exit.") - return d - wait_d.addCallback(_later) - return wait_d + yield task.deferLater(reactor, 1.0, lambda: None) + if len(tp.working): + log.msg("wormhole.test.common.ServerBase.tearDown:" + " I was unable to convince all threads to exit.") + tp.dumpStats() + print("tearDown warning: threads are still active") + print("This test will probably hang until one of the" + " clients gives up of their own accord.") + else: + log.msg("wormhole.test.common.ServerBase.tearDown:" + " I convinced all threads to exit.") + yield d def config(*argv): r = CliRunner() diff --git a/src/wormhole/test/test_cli.py b/src/wormhole/test/test_cli.py index 2ff080e..5cc8b60 100644 --- a/src/wormhole/test/test_cli.py +++ b/src/wormhole/test/test_cli.py @@ -17,8 +17,6 @@ from ..cli import cmd_send, cmd_receive, welcome, cli from ..errors import (TransferError, WrongPasswordError, WelcomeError, UnsendableFileError, ServerConnectionError) from .._interfaces import ITorManager -from wormhole.server.cmd_server import MyPlugin -from wormhole.server.cli import server def build_offer(args): @@ -564,7 +562,7 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase): yield gatherResults([send_d, receive_d], True) if fake_tor: - expected_endpoints = [("127.0.0.1", self.relayport)] + expected_endpoints = [("127.0.0.1", self.rdv_ws_port)] if mode in ("file", "directory"): expected_endpoints.append(("127.0.0.1", self.transitport)) tx_timing = mtx_tm.call_args[1]["timing"] @@ -667,9 +665,6 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase): self.failUnlessEqual(modes[i], stat.S_IMODE(os.stat(fn).st_mode)) - # check server stats - self._rendezvous.get_stats() - def test_text(self): return self._do_test() def test_text_subprocess(self): @@ -849,9 +844,6 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase): with open(fn, "r") as f: self.failUnlessEqual(f.read(), PRESERVE) - # check server stats - self._rendezvous.get_stats() - def test_fail_file_noclobber(self): return self._do_test_fail("file", "noclobber") def test_fail_directory_noclobber(self): @@ -915,12 +907,10 @@ class ZeroMode(ServerBase, unittest.TestCase): self.assertEqual(receive_stdout, message+NL) self.assertEqual(receive_stderr, "") - # check server stats - self._rendezvous.get_stats() - class NotWelcome(ServerBase, unittest.TestCase): + @inlineCallbacks def setUp(self): - self._setup_relay(error="please upgrade XYZ") + yield self._setup_relay(error="please upgrade XYZ") self.cfg = cfg = config("send") cfg.hide_progress = True cfg.listen = False @@ -949,7 +939,7 @@ class NotWelcome(ServerBase, unittest.TestCase): class NoServer(ServerBase, unittest.TestCase): @inlineCallbacks def setUp(self): - self._setup_relay(None) + yield self._setup_relay(None) yield self._relay_server.disownServiceParent() @inlineCallbacks @@ -1093,8 +1083,9 @@ class ExtractFile(unittest.TestCase): self.assertIn("malicious zipfile", str(e)) class AppID(ServerBase, unittest.TestCase): + @inlineCallbacks def setUp(self): - d = super(AppID, self).setUp() + yield super(AppID, self).setUp() self.cfg = cfg = config("send") # common options for all tests in this suite cfg.hide_progress = True @@ -1102,7 +1093,6 @@ class AppID(ServerBase, unittest.TestCase): cfg.transit_helper = "" cfg.stdout = io.StringIO() cfg.stderr = io.StringIO() - return d @inlineCallbacks def test_override(self): @@ -1117,9 +1107,9 @@ class AppID(ServerBase, unittest.TestCase): yield send_d yield receive_d - used = self._rendezvous._db.execute("SELECT DISTINCT `app_id`" - " FROM `nameplate_usage`" - ).fetchall() + used = self._usage_db.execute("SELECT DISTINCT `app_id`" + " FROM `nameplates`" + ).fetchall() self.assertEqual(len(used), 1, used) self.assertEqual(used[0]["app_id"], u"appid2") @@ -1260,97 +1250,3 @@ class Help(unittest.TestCase): result = CliRunner().invoke(cli.wormhole, ["--help"]) self._check_top_level_help(result.output) self.assertEqual(result.exit_code, 0) - -class FakeConfig(object): - no_daemon = True - blur_usage = True - advertise_version = u"fake.version.1" - transit = str('tcp:4321') - rendezvous = str('tcp:1234') - signal_error = True - allow_list = False - relay_database_path = "relay.sqlite" - stats_json_path = "stats.json" - - -class Server(unittest.TestCase): - - def setUp(self): - self.runner = CliRunner() - - @mock.patch('wormhole.server.cmd_server.twistd') - def test_server_disallow_list(self, fake_twistd): - result = self.runner.invoke(server, ['start', '--no-daemon', '--disallow-list']) - self.assertEqual(0, result.exit_code) - - def test_server_plugin(self): - cfg = FakeConfig() - plugin = MyPlugin(cfg) - relay = plugin.makeService(None) - self.assertEqual(False, relay._allow_list) - - @mock.patch("wormhole.server.cmd_server.start_server") - def test_start_no_args(self, fake_start_server): - result = self.runner.invoke(server, ['start']) - self.assertEqual(0, result.exit_code) - cfg = fake_start_server.mock_calls[0][1][0] - MyPlugin(cfg).makeService(None) - - @mock.patch("wormhole.server.cmd_server.restart_server") - def test_restart_no_args(self, fake_start_reserver): - result = self.runner.invoke(server, ['restart']) - self.assertEqual(0, result.exit_code) - cfg = fake_start_reserver.mock_calls[0][1][0] - MyPlugin(cfg).makeService(None) - - def test_state_locations(self): - cfg = FakeConfig() - plugin = MyPlugin(cfg) - relay = plugin.makeService(None) - self.assertEqual('relay.sqlite', relay._db_url) - self.assertEqual('stats.json', relay._stats_file) - - @mock.patch("wormhole.server.cmd_server.start_server") - def test_websocket_protocol_options(self, fake_start_server): - result = self.runner.invoke( - server, [ - 'start', - '--websocket-protocol-option=a=3', - '--websocket-protocol-option=b=true', - '--websocket-protocol-option=c=3.5', - '--websocket-protocol-option=d=["foo","bar"]', - '--websocket-protocol-option', 'e=["foof","barf"]', - ]) - self.assertEqual(0, result.exit_code) - cfg = fake_start_server.mock_calls[0][1][0] - self.assertEqual( - cfg.websocket_protocol_option, - [("a", 3), ("b", True), ("c", 3.5), ("d", ['foo', 'bar']), - ("e", ['foof', 'barf']), - ], - ) - - def test_broken_websocket_protocol_options(self): - result = self.runner.invoke( - server, [ - 'start', - '--websocket-protocol-option=a', - ]) - self.assertNotEqual(0, result.exit_code) - self.assertIn( - 'Error: Invalid value for "--websocket-protocol-option": ' - 'format options as OPTION=VALUE', - result.output, - ) - - result = self.runner.invoke( - server, [ - 'start', - '--websocket-protocol-option=a=foo', - ]) - self.assertNotEqual(0, result.exit_code) - self.assertIn( - 'Error: Invalid value for "--websocket-protocol-option": ' - 'could not parse JSON value for a', - result.output, - ) diff --git a/src/wormhole/test/test_database.py b/src/wormhole/test/test_database.py deleted file mode 100644 index 4ebc2cb..0000000 --- a/src/wormhole/test/test_database.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import print_function, unicode_literals -import os -from twisted.python import filepath -from twisted.trial import unittest -from ..server import database -from ..server.database import get_db, TARGET_VERSION, dump_db - -class DB(unittest.TestCase): - def test_create_default(self): - db_url = ":memory:" - db = get_db(db_url) - rows = db.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], TARGET_VERSION) - - def test_failed_create_allows_subsequent_create(self): - patch = self.patch(database, "get_schema", lambda version: b"this is a broken schema") - dbfile = filepath.FilePath(self.mktemp()) - self.assertRaises(Exception, lambda: get_db(dbfile.path)) - patch.restore() - get_db(dbfile.path) - - def test_upgrade(self): - basedir = self.mktemp() - os.mkdir(basedir) - fn = os.path.join(basedir, "upgrade.db") - self.assertNotEqual(TARGET_VERSION, 2) - - # create an old-version DB in a file - db = get_db(fn, 2) - rows = db.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], 2) - del db - - # then upgrade the file to the latest version - dbA = get_db(fn, TARGET_VERSION) - rows = dbA.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], TARGET_VERSION) - dbA_text = dump_db(dbA) - del dbA - - # make sure the upgrades got committed to disk - dbB = get_db(fn, TARGET_VERSION) - dbB_text = dump_db(dbB) - del dbB - self.assertEqual(dbA_text, dbB_text) - - # The upgraded schema should be equivalent to that of a new DB. - # However a text dump will differ because ALTER TABLE always appends - # the new column to the end of a table, whereas our schema puts it - # somewhere in the middle (wherever it fits naturally). Also ALTER - # TABLE doesn't include comments. - if False: - latest_db = get_db(":memory:", TARGET_VERSION) - latest_text = dump_db(latest_db) - with open("up.sql","w") as f: f.write(dbA_text) - with open("new.sql","w") as f: f.write(latest_text) - # check with "diff -u _trial_temp/up.sql _trial_temp/new.sql" - self.assertEqual(dbA_text, latest_text) diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py deleted file mode 100644 index 6cc202f..0000000 --- a/src/wormhole/test/test_server.py +++ /dev/null @@ -1,1424 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, json, itertools, time -import mock -from twisted.trial import unittest -from twisted.python import log -from twisted.internet import reactor, defer, endpoints -from twisted.internet.defer import inlineCallbacks, returnValue -from autobahn.twisted import websocket -from .common import ServerBase -from ..server import server, rendezvous -from ..server.rendezvous import Usage, SidedMessage -from ..server.database import get_db - -def easy_relay( - rendezvous_web_port=str("tcp:0"), - advertise_version=None, - **kwargs -): - return server.RelayServer( - rendezvous_web_port, - advertise_version, - **kwargs - ) - -class RLimits(unittest.TestCase): - def test_rlimit(self): - def patch_s(name, *args, **kwargs): - return mock.patch("wormhole.server.server." + name, *args, **kwargs) - # We never start this, so the endpoints can be fake. - # serverFromString() requires bytes on py2 and str on py3, so this - # is easier than just passing "tcp:0" - ep = endpoints.TCP4ServerEndpoint(None, 0) - with patch_s("endpoints.serverFromString", return_value=ep): - s = server.RelayServer("fake", None) - fakelog = [] - def checklog(*expected): - self.assertEqual(fakelog, list(expected)) - fakelog[:] = [] - NF = "NOFILE" - mock_NF = patch_s("RLIMIT_NOFILE", NF) - - with patch_s("log.msg", fakelog.append): - with patch_s("getrlimit", None): - s.increase_rlimits() - checklog("unable to import 'resource', leaving rlimit alone") - - with mock_NF: - with patch_s("getrlimit", return_value=(20000, 30000)) as gr: - s.increase_rlimits() - self.assertEqual(gr.mock_calls, [mock.call(NF)]) - checklog("RLIMIT_NOFILE.soft was 20000, leaving it alone") - - with patch_s("getrlimit", return_value=(10, 30000)) as gr: - with patch_s("setrlimit", side_effect=TypeError("other")): - with patch_s("log.err") as err: - s.increase_rlimits() - self.assertEqual(err.mock_calls, [mock.call()]) - checklog("changing RLIMIT_NOFILE from (10,30000) to (30000,30000)", - "other error during setrlimit, leaving it alone") - - for maxlimit in [40000, 20000, 9000, 2000, 1000]: - def setrlimit(which, newlimit): - if newlimit[0] > maxlimit: - raise ValueError("nope") - return None - calls = [] - expected = [] - for tries in [30000, 10000, 3200, 1024]: - calls.append(mock.call(NF, (tries, 30000))) - expected.append("changing RLIMIT_NOFILE from (10,30000) to (%d,30000)" % tries) - if tries > maxlimit: - expected.append("error during setrlimit: nope") - else: - expected.append("setrlimit successful") - break - else: - expected.append("unable to change rlimit, leaving it alone") - - with patch_s("setrlimit", side_effect=setrlimit) as sr: - s.increase_rlimits() - self.assertEqual(sr.mock_calls, calls) - checklog(*expected) - -class _Util: - def _nameplate(self, app, name): - np_row = app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `name`=?", - (name,)).fetchone() - if not np_row: - return None, None - npid = np_row["id"] - side_rows = app._db.execute("SELECT * FROM `nameplate_sides`" - " WHERE `nameplates_id`=?", - (npid,)).fetchall() - return np_row, side_rows - - def _mailbox(self, app, mailbox_id): - mb_row = app._db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`='appid' AND `id`=?", - (mailbox_id,)).fetchone() - if not mb_row: - return None, None - side_rows = app._db.execute("SELECT * FROM `mailbox_sides`" - " WHERE `mailbox_id`=?", - (mailbox_id,)).fetchall() - return mb_row, side_rows - - def _messages(self, app): - c = app._db.execute("SELECT * FROM `messages`" - " WHERE `app_id`='appid' AND `mailbox_id`='mid'") - return c.fetchall() - -class Server(_Util, ServerBase, unittest.TestCase): - def test_apps(self): - app1 = self._rendezvous.get_app("appid1") - self.assertIdentical(app1, self._rendezvous.get_app("appid1")) - app2 = self._rendezvous.get_app("appid2") - self.assertNotIdentical(app1, app2) - - def test_nameplate_allocation(self): - app = self._rendezvous.get_app("appid") - nids = set() - # this takes a second, and claims all the short-numbered nameplates - def add(): - nameplate_id = app.allocate_nameplate("side1", 0) - self.assertEqual(type(nameplate_id), type("")) - nid = int(nameplate_id) - nids.add(nid) - for i in range(9): add() - self.assertNotIn(0, nids) - self.assertEqual(set(range(1,10)), nids) - - for i in range(100-10): add() - self.assertEqual(len(nids), 99) - self.assertEqual(set(range(1,100)), nids) - - for i in range(1000-100): add() - self.assertEqual(len(nids), 999) - self.assertEqual(set(range(1,1000)), nids) - - add() - self.assertEqual(len(nids), 1000) - biggest = max(nids) - self.assert_(1000 <= biggest < 1000000, biggest) - - def test_nameplate(self): - app = self._rendezvous.get_app("appid") - name = app.allocate_nameplate("side1", 0) - self.assertEqual(type(name), type("")) - nid = int(name) - self.assert_(0 < nid < 10, nid) - self.assertEqual(app.get_nameplate_ids(), set([name])) - # allocate also does a claim - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side1") - self.assertEqual(side_rows[0]["added"], 0) - - # duplicate claims by the same side are combined - mailbox_id = app.claim_nameplate(name, "side1", 1) - self.assertEqual(type(mailbox_id), type("")) - self.assertEqual(mailbox_id, np_row["mailbox_id"]) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["added"], 0) - self.assertEqual(mailbox_id, np_row["mailbox_id"]) - - # and they don't updated the 'added' time - mailbox_id2 = app.claim_nameplate(name, "side1", 2) - self.assertEqual(mailbox_id, mailbox_id2) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["added"], 0) - - # claim by the second side is new - mailbox_id3 = app.claim_nameplate(name, "side2", 3) - self.assertEqual(mailbox_id, mailbox_id3) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 2) - self.assertEqual(sorted([row["side"] for row in side_rows]), - sorted(["side1", "side2"])) - self.assertIn(("side2", 3), - [(row["side"], row["added"]) for row in side_rows]) - - # a third claim marks the nameplate as "crowded", and adds a third - # claim (which must be released later), but leaves the two existing - # claims alone - self.assertRaises(rendezvous.CrowdedError, - app.claim_nameplate, name, "side3", 4) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 3) - - # releasing a non-existent nameplate is ignored - app.release_nameplate(name+"not", "side4", 0) - - # releasing a side that never claimed the nameplate is ignored - app.release_nameplate(name, "side4", 0) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 3) - - # releasing one side leaves the second claim - app.release_nameplate(name, "side1", 5) - np_row, side_rows = self._nameplate(app, name) - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side1", False), claims) - self.assertIn(("side2", True), claims) - self.assertIn(("side3", True), claims) - - # releasing one side multiple times is ignored - app.release_nameplate(name, "side1", 5) - np_row, side_rows = self._nameplate(app, name) - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side1", False), claims) - self.assertIn(("side2", True), claims) - self.assertIn(("side3", True), claims) - - # release the second side - app.release_nameplate(name, "side2", 6) - np_row, side_rows = self._nameplate(app, name) - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side1", False), claims) - self.assertIn(("side2", False), claims) - self.assertIn(("side3", True), claims) - - # releasing the third side frees the nameplate, and adds usage - app.release_nameplate(name, "side3", 7) - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(np_row, None) - usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone() - self.assertEqual(usage["app_id"], "appid") - self.assertEqual(usage["started"], 0) - self.assertEqual(usage["waiting_time"], 3) - self.assertEqual(usage["total_time"], 7) - self.assertEqual(usage["result"], "crowded") - - - def test_mailbox(self): - app = self._rendezvous.get_app("appid") - mailbox_id = "mid" - m1 = app.open_mailbox(mailbox_id, "side1", 0) - - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side1") - self.assertEqual(side_rows[0]["added"], 0) - - # opening the same mailbox twice, by the same side, gets the same - # object, and does not update the "added" timestamp - self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side1", 1)) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side1") - self.assertEqual(side_rows[0]["added"], 0) - - # opening a second side gets the same object, and adds a new claim - self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side2", 2)) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 2) - adds = [(row["side"], row["added"]) for row in side_rows] - self.assertIn(("side1", 0), adds) - self.assertIn(("side2", 2), adds) - - # a third open marks it as crowded - self.assertRaises(rendezvous.CrowdedError, - app.open_mailbox, mailbox_id, "side3", 3) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 3) - m1.close("side3", "company", 4) - - # closing a side that never claimed the mailbox is ignored - m1.close("side4", "mood", 4) - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(len(side_rows), 3) - - # closing one side leaves the second claim - m1.close("side1", "mood", 5) - mb_row, side_rows = self._mailbox(app, mailbox_id) - sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows] - self.assertIn(("side1", False, "mood"), sides) - self.assertIn(("side2", True, None), sides) - self.assertIn(("side3", False, "company"), sides) - - # closing one side multiple times is ignored - m1.close("side1", "mood", 6) - mb_row, side_rows = self._mailbox(app, mailbox_id) - sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows] - self.assertIn(("side1", False, "mood"), sides) - self.assertIn(("side2", True, None), sides) - self.assertIn(("side3", False, "company"), sides) - - l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) - m1.add_listener("handle1", l1.append, stop1_f) - - # closing the second side frees the mailbox, and adds usage - m1.close("side2", "mood", 7) - self.assertEqual(stop1, [True]) - - mb_row, side_rows = self._mailbox(app, mailbox_id) - self.assertEqual(mb_row, None) - usage = app._db.execute("SELECT * FROM `mailbox_usage`").fetchone() - self.assertEqual(usage["app_id"], "appid") - self.assertEqual(usage["started"], 0) - self.assertEqual(usage["waiting_time"], 2) - self.assertEqual(usage["total_time"], 7) - self.assertEqual(usage["result"], "crowded") - - def test_messages(self): - app = self._rendezvous.get_app("appid") - mailbox_id = "mid" - m1 = app.open_mailbox(mailbox_id, "side1", 0) - m1.add_message(SidedMessage(side="side1", phase="phase", - body="body", server_rx=1, - msg_id="msgid")) - msgs = self._messages(app) - self.assertEqual(len(msgs), 1) - self.assertEqual(msgs[0]["body"], "body") - - l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) - l2 = []; stop2 = []; stop2_f = lambda: stop2.append(True) - old = m1.add_listener("handle1", l1.append, stop1_f) - self.assertEqual(len(old), 1) - self.assertEqual(old[0].side, "side1") - self.assertEqual(old[0].body, "body") - - m1.add_message(SidedMessage(side="side1", phase="phase2", - body="body2", server_rx=1, - msg_id="msgid")) - self.assertEqual(len(l1), 1) - self.assertEqual(l1[0].body, "body2") - old = m1.add_listener("handle2", l2.append, stop2_f) - self.assertEqual(len(old), 2) - - m1.add_message(SidedMessage(side="side1", phase="phase3", - body="body3", server_rx=1, - msg_id="msgid")) - self.assertEqual(len(l1), 2) - self.assertEqual(l1[-1].body, "body3") - self.assertEqual(len(l2), 1) - self.assertEqual(l2[-1].body, "body3") - - m1.remove_listener("handle1") - - m1.add_message(SidedMessage(side="side1", phase="phase4", - body="body4", server_rx=1, - msg_id="msgid")) - self.assertEqual(len(l1), 2) - self.assertEqual(l1[-1].body, "body3") - self.assertEqual(len(l2), 2) - self.assertEqual(l2[-1].body, "body4") - - m1._shutdown() - self.assertEqual(stop1, []) - self.assertEqual(stop2, [True]) - - # message adds are not idempotent: clients filter duplicates - m1.add_message(SidedMessage(side="side1", phase="phase", - body="body", server_rx=1, - msg_id="msgid")) - msgs = self._messages(app) - self.assertEqual(len(msgs), 5) - self.assertEqual(msgs[-1]["body"], "body") - -class Prune(unittest.TestCase): - - def _get_mailbox_updated(self, app, mbox_id): - row = app._db.execute("SELECT * FROM `mailboxes` WHERE" - " `app_id`=? AND `id`=?", - (app._app_id, mbox_id)).fetchone() - return row["updated"] - - def test_update(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, None, True) - app = rv.get_app("appid") - mbox_id = "mbox1" - app.open_mailbox(mbox_id, "side1", 1) - self.assertEqual(self._get_mailbox_updated(app, mbox_id), 1) - - mb = app.open_mailbox(mbox_id, "side2", 2) - self.assertEqual(self._get_mailbox_updated(app, mbox_id), 2) - - sm = SidedMessage("side1", "phase", "body", 3, "msgid") - mb.add_message(sm) - self.assertEqual(self._get_mailbox_updated(app, mbox_id), 3) - - def test_apps(self): - rv = rendezvous.Rendezvous(get_db(":memory:"), None, None, True) - app = rv.get_app("appid") - app.allocate_nameplate("side", 121) - app.prune = mock.Mock() - rv.prune_all_apps(now=123, old=122) - self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)]) - - def test_nameplates(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - - # timestamps <=50 are "old", >=51 are "new" - #OLD = "old"; NEW = "new" - #when = {OLD: 1, NEW: 60} - new_nameplates = set() - - APPID = "appid" - app = rv.get_app(APPID) - - # Exercise the first-vs-second newness tests - app.claim_nameplate("np-1", "side1", 1) - app.claim_nameplate("np-2", "side1", 1) - app.claim_nameplate("np-2", "side2", 2) - app.claim_nameplate("np-3", "side1", 60) - new_nameplates.add("np-3") - app.claim_nameplate("np-4", "side1", 1) - app.claim_nameplate("np-4", "side2", 60) - new_nameplates.add("np-4") - app.claim_nameplate("np-5", "side1", 60) - app.claim_nameplate("np-5", "side2", 61) - new_nameplates.add("np-5") - - rv.prune_all_apps(now=123, old=50) - - nameplates = set([row["name"] for row in - db.execute("SELECT * FROM `nameplates`").fetchall()]) - self.assertEqual(new_nameplates, nameplates) - mailboxes = set([row["id"] for row in - db.execute("SELECT * FROM `mailboxes`").fetchall()]) - self.assertEqual(len(new_nameplates), len(mailboxes)) - - def test_mailboxes(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - - # timestamps <=50 are "old", >=51 are "new" - #OLD = "old"; NEW = "new" - #when = {OLD: 1, NEW: 60} - new_mailboxes = set() - - APPID = "appid" - app = rv.get_app(APPID) - - # Exercise the first-vs-second newness tests - app.open_mailbox("mb-11", "side1", 1) - app.open_mailbox("mb-12", "side1", 1) - app.open_mailbox("mb-12", "side2", 2) - app.open_mailbox("mb-13", "side1", 60) - new_mailboxes.add("mb-13") - app.open_mailbox("mb-14", "side1", 1) - app.open_mailbox("mb-14", "side2", 60) - new_mailboxes.add("mb-14") - app.open_mailbox("mb-15", "side1", 60) - app.open_mailbox("mb-15", "side2", 61) - new_mailboxes.add("mb-15") - - rv.prune_all_apps(now=123, old=50) - - mailboxes = set([row["id"] for row in - db.execute("SELECT * FROM `mailboxes`").fetchall()]) - self.assertEqual(new_mailboxes, mailboxes) - - def test_lots(self): - OLD = "old"; NEW = "new" - for nameplate in [False, True]: - for mailbox in [OLD, NEW]: - for has_listeners in [False, True]: - self.one(nameplate, mailbox, has_listeners) - - def test_one(self): - # to debug specific problems found by test_lots - self.one(None, "new", False) - - def one(self, nameplate, mailbox, has_listeners): - desc = ("nameplate=%s, mailbox=%s, has_listeners=%s" % - (nameplate, mailbox, has_listeners)) - log.msg(desc) - - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - APPID = "appid" - app = rv.get_app(APPID) - - # timestamps <=50 are "old", >=51 are "new" - OLD = "old"; NEW = "new" - when = {OLD: 1, NEW: 60} - nameplate_survives = False - mailbox_survives = False - - mbid = "mbid" - if nameplate: - mbid = app.claim_nameplate("npid", "side1", when[mailbox]) - mb = app.open_mailbox(mbid, "side1", when[mailbox]) - - # the pruning algorithm doesn't care about the age of messages, - # because mailbox.updated is always updated each time we add a - # message - sm = SidedMessage("side1", "phase", "body", when[mailbox], "msgid") - mb.add_message(sm) - - if has_listeners: - mb.add_listener("handle", None, None) - - if (mailbox == NEW or has_listeners): - if nameplate: - nameplate_survives = True - mailbox_survives = True - messages_survive = mailbox_survives - - rv.prune_all_apps(now=123, old=50) - - nameplates = set([row["name"] for row in - db.execute("SELECT * FROM `nameplates`").fetchall()]) - self.assertEqual(nameplate_survives, bool(nameplates), - ("nameplate", nameplate_survives, nameplates, desc)) - - mailboxes = set([row["id"] for row in - db.execute("SELECT * FROM `mailboxes`").fetchall()]) - self.assertEqual(mailbox_survives, bool(mailboxes), - ("mailbox", mailbox_survives, mailboxes, desc)) - - messages = set([row["msg_id"] for row in - db.execute("SELECT * FROM `messages`").fetchall()]) - self.assertEqual(messages_survive, bool(messages), - ("messages", messages_survive, messages, desc)) - - -def strip_message(msg): - m2 = msg.copy() - m2.pop("id", None) - m2.pop("server_rx", None) - return m2 - -def strip_messages(messages): - return [strip_message(m) for m in messages] - -class WSClient(websocket.WebSocketClientProtocol): - def __init__(self): - websocket.WebSocketClientProtocol.__init__(self) - self.events = [] - self.errors = [] - self.d = None - self.ping_counter = itertools.count(0) - def onOpen(self): - self.factory.d.callback(self) - def onMessage(self, payload, isBinary): - assert not isBinary - event = json.loads(payload.decode("utf-8")) - if event["type"] == "error": - self.errors.append(event) - if self.d: - assert not self.events - d,self.d = self.d,None - d.callback(event) - return - self.events.append(event) - - def close(self): - self.d = defer.Deferred() - self.transport.loseConnection() - return self.d - def onClose(self, wasClean, code, reason): - if self.d: - self.d.callback((wasClean, code, reason)) - - def next_event(self): - assert not self.d - if self.events: - event = self.events.pop(0) - return defer.succeed(event) - self.d = defer.Deferred() - return self.d - - @inlineCallbacks - def next_non_ack(self): - while True: - m = yield self.next_event() - if isinstance(m, tuple): - print("unexpected onClose", m) - raise AssertionError("unexpected onClose") - if m["type"] != "ack": - returnValue(m) - - def strip_acks(self): - self.events = [e for e in self.events if e["type"] != "ack"] - - def send(self, mtype, **kwargs): - kwargs["type"] = mtype - payload = json.dumps(kwargs).encode("utf-8") - self.sendMessage(payload, False) - - def send_notype(self, **kwargs): - payload = json.dumps(kwargs).encode("utf-8") - self.sendMessage(payload, False) - - @inlineCallbacks - def sync(self): - ping = next(self.ping_counter) - self.send("ping", ping=ping) - # queue all messages until the pong, then put them back - old_events = [] - while True: - ev = yield self.next_event() - if ev["type"] == "pong" and ev["pong"] == ping: - self.events = old_events + self.events - returnValue(None) - old_events.append(ev) - -class WSFactory(websocket.WebSocketClientFactory): - protocol = WSClient - -class WSClientSync(unittest.TestCase): - # make sure my 'sync' method actually works - - @inlineCallbacks - def test_sync(self): - sent = [] - c = WSClient() - def _send(mtype, **kwargs): - sent.append( (mtype, kwargs) ) - c.send = _send - def add(mtype, **kwargs): - kwargs["type"] = mtype - c.onMessage(json.dumps(kwargs).encode("utf-8"), False) - # no queued messages - d = c.sync() - self.assertEqual(sent, [("ping", {"ping": 0})]) - self.assertNoResult(d) - add("pong", pong=0) - yield d - self.assertEqual(c.events, []) - - # one,two,ping,pong - add("one") - add("two", two=2) - d = c.sync() - add("pong", pong=1) - yield d - m = yield c.next_non_ack() - self.assertEqual(m["type"], "one") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "two") - self.assertEqual(c.events, []) - - # one,ping,two,pong - add("one") - d = c.sync() - add("two", two=2) - add("pong", pong=2) - yield d - m = yield c.next_non_ack() - self.assertEqual(m["type"], "one") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "two") - self.assertEqual(c.events, []) - - # ping,one,two,pong - d = c.sync() - add("one") - add("two", two=2) - add("pong", pong=3) - yield d - m = yield c.next_non_ack() - self.assertEqual(m["type"], "one") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "two") - self.assertEqual(c.events, []) - - - -class WebSocketAPI(_Util, ServerBase, unittest.TestCase): - def setUp(self): - self._clients = [] - self._setup_relay(None, advertise_version="advertised.version") - - def tearDown(self): - for c in self._clients: - c.transport.loseConnection() - return ServerBase.tearDown(self) - - @inlineCallbacks - def make_client(self): - f = WSFactory(self.relayurl) - f.d = defer.Deferred() - reactor.connectTCP("127.0.0.1", self.rdv_ws_port, f) - c = yield f.d - self._clients.append(c) - returnValue(c) - - def check_welcome(self, data): - self.failUnlessIn("welcome", data) - self.failUnlessEqual(data["welcome"], - {"current_cli_version": "advertised.version"}) - - @inlineCallbacks - def test_welcome(self): - c1 = yield self.make_client() - msg = yield c1.next_non_ack() - self.check_welcome(msg) - self.assertEqual(self._rendezvous._apps, {}) - - @inlineCallbacks - def test_bind(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - - c1.send("bind", appid="appid") # missing side= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "bind requires 'side'") - - c1.send("bind", side="side") # missing appid= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "bind requires 'appid'") - - c1.send("bind", appid="appid", side="side") - yield c1.sync() - self.assertEqual(list(self._rendezvous._apps.keys()), ["appid"]) - - c1.send("bind", appid="appid", side="side") # duplicate - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "already bound") - - c1.send_notype(other="misc") # missing 'type' - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "missing 'type'") - - c1.send("___unknown") # unknown type - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "unknown type") - - c1.send("ping") # missing 'ping' - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "ping requires 'ping'") - - @inlineCallbacks - def test_list(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - - c1.send("list") # too early, must bind first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must bind first") - - c1.send("bind", appid="appid", side="side") - c1.send("list") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "nameplates") - self.assertEqual(m["nameplates"], []) - - app = self._rendezvous.get_app("appid") - nameplate_id1 = app.allocate_nameplate("side", 0) - app.claim_nameplate("np2", "side", 0) - - c1.send("list") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "nameplates") - nids = set() - for n in m["nameplates"]: - self.assertEqual(type(n), dict) - self.assertEqual(list(n.keys()), ["id"]) - nids.add(n["id"]) - self.assertEqual(nids, set([nameplate_id1, "np2"])) - - @inlineCallbacks - def test_allocate(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - - c1.send("allocate") # too early, must bind first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must bind first") - - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - c1.send("allocate") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "allocated") - name = m["nameplate"] - - nids = app.get_nameplate_ids() - self.assertEqual(len(nids), 1) - self.assertEqual(name, list(nids)[0]) - - c1.send("allocate") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], - "you already allocated one, don't be greedy") - - c1.send("claim", nameplate=name) # allocate+claim is ok - yield c1.sync() - np_row, side_rows = self._nameplate(app, name) - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side") - - @inlineCallbacks - def test_claim(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("claim") # missing nameplate= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "claim requires 'nameplate'") - - c1.send("claim", nameplate="np1") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "claimed") - mailbox_id = m["mailbox"] - self.assertEqual(type(mailbox_id), type("")) - - c1.send("claim", nameplate="np1") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error", err) - self.assertEqual(err["error"], "only one claim per connection") - - nids = app.get_nameplate_ids() - self.assertEqual(len(nids), 1) - self.assertEqual("np1", list(nids)[0]) - np_row, side_rows = self._nameplate(app, "np1") - self.assertEqual(len(side_rows), 1) - self.assertEqual(side_rows[0]["side"], "side") - - # claiming a nameplate assigns a random mailbox id and creates the - # mailbox row - mailboxes = app._db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`='appid'").fetchall() - self.assertEqual(len(mailboxes), 1) - - @inlineCallbacks - def test_claim_crowded(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - app.claim_nameplate("np1", "side1", 0) - app.claim_nameplate("np1", "side2", 0) - - # the third claim will signal crowding - c1.send("claim", nameplate="np1") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "crowded") - - @inlineCallbacks - def test_release(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - app.claim_nameplate("np1", "side2", 0) - - c1.send("release") # didn't do claim first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], - "release without nameplate must follow claim") - - c1.send("claim", nameplate="np1") - yield c1.next_non_ack() - - c1.send("release") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released", m) - - np_row, side_rows = self._nameplate(app, "np1") - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertIn(("side", False), claims) - self.assertIn(("side2", True), claims) - - c1.send("release") # no longer claimed - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "only one release per connection") - - @inlineCallbacks - def test_release_named(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("claim", nameplate="np1") - yield c1.next_non_ack() - - c1.send("release", nameplate="np1") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released", m) - - @inlineCallbacks - def test_release_named_ignored(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("release", nameplate="np1") # didn't do claim first, ignored - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released", m) - - @inlineCallbacks - def test_release_named_mismatch(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("claim", nameplate="np1") - yield c1.next_non_ack() - - c1.send("release", nameplate="np2") # mismatching nameplate - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], - "release and claim must use same nameplate") - - @inlineCallbacks - def test_open(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("open") # missing mailbox= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "open requires 'mailbox'") - - mb1 = app.open_mailbox("mb1", "side2", 0) - mb1.add_message(SidedMessage(side="side2", phase="phase", - body="body", server_rx=0, - msg_id="msgid")) - - c1.send("open", mailbox="mb1") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - self.assertTrue(mb1.has_listeners()) - - mb1.add_message(SidedMessage(side="side2", phase="phase2", - body="body2", server_rx=0, - msg_id="msgid")) - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body2") - - c1.send("open", mailbox="mb1") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "only one open per connection") - - @inlineCallbacks - def test_open_crowded(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - mbid = app.claim_nameplate("np1", "side1", 0) - app.claim_nameplate("np1", "side2", 0) - - # the third open will signal crowding - c1.send("open", mailbox=mbid) - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "crowded") - - @inlineCallbacks - def test_add(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - mb1 = app.open_mailbox("mb1", "side2", 0) - l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) - mb1.add_listener("handle1", l1.append, stop1_f) - - c1.send("add") # didn't open first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must open mailbox before adding") - - c1.send("open", mailbox="mb1") - - c1.send("add", body="body") # missing phase= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "missing 'phase'") - - c1.send("add", phase="phase") # missing body= - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "missing 'body'") - - c1.send("add", phase="phase", body="body") - m = yield c1.next_non_ack() # echoed back - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - - self.assertEqual(len(l1), 1) - self.assertEqual(l1[0].body, "body") - - @inlineCallbacks - def test_close(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("close", mood="mood") # must open first - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "close without mailbox must follow open") - - c1.send("open", mailbox="mb1") - yield c1.sync() - mb1 = app._mailboxes["mb1"] - self.assertTrue(mb1.has_listeners()) - - c1.send("close", mood="mood") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "closed") - self.assertFalse(mb1.has_listeners()) - - c1.send("close", mood="mood") # already closed - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error", m) - self.assertEqual(err["error"], "only one close per connection") - - @inlineCallbacks - def test_close_named(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("open", mailbox="mb1") - yield c1.sync() - - c1.send("close", mailbox="mb1", mood="mood") - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "closed") - - @inlineCallbacks - def test_close_named_ignored(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("close", mailbox="mb1", mood="mood") # no open first, ignored - m = yield c1.next_non_ack() - self.assertEqual(m["type"], "closed") - - @inlineCallbacks - def test_close_named_mismatch(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - - c1.send("open", mailbox="mb1") - yield c1.sync() - - c1.send("close", mailbox="mb2", mood="mood") - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "open and close must use same mailbox") - - @inlineCallbacks - def test_close_crowded(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - mbid = app.claim_nameplate("np1", "side1", 0) - app.claim_nameplate("np1", "side2", 0) - - # a close that allocates a third side will signal crowding - c1.send("close", mailbox=mbid) - err = yield c1.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "crowded") - - - @inlineCallbacks - def test_disconnect(self): - c1 = yield self.make_client() - yield c1.next_non_ack() - c1.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c1.send("open", mailbox="mb1") - yield c1.sync() - mb1 = app._mailboxes["mb1"] - self.assertTrue(mb1.has_listeners()) - - yield c1.close() - # wait for the server to notice the socket has closed - started = time.time() - while mb1.has_listeners() and (time.time()-started < 5.0): - d = defer.Deferred() - reactor.callLater(0.01, d.callback, None) - yield d - self.assertFalse(mb1.has_listeners()) - - @inlineCallbacks - def test_interrupted_client_nameplate(self): - # a client's interactions with the server might be split over - # multiple sequential WebSocket connections, e.g. when the server is - # bounced and the client reconnects, or vice versa - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - c.send("claim", nameplate="np1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "claimed") - mailbox_id = m["mailbox"] - self.assertEqual(type(mailbox_id), type("")) - np_row, side_rows = self._nameplate(app, "np1") - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertEqual(claims, [("side", True)]) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("claim", nameplate="np1") # idempotent - m = yield c.next_non_ack() - self.assertEqual(m["type"], "claimed") - self.assertEqual(m["mailbox"], mailbox_id) # mailbox id is stable - np_row, side_rows = self._nameplate(app, "np1") - claims = [(row["side"], row["claimed"]) for row in side_rows] - self.assertEqual(claims, [("side", True)]) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - # we haven't done a claim with this particular connection, but we can - # still send a release as long as we include the nameplate - c.send("release", nameplate="np1") # release-without-claim - m = yield c.next_non_ack() - self.assertEqual(m["type"], "released") - np_row, side_rows = self._nameplate(app, "np1") - self.assertEqual(np_row, None) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - # and the release is idempotent, when done on separate connections - c.send("release", nameplate="np1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "released") - np_row, side_rows = self._nameplate(app, "np1") - self.assertEqual(np_row, None) - c.close() - yield c.d - - - @inlineCallbacks - def test_interrupted_client_nameplate_reclaimed(self): - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - - # a new claim on a previously-closed nameplate is forbidden. We make - # a new nameplate here and manually open a second claim on it, so the - # nameplate stays alive long enough for the code check to happen. - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("claim", nameplate="np2") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "claimed") - app.claim_nameplate("np2", "side2", 0) - c.send("release", nameplate="np2") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "released") - np_row, side_rows = self._nameplate(app, "np2") - claims = sorted([(row["side"], row["claimed"]) for row in side_rows]) - self.assertEqual(claims, [("side", 0), ("side2", 1)]) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("claim", nameplate="np2") # new claim is forbidden - err = yield c.next_non_ack() - self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "reclaimed") - - np_row, side_rows = self._nameplate(app, "np2") - claims = sorted([(row["side"], row["claimed"]) for row in side_rows]) - self.assertEqual(claims, [("side", 0), ("side2", 1)]) - c.close() - yield c.d - - @inlineCallbacks - def test_interrupted_client_mailbox(self): - # a client's interactions with the server might be split over - # multiple sequential WebSocket connections, e.g. when the server is - # bounced and the client reconnects, or vice versa - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - app = self._rendezvous.get_app("appid") - mb1 = app.open_mailbox("mb1", "side2", 0) - mb1.add_message(SidedMessage(side="side2", phase="phase", - body="body", server_rx=0, - msg_id="msgid")) - - c.send("open", mailbox="mb1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - self.assertTrue(mb1.has_listeners()) - c.close() - yield c.d - - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - # open should be idempotent - c.send("open", mailbox="mb1") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "message") - self.assertEqual(m["body"], "body") - mb_row, side_rows = self._mailbox(app, "mb1") - openeds = [(row["side"], row["opened"]) for row in side_rows] - self.assertIn(("side", 1), openeds) # TODO: why 1, and not True? - - # close on the same connection as open is ok - c.send("close", mailbox="mb1", mood="mood") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "closed", m) - mb_row, side_rows = self._mailbox(app, "mb1") - openeds = [(row["side"], row["opened"]) for row in side_rows] - self.assertIn(("side", 0), openeds) - c.close() - yield c.d - - # close (on a separate connection) is idempotent - c = yield self.make_client() - yield c.next_non_ack() - c.send("bind", appid="appid", side="side") - c.send("close", mailbox="mb1", mood="mood") - m = yield c.next_non_ack() - self.assertEqual(m["type"], "closed", m) - mb_row, side_rows = self._mailbox(app, "mb1") - openeds = [(row["side"], row["opened"]) for row in side_rows] - self.assertIn(("side", 0), openeds) - c.close() - yield c.d - - -class Summary(unittest.TestCase): - def test_mailbox(self): - app = rendezvous.AppNamespace(None, None, False, None, True) - # starts at time 1, maybe gets second open at time 3, closes at 5 - def s(rows, pruned=False): - return app._summarize_mailbox(rows, 5, pruned) - - rows = [dict(added=1)] - self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) - rows = [dict(added=1, mood="lonely")] - self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) - rows = [dict(added=1, mood="errory")] - self.assertEqual(s(rows), Usage(1, None, 4, "errory")) - rows = [dict(added=1, mood=None)] - self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney")) - rows = [dict(added=1, mood="happy")] - self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney")) - - rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")] - self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - - rows = [dict(added=1, mood="errory"), dict(added=3, mood="happy")] - self.assertEqual(s(rows), Usage(1, 2, 4, "errory")) - - rows = [dict(added=1, mood="happy"), dict(added=3, mood="errory")] - self.assertEqual(s(rows), Usage(1, 2, 4, "errory")) - - rows = [dict(added=1, mood="scary"), dict(added=3, mood="happy")] - self.assertEqual(s(rows), Usage(1, 2, 4, "scary")) - - rows = [dict(added=1, mood="scary"), dict(added=3, mood="errory")] - self.assertEqual(s(rows), Usage(1, 2, 4, "scary")) - - rows = [dict(added=1, mood="happy"), dict(added=3, mood=None)] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) - rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) - - rows = [dict(added=1), dict(added=3), dict(added=4)] - self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) - - rows = [dict(added=1), dict(added=3), dict(added=4)] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "crowded")) - - def test_nameplate(self): - a = rendezvous.AppNamespace(None, None, False, None, True) - # starts at time 1, maybe gets second open at time 3, closes at 5 - def s(rows, pruned=False): - return a._summarize_nameplate_usage(rows, 5, pruned) - - rows = [dict(added=1)] - self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) - rows = [dict(added=1), dict(added=3)] - self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - - rows = [dict(added=1), dict(added=3)] - self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) - - rows = [dict(added=1), dict(added=3), dict(added=4)] - self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) - - def test_nameplate_disallowed(self): - db = get_db(":memory:") - a = rendezvous.AppNamespace(db, None, False, "some_app_id", False) - a.allocate_nameplate("side1", "123") - self.assertEqual([], a.get_nameplate_ids()) - - def test_nameplate_allowed(self): - db = get_db(":memory:") - a = rendezvous.AppNamespace(db, None, False, "some_app_id", True) - np = a.allocate_nameplate("side1", "321") - self.assertEqual(set([np]), a.get_nameplate_ids()) - - def test_blur(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, 3600, True) - APPID = "appid" - app = rv.get_app(APPID) - app.claim_nameplate("npid", "side1", 10) # start time is 10 - rv.prune_all_apps(now=123, old=50) - # start time should be rounded to top of the hour (blur_usage=3600) - row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() - self.assertEqual(row["started"], 0) - - app = rv.get_app(APPID) - app.open_mailbox("mbid", "side1", 20) # start time is 20 - rv.prune_all_apps(now=123, old=50) - row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() - self.assertEqual(row["started"], 0) - - def test_no_blur(self): - db = get_db(":memory:") - rv = rendezvous.Rendezvous(db, None, None, True) - APPID = "appid" - app = rv.get_app(APPID) - app.claim_nameplate("npid", "side1", 10) # start time is 10 - rv.prune_all_apps(now=123, old=50) - row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() - self.assertEqual(row["started"], 10) - - db.execute("DELETE FROM `mailbox_usage`") - db.commit() - app = rv.get_app(APPID) - app.open_mailbox("mbid", "side1", 20) # start time is 20 - rv.prune_all_apps(now=123, old=50) - row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() - self.assertEqual(row["started"], 20) - -class DumpStats(unittest.TestCase): - def test_nostats(self): - rs = easy_relay() - # with no ._stats_file, this should do nothing - rs.dump_stats(1, 1) - - def test_empty(self): - basedir = self.mktemp() - os.mkdir(basedir) - fn = os.path.join(basedir, "stats.json") - rs = easy_relay(stats_file=fn) - now = 1234 - validity = 500 - rs.dump_stats(now, validity) - with open(fn, "rb") as f: - data_bytes = f.read() - data = json.loads(data_bytes.decode("utf-8")) - self.assertEqual(data["created"], now) - self.assertEqual(data["valid_until"], now+validity) - self.assertEqual(data["rendezvous"]["all_time"]["mailboxes_total"], 0) - - -class Startup(unittest.TestCase): - - @mock.patch('wormhole.server.server.log') - def test_empty(self, fake_log): - rs = easy_relay(allow_list=False) - rs.startService() - try: - logs = '\n'.join([call[1][0] for call in fake_log.mock_calls]) - self.assertTrue( - 'listing of allocated nameplates disallowed' in logs - ) - finally: - rs.stopService() - - -class WebSocketProtocolOptions(unittest.TestCase): - @mock.patch('wormhole.server.server.WebSocketRendezvousFactory') - def test_set(self, fake_factory): - easy_relay( - websocket_protocol_options=[ - ("foo", "bar"), - ] - ) - self.assertEqual( - mock.call().setProtocolOptions(foo="bar"), - fake_factory.mock_calls[1], - )