This commit is contained in:
Brian Warner 2017-11-04 12:40:54 -07:00
parent d7800f6337
commit 790f29d4ba
5 changed files with 217 additions and 25 deletions

8
docs/running.md Normal file
View File

@ -0,0 +1,8 @@
# How to Run the Transit Relay
```
pip install magic-wormhole-transit-relay
twist wormhole-transit-relay --port tcp:4001
```
The relay runs as a twist/twistd plugin. To

View File

@ -0,0 +1,126 @@
from __future__ import unicode_literals
import os
import sqlite3
import tempfile
from pkg_resources import resource_string
from twisted.python import log
class DBError(Exception):
pass
def get_schema(version):
schema_bytes = resource_string("wormhole.server",
"db-schemas/v%d.sql" % version)
return schema_bytes.decode("utf-8")
def get_upgrader(new_version):
schema_bytes = resource_string("wormhole.server",
"db-schemas/upgrade-to-v%d.sql" % new_version)
return schema_bytes.decode("utf-8")
TARGET_VERSION = 3
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def _initialize_db_schema(db, target_version):
"""Creates the application schema in the given database.
"""
log.msg("populating new database with schema v%s" % target_version)
schema = get_schema(target_version)
db.executescript(schema)
db.execute("INSERT INTO version (version) VALUES (?)",
(target_version,))
db.commit()
def _initialize_db_connection(db):
"""Sets up the db connection object with a row factory and with necessary
foreign key settings.
"""
db.row_factory = dict_factory
db.execute("PRAGMA foreign_keys = ON")
problems = db.execute("PRAGMA foreign_key_check").fetchall()
if problems:
raise DBError("failed foreign key check: %s" % (problems,))
def _open_db_connection(dbfile):
"""Open a new connection to the SQLite3 database at the given path.
"""
try:
db = sqlite3.connect(dbfile)
except (EnvironmentError, sqlite3.OperationalError) as e:
raise DBError("Unable to create/open db file %s: %s" % (dbfile, e))
_initialize_db_connection(db)
return db
def _get_temporary_dbfile(dbfile):
"""Get a temporary filename near the given path.
"""
fd, name = tempfile.mkstemp(
prefix=os.path.basename(dbfile) + ".",
dir=os.path.dirname(dbfile)
)
os.close(fd)
return name
def _atomic_create_and_initialize_db(dbfile, target_version):
"""Create and return a new database, initialized with the application
schema.
If anything goes wrong, nothing is left at the ``dbfile`` path.
"""
temp_dbfile = _get_temporary_dbfile(dbfile)
db = _open_db_connection(temp_dbfile)
_initialize_db_schema(db, target_version)
db.close()
os.rename(temp_dbfile, dbfile)
return _open_db_connection(dbfile)
def get_db(dbfile, target_version=TARGET_VERSION):
"""Open or create the given db file. The parent directory must exist.
Returns the db connection object, or raises DBError.
"""
if dbfile == ":memory:":
db = _open_db_connection(dbfile)
_initialize_db_schema(db, target_version)
elif os.path.exists(dbfile):
db = _open_db_connection(dbfile)
else:
db = _atomic_create_and_initialize_db(dbfile, target_version)
try:
version = db.execute("SELECT version FROM version").fetchone()["version"]
except sqlite3.DatabaseError as e:
# this indicates that the file is not a compatible database format.
# Perhaps it was created with an old version, or it might be junk.
raise DBError("db file is unusable: %s" % e)
while version < target_version:
log.msg(" need to upgrade from %s to %s" % (version, target_version))
try:
upgrader = get_upgrader(version+1)
except ValueError: # ResourceError??
log.msg(" unable to upgrade %s to %s" % (version, version+1))
raise DBError("Unable to upgrade %s to version %s, left at %s"
% (dbfile, version+1, version))
log.msg(" executing upgrader v%s->v%s" % (version, version+1))
db.executescript(upgrader)
db.commit()
version = version+1
if version != target_version:
raise DBError("Unable to handle db version %s" % version)
return db
def dump_db(db):
# to let _iterdump work, we need to restore the original row factory
orig = db.row_factory
try:
db.row_factory = sqlite3.Row
return "".join(db.iterdump())
finally:
db.row_factory = orig

View File

