start on usage-db

This commit is contained in:
Brian Warner 2017-11-04 12:54:49 -07:00
parent d36e0c44bd
commit a898a65b09
4 changed files with 275 additions and 95 deletions

View File

@ -0,0 +1,126 @@
from __future__ import unicode_literals
import os
import sqlite3
import tempfile
from pkg_resources import resource_string
from twisted.python import log
class DBError(Exception):
pass
def get_schema(version):
schema_bytes = resource_string("wormhole.server",
"db-schemas/v%d.sql" % version)
return schema_bytes.decode("utf-8")
def get_upgrader(new_version):
schema_bytes = resource_string("wormhole.server",
"db-schemas/upgrade-to-v%d.sql" % new_version)
return schema_bytes.decode("utf-8")
TARGET_VERSION = 1
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def _initialize_db_schema(db, target_version):
"""Creates the application schema in the given database.
"""
log.msg("populating new database with schema v%s" % target_version)
schema = get_schema(target_version)
db.executescript(schema)
db.execute("INSERT INTO version (version) VALUES (?)",
(target_version,))
db.commit()
def _initialize_db_connection(db):
"""Sets up the db connection object with a row factory and with necessary
foreign key settings.
"""
db.row_factory = dict_factory
db.execute("PRAGMA foreign_keys = ON")
problems = db.execute("PRAGMA foreign_key_check").fetchall()
if problems:
raise DBError("failed foreign key check: %s" % (problems,))
def _open_db_connection(dbfile):
"""Open a new connection to the SQLite3 database at the given path.
"""
try:
db = sqlite3.connect(dbfile)
except (EnvironmentError, sqlite3.OperationalError) as e:
raise DBError("Unable to create/open db file %s: %s" % (dbfile, e))
_initialize_db_connection(db)
return db
def _get_temporary_dbfile(dbfile):
"""Get a temporary filename near the given path.
"""
fd, name = tempfile.mkstemp(
prefix=os.path.basename(dbfile) + ".",
dir=os.path.dirname(dbfile)
)
os.close(fd)
return name
def _atomic_create_and_initialize_db(dbfile, target_version):
"""Create and return a new database, initialized with the application
schema.
If anything goes wrong, nothing is left at the ``dbfile`` path.
"""
temp_dbfile = _get_temporary_dbfile(dbfile)
db = _open_db_connection(temp_dbfile)
_initialize_db_schema(db, target_version)
db.close()
os.rename(temp_dbfile, dbfile)
return _open_db_connection(dbfile)
def get_db(dbfile, target_version=TARGET_VERSION):
"""Open or create the given db file. The parent directory must exist.
Returns the db connection object, or raises DBError.
"""
if dbfile == ":memory:":
db = _open_db_connection(dbfile)
_initialize_db_schema(db, target_version)
elif os.path.exists(dbfile):
db = _open_db_connection(dbfile)
else:
db = _atomic_create_and_initialize_db(dbfile, target_version)
try:
version = db.execute("SELECT version FROM version").fetchone()["version"]
except sqlite3.DatabaseError as e:
# this indicates that the file is not a compatible database format.
# Perhaps it was created with an old version, or it might be junk.
raise DBError("db file is unusable: %s" % e)
while version < target_version:
log.msg(" need to upgrade from %s to %s" % (version, target_version))
try:
upgrader = get_upgrader(version+1)
except ValueError: # ResourceError??
log.msg(" unable to upgrade %s to %s" % (version, version+1))
raise DBError("Unable to upgrade %s to version %s, left at %s"
% (dbfile, version+1, version))
log.msg(" executing upgrader v%s->v%s" % (version, version+1))
db.executescript(upgrader)
db.commit()
version = version+1
if version != target_version:
raise DBError("Unable to handle db version %s" % version)
return db
def dump_db(db):
# to let _iterdump work, we need to restore the original row factory
orig = db.row_factory
try:
db.row_factory = sqlite3.Row
return "".join(db.iterdump())
finally:
db.row_factory = orig

View File

