diff --git a/src/wormhole_transit_relay/server_tap.py b/src/wormhole_transit_relay/server_tap.py index b0a8b27..0ac04e7 100644 --- a/src/wormhole_transit_relay/server_tap.py +++ b/src/wormhole_transit_relay/server_tap.py @@ -2,7 +2,6 @@ from . import transit_server from twisted.internet import reactor from twisted.python import usage from twisted.application.internet import StreamServerEndpointService -from twisted.application.service import MultiService from twisted.internet import endpoints LONGDESC = """\ @@ -27,19 +26,24 @@ second matching side never appeared (and thus 'waiting_time' will be null). If --blur-usage= is provided, then 'started' will be rounded to the given time interval, and 'total_bytes' will be rounded as well. -If --stats-json is provided, the server will periodically write a simple JSON -dictionary to that file (atomically), with usage since last reboot. This -information is *not* blurred (the assumption is that it will be overwritten on -a regular basis, and is aggregated anyways). The keys are: +If --stats-file is provided, the server will periodically write a simple JSON +dictionary to that file (atomically), with cumulative usage data (since last +reboot, and all-time). This information is *not* blurred (the assumption is +that it will be overwritten on a regular basis, and is aggregated anyways). The +keys are: * active.connected: number of paired connections * active.waiting: number of not-yet-paired connections * since_reboot.bytes: sum of 'total_bytes' * since_reboot.total: number of completed connections * since_reboot.moods: dict mapping mood string to number of connections +* all_time.bytes: same +* all_time.total +* all_time.moods The server will write twistd.pid and twistd.log files as usual, if daemonized -by twistd. +by twistd. twistd.log will only contain startup, shutdown, and exception +messages. To record information about each connection, use --usage-logfile. """ class Options(usage.Options): @@ -48,9 +52,9 @@ class Options(usage.Options): optParameters = [ ("port", "p", "tcp:4001", "endpoint to listen on"), + ("blur-usage", None, None, "blur timestamps and data sizes in logs"), ("usage-logfile", None, None, "record usage data (JSON lines)"), - ("blur-usage", None, None, "blur logged timestamps and data sizes"), - ("stats-json", None, None, "record usage since-reboot"), + ("stats-file", None, None, "record usage in JSON format"), ] def opt_blur_usage(self, arg): @@ -58,11 +62,8 @@ class Options(usage.Options): def makeService(config, reactor=reactor): - s = MultiService() - t = transit_server.Transit(blur_usage=config["blur-usage"], - usage_logfile=config["usage-logfile"], - stats_json_path=config["stats-json"]) - s.setServiceParent(t) # for the timer ep = endpoints.serverFromString(reactor, config["port"]) # to listen - s.setServiceParent(StreamServerEndpointService(ep, t)) - return s + f = transit_server.Transit(blur_usage=config["blur-usage"], + usage_logfile=config["usage-logfile"], + stats_file=config["stats-file"]) + return StreamServerEndpointService(ep, f) diff --git a/src/wormhole_transit_relay/test/common.py b/src/wormhole_transit_relay/test/common.py new file mode 100644 index 0000000..6404532 --- /dev/null +++ b/src/wormhole_transit_relay/test/common.py @@ -0,0 +1,69 @@ +# no unicode_literals untill twisted update +from twisted.application import service +from twisted.internet import defer, task, reactor +from twisted.python import log +from click.testing import CliRunner +import mock +from ..cli import cli +from ..transit import allocate_tcp_port +from ..server.server import RelayServer + +class ServerBase: + def setUp(self): + self._setup_relay(None) + + def _setup_relay(self, error, advertise_version=None): + self.sp = service.MultiService() + self.sp.startService() + self.transitport = allocate_tcp_port() + # need to talk to twisted team about only using unicode in + # endpoints.serverFromString + s = RelayServer("tcp:%s:interface=127.0.0.1" % self.transitport, + advertise_version=advertise_version, + signal_error=error) + s.setServiceParent(self.sp) + self._relay_server = s + self._rendezvous = s._rendezvous + self._transit_server = s._transit + # ws://127.0.0.1:%d/wormhole-relay/ws + self.transit = u"tcp:127.0.0.1:%d" % self.transitport + + def tearDown(self): + # Unit tests that spawn a (blocking) client in a thread might still + # have threads running at this point, if one is stuck waiting for a + # message from a companion which has exited with an error. Our + # relay's .stopService() drops all connections, which ought to + # encourage those threads to terminate soon. If they don't, print a + # warning to ease debugging. + + # XXX FIXME there's something in _noclobber test that's not + # waiting for a close, I think -- was pretty relieably getting + # unclean-reactor, but adding a slight pause here stops it... + from twisted.internet import reactor + + tp = reactor.getThreadPool() + if not tp.working: + d = defer.succeed(None) + d.addCallback(lambda _: self.sp.stopService()) + d.addCallback(lambda _: task.deferLater(reactor, 0.1, lambda: None)) + return d + return self.sp.stopService() + # disconnect all callers + d = defer.maybeDeferred(self.sp.stopService) + wait_d = defer.Deferred() + # wait a second, then check to see if it worked + reactor.callLater(1.0, wait_d.callback, None) + def _later(res): + if len(tp.working): + log.msg("wormhole.test.common.ServerBase.tearDown:" + " I was unable to convince all threads to exit.") + tp.dumpStats() + print("tearDown warning: threads are still active") + print("This test will probably hang until one of the" + " clients gives up of their own accord.") + else: + log.msg("wormhole.test.common.ServerBase.tearDown:" + " I convinced all threads to exit.") + return d + wait_d.addCallback(_later) + return wait_d diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index acdca61..304fbde 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -5,7 +5,7 @@ from twisted.internet import protocol, reactor, defer from twisted.internet.endpoints import clientFromString, connectProtocol from twisted.web import client from .common import ServerBase -from ..server import transit_server +from .. import transit_server class Accumulator(protocol.Protocol): def __init__(self): diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 92265a5..417f74d 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -188,7 +188,7 @@ class TransitConnection(protocol.Protocol): self.factory.recordUsage(self._started, "errory", 0, total_time, None) -class Transit(protocol.ServerFactory, service.MultiService): +class Transit(protocol.ServerFactory): # I manage pairs of simultaneous connections to a secondary TCP port, # both forwarded to the other. Clients must begin each connection with # "please relay TOKEN for SIDE\n" (or a legacy form without the "for @@ -221,11 +221,13 @@ class Transit(protocol.ServerFactory, service.MultiService): MAXTIME = 60*SECONDS protocol = TransitConnection - def __init__(self, db, blur_usage): + def __init__(self, blur_usage, usage_logfile, stats_file): service.MultiService.__init__(self) - self._db = db self._blur_usage = blur_usage self._log_requests = blur_usage is None + if usage_logfile: + self._usage_logfile = open(usage_logfile, "a") + self._stats_file = stats_file self._pending_requests = {} # token -> set((side, TransitConnection)) self._active_connections = set() # TransitConnection self._counts = collections.defaultdict(int) @@ -264,10 +266,11 @@ class Transit(protocol.ServerFactory, service.MultiService): def recordUsage(self, started, result, total_bytes, total_time, waiting_time): if self._log_requests: - log.msg("Transit.recordUsage (%dB)" % total_bytes) + log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes) if self._blur_usage: started = self._blur_usage * (started // self._blur_usage) total_bytes = blur_size(total_bytes) + if self._usage_logfile self._db.execute("INSERT INTO `transit_usage`" " (`started`, `total_time`, `waiting_time`," " `total_bytes`, `result`)"