@ -0,0 +1,30 @@
CREATE TABLE `version` -- contains one row
(
`version` INTEGER -- set to 1
);
CREATE TABLE `current` -- contains one row
(
`reboot` INTEGER, -- seconds since epoch of most recent reboot
`last_update` INTEGER, -- when `current` was last updated
`connected` INTEGER, -- number of current paired connections
`waiting` INTEGER, -- number of not-yet-paired connections
`incomplete_bytes` INTEGER -- bytes sent through not-yet-complete connections
);
CREATE TABLE `usage`
(
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_bytes` INTEGER, -- total bytes relayed (both directions)
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- transit moods:
-- "errory": one side gave the wrong handshake
-- "lonely": good handshake, but the other side never showed up
-- "happy": both sides gave correct handshake
);
CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`);
CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`);

View File

@ -9,10 +9,10 @@ This plugin sets up a 'Transit Relay' server for magic-wormhole. This service
listens for TCP connections, finds pairs which present the same handshake, and listens for TCP connections, finds pairs which present the same handshake, and
glues the two TCP sockets together. glues the two TCP sockets together.
If --usage-logfile= is provided, a line will be written to the given file after If --log-stdout is provided, a line will be written to stdout after each
each connection is done. This line will be a complete JSON object (starting connection is done. This line will be a complete JSON object (starting with
with "{", ending with "}\n", and containing no internal newlines). The keys "{", ending with "}\n", and containing no internal newlines). The keys will
will be: be:
* 'started': number, seconds since epoch * 'started': number, seconds since epoch
* 'total_time': number, seconds from open to last close * 'total_time': number, seconds from open to last close
@ -27,35 +27,62 @@ second matching side never appeared (and thus 'waiting_time' will be null).
If --blur-usage= is provided, then 'started' will be rounded to the given time If --blur-usage= is provided, then 'started' will be rounded to the given time
interval, and 'total_bytes' will be rounded as well. interval, and 'total_bytes' will be rounded as well.
If --stats-file is provided, the server will periodically write a simple JSON If --usage-db= is provided, the server will maintain a SQLite database in the
dictionary to that file (atomically), with cumulative usage data (since last given file. Current, recent, and historical usage data will be written to the
reboot, and all-time). This information is *not* blurred (the assumption is database, and external tools can query the DB for metrics: the munin plugins
that it will be overwritten on a regular basis, and is aggregated anyways). The in misc/ may be useful. Timestamps and sizes in this file will respect
keys are: --blur-usage. The four tables are:
* active.connected: number of paired connections "current" contains a single row, with these columns:
* active.waiting: number of not-yet-paired connections
* since_reboot.bytes: sum of 'total_bytes'
* since_reboot.total: number of completed connections
* since_reboot.moods: dict mapping mood string to number of connections
* all_time.bytes: same
* all_time.total
* all_time.moods
The server will write twistd.pid and twistd.log files as usual, if daemonized * connected: number of paired connections
by twistd. twistd.log will only contain startup, shutdown, and exception * waiting: number of not-yet-paired connections
messages. To record information about each connection, use --usage-logfile. * partal_bytes: bytes transmitted over not-yet-complete connections
"since_reboot" contains a single row, with these columns:
* bytes: sum of 'total_bytes'
* connections: number of completed connections
* mood_happy: count of connections that finished "happy": both sides gave correct handshake
* mood_lonely: one side gave good handshake, other side never showed up
* mood_errory: one side gave a bad handshake
"all_time" contains a single row, with these columns:
* bytes:
* connections:
* mood_happy:
* mood_lonely:
* mood_errory:
"usage" contains one row per closed connection, with these columns:
* started: seconds since epoch, rounded to "blur time"
* total_time: seconds from first open to last close
* waiting_time: seconds from first open to second open, or None
* bytes: total bytes relayed (in both directions)
* result: (string) the mood: happy, lonely, errory
All tables will be updated after each connection is finished. In addition,
the "current" table will be updated at least once every 5 minutes.
If daemonized by twistd, the server will write twistd.pid and twistd.log
files as usual. By default twistd.log will only contain startup, shutdown,
and exception messages. Adding --log-stdout will add per-connection JSON
lines to twistd.log.
""" """
class Options(usage.Options): class Options(usage.Options):
#synopsis = "[--port=] [--usage-logfile=] [--blur-usage=] [--stats-json=]" #synopsis = "[--port=] [--log-stdout] [--blur-usage=] [--usage-db=]"
longdesc = LONGDESC longdesc = LONGDESC
optFlags = {
("log-stdout", None, "write JSON usage logs to stdout"),
}
optParameters = [ optParameters = [
("port", "p", "tcp:4001", "endpoint to listen on"), ("port", "p", "tcp:4001", "endpoint to listen on"),
("blur-usage", None, None, "blur timestamps and data sizes in logs"), ("blur-usage", None, None, "blur timestamps and data sizes in logs"),
("usage-logfile", None, None, "record usage data (JSON lines)"), ("usage-db", None, None, "record usage data (SQLite)"),
("stats-file", None, None, "record usage in JSON format"),
] ]
def opt_blur_usage(self, arg): def opt_blur_usage(self, arg):
@ -65,6 +92,6 @@ class Options(usage.Options):
def makeService(config, reactor=reactor): def makeService(config, reactor=reactor):
ep = endpoints.serverFromString(reactor, config["port"]) # to listen ep = endpoints.serverFromString(reactor, config["port"]) # to listen
f = transit_server.Transit(blur_usage=config["blur-usage"], f = transit_server.Transit(blur_usage=config["blur-usage"],
usage_logfile=config["usage-logfile"], log_stdout=config["log-stdout"],
stats_file=config["stats-file"]) usage_db=config["usage-db"])
return StreamServerEndpointService(ep, f) return StreamServerEndpointService(ep, f)

View File

@ -2,6 +2,7 @@ from __future__ import print_function, unicode_literals
import os, re, time, json import os, re, time, json
from twisted.python import log from twisted.python import log
from twisted.internet import protocol from twisted.internet import protocol
from .database import get_db
SECONDS = 1.0 SECONDS = 1.0
MINUTE = 60*SECONDS MINUTE = 60*SECONDS