diff --git a/src/wormhole_transit_relay/cli.py b/src/wormhole_transit_relay/cli.py deleted file mode 100644 index 9596dbd..0000000 --- a/src/wormhole_transit_relay/cli.py +++ /dev/null @@ -1,156 +0,0 @@ -from __future__ import print_function -import json -import click -from ..cli.cli import Config, _compose - -# can put this back in to get this command as "wormhole server" -# instead -#from ..cli.cli import wormhole -#@wormhole.group() -@click.group() -@click.pass_context -def server(ctx): # this is the setuptools entrypoint for bin/wormhole-server - """ - Control a relay server (most users shouldn't need to worry - about this and can use the default server). - """ - # just leaving this pointing to wormhole.cli.cli.Config for now, - # but if we want to keep wormhole-server as a separate command - # should probably have our own Config without all the options the - # server commands don't use - ctx.obj = Config() - -def _validate_websocket_protocol_options(ctx, param, value): - return list(_validate_websocket_protocol_option(option) for option in value) - -def _validate_websocket_protocol_option(option): - try: - key, value = option.split("=", 1) - except ValueError: - raise click.BadParameter("format options as OPTION=VALUE") - - try: - value = json.loads(value) - except: - raise click.BadParameter("could not parse JSON value for {}".format(key)) - - return (key, value) - -LaunchArgs = _compose( - click.option( - "--rendezvous", default="tcp:4000", metavar="tcp:PORT", - help="endpoint specification for the rendezvous port", - ), - click.option( - "--transit", default="tcp:4001", metavar="tcp:PORT", - help="endpoint specification for the transit-relay port", - ), - click.option( - "--advertise-version", metavar="VERSION", - help="version to recommend to clients", - ), - click.option( - "--blur-usage", default=None, type=int, - metavar="SECONDS", - help="round logged access times to improve privacy", - ), - click.option( - "--no-daemon", "-n", is_flag=True, - help="Run in the foreground", - ), - click.option( - "--signal-error", is_flag=True, - help="force all clients to fail with a message", - ), - click.option( - "--allow-list/--disallow-list", default=True, - help="always/never send list of allocated nameplates", - ), - click.option( - "--relay-database-path", default="relay.sqlite", metavar="PATH", - help="location for the relay server state database", - ), - click.option( - "--stats-json-path", default="stats.json", metavar="PATH", - help="location to write the relay stats file", - ), - click.option( - "--websocket-protocol-option", multiple=True, metavar="OPTION=VALUE", - callback=_validate_websocket_protocol_options, - help="a websocket server protocol option to configure", - ), -) - - -@server.command() -@LaunchArgs -@click.pass_obj -def start(cfg, **kwargs): - """ - Start a relay server - """ - for name, value in kwargs.items(): - setattr(cfg, name, value) - from wormhole.server.cmd_server import start_server - start_server(cfg) - - -@server.command() -@LaunchArgs -@click.pass_obj -def restart(cfg, **kwargs): - """ - Re-start a relay server - """ - for name, value in kwargs.items(): - setattr(cfg, name, value) - from wormhole.server.cmd_server import restart_server - restart_server(cfg) - - -@server.command() -@click.pass_obj -def stop(cfg): - """ - Stop a relay server - """ - from wormhole.server.cmd_server import stop_server - stop_server(cfg) - - -@server.command(name="tail-usage") -@click.pass_obj -def tail_usage(cfg): - """ - Follow the latest usage - """ - from wormhole.server.cmd_usage import tail_usage - tail_usage(cfg) - - -@server.command(name='count-channels') -@click.option( - "--json", is_flag=True, -) -@click.pass_obj -def count_channels(cfg, json): - """ - Count active channels - """ - from wormhole.server.cmd_usage import count_channels - cfg.json = json - count_channels(cfg) - - -@server.command(name='count-events') -@click.option( - "--json", is_flag=True, -) -@click.pass_obj -def count_events(cfg, json): - """ - Count events - """ - from wormhole.server.cmd_usage import count_events - cfg.json = json - count_events(cfg) diff --git a/src/wormhole_transit_relay/cmd_server.py b/src/wormhole_transit_relay/cmd_server.py deleted file mode 100644 index 2bbd0a3..0000000 --- a/src/wormhole_transit_relay/cmd_server.py +++ /dev/null @@ -1,73 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, time -from twisted.python import usage -from twisted.scripts import twistd - -class MyPlugin(object): - tapname = "xyznode" - - def __init__(self, args): - self.args = args - - def makeService(self, so): - # delay this import as late as possible, to allow twistd's code to - # accept --reactor= selection - from .server import RelayServer - return RelayServer( - str(self.args.rendezvous), - str(self.args.transit), - self.args.advertise_version, - self.args.relay_database_path, - self.args.blur_usage, - signal_error=self.args.signal_error, - stats_file=self.args.stats_json_path, - allow_list=self.args.allow_list, - ) - -class MyTwistdConfig(twistd.ServerOptions): - subCommands = [("XYZ", None, usage.Options, "node")] - -def start_server(args): - c = MyTwistdConfig() - #twistd_args = tuple(args.twistd_args) + ("XYZ",) - base_args = [] - if args.no_daemon: - base_args.append("--nodaemon") - twistd_args = base_args + ["XYZ"] - c.parseOptions(tuple(twistd_args)) - c.loadedPlugins = {"XYZ": MyPlugin(args)} - - print("starting wormhole relay server") - # this forks and never comes back. The parent calls os._exit(0) - twistd.runApp(c) - -def kill_server(): - try: - f = open("twistd.pid", "r") - except EnvironmentError: - print("Unable to find twistd.pid: is this really a server directory?") - print("oh well, ignoring 'stop'") - return - pid = int(f.read().strip()) - f.close() - os.kill(pid, 15) - print("server process %d sent SIGTERM" % pid) - return - -def stop_server(args): - kill_server() - -def restart_server(args): - kill_server() - time.sleep(0.1) - timeout = 0 - while os.path.exists("twistd.pid") and timeout < 10: - if timeout == 0: - print(" waiting for shutdown..") - timeout += 1 - time.sleep(1) - if os.path.exists("twistd.pid"): - print("error: unable to shut down old server") - return 1 - print(" old server shut down") - start_server(args) diff --git a/src/wormhole_transit_relay/cmd_usage.py b/src/wormhole_transit_relay/cmd_usage.py deleted file mode 100644 index 227a220..0000000 --- a/src/wormhole_transit_relay/cmd_usage.py +++ /dev/null @@ -1,226 +0,0 @@ -from __future__ import print_function, unicode_literals -import os, time, json -from collections import defaultdict -import click -from humanize import naturalsize -from .database import get_db - -def abbrev(t): - if t is None: - return "-" - if t > 1.0: - return "%.3fs" % t - if t > 1e-3: - return "%.1fms" % (t*1e3) - return "%.1fus" % (t*1e6) - - -def print_event(event): - event_type, started, result, total_bytes, waiting_time, total_time = event - followthrough = None - if waiting_time and total_time: - followthrough = total_time - waiting_time - print("%17s: total=%7s wait=%7s ft=%7s size=%s (%s)" % - ("%s-%s" % (event_type, result), - abbrev(total_time), - abbrev(waiting_time), - abbrev(followthrough), - naturalsize(total_bytes), - time.ctime(started), - )) - -def show_usage(args): - print("closed for renovation") - return 0 - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - oldest = None - newest = None - rendezvous_counters = defaultdict(int) - transit_counters = defaultdict(int) - total_transit_bytes = 0 - db = get_db("relay.sqlite") - c = db.execute("SELECT * FROM `usage`" - " ORDER BY `started` ASC LIMIT ?", - (args.n,)) - for row in c.fetchall(): - if row["type"] == "rendezvous": - counters = rendezvous_counters - elif row["type"] == "transit": - counters = transit_counters - total_transit_bytes += row["total_bytes"] - else: - continue - counters["total"] += 1 - counters[row["result"]] += 1 - if oldest is None or row["started"] < oldest: - oldest = row["started"] - if newest is None or row["started"] > newest: - newest = row["started"] - event = (row["type"], row["started"], row["result"], - row["total_bytes"], row["waiting_time"], row["total_time"]) - print_event(event) - if rendezvous_counters["total"] or transit_counters["total"]: - print("---") - print("(most recent started %s ago)" % abbrev(time.time() - newest)) - if rendezvous_counters["total"]: - print("rendezvous events:") - counters = rendezvous_counters - elapsed = time.time() - oldest - total = counters["total"] - print(" %d events in %s (%.2f per hour)" % (total, abbrev(elapsed), - (3600 * total / elapsed))) - print("", ", ".join(["%s=%d (%d%%)" % - (k, counters[k], (100.0 * counters[k] / total)) - for k in sorted(counters) - if k != "total"])) - if transit_counters["total"]: - print("transit events:") - counters = transit_counters - elapsed = time.time() - oldest - total = counters["total"] - print(" %d events in %s (%.2f per hour)" % (total, abbrev(elapsed), - (3600 * total / elapsed))) - rate = total_transit_bytes / elapsed - print(" %s total bytes, %sps" % (naturalsize(total_transit_bytes), - naturalsize(rate))) - print("", ", ".join(["%s=%d (%d%%)" % - (k, counters[k], (100.0 * counters[k] / total)) - for k in sorted(counters) - if k != "total"])) - return 0 - -def tail_usage(args): - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - db = get_db("relay.sqlite") - # we don't seem to have unique row IDs, so this is an inaccurate and - # inefficient hack - seen = set() - try: - while True: - old = time.time() - 2*60*60 - c = db.execute("SELECT * FROM `usage`" - " WHERE `started` > ?" - " ORDER BY `started` ASC", (old,)) - for row in c.fetchall(): - event = (row["type"], row["started"], row["result"], - row["total_bytes"], row["waiting_time"], - row["total_time"]) - if event not in seen: - print_event(event) - seen.add(event) - time.sleep(2) - except KeyboardInterrupt: - return 0 - return 0 - -def count_channels(args): - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - db = get_db("relay.sqlite") - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - OLD = time.time() - 10*60 - def q(query, values=()): - return list(db.execute(query, values).fetchone().values())[0] - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplates`")) - add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is null")) - add("connected nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is not null")) - add("stale nameplates", q("SELECT COUNT() FROM `nameplates`" - " where `updated` < ?", (OLD,))) - - add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`")) - add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is null")) - add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is not null")) - - stale_mailboxes = 0 - for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall(): - newest = db.execute("SELECT `server_rx` FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " ORDER BY `server_rx` DESC LIMIT 1", - (mbox_row["app_id"], mbox_row["id"])).fetchone() - if newest and newest[0] < OLD: - stale_mailboxes += 1 - add("stale mailboxes", stale_mailboxes) - - add("messages", q("SELECT COUNT() FROM `messages`")) - - if args.json: - print(json.dumps(c_dict)) - else: - for (key, value) in c_list: - print(key, value) - return 0 - -def count_events(args): - if not os.path.exists("relay.sqlite"): - raise click.UsageError( - "cannot find relay.sqlite, please run from the server directory" - ) - db = get_db("relay.sqlite") - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - def q(query, values=()): - return list(db.execute(query, values).fetchone().values())[0] - - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`")) - add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='happy'")) - add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='lonely'")) - add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='pruney'")) - add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='crowded'")) - - add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`")) - add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='happy'")) - add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='scary'")) - add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='lonely'")) - add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='errory'")) - add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='pruney'")) - add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='crowded'")) - - add("total transit", q("SELECT COUNT() FROM `transit_usage`")) - add("happy transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='happy'")) - add("lonely transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='lonely'")) - add("errory transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='errory'")) - - add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`")) - - if args.json: - print(json.dumps(c_dict)) - else: - for (key, value) in c_list: - print(key, value) - return 0 diff --git a/src/wormhole_transit_relay/database.py b/src/wormhole_transit_relay/database.py deleted file mode 100644 index eb188e1..0000000 --- a/src/wormhole_transit_relay/database.py +++ /dev/null @@ -1,126 +0,0 @@ -from __future__ import unicode_literals -import os -import sqlite3 -import tempfile -from pkg_resources import resource_string -from twisted.python import log - -class DBError(Exception): - pass - -def get_schema(version): - schema_bytes = resource_string("wormhole.server", - "db-schemas/v%d.sql" % version) - return schema_bytes.decode("utf-8") - -def get_upgrader(new_version): - schema_bytes = resource_string("wormhole.server", - "db-schemas/upgrade-to-v%d.sql" % new_version) - return schema_bytes.decode("utf-8") - -TARGET_VERSION = 3 - -def dict_factory(cursor, row): - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d - -def _initialize_db_schema(db, target_version): - """Creates the application schema in the given database. - """ - log.msg("populating new database with schema v%s" % target_version) - schema = get_schema(target_version) - db.executescript(schema) - db.execute("INSERT INTO version (version) VALUES (?)", - (target_version,)) - db.commit() - -def _initialize_db_connection(db): - """Sets up the db connection object with a row factory and with necessary - foreign key settings. - """ - db.row_factory = dict_factory - db.execute("PRAGMA foreign_keys = ON") - problems = db.execute("PRAGMA foreign_key_check").fetchall() - if problems: - raise DBError("failed foreign key check: %s" % (problems,)) - -def _open_db_connection(dbfile): - """Open a new connection to the SQLite3 database at the given path. - """ - try: - db = sqlite3.connect(dbfile) - except (EnvironmentError, sqlite3.OperationalError) as e: - raise DBError("Unable to create/open db file %s: %s" % (dbfile, e)) - _initialize_db_connection(db) - return db - -def _get_temporary_dbfile(dbfile): - """Get a temporary filename near the given path. - """ - fd, name = tempfile.mkstemp( - prefix=os.path.basename(dbfile) + ".", - dir=os.path.dirname(dbfile) - ) - os.close(fd) - return name - -def _atomic_create_and_initialize_db(dbfile, target_version): - """Create and return a new database, initialized with the application - schema. - - If anything goes wrong, nothing is left at the ``dbfile`` path. - """ - temp_dbfile = _get_temporary_dbfile(dbfile) - db = _open_db_connection(temp_dbfile) - _initialize_db_schema(db, target_version) - db.close() - os.rename(temp_dbfile, dbfile) - return _open_db_connection(dbfile) - -def get_db(dbfile, target_version=TARGET_VERSION): - """Open or create the given db file. The parent directory must exist. - Returns the db connection object, or raises DBError. - """ - if dbfile == ":memory:": - db = _open_db_connection(dbfile) - _initialize_db_schema(db, target_version) - elif os.path.exists(dbfile): - db = _open_db_connection(dbfile) - else: - db = _atomic_create_and_initialize_db(dbfile, target_version) - - try: - version = db.execute("SELECT version FROM version").fetchone()["version"] - except sqlite3.DatabaseError as e: - # this indicates that the file is not a compatible database format. - # Perhaps it was created with an old version, or it might be junk. - raise DBError("db file is unusable: %s" % e) - - while version < target_version: - log.msg(" need to upgrade from %s to %s" % (version, target_version)) - try: - upgrader = get_upgrader(version+1) - except ValueError: # ResourceError?? - log.msg(" unable to upgrade %s to %s" % (version, version+1)) - raise DBError("Unable to upgrade %s to version %s, left at %s" - % (dbfile, version+1, version)) - log.msg(" executing upgrader v%s->v%s" % (version, version+1)) - db.executescript(upgrader) - db.commit() - version = version+1 - - if version != target_version: - raise DBError("Unable to handle db version %s" % version) - - return db - -def dump_db(db): - # to let _iterdump work, we need to restore the original row factory - orig = db.row_factory - try: - db.row_factory = sqlite3.Row - return "".join(db.iterdump()) - finally: - db.row_factory = orig diff --git a/src/wormhole_transit_relay/db-schemas/upgrade-to-v3.sql b/src/wormhole_transit_relay/db-schemas/upgrade-to-v3.sql deleted file mode 100644 index 69bb255..0000000 --- a/src/wormhole_transit_relay/db-schemas/upgrade-to-v3.sql +++ /dev/null @@ -1,68 +0,0 @@ -DROP TABLE `nameplates`; -DROP TABLE `messages`; -DROP TABLE `mailboxes`; - - --- Wormhole codes use a "nameplate": a short name which is only used to --- reference a specific (long-named) mailbox. The codes only use numeric --- nameplates, but the protocol and server allow can use arbitrary strings. -CREATE TABLE `nameplates` -( - `id` INTEGER PRIMARY KEY AUTOINCREMENT, - `app_id` VARCHAR, - `name` VARCHAR, - `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), - `request_id` VARCHAR -- from 'allocate' message, for future deduplication -); -CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); -CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); - -CREATE TABLE `nameplate_sides` -( - `nameplates_id` REFERENCES `nameplates`(`id`), - `claimed` BOOLEAN, -- True after claim(), False after release() - `side` VARCHAR, - `added` INTEGER -- time when this side first claimed the nameplate -); - - --- Clients exchange messages through a "mailbox", which has a long (randomly --- unique) identifier and a queue of messages. --- `id` is randomly-generated and unique across all apps. -CREATE TABLE `mailboxes` -( - `app_id` VARCHAR, - `id` VARCHAR PRIMARY KEY, - `updated` INTEGER, -- time of last activity, used for pruning - `for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone -); -CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); - -CREATE TABLE `mailbox_sides` -( - `mailbox_id` REFERENCES `mailboxes`(`id`), - `opened` BOOLEAN, -- True after open(), False after close() - `side` VARCHAR, - `added` INTEGER, -- time when this side first opened the mailbox - `mood` VARCHAR -); - -CREATE TABLE `messages` -( - `app_id` VARCHAR, - `mailbox_id` VARCHAR, - `side` VARCHAR, - `phase` VARCHAR, -- numeric or string - `body` VARCHAR, - `server_rx` INTEGER, - `msg_id` VARCHAR -); -CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); - -ALTER TABLE `mailbox_usage` ADD COLUMN `for_nameplate` BOOLEAN; -CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`); -CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`); - -DELETE FROM `version`; -INSERT INTO `version` (`version`) VALUES (3); diff --git a/src/wormhole_transit_relay/db-schemas/v2.sql b/src/wormhole_transit_relay/db-schemas/v2.sql deleted file mode 100644 index 3ff6e58..0000000 --- a/src/wormhole_transit_relay/db-schemas/v2.sql +++ /dev/null @@ -1,105 +0,0 @@ - --- note: anything which isn't an boolean, integer, or human-readable unicode --- string, (i.e. binary strings) will be stored as hex - -CREATE TABLE `version` -( - `version` INTEGER -- contains one row, set to 2 -); - - --- Wormhole codes use a "nameplate": a short identifier which is only used to --- reference a specific (long-named) mailbox. The codes only use numeric --- nameplates, but the protocol and server allow can use arbitrary strings. -CREATE TABLE `nameplates` -( - `app_id` VARCHAR, - `id` VARCHAR, - `mailbox_id` VARCHAR, -- really a foreign key - `side1` VARCHAR, -- side name, or NULL - `side2` VARCHAR, -- side name, or NULL - `request_id` VARCHAR, -- from 'allocate' message, for future deduplication - `crowded` BOOLEAN, -- at some point, three or more sides were involved - `updated` INTEGER, -- time of last activity, used for pruning - -- timing data - `started` INTEGER, -- time when nameplace was opened - `second` INTEGER -- time when second side opened -); -CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `id`); -CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`); -CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); - --- Clients exchange messages through a "mailbox", which has a long (randomly --- unique) identifier and a queue of messages. -CREATE TABLE `mailboxes` -( - `app_id` VARCHAR, - `id` VARCHAR, - `side1` VARCHAR, -- side name, or NULL - `side2` VARCHAR, -- side name, or NULL - `crowded` BOOLEAN, -- at some point, three or more sides were involved - `first_mood` VARCHAR, - -- timing data for the mailbox itself - `started` INTEGER, -- time when opened - `second` INTEGER -- time when second side opened -); -CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); - -CREATE TABLE `messages` -( - `app_id` VARCHAR, - `mailbox_id` VARCHAR, - `side` VARCHAR, - `phase` VARCHAR, -- numeric or string - `body` VARCHAR, - `server_rx` INTEGER, - `msg_id` VARCHAR -); -CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); - -CREATE TABLE `nameplate_usage` -( - `app_id` VARCHAR, - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_time` INTEGER, -- seconds from open to last close/prune - `result` VARCHAR -- happy, lonely, pruney, crowded - -- nameplate moods: - -- "happy": two sides open and close - -- "lonely": one side opens and closes (no response from 2nd side) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`); - -CREATE TABLE `mailbox_usage` -( - `app_id` VARCHAR, - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- rendezvous moods: - -- "happy": both sides close with mood=happy - -- "scary": any side closes with mood=scary (bad MAC, probably wrong pw) - -- "lonely": any side closes with mood=lonely (no response from 2nd side) - -- "errory": any side closes with mood=errory (other errors) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`); - -CREATE TABLE `transit_usage` -( - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_bytes` INTEGER, -- total bytes relayed (both directions) - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- transit moods: - -- "errory": one side gave the wrong handshake - -- "lonely": good handshake, but the other side never showed up - -- "happy": both sides gave correct handshake -); -CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`); diff --git a/src/wormhole_transit_relay/db-schemas/v3.sql b/src/wormhole_transit_relay/db-schemas/v3.sql deleted file mode 100644 index e447744..0000000 --- a/src/wormhole_transit_relay/db-schemas/v3.sql +++ /dev/null @@ -1,115 +0,0 @@ - --- note: anything which isn't an boolean, integer, or human-readable unicode --- string, (i.e. binary strings) will be stored as hex - -CREATE TABLE `version` -( - `version` INTEGER -- contains one row, set to 3 -); - - --- Wormhole codes use a "nameplate": a short name which is only used to --- reference a specific (long-named) mailbox. The codes only use numeric --- nameplates, but the protocol and server allow can use arbitrary strings. -CREATE TABLE `nameplates` -( - `id` INTEGER PRIMARY KEY AUTOINCREMENT, - `app_id` VARCHAR, - `name` VARCHAR, - `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), - `request_id` VARCHAR -- from 'allocate' message, for future deduplication -); -CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); -CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); -CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); - -CREATE TABLE `nameplate_sides` -( - `nameplates_id` REFERENCES `nameplates`(`id`), - `claimed` BOOLEAN, -- True after claim(), False after release() - `side` VARCHAR, - `added` INTEGER -- time when this side first claimed the nameplate -); - - --- Clients exchange messages through a "mailbox", which has a long (randomly --- unique) identifier and a queue of messages. --- `id` is randomly-generated and unique across all apps. -CREATE TABLE `mailboxes` -( - `app_id` VARCHAR, - `id` VARCHAR PRIMARY KEY, - `updated` INTEGER, -- time of last activity, used for pruning - `for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone -); -CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); - -CREATE TABLE `mailbox_sides` -( - `mailbox_id` REFERENCES `mailboxes`(`id`), - `opened` BOOLEAN, -- True after open(), False after close() - `side` VARCHAR, - `added` INTEGER, -- time when this side first opened the mailbox - `mood` VARCHAR -); - -CREATE TABLE `messages` -( - `app_id` VARCHAR, - `mailbox_id` VARCHAR, - `side` VARCHAR, - `phase` VARCHAR, -- numeric or string - `body` VARCHAR, - `server_rx` INTEGER, - `msg_id` VARCHAR -); -CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); - -CREATE TABLE `nameplate_usage` -( - `app_id` VARCHAR, - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_time` INTEGER, -- seconds from open to last close/prune - `result` VARCHAR -- happy, lonely, pruney, crowded - -- nameplate moods: - -- "happy": two sides open and close - -- "lonely": one side opens and closes (no response from 2nd side) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`); - -CREATE TABLE `mailbox_usage` -( - `app_id` VARCHAR, - `for_nameplate` BOOLEAN, -- allocated for a nameplate, not standalone - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- rendezvous moods: - -- "happy": both sides close with mood=happy - -- "scary": any side closes with mood=scary (bad MAC, probably wrong pw) - -- "lonely": any side closes with mood=lonely (no response from 2nd side) - -- "errory": any side closes with mood=errory (other errors) - -- "pruney": channels which get pruned for inactivity - -- "crowded": three or more sides were involved -); -CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`); -CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`); - -CREATE TABLE `transit_usage` -( - `started` INTEGER, -- seconds since epoch, rounded to "blur time" - `total_time` INTEGER, -- seconds from open to last close - `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None - `total_bytes` INTEGER, -- total bytes relayed (both directions) - `result` VARCHAR -- happy, scary, lonely, errory, pruney - -- transit moods: - -- "errory": one side gave the wrong handshake - -- "lonely": good handshake, but the other side never showed up - -- "happy": both sides gave correct handshake -); -CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`); -CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`); diff --git a/src/wormhole_transit_relay/server.py b/src/wormhole_transit_relay/server.py deleted file mode 100644 index 0174eca..0000000 --- a/src/wormhole_transit_relay/server.py +++ /dev/null @@ -1,181 +0,0 @@ -# NO unicode_literals or static.Data() will break, because it demands -# a str on Python 2 -from __future__ import print_function -import os, time, json -try: - # 'resource' is unix-only - from resource import getrlimit, setrlimit, RLIMIT_NOFILE -except ImportError: # pragma: nocover - getrlimit, setrlimit, RLIMIT_NOFILE = None, None, None # pragma: nocover -from twisted.python import log -from twisted.internet import reactor, endpoints -from twisted.application import service, internet -from twisted.web import server, static -from twisted.web.resource import Resource -from autobahn.twisted.resource import WebSocketResource -from .database import get_db -from .rendezvous import Rendezvous -from .rendezvous_websocket import WebSocketRendezvousFactory -from .transit_server import Transit - -SECONDS = 1.0 -MINUTE = 60*SECONDS - -CHANNEL_EXPIRATION_TIME = 11*MINUTE -EXPIRATION_CHECK_PERIOD = 10*MINUTE - -class Root(Resource): - # child_FOO is a nevow thing, not a twisted.web.resource thing - def __init__(self): - Resource.__init__(self) - self.putChild(b"", static.Data(b"Wormhole Relay\n", "text/plain")) - -class PrivacyEnhancedSite(server.Site): - logRequests = True - def log(self, request): - if self.logRequests: - return server.Site.log(self, request) - -class RelayServer(service.MultiService): - - def __init__(self, rendezvous_web_port, transit_port, - advertise_version, db_url=":memory:", blur_usage=None, - signal_error=None, stats_file=None, allow_list=True, - websocket_protocol_options=()): - service.MultiService.__init__(self) - self._blur_usage = blur_usage - self._allow_list = allow_list - self._db_url = db_url - - db = get_db(db_url) - welcome = { - # adding .motd will cause all clients to display the message, - # then keep running normally - #"motd": "Welcome to the public relay.\nPlease enjoy this service.", - - # adding .error will cause all clients to fail, with this message - #"error": "This server has been disabled, see URL for details.", - } - - if advertise_version: - # The primary (python CLI) implementation will emit a message if - # its version does not match this key. If/when we have - # distributions which include older version, but we still expect - # them to be compatible, stop sending this key. - welcome["current_cli_version"] = advertise_version - if signal_error: - welcome["error"] = signal_error - - self._rendezvous = Rendezvous(db, welcome, blur_usage, self._allow_list) - self._rendezvous.setServiceParent(self) # for the pruning timer - - root = Root() - wsrf = WebSocketRendezvousFactory(None, self._rendezvous) - _set_options(websocket_protocol_options, wsrf) - root.putChild(b"v1", WebSocketResource(wsrf)) - - site = PrivacyEnhancedSite(root) - if blur_usage: - site.logRequests = False - - r = endpoints.serverFromString(reactor, rendezvous_web_port) - rendezvous_web_service = internet.StreamServerEndpointService(r, site) - rendezvous_web_service.setServiceParent(self) - - if transit_port: - transit = Transit(db, blur_usage) - transit.setServiceParent(self) # for the timer - t = endpoints.serverFromString(reactor, transit_port) - transit_service = internet.StreamServerEndpointService(t, transit) - transit_service.setServiceParent(self) - - self._stats_file = stats_file - if self._stats_file and os.path.exists(self._stats_file): - os.unlink(self._stats_file) - # this will be regenerated immediately, but if something goes - # wrong in dump_stats(), it's better to have a missing file than - # a stale one - t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer) - t.setServiceParent(self) - - # make some things accessible for tests - self._db = db - self._root = root - self._rendezvous_web_service = rendezvous_web_service - self._rendezvous_websocket = wsrf - self._transit = None - if transit_port: - self._transit = transit - self._transit_service = transit_service - - def increase_rlimits(self): - if getrlimit is None: - log.msg("unable to import 'resource', leaving rlimit alone") - return - soft, hard = getrlimit(RLIMIT_NOFILE) - if soft >= 10000: - log.msg("RLIMIT_NOFILE.soft was %d, leaving it alone" % soft) - return - # OS-X defaults to soft=7168, and reports a huge number for 'hard', - # but won't accept anything more than soft=10240, so we can't just - # set soft=hard. Linux returns (1024, 1048576) and is fine with - # soft=hard. Cygwin is reported to return (256,-1) and accepts up to - # soft=3200. So we try multiple values until something works. - for newlimit in [hard, 10000, 3200, 1024]: - log.msg("changing RLIMIT_NOFILE from (%s,%s) to (%s,%s)" % - (soft, hard, newlimit, hard)) - try: - setrlimit(RLIMIT_NOFILE, (newlimit, hard)) - log.msg("setrlimit successful") - return - except ValueError as e: - log.msg("error during setrlimit: %s" % e) - continue - except: - log.msg("other error during setrlimit, leaving it alone") - log.err() - return - log.msg("unable to change rlimit, leaving it alone") - - def startService(self): - service.MultiService.startService(self) - self.increase_rlimits() - log.msg("websocket listening on /wormhole-relay/ws") - log.msg("Wormhole relay server (Rendezvous and Transit) running") - if self._blur_usage: - log.msg("blurring access times to %d seconds" % self._blur_usage) - log.msg("not logging HTTP requests or Transit connections") - else: - log.msg("not blurring access times") - if not self._allow_list: - log.msg("listing of allocated nameplates disallowed") - - def timer(self): - now = time.time() - old = now - CHANNEL_EXPIRATION_TIME - self._rendezvous.prune_all_apps(now, old) - self.dump_stats(now, validity=EXPIRATION_CHECK_PERIOD+60) - - def dump_stats(self, now, validity): - if not self._stats_file: - return - tmpfn = self._stats_file + ".tmp" - - data = {} - data["created"] = now - data["valid_until"] = now + validity - - start = time.time() - data["rendezvous"] = self._rendezvous.get_stats() - data["transit"] = self._transit.get_stats() - log.msg("get_stats took:", time.time() - start) - - with open(tmpfn, "wb") as f: - # json.dump(f) has str-vs-unicode issues on py2-vs-py3 - f.write(json.dumps(data, indent=1).encode("utf-8")) - f.write(b"\n") - os.rename(tmpfn, self._stats_file) - - -def _set_options(options, factory): - factory.setProtocolOptions(**dict(options)) diff --git a/src/wormhole_transit_relay/test/test_database.py b/src/wormhole_transit_relay/test/test_database.py deleted file mode 100644 index 4ebc2cb..0000000 --- a/src/wormhole_transit_relay/test/test_database.py +++ /dev/null @@ -1,61 +0,0 @@ -from __future__ import print_function, unicode_literals -import os -from twisted.python import filepath -from twisted.trial import unittest -from ..server import database -from ..server.database import get_db, TARGET_VERSION, dump_db - -class DB(unittest.TestCase): - def test_create_default(self): - db_url = ":memory:" - db = get_db(db_url) - rows = db.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], TARGET_VERSION) - - def test_failed_create_allows_subsequent_create(self): - patch = self.patch(database, "get_schema", lambda version: b"this is a broken schema") - dbfile = filepath.FilePath(self.mktemp()) - self.assertRaises(Exception, lambda: get_db(dbfile.path)) - patch.restore() - get_db(dbfile.path) - - def test_upgrade(self): - basedir = self.mktemp() - os.mkdir(basedir) - fn = os.path.join(basedir, "upgrade.db") - self.assertNotEqual(TARGET_VERSION, 2) - - # create an old-version DB in a file - db = get_db(fn, 2) - rows = db.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], 2) - del db - - # then upgrade the file to the latest version - dbA = get_db(fn, TARGET_VERSION) - rows = dbA.execute("SELECT * FROM version").fetchall() - self.assertEqual(len(rows), 1) - self.assertEqual(rows[0]["version"], TARGET_VERSION) - dbA_text = dump_db(dbA) - del dbA - - # make sure the upgrades got committed to disk - dbB = get_db(fn, TARGET_VERSION) - dbB_text = dump_db(dbB) - del dbB - self.assertEqual(dbA_text, dbB_text) - - # The upgraded schema should be equivalent to that of a new DB. - # However a text dump will differ because ALTER TABLE always appends - # the new column to the end of a table, whereas our schema puts it - # somewhere in the middle (wherever it fits naturally). Also ALTER - # TABLE doesn't include comments. - if False: - latest_db = get_db(":memory:", TARGET_VERSION) - latest_text = dump_db(latest_db) - with open("up.sql","w") as f: f.write(dbA_text) - with open("new.sql","w") as f: f.write(latest_text) - # check with "diff -u _trial_temp/up.sql _trial_temp/new.sql" - self.assertEqual(dbA_text, latest_text)