Merge branch 'remove-rendezvous'

Remove the mailbox/rendezvous server code (and tests), since that now lives
in the magic-wormhole-mailbox-server repository. Also remove the
munin (monitoring) plugins and the man page.

We declare magic-wormhole-mailbox-server as a dependency for the "[dev]"
extra (i.e. only for tests). A few tests needed updating to handle the new
structure, but this was mostly confined to test/common.py .

refs #240
This commit is contained in:
Brian Warner 2018-02-21 15:00:31 -08:00
commit 7e22949b70
24 changed files with 76 additions and 3866 deletions

View File

@ -2,12 +2,7 @@ include versioneer.py
include src/wormhole/_version.py include src/wormhole/_version.py
include LICENSE README.md NEWS.md include LICENSE README.md NEWS.md
recursive-include docs *.md *.rst *.dot recursive-include docs *.md *.rst *.dot
include docs/wormhole.1 docs/wormhole-server.8 include docs/wormhole.1
include .coveragerc tox.ini snapcraft.yaml include .coveragerc tox.ini snapcraft.yaml
include misc/windows-build.cmd include misc/windows-build.cmd
include misc/*.py misc/web/*.html misc/web/*.js misc/web/*.css include misc/*.py misc/web/*.html misc/web/*.js misc/web/*.css
include misc/munin/wormhole_active
include misc/munin/wormhole_errors
include misc/munin/wormhole_event_rate
include misc/munin/wormhole_events
include misc/munin/wormhole_events_alltime

View File

@ -1,41 +0,0 @@
.TH WORMHOLE-SERVER "8" "July 2016"
.SH NAME
wormhole-server \- Securely and simply transfer data between computers
.SH SYNOPSIS
.B wormhole-server
[\fI\,OPTIONS\/\fR] \fI\,COMMAND \/\fR[\fI\,ARGS\/\fR]...
.SH DESCRIPTION
.IP
Control a relay server (most users shouldn't need to worry about this and
can use the default server).
.SH OPTIONS
.TP
\fB\-\-help\fR
Show this message and exit.
.SS "Commands:"
.TP
count\-channels
Count active channels
.TP
count\-events
Count events
.TP
restart
Re\-start a relay server
.TP
start
Start a relay server
.TP
stop
Stop a relay server
.TP
tail\-usage
Follow the latest usage
.SH SEE ALSO
.BR wormhole (1)
.SH AUTHORS
Brian Warner <warner-magic-wormhole@lothar.com>
.PP
This manual was written by Jameson Rollins
<jrollins@finestructure.net> for the Debian project (and may be used
by others).

View File

@ -1,41 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Active Channels
graph_vlabel Channels
graph_category network
nameplates.label Nameplates
nameplates.draw LINE2
nameplates.type GAUGE
mailboxes.label Mailboxes
mailboxes.draw LINE2
mailboxes.type GAUGE
messages.label Messages
messages.draw LINE1
messages.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
ra = data["rendezvous"]["active"]
print "nameplates.value", ra["nameplates_total"]
print "mailboxes.value", ra["mailboxes_total"]
print "messages.value", ra["messages_total"]

View File

@ -1,43 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Server Errors
graph_vlabel Events Since Reboot
graph_category network
nameplates.label Nameplate Errors (total)
nameplates.draw LINE1
nameplates.type GAUGE
mailboxes.label Mailboxes (total)
mailboxes.draw LINE1
mailboxes.type GAUGE
mailboxes_scary.label Mailboxes (scary)
mailboxes_scary.draw LINE1
mailboxes_scary.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
r = data["rendezvous"]["since_reboot"]
print "nameplates.value", (r["nameplates_total"]
- r["nameplate_moods"].get("happy", 0))
print "mailboxes.value", (r["mailboxes_total"]
- r["mailbox_moods"].get("happy", 0))
print "mailboxes_scary.value", r["mailbox_moods"].get("scary", 0)

View File

@ -1,50 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Server Events
graph_vlabel Events per Hour
graph_category network
happy.label Happy
happy.draw LINE
happy.type DERIVE
happy.min 0
happy.max 60
happy.cdef happy,3600,*
incomplete.label Incomplete
incomplete.draw LINE
incomplete.type DERIVE
incomplete.min 0
incomplete.max 60
incomplete.cdef incomplete,3600,*
scary.label Scary
scary.draw LINE
scary.type DERIVE
scary.min 0
scary.max 60
scary.cdef scary,3600,*
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
atm = data["rendezvous"]["all_time"]["mailbox_moods"]
print "happy.value", atm.get("happy", 0)
print "incomplete.value", (atm.get("pruney", 0) + atm.get("lonely", 0))
print "scary.value", atm.get("scary", 0)

View File

@ -1,53 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Mailbox Events (since reboot)
graph_vlabel Events Since Reboot
graph_category network
happy.label Happy
happy.draw LINE2
happy.type GAUGE
total.label Total
total.draw LINE1
total.type GAUGE
scary.label Scary
scary.draw LINE2
scary.type GAUGE
pruney.label Pruney
pruney.draw LINE1
pruney.type GAUGE
lonely.label Lonely
lonely.draw LINE2
lonely.type GAUGE
errory.label Errory
errory.draw LINE1
errory.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
r = data["rendezvous"]["since_reboot"]
print "happy.value", r["mailbox_moods"].get("happy", 0)
print "total.value", r["mailboxes_total"]
print "scary.value", r["mailbox_moods"].get("scary", 0)
print "pruney.value", r["mailbox_moods"].get("pruney", 0)
print "lonely.value", r["mailbox_moods"].get("lonely", 0)
print "errory.value", r["mailbox_moods"].get("errory", 0)

View File

@ -1,53 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Mailbox Events (all time)
graph_vlabel Events Since DB Creation
graph_category network
happy.label Happy
happy.draw LINE2
happy.type GAUGE
total.label Total
total.draw LINE1
total.type GAUGE
scary.label Scary
scary.draw LINE2
scary.type GAUGE
pruney.label Pruney
pruney.draw LINE1
pruney.type GAUGE
lonely.label Lonely
lonely.draw LINE2
lonely.type GAUGE
errory.label Errory
errory.draw LINE1
errory.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
r = data["rendezvous"]["all_time"]
print "happy.value", r["mailbox_moods"].get("happy", 0)
print "total.value", r["mailboxes_total"]
print "scary.value", r["mailbox_moods"].get("scary", 0)
print "pruney.value", r["mailbox_moods"].get("pruney", 0)
print "lonely.value", r["mailbox_moods"].get("lonely", 0)
print "errory.value", r["mailbox_moods"].get("errory", 0)

View File

@ -14,15 +14,12 @@ setup(name="magic-wormhole",
package_dir={"": "src"}, package_dir={"": "src"},
packages=["wormhole", packages=["wormhole",
"wormhole.cli", "wormhole.cli",
"wormhole.server",
"wormhole.test", "wormhole.test",
], ],
package_data={"wormhole.server": ["db-schemas/*.sql"]},
entry_points={ entry_points={
"console_scripts": "console_scripts":
[ [
"wormhole = wormhole.cli.cli:wormhole", "wormhole = wormhole.cli.cli:wormhole",
"wormhole-server = wormhole.server.cli:server",
] ]
}, },
install_requires=[ install_requires=[
@ -41,7 +38,8 @@ setup(name="magic-wormhole",
extras_require={ extras_require={
':sys_platform=="win32"': ["pypiwin32"], ':sys_platform=="win32"': ["pypiwin32"],
"dev": ["mock", "tox", "pyflakes", "dev": ["mock", "tox", "pyflakes",
"magic-wormhole-transit-relay==0.1.1"], "magic-wormhole-transit-relay==0.1.1",
"magic-wormhole-mailbox-server==0.1.0"],
}, },
test_suite="wormhole.test", test_suite="wormhole.test",
cmdclass=commands, cmdclass=commands,

View File

@ -1,8 +0,0 @@
if __name__ != "__main__":
raise ImportError('this module should not be imported')
from wormhole.server import cli
cli.server()

View File

@ -1,152 +0,0 @@
from __future__ import print_function
import json
import click
from ..cli.cli import Config, _compose
# can put this back in to get this command as "wormhole server"
# instead
#from ..cli.cli import wormhole
#@wormhole.group()
@click.group()
@click.pass_context
def server(ctx): # this is the setuptools entrypoint for bin/wormhole-server
"""
Control a relay server (most users shouldn't need to worry
about this and can use the default server).
"""
# just leaving this pointing to wormhole.cli.cli.Config for now,
# but if we want to keep wormhole-server as a separate command
# should probably have our own Config without all the options the
# server commands don't use
ctx.obj = Config()
def _validate_websocket_protocol_options(ctx, param, value):
return list(_validate_websocket_protocol_option(option) for option in value)
def _validate_websocket_protocol_option(option):
try:
key, value = option.split("=", 1)
except ValueError:
raise click.BadParameter("format options as OPTION=VALUE")
try:
value = json.loads(value)
except:
raise click.BadParameter("could not parse JSON value for {}".format(key))
return (key, value)
LaunchArgs = _compose(
click.option(
"--rendezvous", default="tcp:4000", metavar="tcp:PORT",
help="endpoint specification for the rendezvous port",
),
click.option(
"--advertise-version", metavar="VERSION",
help="version to recommend to clients",
),
click.option(
"--blur-usage", default=None, type=int,
metavar="SECONDS",
help="round logged access times to improve privacy",
),
click.option(
"--no-daemon", "-n", is_flag=True,
help="Run in the foreground",
),
click.option(
"--signal-error", is_flag=True,
help="force all clients to fail with a message",
),
click.option(
"--allow-list/--disallow-list", default=True,
help="always/never send list of allocated nameplates",
),
click.option(
"--relay-database-path", default="relay.sqlite", metavar="PATH",
help="location for the relay server state database",
),
click.option(
"--stats-json-path", default="stats.json", metavar="PATH",
help="location to write the relay stats file",
),
click.option(
"--websocket-protocol-option", multiple=True, metavar="OPTION=VALUE",
callback=_validate_websocket_protocol_options,
help="a websocket server protocol option to configure",
),
)
@server.command()
@LaunchArgs
@click.pass_obj
def start(cfg, **kwargs):
"""
Start a relay server
"""
for name, value in kwargs.items():
setattr(cfg, name, value)
from wormhole.server.cmd_server import start_server
start_server(cfg)
@server.command()
@LaunchArgs
@click.pass_obj
def restart(cfg, **kwargs):
"""
Re-start a relay server
"""
for name, value in kwargs.items():
setattr(cfg, name, value)
from wormhole.server.cmd_server import restart_server
restart_server(cfg)
@server.command()
@click.pass_obj
def stop(cfg):
"""
Stop a relay server
"""
from wormhole.server.cmd_server import stop_server
stop_server(cfg)
@server.command(name="tail-usage")
@click.pass_obj
def tail_usage(cfg):
"""
Follow the latest usage
"""
from wormhole.server.cmd_usage import tail_usage
tail_usage(cfg)
@server.command(name='count-channels')
@click.option(
"--json", is_flag=True,
)
@click.pass_obj
def count_channels(cfg, json):
"""
Count active channels
"""
from wormhole.server.cmd_usage import count_channels
cfg.json = json
count_channels(cfg)
@server.command(name='count-events')
@click.option(
"--json", is_flag=True,
)
@click.pass_obj
def count_events(cfg, json):
"""
Count events
"""
from wormhole.server.cmd_usage import count_events
cfg.json = json
count_events(cfg)

View File

@ -1,72 +0,0 @@
from __future__ import print_function, unicode_literals
import os, time
from twisted.python import usage
from twisted.scripts import twistd
class MyPlugin(object):
tapname = "xyznode"
def __init__(self, args):
self.args = args
def makeService(self, so):
# delay this import as late as possible, to allow twistd's code to
# accept --reactor= selection
from .server import RelayServer
return RelayServer(
str(self.args.rendezvous),
self.args.advertise_version,
self.args.relay_database_path,
self.args.blur_usage,
signal_error=self.args.signal_error,
stats_file=self.args.stats_json_path,
allow_list=self.args.allow_list,
)
class MyTwistdConfig(twistd.ServerOptions):
subCommands = [("XYZ", None, usage.Options, "node")]
def start_server(args):
c = MyTwistdConfig()
#twistd_args = tuple(args.twistd_args) + ("XYZ",)
base_args = []
if args.no_daemon:
base_args.append("--nodaemon")
twistd_args = base_args + ["XYZ"]
c.parseOptions(tuple(twistd_args))
c.loadedPlugins = {"XYZ": MyPlugin(args)}
print("starting wormhole relay server")
# this forks and never comes back. The parent calls os._exit(0)
twistd.runApp(c)
def kill_server():
try:
f = open("twistd.pid", "r")
except EnvironmentError:
print("Unable to find twistd.pid: is this really a server directory?")
print("oh well, ignoring 'stop'")
return
pid = int(f.read().strip())
f.close()
os.kill(pid, 15)
print("server process %d sent SIGTERM" % pid)
return
def stop_server(args):
kill_server()
def restart_server(args):
kill_server()
time.sleep(0.1)
timeout = 0
while os.path.exists("twistd.pid") and timeout < 10:
if timeout == 0:
print(" waiting for shutdown..")
timeout += 1
time.sleep(1)
if os.path.exists("twistd.pid"):
print("error: unable to shut down old server")
return 1
print(" old server shut down")
start_server(args)

View File

@ -1,165 +0,0 @@
from __future__ import print_function, unicode_literals
import os, time, json
import click
from humanize import naturalsize
from .database import get_db
def abbrev(t):
if t is None:
return "-"
if t > 1.0:
return "%.3fs" % t
if t > 1e-3:
return "%.1fms" % (t*1e3)
return "%.1fus" % (t*1e6)
def print_event(event):
event_type, started, result, total_bytes, waiting_time, total_time = event
followthrough = None
if waiting_time and total_time:
followthrough = total_time - waiting_time
print("%17s: total=%7s wait=%7s ft=%7s size=%s (%s)" %
("%s-%s" % (event_type, result),
abbrev(total_time),
abbrev(waiting_time),
abbrev(followthrough),
naturalsize(total_bytes),
time.ctime(started),
))
def show_usage(args):
print("closed for renovation")
return 0
def tail_usage(args):
if not os.path.exists("relay.sqlite"):
raise click.UsageError(
"cannot find relay.sqlite, please run from the server directory"
)
db = get_db("relay.sqlite")
# we don't seem to have unique row IDs, so this is an inaccurate and
# inefficient hack
seen = set()
try:
while True:
old = time.time() - 2*60*60
c = db.execute("SELECT * FROM `usage`"
" WHERE `started` > ?"
" ORDER BY `started` ASC", (old,))
for row in c.fetchall():
event = (row["type"], row["started"], row["result"],
row["total_bytes"], row["waiting_time"],
row["total_time"])
if event not in seen:
print_event(event)
seen.add(event)
time.sleep(2)
except KeyboardInterrupt:
return 0
def count_channels(args):
if not os.path.exists("relay.sqlite"):
raise click.UsageError(
"cannot find relay.sqlite, please run from the server directory"
)
db = get_db("relay.sqlite")
c_list = []
c_dict = {}
def add(key, value):
c_list.append((key, value))
c_dict[key] = value
OLD = time.time() - 10*60
def q(query, values=()):
return list(db.execute(query, values).fetchone().values())[0]
add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`"))
add("total nameplates", q("SELECT COUNT() FROM `nameplates`"))
add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`"
" WHERE `second` is null"))
add("connected nameplates", q("SELECT COUNT() FROM `nameplates`"
" WHERE `second` is not null"))
add("stale nameplates", q("SELECT COUNT() FROM `nameplates`"
" where `updated` < ?", (OLD,)))
add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`"))
add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`"
" WHERE `second` is null"))
add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`"
" WHERE `second` is not null"))
stale_mailboxes = 0
for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall():
newest = db.execute("SELECT `server_rx` FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` DESC LIMIT 1",
(mbox_row["app_id"], mbox_row["id"])).fetchone()
if newest and newest[0] < OLD:
stale_mailboxes += 1
add("stale mailboxes", stale_mailboxes)
add("messages", q("SELECT COUNT() FROM `messages`"))
if args.json:
print(json.dumps(c_dict))
else:
for (key, value) in c_list:
print(key, value)
return 0
def count_events(args):
if not os.path.exists("relay.sqlite"):
raise click.UsageError(
"cannot find relay.sqlite, please run from the server directory"
)
db = get_db("relay.sqlite")
c_list = []
c_dict = {}
def add(key, value):
c_list.append((key, value))
c_dict[key] = value
def q(query, values=()):
return list(db.execute(query, values).fetchone().values())[0]
add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`"))
add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`"))
add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='happy'"))
add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='lonely'"))
add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='pruney'"))
add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='crowded'"))
add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"))
add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='happy'"))
add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='scary'"))
add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='lonely'"))
add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='errory'"))
add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='pruney'"))
add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='crowded'"))
add("total transit", q("SELECT COUNT() FROM `transit_usage`"))
add("happy transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='happy'"))
add("lonely transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='lonely'"))
add("errory transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='errory'"))
add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`"))
if args.json:
print(json.dumps(c_dict))
else:
for (key, value) in c_list:
print(key, value)
return 0

View File

@ -1,126 +0,0 @@
from __future__ import unicode_literals
import os
import sqlite3
import tempfile
from pkg_resources import resource_string
from twisted.python import log
class DBError(Exception):
pass
def get_schema(version):
schema_bytes = resource_string("wormhole.server",
"db-schemas/v%d.sql" % version)
return schema_bytes.decode("utf-8")
def get_upgrader(new_version):
schema_bytes = resource_string("wormhole.server",
"db-schemas/upgrade-to-v%d.sql" % new_version)
return schema_bytes.decode("utf-8")
TARGET_VERSION = 3
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def _initialize_db_schema(db, target_version):
"""Creates the application schema in the given database.
"""
log.msg("populating new database with schema v%s" % target_version)
schema = get_schema(target_version)
db.executescript(schema)
db.execute("INSERT INTO version (version) VALUES (?)",
(target_version,))
db.commit()
def _initialize_db_connection(db):
"""Sets up the db connection object with a row factory and with necessary
foreign key settings.
"""
db.row_factory = dict_factory
db.execute("PRAGMA foreign_keys = ON")
problems = db.execute("PRAGMA foreign_key_check").fetchall()
if problems:
raise DBError("failed foreign key check: %s" % (problems,))
def _open_db_connection(dbfile):
"""Open a new connection to the SQLite3 database at the given path.
"""
try:
db = sqlite3.connect(dbfile)
except (EnvironmentError, sqlite3.OperationalError) as e:
raise DBError("Unable to create/open db file %s: %s" % (dbfile, e))
_initialize_db_connection(db)
return db
def _get_temporary_dbfile(dbfile):
"""Get a temporary filename near the given path.
"""
fd, name = tempfile.mkstemp(
prefix=os.path.basename(dbfile) + ".",
dir=os.path.dirname(dbfile)
)
os.close(fd)
return name
def _atomic_create_and_initialize_db(dbfile, target_version):
"""Create and return a new database, initialized with the application
schema.
If anything goes wrong, nothing is left at the ``dbfile`` path.
"""
temp_dbfile = _get_temporary_dbfile(dbfile)
db = _open_db_connection(temp_dbfile)
_initialize_db_schema(db, target_version)
db.close()
os.rename(temp_dbfile, dbfile)
return _open_db_connection(dbfile)
def get_db(dbfile, target_version=TARGET_VERSION):
"""Open or create the given db file. The parent directory must exist.
Returns the db connection object, or raises DBError.
"""
if dbfile == ":memory:":
db = _open_db_connection(dbfile)
_initialize_db_schema(db, target_version)
elif os.path.exists(dbfile):
db = _open_db_connection(dbfile)
else:
db = _atomic_create_and_initialize_db(dbfile, target_version)
try:
version = db.execute("SELECT version FROM version").fetchone()["version"]
except sqlite3.DatabaseError as e:
# this indicates that the file is not a compatible database format.
# Perhaps it was created with an old version, or it might be junk.
raise DBError("db file is unusable: %s" % e)
while version < target_version:
log.msg(" need to upgrade from %s to %s" % (version, target_version))
try:
upgrader = get_upgrader(version+1)
except ValueError: # ResourceError??
log.msg(" unable to upgrade %s to %s" % (version, version+1))
raise DBError("Unable to upgrade %s to version %s, left at %s"
% (dbfile, version+1, version))
log.msg(" executing upgrader v%s->v%s" % (version, version+1))
db.executescript(upgrader)
db.commit()
version = version+1
if version != target_version:
raise DBError("Unable to handle db version %s" % version)
return db
def dump_db(db):
# to let _iterdump work, we need to restore the original row factory
orig = db.row_factory
try:
db.row_factory = sqlite3.Row
return "".join(db.iterdump())
finally:
db.row_factory = orig

View File

@ -1,68 +0,0 @@
DROP TABLE `nameplates`;
DROP TABLE `messages`;
DROP TABLE `mailboxes`;
-- Wormhole codes use a "nameplate": a short name which is only used to
-- reference a specific (long-named) mailbox. The codes only use numeric
-- nameplates, but the protocol and server allow can use arbitrary strings.
CREATE TABLE `nameplates`
(
`id` INTEGER PRIMARY KEY AUTOINCREMENT,
`app_id` VARCHAR,
`name` VARCHAR,
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
`request_id` VARCHAR -- from 'allocate' message, for future deduplication
);
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
CREATE TABLE `nameplate_sides`
(
`nameplates_id` REFERENCES `nameplates`(`id`),
`claimed` BOOLEAN, -- True after claim(), False after release()
`side` VARCHAR,
`added` INTEGER -- time when this side first claimed the nameplate
);
-- Clients exchange messages through a "mailbox", which has a long (randomly
-- unique) identifier and a queue of messages.
-- `id` is randomly-generated and unique across all apps.
CREATE TABLE `mailboxes`
(
`app_id` VARCHAR,
`id` VARCHAR PRIMARY KEY,
`updated` INTEGER, -- time of last activity, used for pruning
`for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone
);
CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`);
CREATE TABLE `mailbox_sides`
(
`mailbox_id` REFERENCES `mailboxes`(`id`),
`opened` BOOLEAN, -- True after open(), False after close()
`side` VARCHAR,
`added` INTEGER, -- time when this side first opened the mailbox
`mood` VARCHAR
);
CREATE TABLE `messages`
(
`app_id` VARCHAR,
`mailbox_id` VARCHAR,
`side` VARCHAR,
`phase` VARCHAR, -- numeric or string
`body` VARCHAR,
`server_rx` INTEGER,
`msg_id` VARCHAR
);
CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`);
ALTER TABLE `mailbox_usage` ADD COLUMN `for_nameplate` BOOLEAN;
CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`);
CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`);
DELETE FROM `version`;
INSERT INTO `version` (`version`) VALUES (3);

View File

@ -1,105 +0,0 @@
-- note: anything which isn't an boolean, integer, or human-readable unicode
-- string, (i.e. binary strings) will be stored as hex
CREATE TABLE `version`
(
`version` INTEGER -- contains one row, set to 2
);
-- Wormhole codes use a "nameplate": a short identifier which is only used to
-- reference a specific (long-named) mailbox. The codes only use numeric
-- nameplates, but the protocol and server allow can use arbitrary strings.
CREATE TABLE `nameplates`
(
`app_id` VARCHAR,
`id` VARCHAR,
`mailbox_id` VARCHAR, -- really a foreign key
`side1` VARCHAR, -- side name, or NULL
`side2` VARCHAR, -- side name, or NULL
`request_id` VARCHAR, -- from 'allocate' message, for future deduplication
`crowded` BOOLEAN, -- at some point, three or more sides were involved
`updated` INTEGER, -- time of last activity, used for pruning
-- timing data
`started` INTEGER, -- time when nameplace was opened
`second` INTEGER -- time when second side opened
);
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `id`);
CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
-- Clients exchange messages through a "mailbox", which has a long (randomly
-- unique) identifier and a queue of messages.
CREATE TABLE `mailboxes`
(
`app_id` VARCHAR,
`id` VARCHAR,
`side1` VARCHAR, -- side name, or NULL
`side2` VARCHAR, -- side name, or NULL
`crowded` BOOLEAN, -- at some point, three or more sides were involved
`first_mood` VARCHAR,
-- timing data for the mailbox itself
`started` INTEGER, -- time when opened
`second` INTEGER -- time when second side opened
);
CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`);
CREATE TABLE `messages`
(
`app_id` VARCHAR,
`mailbox_id` VARCHAR,
`side` VARCHAR,
`phase` VARCHAR, -- numeric or string
`body` VARCHAR,
`server_rx` INTEGER,
`msg_id` VARCHAR
);
CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`);
CREATE TABLE `nameplate_usage`
(
`app_id` VARCHAR,
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_time` INTEGER, -- seconds from open to last close/prune
`result` VARCHAR -- happy, lonely, pruney, crowded
-- nameplate moods:
-- "happy": two sides open and close
-- "lonely": one side opens and closes (no response from 2nd side)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
);
CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`);
CREATE TABLE `mailbox_usage`
(
`app_id` VARCHAR,
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- rendezvous moods:
-- "happy": both sides close with mood=happy
-- "scary": any side closes with mood=scary (bad MAC, probably wrong pw)
-- "lonely": any side closes with mood=lonely (no response from 2nd side)
-- "errory": any side closes with mood=errory (other errors)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
);
CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`);
CREATE TABLE `transit_usage`
(
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_bytes` INTEGER, -- total bytes relayed (both directions)
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- transit moods:
-- "errory": one side gave the wrong handshake
-- "lonely": good handshake, but the other side never showed up
-- "happy": both sides gave correct handshake
);
CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`);

View File

@ -1,115 +0,0 @@
-- note: anything which isn't an boolean, integer, or human-readable unicode
-- string, (i.e. binary strings) will be stored as hex
CREATE TABLE `version`
(
`version` INTEGER -- contains one row, set to 3
);
-- Wormhole codes use a "nameplate": a short name which is only used to
-- reference a specific (long-named) mailbox. The codes only use numeric
-- nameplates, but the protocol and server allow can use arbitrary strings.
CREATE TABLE `nameplates`
(
`id` INTEGER PRIMARY KEY AUTOINCREMENT,
`app_id` VARCHAR,
`name` VARCHAR,
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
`request_id` VARCHAR -- from 'allocate' message, for future deduplication
);
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
CREATE TABLE `nameplate_sides`
(
`nameplates_id` REFERENCES `nameplates`(`id`),
`claimed` BOOLEAN, -- True after claim(), False after release()
`side` VARCHAR,
`added` INTEGER -- time when this side first claimed the nameplate
);
-- Clients exchange messages through a "mailbox", which has a long (randomly
-- unique) identifier and a queue of messages.
-- `id` is randomly-generated and unique across all apps.
CREATE TABLE `mailboxes`
(
`app_id` VARCHAR,
`id` VARCHAR PRIMARY KEY,
`updated` INTEGER, -- time of last activity, used for pruning
`for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone
);
CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`);
CREATE TABLE `mailbox_sides`
(
`mailbox_id` REFERENCES `mailboxes`(`id`),
`opened` BOOLEAN, -- True after open(), False after close()
`side` VARCHAR,
`added` INTEGER, -- time when this side first opened the mailbox
`mood` VARCHAR
);
CREATE TABLE `messages`
(
`app_id` VARCHAR,
`mailbox_id` VARCHAR,
`side` VARCHAR,
`phase` VARCHAR, -- numeric or string
`body` VARCHAR,
`server_rx` INTEGER,
`msg_id` VARCHAR
);
CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`);
CREATE TABLE `nameplate_usage`
(
`app_id` VARCHAR,
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_time` INTEGER, -- seconds from open to last close/prune
`result` VARCHAR -- happy, lonely, pruney, crowded
-- nameplate moods:
-- "happy": two sides open and close
-- "lonely": one side opens and closes (no response from 2nd side)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
);
CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`);
CREATE TABLE `mailbox_usage`
(
`app_id` VARCHAR,
`for_nameplate` BOOLEAN, -- allocated for a nameplate, not standalone
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- rendezvous moods:
-- "happy": both sides close with mood=happy
-- "scary": any side closes with mood=scary (bad MAC, probably wrong pw)
-- "lonely": any side closes with mood=lonely (no response from 2nd side)
-- "errory": any side closes with mood=errory (other errors)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
);
CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`);
CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`);
CREATE TABLE `transit_usage`
(
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_bytes` INTEGER, -- total bytes relayed (both directions)
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- transit moods:
-- "errory": one side gave the wrong handshake
-- "lonely": good handshake, but the other side never showed up
-- "happy": both sides gave correct handshake
);
CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`);
CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`);

View File

@ -1,659 +0,0 @@
from __future__ import print_function, unicode_literals
import os, random, base64, collections
from collections import namedtuple
from twisted.python import log
from twisted.application import service
def generate_mailbox_id():
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
class CrowdedError(Exception):
pass
class ReclaimedError(Exception):
pass
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
TransitUsage = namedtuple("TransitUsage",
["started", "waiting_time", "total_time",
"total_bytes", "result"])
SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body",
"server_rx", "msg_id"])
class Mailbox:
def __init__(self, app, db, app_id, mailbox_id):
self._app = app
self._db = db
self._app_id = app_id
self._mailbox_id = mailbox_id
self._listeners = {} # handle -> (send_f, stop_f)
# "handle" is a hashable object, for deregistration
# send_f() takes a JSONable object, stop_f() has no args
def open(self, side, when):
# requires caller to db.commit()
assert isinstance(side, type("")), type(side)
db = self._db
already = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=? AND `side`=?",
(self._mailbox_id, side)).fetchone()
if not already:
db.execute("INSERT INTO `mailbox_sides`"
" (`mailbox_id`, `opened`, `side`, `added`)"
" VALUES(?,?,?,?)",
(self._mailbox_id, True, side, when))
# We accept re-opening a mailbox which a side previously closed,
# unlike claim_nameplate(), which forbids any side from re-claiming a
# nameplate which they previously released. (Nameplates forbid this
# because the act of claiming a nameplate for the first time causes a
# new mailbox to be created, which should only happen once).
# Mailboxes have their own distinct objects (to manage
# subscriptions), so closing one which was already closed requires
# making a new object, which works by calling open() just before
# close(). We really do want to support re-closing closed mailboxes,
# because this enables intermittently-connected clients, who remember
# sending a 'close' but aren't sure whether it was received or not,
# then get shut down. Those clients will wake up and re-send the
# 'close', until they receive the 'closed' ack message.
self._touch(when)
db.commit() # XXX: reconcile the need for this with the comment above
def _touch(self, when):
self._db.execute("UPDATE `mailboxes` SET `updated`=? WHERE `id`=?",
(when, self._mailbox_id))
def get_messages(self):
messages = []
db = self._db
for row in db.execute("SELECT * FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` ASC",
(self._app_id, self._mailbox_id)).fetchall():
sm = SidedMessage(side=row["side"], phase=row["phase"],
body=row["body"], server_rx=row["server_rx"],
msg_id=row["msg_id"])
messages.append(sm)
return messages
def add_listener(self, handle, send_f, stop_f):
#log.msg("add_listener", self._mailbox_id, handle)
self._listeners[handle] = (send_f, stop_f)
#log.msg(" added", len(self._listeners))
return self.get_messages()
def remove_listener(self, handle):
#log.msg("remove_listener", self._mailbox_id, handle)
self._listeners.pop(handle, None)
#log.msg(" removed", len(self._listeners))
def has_listeners(self):
return bool(self._listeners)
def broadcast_message(self, sm):
for (send_f, stop_f) in self._listeners.values():
send_f(sm)
def _add_message(self, sm):
self._db.execute("INSERT INTO `messages`"
" (`app_id`, `mailbox_id`, `side`, `phase`, `body`,"
" `server_rx`, `msg_id`)"
" VALUES (?,?,?,?,?, ?,?)",
(self._app_id, self._mailbox_id, sm.side,
sm.phase, sm.body, sm.server_rx, sm.msg_id))
self._touch(sm.server_rx)
self._db.commit()
def add_message(self, sm):
assert isinstance(sm, SidedMessage)
self._add_message(sm)
self.broadcast_message(sm)
def close(self, side, mood, when):
assert isinstance(side, type("")), type(side)
db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id)).fetchone()
if not row:
return
for_nameplate = row["for_nameplate"]
row = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=? AND `side`=?",
(self._mailbox_id, side)).fetchone()
if not row:
return
db.execute("UPDATE `mailbox_sides` SET `opened`=?, `mood`=?"
" WHERE `mailbox_id`=? AND `side`=?",
(False, mood, self._mailbox_id, side))
db.commit()
# are any sides still open?
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(self._mailbox_id,)).fetchall()
if any([sr["opened"] for sr in side_rows]):
return
# nope. delete and summarize
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
(self._mailbox_id,))
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
(self._mailbox_id,))
db.execute("DELETE FROM `mailboxes` WHERE `id`=?", (self._mailbox_id,))
self._app._summarize_mailbox_and_store(for_nameplate, side_rows,
when, pruned=False)
db.commit()
# Shut down any listeners, just in case they're still lingering
# around.
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._listeners = {}
self._app.free_mailbox(self._mailbox_id)
def _shutdown(self):
# used at test shutdown to accelerate client disconnects
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._listeners = {}
class AppNamespace(object):
def __init__(self, db, blur_usage, log_requests, app_id, allow_list):
self._db = db
self._blur_usage = blur_usage
self._log_requests = log_requests
self._app_id = app_id
self._mailboxes = {}
self._nameplate_counts = collections.defaultdict(int)
self._mailbox_counts = collections.defaultdict(int)
self._allow_list = allow_list
def get_nameplate_ids(self):
if not self._allow_list:
return []
return self._get_nameplate_ids()
def _get_nameplate_ids(self):
db = self._db
# TODO: filter this to numeric ids?
c = db.execute("SELECT DISTINCT `name` FROM `nameplates`"
" WHERE `app_id`=?", (self._app_id,))
return set([row["name"] for row in c.fetchall()])
def _find_available_nameplate_id(self):
claimed = self._get_nameplate_ids()
for size in range(1,4): # stick to 1-999 for now
available = set()
for id_int in range(10**(size-1), 10**size):
id = "%d" % id_int
if id not in claimed:
available.add(id)
if available:
return random.choice(list(available))
# ouch, 999 currently claimed. Try random ones for a while.
for tries in range(1000):
id_int = random.randrange(1000, 1000*1000)
id = "%d" % id_int
if id not in claimed:
return id
raise ValueError("unable to find a free nameplate-id")
def allocate_nameplate(self, side, when):
nameplate_id = self._find_available_nameplate_id()
mailbox_id = self.claim_nameplate(nameplate_id, side, when)
del mailbox_id # ignored, they'll learn it from claim()
return nameplate_id
def claim_nameplate(self, name, side, when):
# when we're done:
# * there will be one row for the nameplate
# * there will be one 'side' attached to it, with claimed=True
# * a mailbox id and mailbox row will be created
# * a mailbox 'side' will be attached, with opened=True
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side)
db = self._db
row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, name)).fetchone()
if not row:
if self._log_requests:
log.msg("creating nameplate#%s for app_id %s" %
(name, self._app_id))
mailbox_id = generate_mailbox_id()
self._add_mailbox(mailbox_id, True, side, when) # ensure row exists
sql = ("INSERT INTO `nameplates`"
" (`app_id`, `name`, `mailbox_id`)"
" VALUES(?,?,?)")
npid = db.execute(sql, (self._app_id, name, mailbox_id)
).lastrowid
else:
npid = row["id"]
mailbox_id = row["mailbox_id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
db.execute("INSERT INTO `nameplate_sides`"
" (`nameplates_id`, `claimed`, `side`, `added`)"
" VALUES(?,?,?,?)",
(npid, True, side, when))
else:
if not row["claimed"]:
raise ReclaimedError("you cannot re-claim a nameplate that your side previously released")
# since that might cause a new mailbox to be allocated
db.commit()
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?", (npid,)).fetchall()
if len(rows) > 2:
# this line will probably never get hit: any crowding is noticed
# on mailbox_sides first, inside open_mailbox()
raise CrowdedError("too many sides have claimed this nameplate")
return mailbox_id
def release_nameplate(self, name, side, when):
# when we're done:
# * the 'claimed' flag will be cleared on the nameplate_sides row
# * if the nameplate is now unused (no claimed sides):
# * a usage record will be added
# * the nameplate row will be removed
# * the nameplate sides will be removed
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side)
db = self._db
np_row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, name)).fetchone()
if not np_row:
return
npid = np_row["id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
return
db.execute("UPDATE `nameplate_sides` SET `claimed`=?"
" WHERE `nameplates_id`=? AND `side`=?",
(False, npid, side))
db.commit()
# now, are there any remaining claims?
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
claims = [1 for sr in side_rows if sr["claimed"]]
if claims:
return
# delete and summarize
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, when, pruned=False)
db.commit()
def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned):
# requires caller to db.commit()
u = self._summarize_nameplate_usage(side_rows, delete_time, pruned)
self._db.execute("INSERT INTO `nameplate_usage`"
" (`app_id`,"
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
self._nameplate_counts[u.result] += 1
def _summarize_nameplate_usage(self, side_rows, delete_time, pruned):
times = sorted([row["added"] for row in side_rows])
started = times[0]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if len(times) > 1:
waiting_time = times[1] - times[0]
total_time = delete_time - times[0]
result = "lonely"
if len(times) == 2:
result = "happy"
if pruned:
result = "pruney"
if len(times) > 2:
result = "crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def _add_mailbox(self, mailbox_id, for_nameplate, side, when):
assert isinstance(mailbox_id, type("")), type(mailbox_id)
db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, mailbox_id)).fetchone()
if not row:
self._db.execute("INSERT INTO `mailboxes`"
" (`app_id`, `id`, `for_nameplate`, `updated`)"
" VALUES(?,?,?,?)",
(self._app_id, mailbox_id, for_nameplate, when))
# we don't need a commit here, because mailbox.open() only
# does SELECT FROM `mailbox_sides`, not from `mailboxes`
def open_mailbox(self, mailbox_id, side, when):
assert isinstance(mailbox_id, type("")), type(mailbox_id)
self._add_mailbox(mailbox_id, False, side, when) # ensure row exists
db = self._db
if not mailbox_id in self._mailboxes: # ensure Mailbox object exists
if self._log_requests:
log.msg("spawning #%s for app_id %s" % (mailbox_id,
self._app_id))
self._mailboxes[mailbox_id] = Mailbox(self, self._db,
self._app_id, mailbox_id)
mailbox = self._mailboxes[mailbox_id]
# delegate to mailbox.open() to add a row to mailbox_sides, and
# update the mailbox.updated timestamp
mailbox.open(side, when)
db.commit()
rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(mailbox_id,)).fetchall()
if len(rows) > 2:
raise CrowdedError("too many sides have opened this mailbox")
return mailbox
def free_mailbox(self, mailbox_id):
# called from Mailbox.delete_and_summarize(), which deletes any
# messages
if mailbox_id in self._mailboxes:
self._mailboxes.pop(mailbox_id)
#if self._log_requests:
# log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" %
# (mailbox_id, len(self.get_claimed()), len(self._mailboxes)))
def _summarize_mailbox_and_store(self, for_nameplate, side_rows,
delete_time, pruned):
db = self._db
u = self._summarize_mailbox(side_rows, delete_time, pruned)
db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`, `for_nameplate`,"
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?,?, ?,?,?,?)",
(self._app_id, for_nameplate,
u.started, u.total_time, u.waiting_time, u.result))
self._mailbox_counts[u.result] += 1
def _summarize_mailbox(self, side_rows, delete_time, pruned):
times = sorted([row["added"] for row in side_rows])
started = times[0]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if len(times) > 1:
waiting_time = times[1] - times[0]
total_time = delete_time - times[0]
num_sides = len(times)
if num_sides == 0:
result = "quiet"
elif num_sides == 1:
result = "lonely"
else:
result = "happy"
# "mood" is only recorded at close()
moods = [row["mood"] for row in side_rows if row.get("mood")]
if "lonely" in moods:
result = "lonely"
if "errory" in moods:
result = "errory"
if "scary" in moods:
result = "scary"
if pruned:
result = "pruney"
if num_sides > 2:
result = "crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def prune(self, now, old):
# The pruning check runs every 10 minutes, and "old" is defined to be
# 11 minutes ago (unit tests can use different values). The client is
# allowed to disconnect for up to 9 minutes without losing the
# channel (nameplate, mailbox, and messages).
# Each time a client does something, the mailbox.updated field is
# updated with the current timestamp. If a client is subscribed to
# the mailbox when pruning check runs, the "updated" field is also
# updated. After that check, if the "updated" field is "old", the
# channel is deleted.
# For now, pruning is logged even if log_requests is False, to debug
# the pruning process, and since pruning is triggered by a timer
# instead of by user action. It does reveal which mailboxes were
# present when the pruning process began, though, so in the log run
# it should do less logging.
log.msg(" prune begins (%s)" % self._app_id)
db = self._db
modified = False
for mailbox in self._mailboxes.values():
if mailbox.has_listeners():
log.msg("touch %s because listeners" % mailbox._mailbox_id)
mailbox._touch(now)
db.commit() # make sure the updates are visible below
new_mailboxes = set()
old_mailboxes = set()
for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?",
(self._app_id,)).fetchall():
mailbox_id = row["id"]
log.msg(" 1: age=%s, old=%s, %s" %
(now - row["updated"], now - old, mailbox_id))
if row["updated"] > old:
new_mailboxes.add(mailbox_id)
else:
old_mailboxes.add(mailbox_id)
log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes)
old_nameplates = set()
for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?",
(self._app_id,)).fetchall():
npid = row["id"]
mailbox_id = row["mailbox_id"]
if mailbox_id in old_mailboxes:
old_nameplates.add(npid)
log.msg(" 3: old_nameplates dbids", old_nameplates)
for npid in old_nameplates:
log.msg(" deleting nameplate with dbid", npid)
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, now, pruned=True)
modified = True
# delete all messages for old mailboxes
# delete all old mailboxes
for mailbox_id in old_mailboxes:
log.msg(" deleting mailbox", mailbox_id)
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `id`=?", (mailbox_id,)).fetchone()
for_nameplate = row["for_nameplate"]
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(mailbox_id,)).fetchall()
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
(mailbox_id,))
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
(mailbox_id,))
db.execute("DELETE FROM `mailboxes` WHERE `id`=?",
(mailbox_id,))
self._summarize_mailbox_and_store(for_nameplate, side_rows,
now, pruned=True)
modified = True
if modified:
db.commit()
log.msg(" prune complete, modified:", modified)
def get_counts(self):
return (self._nameplate_counts, self._mailbox_counts)
def _shutdown(self):
for channel in self._mailboxes.values():
channel._shutdown()
class Rendezvous(service.MultiService):
def __init__(self, db, welcome, blur_usage, allow_list):
service.MultiService.__init__(self)
self._db = db
self._welcome = welcome
self._blur_usage = blur_usage
log_requests = blur_usage is None
self._log_requests = log_requests
self._allow_list = allow_list
self._apps = {}
def get_welcome(self):
return self._welcome
def get_log_requests(self):
return self._log_requests
def get_app(self, app_id):
assert isinstance(app_id, type(""))
if not app_id in self._apps:
if self._log_requests:
log.msg("spawning app_id %s" % (app_id,))
self._apps[app_id] = AppNamespace(
self._db,
self._blur_usage,
self._log_requests,
app_id,
self._allow_list,
)
return self._apps[app_id]
def get_all_apps(self):
apps = set()
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `nameplates`").fetchall():
apps.add(row["app_id"])
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `mailboxes`").fetchall():
apps.add(row["app_id"])
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `messages`").fetchall():
apps.add(row["app_id"])
return apps
def prune_all_apps(self, now, old):
# As with AppNamespace.prune_old_mailboxes, we log for now.
log.msg("beginning app prune")
for app_id in sorted(self.get_all_apps()):
log.msg(" app prune checking %r" % (app_id,))
app = self.get_app(app_id)
app.prune(now, old)
log.msg("app prune ends, %d apps" % len(self._apps))
def get_stats(self):
stats = {}
# current status: expected to be zero most of the time
c = stats["active"] = {}
c["apps"] = len(self.get_all_apps())
def q(query, values=()):
row = self._db.execute(query, values).fetchone()
return list(row.values())[0]
c["nameplates_total"] = q("SELECT COUNT() FROM `nameplates`")
# TODO: nameplates with only one side (most of them)
# TODO: nameplates with two sides (very fleeting)
# TODO: nameplates with three or more sides (crowded, unlikely)
c["mailboxes_total"] = q("SELECT COUNT() FROM `mailboxes`")
# TODO: mailboxes with only one side (most of them)
# TODO: mailboxes with two sides (somewhat fleeting, in-transit)
# TODO: mailboxes with three or more sides (unlikely)
c["messages_total"] = q("SELECT COUNT() FROM `messages`")
# usage since last reboot
nameplate_counts = collections.defaultdict(int)
mailbox_counts = collections.defaultdict(int)
for app in self._apps.values():
nc, mc = app.get_counts()
for result, count in nc.items():
nameplate_counts[result] += count
for result, count in mc.items():
mailbox_counts[result] += count
urb = stats["since_reboot"] = {}
urb["nameplate_moods"] = {}
for result, count in nameplate_counts.items():
urb["nameplate_moods"][result] = count
urb["nameplates_total"] = sum(nameplate_counts.values())
urb["mailbox_moods"] = {}
for result, count in mailbox_counts.items():
urb["mailbox_moods"][result] = count
urb["mailboxes_total"] = sum(mailbox_counts.values())
# historical usage (all-time)
u = stats["all_time"] = {}
un = u["nameplate_moods"] = {}
# TODO: there's probably a single SQL query for all this
un["happy"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='happy'")
un["lonely"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='lonely'")
un["pruney"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='pruney'")
un["crowded"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='crowded'")
u["nameplates_total"] = q("SELECT COUNT() FROM `nameplate_usage`")
um = u["mailbox_moods"] = {}
um["happy"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='happy'")
um["scary"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='scary'")
um["lonely"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='lonely'")
um["quiet"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='quiet'")
um["errory"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='errory'")
um["pruney"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='pruney'")
um["crowded"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='crowded'")
u["mailboxes_total"] = q("SELECT COUNT() FROM `mailbox_usage`")
u["mailboxes_standalone"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `for_nameplate`=0")
# recent timings (last 100 operations)
# TODO: median/etc of nameplate.total_time
# TODO: median/etc of mailbox.waiting_time (should be the same)
# TODO: median/etc of mailbox.total_time
# other
# TODO: mailboxes without nameplates (needs new DB schema)
return stats
def stopService(self):
# This forcibly boots any clients that are still connected, which
# helps with unit tests that use threads for both clients. One client
# hits an exception, which terminates the test (and .tearDown calls
# stopService on the relay), but the other client (in its thread) is
# still waiting for a message. By killing off all connections, that
# other client gets an error, and exits promptly.
for app in self._apps.values():
app._shutdown()
return service.MultiService.stopService(self)

View File

@ -1,306 +0,0 @@
from __future__ import unicode_literals
import time
from twisted.internet import reactor
from twisted.python import log
from autobahn.twisted import websocket
from .rendezvous import CrowdedError, ReclaimedError, SidedMessage
from ..util import dict_to_bytes, bytes_to_dict
# The WebSocket allows the client to send "commands" to the server, and the
# server to send "responses" to the client. Note that commands and responses
# are not necessarily one-to-one. All commands provoke an "ack" response
# (with a copy of the original message) for timing, testing, and
# synchronization purposes. All commands and responses are JSON-encoded.
# Each WebSocket connection is bound to one "appid" and one "side", which are
# set by the "bind" command (which must be the first command on the
# connection), and must be set before any other command will be accepted.
# Each connection can be bound to a single "mailbox" (a two-sided
# store-and-forward queue, identified by the "mailbox id": a long, randomly
# unique string identifier) by using the "open" command. This protects the
# mailbox from idle closure, enables the "add" command (to put new messages
# in the queue), and triggers delivery of past and future messages via the
# "message" response. The "close" command removes the binding (but note that
# it does not enable the subsequent binding of a second mailbox). When the
# last side closes a mailbox, its contents are deleted.
# Additionally, the connection can be bound a single "nameplate", which is
# short identifier that makes up the first component of a wormhole code. Each
# nameplate points to a single long-id "mailbox". The "allocate" message
# determines the shortest available numeric nameplate, reserves it, and
# returns the nameplate id. "list" returns a list of all numeric nameplates
# which currently have only one side active (i.e. they are waiting for a
# partner). The "claim" message reserves an arbitrary nameplate id (perhaps
# the receiver of a wormhole connection typed in a code they got from the
# sender, or perhaps the two sides agreed upon a code offline and are both
# typing it in), and the "release" message releases it. When every side that
# has claimed the nameplate has also released it, the nameplate is
# deallocated (but they will probably keep the underlying mailbox open).
# "claim" and "release" may only be called once per connection, however calls
# across connections (assuming a consistent "side") are idempotent. [connect,
# claim, disconnect, connect, claim] is legal, but not useful, as is a
# "release" for a nameplate that nobody is currently claiming.
# "open" and "close" may only be called once per connection. They are
# basically idempotent, however "open" doubles as a subscribe action. So
# [connect, open, disconnect, connect, open] is legal *and* useful (without
# the second "open", the second connection would not be subscribed to hear
# about new messages).
# Inbound (client to server) commands are marked as "->" below. Unrecognized
# inbound keys will be ignored. Outbound (server to client) responses use
# "<-". There is no guaranteed correlation between requests and responses. In
# this list, "A -> B" means that some time after A is received, at least one
# message of type B will be sent out (probably).
# All responses include a "server_tx" key, which is a float (seconds since
# epoch) with the server clock just before the outbound response was written
# to the socket.
# connection -> welcome
# <- {type: "welcome", welcome: {}} # .welcome keys are all optional:
# current_cli_version: out-of-date clients display a warning
# motd: all clients display message, then continue normally
# error: all clients display mesage, then terminate with error
# -> {type: "bind", appid:, side:}
#
# -> {type: "list"} -> nameplates
# <- {type: "nameplates", nameplates: [{id: str,..},..]}
# -> {type: "allocate"} -> nameplate, mailbox
# <- {type: "allocated", nameplate: str}
# -> {type: "claim", nameplate: str} -> mailbox
# <- {type: "claimed", mailbox: str}
# -> {type: "release"}
# .nameplate is optional, but must match previous claim()
# <- {type: "released"}
#
# -> {type: "open", mailbox: str} -> message
# sends old messages now, and subscribes to deliver future messages
# <- {type: "message", side:, phase:, body:, msg_id:}} # body is hex
# -> {type: "add", phase: str, body: hex} # will send echo in a "message"
#
# -> {type: "close", mood: str} -> closed
# .mailbox is optional, but must match previous open()
# <- {type: "closed"}
#
# <- {type: "error", error: str, orig: {}} # in response to malformed msgs
# for tests that need to know when a message has been processed:
# -> {type: "ping", ping: int} -> pong (does not require bind/claim)
# <- {type: "pong", pong: int}
class Error(Exception):
def __init__(self, explain):
self._explain = explain
class WebSocketRendezvous(websocket.WebSocketServerProtocol):
def __init__(self):
websocket.WebSocketServerProtocol.__init__(self)
self._app = None
self._side = None
self._did_allocate = False # only one allocate() per websocket
self._listening = False
self._did_claim = False
self._nameplate_id = None
self._did_release = False
self._did_open = False
self._mailbox = None
self._mailbox_id = None
self._did_close = False
def onConnect(self, request):
rv = self.factory.rendezvous
if rv.get_log_requests():
log.msg("ws client connecting: %s" % (request.peer,))
self._reactor = self.factory.reactor
def onOpen(self):
rv = self.factory.rendezvous
self.send("welcome", welcome=rv.get_welcome())
def onMessage(self, payload, isBinary):
server_rx = time.time()
msg = bytes_to_dict(payload)
try:
if "type" not in msg:
raise Error("missing 'type'")
self.send("ack", id=msg.get("id"))
mtype = msg["type"]
if mtype == "ping":
return self.handle_ping(msg)
if mtype == "bind":
return self.handle_bind(msg)
if not self._app:
raise Error("must bind first")
if mtype == "list":
return self.handle_list()
if mtype == "allocate":
return self.handle_allocate(server_rx)
if mtype == "claim":
return self.handle_claim(msg, server_rx)
if mtype == "release":
return self.handle_release(msg, server_rx)
if mtype == "open":
return self.handle_open(msg, server_rx)
if mtype == "add":
return self.handle_add(msg, server_rx)
if mtype == "close":
return self.handle_close(msg, server_rx)
raise Error("unknown type")
except Error as e:
self.send("error", error=e._explain, orig=msg)
def handle_ping(self, msg):
if "ping" not in msg:
raise Error("ping requires 'ping'")
self.send("pong", pong=msg["ping"])
def handle_bind(self, msg):
if self._app or self._side:
raise Error("already bound")
if "appid" not in msg:
raise Error("bind requires 'appid'")
if "side" not in msg:
raise Error("bind requires 'side'")
self._app = self.factory.rendezvous.get_app(msg["appid"])
self._side = msg["side"]
def handle_list(self):
nameplate_ids = sorted(self._app.get_nameplate_ids())
# provide room to add nameplate attributes later (like which wordlist
# is used for each, maybe how many words)
nameplates = [{"id": nid} for nid in nameplate_ids]
self.send("nameplates", nameplates=nameplates)
def handle_allocate(self, server_rx):
if self._did_allocate:
raise Error("you already allocated one, don't be greedy")
nameplate_id = self._app.allocate_nameplate(self._side, server_rx)
assert isinstance(nameplate_id, type(""))
self._did_allocate = True
self.send("allocated", nameplate=nameplate_id)
def handle_claim(self, msg, server_rx):
if "nameplate" not in msg:
raise Error("claim requires 'nameplate'")
if self._did_claim:
raise Error("only one claim per connection")
self._did_claim = True
nameplate_id = msg["nameplate"]
assert isinstance(nameplate_id, type("")), type(nameplate_id)
self._nameplate_id = nameplate_id
try:
mailbox_id = self._app.claim_nameplate(nameplate_id, self._side,
server_rx)
except CrowdedError:
raise Error("crowded")
except ReclaimedError:
raise Error("reclaimed")
self.send("claimed", mailbox=mailbox_id)
def handle_release(self, msg, server_rx):
if self._did_release:
raise Error("only one release per connection")
if "nameplate" in msg:
if self._nameplate_id is not None:
if msg["nameplate"] != self._nameplate_id:
raise Error("release and claim must use same nameplate")
nameplate_id = msg["nameplate"]
else:
if self._nameplate_id is None:
raise Error("release without nameplate must follow claim")
nameplate_id = self._nameplate_id
assert nameplate_id is not None
self._did_release = True
self._app.release_nameplate(nameplate_id, self._side, server_rx)
self.send("released")
def handle_open(self, msg, server_rx):
if self._mailbox:
raise Error("only one open per connection")
if "mailbox" not in msg:
raise Error("open requires 'mailbox'")
mailbox_id = msg["mailbox"]
assert isinstance(mailbox_id, type(""))
self._mailbox_id = mailbox_id
try:
self._mailbox = self._app.open_mailbox(mailbox_id, self._side,
server_rx)
except CrowdedError:
raise Error("crowded")
def _send(sm):
self.send("message", side=sm.side, phase=sm.phase,
body=sm.body, server_rx=sm.server_rx, id=sm.msg_id)
def _stop():
pass
self._listening = True
for old_sm in self._mailbox.add_listener(self, _send, _stop):
_send(old_sm)
def handle_add(self, msg, server_rx):
if not self._mailbox:
raise Error("must open mailbox before adding")
if "phase" not in msg:
raise Error("missing 'phase'")
if "body" not in msg:
raise Error("missing 'body'")
msg_id = msg.get("id") # optional
sm = SidedMessage(side=self._side, phase=msg["phase"],
body=msg["body"], server_rx=server_rx,
msg_id=msg_id)
self._mailbox.add_message(sm)
def handle_close(self, msg, server_rx):
if self._did_close:
raise Error("only one close per connection")
if "mailbox" in msg:
if self._mailbox_id is not None:
if msg["mailbox"] != self._mailbox_id:
raise Error("open and close must use same mailbox")
mailbox_id = msg["mailbox"]
else:
if self._mailbox_id is None:
raise Error("close without mailbox must follow open")
mailbox_id = self._mailbox_id
if not self._mailbox:
try:
self._mailbox = self._app.open_mailbox(mailbox_id, self._side,
server_rx)
except CrowdedError:
raise Error("crowded")
if self._listening:
self._mailbox.remove_listener(self)
self._listening = False
self._did_close = True
self._mailbox.close(self._side, msg.get("mood"), server_rx)
self._mailbox = None
self.send("closed")
def send(self, mtype, **kwargs):
kwargs["type"] = mtype
kwargs["server_tx"] = time.time()
payload = dict_to_bytes(kwargs)
self.sendMessage(payload, False)
def onClose(self, wasClean, code, reason):
#log.msg("onClose", self, self._mailbox, self._listening)
if self._mailbox and self._listening:
self._mailbox.remove_listener(self)
class WebSocketRendezvousFactory(websocket.WebSocketServerFactory):
protocol = WebSocketRendezvous
def __init__(self, url, rendezvous):
websocket.WebSocketServerFactory.__init__(self, url)
self.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600)
self.rendezvous = rendezvous
self.reactor = reactor # for tests to control

View File

@ -1,168 +0,0 @@
# NO unicode_literals or static.Data() will break, because it demands
# a str on Python 2
from __future__ import print_function
import os, time, json
try:
# 'resource' is unix-only
from resource import getrlimit, setrlimit, RLIMIT_NOFILE
except ImportError: # pragma: nocover
getrlimit, setrlimit, RLIMIT_NOFILE = None, None, None # pragma: nocover
from twisted.python import log
from twisted.internet import reactor, endpoints
from twisted.application import service, internet
from twisted.web import server, static
from twisted.web.resource import Resource
from autobahn.twisted.resource import WebSocketResource
from .database import get_db
from .rendezvous import Rendezvous
from .rendezvous_websocket import WebSocketRendezvousFactory
SECONDS = 1.0
MINUTE = 60*SECONDS
CHANNEL_EXPIRATION_TIME = 11*MINUTE
EXPIRATION_CHECK_PERIOD = 10*MINUTE
class Root(Resource):
# child_FOO is a nevow thing, not a twisted.web.resource thing
def __init__(self):
Resource.__init__(self)
self.putChild(b"", static.Data(b"Wormhole Relay\n", "text/plain"))
class PrivacyEnhancedSite(server.Site):
logRequests = True
def log(self, request):
if self.logRequests:
return server.Site.log(self, request)
class RelayServer(service.MultiService):
def __init__(self, rendezvous_web_port,
advertise_version, db_url=":memory:", blur_usage=None,
signal_error=None, stats_file=None, allow_list=True,
websocket_protocol_options=()):
service.MultiService.__init__(self)
self._blur_usage = blur_usage
self._allow_list = allow_list
self._db_url = db_url
db = get_db(db_url)
welcome = {
# adding .motd will cause all clients to display the message,
# then keep running normally
#"motd": "Welcome to the public relay.\nPlease enjoy this service.",
# adding .error will cause all clients to fail, with this message
#"error": "This server has been disabled, see URL for details.",
}
if advertise_version:
# The primary (python CLI) implementation will emit a message if
# its version does not match this key. If/when we have
# distributions which include older version, but we still expect
# them to be compatible, stop sending this key.
welcome["current_cli_version"] = advertise_version
if signal_error:
welcome["error"] = signal_error
self._rendezvous = Rendezvous(db, welcome, blur_usage, self._allow_list)
self._rendezvous.setServiceParent(self) # for the pruning timer
root = Root()
wsrf = WebSocketRendezvousFactory(None, self._rendezvous)
_set_options(websocket_protocol_options, wsrf)
root.putChild(b"v1", WebSocketResource(wsrf))
site = PrivacyEnhancedSite(root)
if blur_usage:
site.logRequests = False
r = endpoints.serverFromString(reactor, rendezvous_web_port)
rendezvous_web_service = internet.StreamServerEndpointService(r, site)
rendezvous_web_service.setServiceParent(self)
self._stats_file = stats_file
if self._stats_file and os.path.exists(self._stats_file):
os.unlink(self._stats_file)
# this will be regenerated immediately, but if something goes
# wrong in dump_stats(), it's better to have a missing file than
# a stale one
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer)
t.setServiceParent(self)
# make some things accessible for tests
self._db = db
self._root = root
self._rendezvous_web_service = rendezvous_web_service
self._rendezvous_websocket = wsrf
def increase_rlimits(self):
if getrlimit is None:
log.msg("unable to import 'resource', leaving rlimit alone")
return
soft, hard = getrlimit(RLIMIT_NOFILE)
if soft >= 10000:
log.msg("RLIMIT_NOFILE.soft was %d, leaving it alone" % soft)
return
# OS-X defaults to soft=7168, and reports a huge number for 'hard',
# but won't accept anything more than soft=10240, so we can't just
# set soft=hard. Linux returns (1024, 1048576) and is fine with
# soft=hard. Cygwin is reported to return (256,-1) and accepts up to
# soft=3200. So we try multiple values until something works.
for newlimit in [hard, 10000, 3200, 1024]:
log.msg("changing RLIMIT_NOFILE from (%s,%s) to (%s,%s)" %
(soft, hard, newlimit, hard))
try:
setrlimit(RLIMIT_NOFILE, (newlimit, hard))
log.msg("setrlimit successful")
return
except ValueError as e:
log.msg("error during setrlimit: %s" % e)
continue
except:
log.msg("other error during setrlimit, leaving it alone")
log.err()
return
log.msg("unable to change rlimit, leaving it alone")
def startService(self):
service.MultiService.startService(self)
self.increase_rlimits()
log.msg("websocket listening on /wormhole-relay/ws")
log.msg("Wormhole relay server (Rendezvous) running")
if self._blur_usage:
log.msg("blurring access times to %d seconds" % self._blur_usage)
log.msg("not logging HTTP requests")
else:
log.msg("not blurring access times")
if not self._allow_list:
log.msg("listing of allocated nameplates disallowed")
def timer(self):
now = time.time()
old = now - CHANNEL_EXPIRATION_TIME
self._rendezvous.prune_all_apps(now, old)
self.dump_stats(now, validity=EXPIRATION_CHECK_PERIOD+60)
def dump_stats(self, now, validity):
if not self._stats_file:
return
tmpfn = self._stats_file + ".tmp"
data = {}
data["created"] = now
data["valid_until"] = now + validity
start = time.time()
data["rendezvous"] = self._rendezvous.get_stats()
log.msg("get_stats took:", time.time() - start)
with open(tmpfn, "wb") as f:
# json.dump(f) has str-vs-unicode issues on py2-vs-py3
f.write(json.dumps(data, indent=1).encode("utf-8"))
f.write(b"\n")
os.rename(tmpfn, self._stats_file)
def _set_options(options, factory):
factory.setProtocolOptions(**dict(options))

View File

@ -6,27 +6,64 @@ from click.testing import CliRunner
import mock import mock
from ..cli import cli from ..cli import cli
from ..transit import allocate_tcp_port from ..transit import allocate_tcp_port
from ..server.server import RelayServer from wormhole_mailbox_server.server import make_server
from wormhole_mailbox_server.web import make_web_server
from wormhole_mailbox_server.database import create_channel_db, create_usage_db
from wormhole_transit_relay.transit_server import Transit from wormhole_transit_relay.transit_server import Transit
class ServerBase: class MyInternetService(service.Service, object):
def setUp(self): # like StreamServerEndpointService, but you can retrieve the port
self._setup_relay(None) def __init__(self, endpoint, factory):
self.endpoint = endpoint
self.factory = factory
self._port_d = defer.Deferred()
self._lp = None
def startService(self):
super(MyInternetService, self).startService()
d = self.endpoint.listen(self.factory)
def good(lp):
self._lp = lp
self._port_d.callback(lp.getHost().port)
def bad(f):
log.err(f)
self._port_d.errback(f)
d.addCallbacks(good, bad)
@defer.inlineCallbacks
def stopService(self):
if self._lp:
yield self._lp.stopListening()
def getPort(self): # only call once!
return self._port_d
class ServerBase:
@defer.inlineCallbacks
def setUp(self):
yield self._setup_relay(None)
@defer.inlineCallbacks
def _setup_relay(self, error, advertise_version=None): def _setup_relay(self, error, advertise_version=None):
self.sp = service.MultiService() self.sp = service.MultiService()
self.sp.startService() self.sp.startService()
self.relayport = allocate_tcp_port()
# need to talk to twisted team about only using unicode in # need to talk to twisted team about only using unicode in
# endpoints.serverFromString # endpoints.serverFromString
s = RelayServer("tcp:%d:interface=127.0.0.1" % self.relayport, db = create_channel_db(":memory:")
self._usage_db = create_usage_db(":memory:")
self._rendezvous = make_server(db,
advertise_version=advertise_version, advertise_version=advertise_version,
signal_error=error) signal_error=error,
usage_db=self._usage_db)
ep = endpoints.TCP4ServerEndpoint(reactor, 0, interface="127.0.0.1")
site = make_web_server(self._rendezvous, log_requests=False)
#self._lp = yield ep.listen(site)
s = MyInternetService(ep, site)
s.setServiceParent(self.sp) s.setServiceParent(self.sp)
self.rdv_ws_port = yield s.getPort()
self._relay_server = s self._relay_server = s
self._rendezvous = s._rendezvous #self._rendezvous = s._rendezvous
self.relayurl = u"ws://127.0.0.1:%d/v1" % self.relayport self.relayurl = u"ws://127.0.0.1:%d/v1" % self.rdv_ws_port
self.rdv_ws_port = self.relayport
# ws://127.0.0.1:%d/wormhole-relay/ws # ws://127.0.0.1:%d/wormhole-relay/ws
self.transitport = allocate_tcp_port() self.transitport = allocate_tcp_port()
@ -38,6 +75,7 @@ class ServerBase:
internet.StreamServerEndpointService(ep, f).setServiceParent(self.sp) internet.StreamServerEndpointService(ep, f).setServiceParent(self.sp)
self.transit = u"tcp:127.0.0.1:%d" % self.transitport self.transit = u"tcp:127.0.0.1:%d" % self.transitport
@defer.inlineCallbacks
def tearDown(self): def tearDown(self):
# Unit tests that spawn a (blocking) client in a thread might still # Unit tests that spawn a (blocking) client in a thread might still
# have threads running at this point, if one is stuck waiting for a # have threads running at this point, if one is stuck waiting for a
@ -49,21 +87,16 @@ class ServerBase:
# XXX FIXME there's something in _noclobber test that's not # XXX FIXME there's something in _noclobber test that's not
# waiting for a close, I think -- was pretty relieably getting # waiting for a close, I think -- was pretty relieably getting
# unclean-reactor, but adding a slight pause here stops it... # unclean-reactor, but adding a slight pause here stops it...
from twisted.internet import reactor
tp = reactor.getThreadPool() tp = reactor.getThreadPool()
if not tp.working: if not tp.working:
d = defer.succeed(None) yield self.sp.stopService()
d.addCallback(lambda _: self.sp.stopService()) yield task.deferLater(reactor, 0.1, lambda: None)
d.addCallback(lambda _: task.deferLater(reactor, 0.1, lambda: None)) defer.returnValue(None)
return d
return self.sp.stopService()
# disconnect all callers # disconnect all callers
d = defer.maybeDeferred(self.sp.stopService) d = defer.maybeDeferred(self.sp.stopService)
wait_d = defer.Deferred()
# wait a second, then check to see if it worked # wait a second, then check to see if it worked
reactor.callLater(1.0, wait_d.callback, None) yield task.deferLater(reactor, 1.0, lambda: None)
def _later(res):
if len(tp.working): if len(tp.working):
log.msg("wormhole.test.common.ServerBase.tearDown:" log.msg("wormhole.test.common.ServerBase.tearDown:"
" I was unable to convince all threads to exit.") " I was unable to convince all threads to exit.")
@ -74,9 +107,7 @@ class ServerBase:
else: else:
log.msg("wormhole.test.common.ServerBase.tearDown:" log.msg("wormhole.test.common.ServerBase.tearDown:"
" I convinced all threads to exit.") " I convinced all threads to exit.")
return d yield d
wait_d.addCallback(_later)
return wait_d
def config(*argv): def config(*argv):
r = CliRunner() r = CliRunner()

View File

@ -17,8 +17,6 @@ from ..cli import cmd_send, cmd_receive, welcome, cli
from ..errors import (TransferError, WrongPasswordError, WelcomeError, from ..errors import (TransferError, WrongPasswordError, WelcomeError,
UnsendableFileError, ServerConnectionError) UnsendableFileError, ServerConnectionError)
from .._interfaces import ITorManager from .._interfaces import ITorManager
from wormhole.server.cmd_server import MyPlugin
from wormhole.server.cli import server
def build_offer(args): def build_offer(args):
@ -564,7 +562,7 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
yield gatherResults([send_d, receive_d], True) yield gatherResults([send_d, receive_d], True)
if fake_tor: if fake_tor:
expected_endpoints = [("127.0.0.1", self.relayport)] expected_endpoints = [("127.0.0.1", self.rdv_ws_port)]
if mode in ("file", "directory"): if mode in ("file", "directory"):
expected_endpoints.append(("127.0.0.1", self.transitport)) expected_endpoints.append(("127.0.0.1", self.transitport))
tx_timing = mtx_tm.call_args[1]["timing"] tx_timing = mtx_tm.call_args[1]["timing"]
@ -667,9 +665,6 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
self.failUnlessEqual(modes[i], self.failUnlessEqual(modes[i],
stat.S_IMODE(os.stat(fn).st_mode)) stat.S_IMODE(os.stat(fn).st_mode))
# check server stats
self._rendezvous.get_stats()
def test_text(self): def test_text(self):
return self._do_test() return self._do_test()
def test_text_subprocess(self): def test_text_subprocess(self):
@ -849,9 +844,6 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
with open(fn, "r") as f: with open(fn, "r") as f:
self.failUnlessEqual(f.read(), PRESERVE) self.failUnlessEqual(f.read(), PRESERVE)
# check server stats
self._rendezvous.get_stats()
def test_fail_file_noclobber(self): def test_fail_file_noclobber(self):
return self._do_test_fail("file", "noclobber") return self._do_test_fail("file", "noclobber")
def test_fail_directory_noclobber(self): def test_fail_directory_noclobber(self):
@ -915,12 +907,10 @@ class ZeroMode(ServerBase, unittest.TestCase):
self.assertEqual(receive_stdout, message+NL) self.assertEqual(receive_stdout, message+NL)
self.assertEqual(receive_stderr, "") self.assertEqual(receive_stderr, "")
# check server stats
self._rendezvous.get_stats()
class NotWelcome(ServerBase, unittest.TestCase): class NotWelcome(ServerBase, unittest.TestCase):
@inlineCallbacks
def setUp(self): def setUp(self):
self._setup_relay(error="please upgrade XYZ") yield self._setup_relay(error="please upgrade XYZ")
self.cfg = cfg = config("send") self.cfg = cfg = config("send")
cfg.hide_progress = True cfg.hide_progress = True
cfg.listen = False cfg.listen = False
@ -949,7 +939,7 @@ class NotWelcome(ServerBase, unittest.TestCase):
class NoServer(ServerBase, unittest.TestCase): class NoServer(ServerBase, unittest.TestCase):
@inlineCallbacks @inlineCallbacks
def setUp(self): def setUp(self):
self._setup_relay(None) yield self._setup_relay(None)
yield self._relay_server.disownServiceParent() yield self._relay_server.disownServiceParent()
@inlineCallbacks @inlineCallbacks
@ -1093,8 +1083,9 @@ class ExtractFile(unittest.TestCase):
self.assertIn("malicious zipfile", str(e)) self.assertIn("malicious zipfile", str(e))
class AppID(ServerBase, unittest.TestCase): class AppID(ServerBase, unittest.TestCase):
@inlineCallbacks
def setUp(self): def setUp(self):
d = super(AppID, self).setUp() yield super(AppID, self).setUp()
self.cfg = cfg = config("send") self.cfg = cfg = config("send")
# common options for all tests in this suite # common options for all tests in this suite
cfg.hide_progress = True cfg.hide_progress = True
@ -1102,7 +1093,6 @@ class AppID(ServerBase, unittest.TestCase):
cfg.transit_helper = "" cfg.transit_helper = ""
cfg.stdout = io.StringIO() cfg.stdout = io.StringIO()
cfg.stderr = io.StringIO() cfg.stderr = io.StringIO()
return d
@inlineCallbacks @inlineCallbacks
def test_override(self): def test_override(self):
@ -1117,8 +1107,8 @@ class AppID(ServerBase, unittest.TestCase):
yield send_d yield send_d
yield receive_d yield receive_d
used = self._rendezvous._db.execute("SELECT DISTINCT `app_id`" used = self._usage_db.execute("SELECT DISTINCT `app_id`"
" FROM `nameplate_usage`" " FROM `nameplates`"
).fetchall() ).fetchall()
self.assertEqual(len(used), 1, used) self.assertEqual(len(used), 1, used)
self.assertEqual(used[0]["app_id"], u"appid2") self.assertEqual(used[0]["app_id"], u"appid2")
@ -1260,97 +1250,3 @@ class Help(unittest.TestCase):
result = CliRunner().invoke(cli.wormhole, ["--help"]) result = CliRunner().invoke(cli.wormhole, ["--help"])
self._check_top_level_help(result.output) self._check_top_level_help(result.output)
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
class FakeConfig(object):
no_daemon = True
blur_usage = True
advertise_version = u"fake.version.1"
transit = str('tcp:4321')
rendezvous = str('tcp:1234')
signal_error = True
allow_list = False
relay_database_path = "relay.sqlite"
stats_json_path = "stats.json"
class Server(unittest.TestCase):
def setUp(self):
self.runner = CliRunner()
@mock.patch('wormhole.server.cmd_server.twistd')
def test_server_disallow_list(self, fake_twistd):
result = self.runner.invoke(server, ['start', '--no-daemon', '--disallow-list'])
self.assertEqual(0, result.exit_code)
def test_server_plugin(self):
cfg = FakeConfig()
plugin = MyPlugin(cfg)
relay = plugin.makeService(None)
self.assertEqual(False, relay._allow_list)
@mock.patch("wormhole.server.cmd_server.start_server")
def test_start_no_args(self, fake_start_server):
result = self.runner.invoke(server, ['start'])
self.assertEqual(0, result.exit_code)
cfg = fake_start_server.mock_calls[0][1][0]
MyPlugin(cfg).makeService(None)
@mock.patch("wormhole.server.cmd_server.restart_server")
def test_restart_no_args(self, fake_start_reserver):
result = self.runner.invoke(server, ['restart'])
self.assertEqual(0, result.exit_code)
cfg = fake_start_reserver.mock_calls[0][1][0]
MyPlugin(cfg).makeService(None)
def test_state_locations(self):
cfg = FakeConfig()
plugin = MyPlugin(cfg)
relay = plugin.makeService(None)
self.assertEqual('relay.sqlite', relay._db_url)
self.assertEqual('stats.json', relay._stats_file)
@mock.patch("wormhole.server.cmd_server.start_server")
def test_websocket_protocol_options(self, fake_start_server):
result = self.runner.invoke(
server, [
'start',
'--websocket-protocol-option=a=3',
'--websocket-protocol-option=b=true',
'--websocket-protocol-option=c=3.5',
'--websocket-protocol-option=d=["foo","bar"]',
'--websocket-protocol-option', 'e=["foof","barf"]',
])
self.assertEqual(0, result.exit_code)
cfg = fake_start_server.mock_calls[0][1][0]
self.assertEqual(
cfg.websocket_protocol_option,
[("a", 3), ("b", True), ("c", 3.5), ("d", ['foo', 'bar']),
("e", ['foof', 'barf']),
],
)
def test_broken_websocket_protocol_options(self):
result = self.runner.invoke(
server, [
'start',
'--websocket-protocol-option=a',
])
self.assertNotEqual(0, result.exit_code)
self.assertIn(
'Error: Invalid value for "--websocket-protocol-option": '
'format options as OPTION=VALUE',
result.output,
)
result = self.runner.invoke(
server, [
'start',
'--websocket-protocol-option=a=foo',
])
self.assertNotEqual(0, result.exit_code)
self.assertIn(
'Error: Invalid value for "--websocket-protocol-option": '
'could not parse JSON value for a',
result.output,
)

View File

@ -1,61 +0,0 @@
from __future__ import print_function, unicode_literals
import os
from twisted.python import filepath
from twisted.trial import unittest
from ..server import database
from ..server.database import get_db, TARGET_VERSION, dump_db
class DB(unittest.TestCase):
def test_create_default(self):
db_url = ":memory:"
db = get_db(db_url)
rows = db.execute("SELECT * FROM version").fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["version"], TARGET_VERSION)
def test_failed_create_allows_subsequent_create(self):
patch = self.patch(database, "get_schema", lambda version: b"this is a broken schema")
dbfile = filepath.FilePath(self.mktemp())
self.assertRaises(Exception, lambda: get_db(dbfile.path))
patch.restore()
get_db(dbfile.path)
def test_upgrade(self):
basedir = self.mktemp()
os.mkdir(basedir)
fn = os.path.join(basedir, "upgrade.db")
self.assertNotEqual(TARGET_VERSION, 2)
# create an old-version DB in a file
db = get_db(fn, 2)
rows = db.execute("SELECT * FROM version").fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["version"], 2)
del db
# then upgrade the file to the latest version
dbA = get_db(fn, TARGET_VERSION)
rows = dbA.execute("SELECT * FROM version").fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0]["version"], TARGET_VERSION)
dbA_text = dump_db(dbA)
del dbA
# make sure the upgrades got committed to disk
dbB = get_db(fn, TARGET_VERSION)
dbB_text = dump_db(dbB)
del dbB
self.assertEqual(dbA_text, dbB_text)
# The upgraded schema should be equivalent to that of a new DB.
# However a text dump will differ because ALTER TABLE always appends
# the new column to the end of a table, whereas our schema puts it
# somewhere in the middle (wherever it fits naturally). Also ALTER
# TABLE doesn't include comments.
if False:
latest_db = get_db(":memory:", TARGET_VERSION)
latest_text = dump_db(latest_db)
with open("up.sql","w") as f: f.write(dbA_text)
with open("new.sql","w") as f: f.write(latest_text)
# check with "diff -u _trial_temp/up.sql _trial_temp/new.sql"
self.assertEqual(dbA_text, latest_text)

File diff suppressed because it is too large Load Diff