@ -0,0 +1,30 @@
CREATE TABLE `version` -- contains one row
(
`version` INTEGER -- set to 1
);
CREATE TABLE `current` -- contains one row
(
`reboot` INTEGER, -- seconds since epoch of most recent reboot
`last_update` INTEGER, -- when `current` was last updated
`connected` INTEGER, -- number of current paired connections
`waiting` INTEGER, -- number of not-yet-paired connections
`incomplete_bytes` INTEGER -- bytes sent through not-yet-complete connections
);
CREATE TABLE `usage`
(
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_bytes` INTEGER, -- total bytes relayed (both directions)
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- transit moods:
-- "errory": one side gave the wrong handshake
-- "lonely": good handshake, but the other side never showed up
-- "happy": both sides gave correct handshake
);
CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`);
CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`);

View File

@ -1,7 +1,8 @@
from . import transit_server from . import transit_server
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python import usage from twisted.python import usage
from twisted.application.internet import StreamServerEndpointService from twisted.application.internet import (TimerService,
StreamServerEndpointService)
from twisted.internet import endpoints from twisted.internet import endpoints
LONGDESC = """\ LONGDESC = """\
@ -9,10 +10,10 @@ This plugin sets up a 'Transit Relay' server for magic-wormhole. This service
listens for TCP connections, finds pairs which present the same handshake, and listens for TCP connections, finds pairs which present the same handshake, and
glues the two TCP sockets together. glues the two TCP sockets together.
If --usage-logfile= is provided, a line will be written to the given file after If --log-stdout is provided, a line will be written to stdout after each
each connection is done. This line will be a complete JSON object (starting connection is done. This line will be a complete JSON object (starting with
with "{", ending with "}\n", and containing no internal newlines). The keys "{", ending with "}\n", and containing no internal newlines). The keys will
will be: be:
* 'started': number, seconds since epoch * 'started': number, seconds since epoch
* 'total_time': number, seconds from open to last close * 'total_time': number, seconds from open to last close
@ -27,35 +28,62 @@ second matching side never appeared (and thus 'waiting_time' will be null).
If --blur-usage= is provided, then 'started' will be rounded to the given time If --blur-usage= is provided, then 'started' will be rounded to the given time
interval, and 'total_bytes' will be rounded as well. interval, and 'total_bytes' will be rounded as well.
If --stats-file is provided, the server will periodically write a simple JSON If --usage-db= is provided, the server will maintain a SQLite database in the
dictionary to that file (atomically), with cumulative usage data (since last given file. Current, recent, and historical usage data will be written to the
reboot, and all-time). This information is *not* blurred (the assumption is database, and external tools can query the DB for metrics: the munin plugins
that it will be overwritten on a regular basis, and is aggregated anyways). The in misc/ may be useful. Timestamps and sizes in this file will respect
keys are: --blur-usage. The four tables are:
* active.connected: number of paired connections "current" contains a single row, with these columns:
* active.waiting: number of not-yet-paired connections
* since_reboot.bytes: sum of 'total_bytes'
* since_reboot.total: number of completed connections
* since_reboot.moods: dict mapping mood string to number of connections
* all_time.bytes: same
* all_time.total
* all_time.moods
The server will write twistd.pid and twistd.log files as usual, if daemonized * connected: number of paired connections
by twistd. twistd.log will only contain startup, shutdown, and exception * waiting: number of not-yet-paired connections
messages. To record information about each connection, use --usage-logfile. * partal_bytes: bytes transmitted over not-yet-complete connections
"since_reboot" contains a single row, with these columns:
* bytes: sum of 'total_bytes'
* connections: number of completed connections
* mood_happy: count of connections that finished "happy": both sides gave correct handshake
* mood_lonely: one side gave good handshake, other side never showed up
* mood_errory: one side gave a bad handshake
"all_time" contains a single row, with these columns:
* bytes:
* connections:
* mood_happy:
* mood_lonely:
* mood_errory:
"usage" contains one row per closed connection, with these columns:
* started: seconds since epoch, rounded to "blur time"
* total_time: seconds from first open to last close
* waiting_time: seconds from first open to second open, or None
* bytes: total bytes relayed (in both directions)
* result: (string) the mood: happy, lonely, errory
All tables will be updated after each connection is finished. In addition,
the "current" table will be updated at least once every 5 minutes.
If daemonized by twistd, the server will write twistd.pid and twistd.log
files as usual. By default twistd.log will only contain startup, shutdown,
and exception messages. Adding --log-stdout will add per-connection JSON
lines to twistd.log.
""" """
class Options(usage.Options): class Options(usage.Options):
#synopsis = "[--port=] [--usage-logfile=] [--blur-usage=] [--stats-json=]" #synopsis = "[--port=] [--log-stdout] [--blur-usage=] [--usage-db=]"
longdesc = LONGDESC longdesc = LONGDESC
optFlags = {
("log-stdout", None, "write JSON usage logs to stdout"),
}
optParameters = [ optParameters = [
("port", "p", "tcp:4001", "endpoint to listen on"), ("port", "p", "tcp:4001", "endpoint to listen on"),
("blur-usage", None, None, "blur timestamps and data sizes in logs"), ("blur-usage", None, None, "blur timestamps and data sizes in logs"),
("usage-logfile", None, None, "record usage data (JSON lines)"), ("usage-db", None, None, "record usage data (SQLite)"),
("stats-file", None, None, "record usage in JSON format"),
] ]
def opt_blur_usage(self, arg): def opt_blur_usage(self, arg):
@ -65,6 +93,9 @@ class Options(usage.Options):
def makeService(config, reactor=reactor): def makeService(config, reactor=reactor):
ep = endpoints.serverFromString(reactor, config["port"]) # to listen ep = endpoints.serverFromString(reactor, config["port"]) # to listen
f = transit_server.Transit(blur_usage=config["blur-usage"], f = transit_server.Transit(blur_usage=config["blur-usage"],
usage_logfile=config["usage-logfile"], log_stdout=config["log-stdout"],
stats_file=config["stats-file"]) usage_db=config["usage-db"])
return StreamServerEndpointService(ep, f) parent = service.MultiService()
StreamServerEndpointService(ep, f).setServiceParent(parent)
TimerService(5.0, f.timerUpdateStats).setServiceParent(parent)
return parent

View File

@ -2,6 +2,7 @@ from __future__ import print_function, unicode_literals
import os, re, time, json import os, re, time, json
from twisted.python import log from twisted.python import log
from twisted.internet import protocol from twisted.internet import protocol
from .database import get_db
SECONDS = 1.0 SECONDS = 1.0
MINUTE = 60*SECONDS MINUTE = 60*SECONDS
@ -220,15 +221,16 @@ class Transit(protocol.ServerFactory):
MAXTIME = 60*SECONDS MAXTIME = 60*SECONDS
protocol = TransitConnection protocol = TransitConnection
def __init__(self, blur_usage, usage_logfile, stats_file): def __init__(self, blur_usage, log_stdout, usage_db):
self._blur_usage = blur_usage self._blur_usage = blur_usage
self._log_requests = blur_usage is None self._debug_log = False
self._usage_logfile = open(usage_logfile, "a") if usage_logfile else None self._log_stdout = log_stdout
self._stats_file = stats_file self._db = None
if usage_db:
self._db = get_db(usage_db)
# we don't track TransitConnections until they submit a token
self._pending_requests = {} # token -> set((side, TransitConnection)) self._pending_requests = {} # token -> set((side, TransitConnection))
self._active_connections = set() # TransitConnection self._active_connections = set() # TransitConnection
self._counts = {"lonely": 0, "happy": 0, "errory": 0}
self._count_bytes = 0
def connection_got_token(self, token, new_side, new_tc): def connection_got_token(self, token, new_side, new_tc):
if token not in self._pending_requests: if token not in self._pending_requests:
@ -240,7 +242,7 @@ class Transit(protocol.ServerFactory):
or (new_side is None) or (new_side is None)
or (old_side != new_side)): or (old_side != new_side)):
# we found a match # we found a match
if self._log_requests: if self._debug_log:
log.msg("transit relay 2: %s" % new_tc.describeToken()) log.msg("transit relay 2: %s" % new_tc.describeToken())
# drop and stop tracking the rest # drop and stop tracking the rest
@ -255,33 +257,11 @@ class Transit(protocol.ServerFactory):
new_tc.buddy_connected(old_tc) new_tc.buddy_connected(old_tc)
old_tc.buddy_connected(new_tc) old_tc.buddy_connected(new_tc)
return return
if self._log_requests: if self._debug_log:
log.msg("transit relay 1: %s" % new_tc.describeToken()) log.msg("transit relay 1: %s" % new_tc.describeToken())
potentials.add((new_side, new_tc)) potentials.add((new_side, new_tc))
# TODO: timer # TODO: timer
def recordUsage(self, started, result, total_bytes,
total_time, waiting_time):
self._counts[result] += 1
self._count_bytes += total_bytes
if self._log_requests:
log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes)
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
total_bytes = blur_size(total_bytes)
if self._usage_logfile:
data = {"started": started,
"total_time": total_time,
"waiting_time": waiting_time,
"total_bytes": total_bytes,
"mood": result,
}
self._usage_logfile.write(json.dumps(data))
self._usage_logfile.write("\n")
self._usage_logfile.flush()
if self._stats_file:
self._update_stats(total_bytes, result)
def transitFinished(self, tc, token, side, description): def transitFinished(self, tc, token, side, description):
if token in self._pending_requests: if token in self._pending_requests:
side_tc = (side, tc) side_tc = (side, tc)
@ -289,50 +269,63 @@ class Transit(protocol.ServerFactory):
self._pending_requests[token].remove(side_tc) self._pending_requests[token].remove(side_tc)
if not self._pending_requests[token]: # set is now empty if not self._pending_requests[token]: # set is now empty
del self._pending_requests[token] del self._pending_requests[token]
if self._log_requests: if self._debug_log:
log.msg("transitFinished %s" % (description,)) log.msg("transitFinished %s" % (description,))
self._active_connections.discard(tc) self._active_connections.discard(tc)
def transitFailed(self, p): def transitFailed(self, p):
if self._log_requests: if self._debug_log:
log.msg("transitFailed %r" % p) log.msg("transitFailed %r" % p)
pass pass
def _update_stats(self, total_bytes, mood): def recordUsage(self, started, result, total_bytes,
try: total_time, waiting_time):
with open(self._stats_file, "r") as f: if self._debug_log:
stats = json.load(f) log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes)
except (EnvironmentError, ValueError): if self._blur_usage:
stats = {} started = self._blur_usage * (started // self._blur_usage)
total_bytes = blur_size(total_bytes)
# current status: expected to be zero most of the time if self._log_stdout:
stats["active"] = {"connected": len(self._active_connections) / 2, data = {"started": started,
"waiting": len(self._pending_requests), "total_time": total_time,
"waiting_time": waiting_time,
"total_bytes": total_bytes,
"mood": result,
} }
sys.stdout.write(json.dumps(data))
sys.stdout.write("\n")
sys.stdout.flush()
if self._db:
self._db.execute("INSERT INTO `usage`"
" (`started`, `total_time`, `waiting_time`,"
" `total_bytes`, `result`)"
" VALUES (?,?,?, ?,?)",
(started, total_time, waiting_time,
total_bytes, result))
self._update_stats()
self._db.commit()
# usage since last reboot def timerUpdateStats(self):
rb = stats["since_reboot"] = {} self._update_stats()
rb["bytes"] = self._count_bytes self._db.commit()
rb["total"] = sum(self._counts.values(), 0)
rbm = rb["moods"] = {}
for result, count in self._counts.items():
rbm[result] = count
# historical usage (all-time) def _update_stats(self):
if "all_time" not in stats: # current status: should be zero when idle
stats["all_time"] = {} reboot = self._reboot
u = stats["all_time"] last_update = time.time()
u["total"] = u.get("total", 0) + 1 connected = len(self._active_connections) / 2
u["bytes"] = u.get("bytes", 0) + total_bytes # TODO: when a connection is half-closed, len(active) will be odd. a
if "moods" not in u: # moment later (hopefully) the other side will disconnect, but
u["moods"] = {} # _update_stats isn't updated until later.
um = u["moods"] waiting = len(self._pending_tokens)
for m in "happy", "lonely", "errory": # "waiting" doesn't count multiple parallel connections from the same
if m not in um: # side
um[m] = 0 incomplete_bytes = sum(tc._total_sent
um[mood] += 1 for tc in self._active_connections)
tmpfile = self._stats_file + ".tmp" self._db.execute("DELETE FROM `current`")
with open(tmpfile, "w") as f: self._db.execute("INSERT INTO `current`"
f.write(json.dumps(stats)) " (`reboot`, `last_update`, `connected`, `waiting`,"
f.write("\n") " `incomplete_bytes`)"
os.rename(tmpfile, self._stats_file) " VALUES (?, ?, ?, ?, ?)",
(reboot, last_update, connected, waiting,
incomplete_bytes))