This commit is contained in:
Brian Warner 2017-09-13 12:29:51 -07:00
parent c287175d38
commit ea9e24913c
4 changed files with 93 additions and 20 deletions

View File

@ -2,7 +2,6 @@ from . import transit_server
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python import usage from twisted.python import usage
from twisted.application.internet import StreamServerEndpointService from twisted.application.internet import StreamServerEndpointService
from twisted.application.service import MultiService
from twisted.internet import endpoints from twisted.internet import endpoints
LONGDESC = """\ LONGDESC = """\
@ -27,19 +26,24 @@ second matching side never appeared (and thus 'waiting_time' will be null).
If --blur-usage= is provided, then 'started' will be rounded to the given time If --blur-usage= is provided, then 'started' will be rounded to the given time
interval, and 'total_bytes' will be rounded as well. interval, and 'total_bytes' will be rounded as well.
If --stats-json is provided, the server will periodically write a simple JSON If --stats-file is provided, the server will periodically write a simple JSON
dictionary to that file (atomically), with usage since last reboot. This dictionary to that file (atomically), with cumulative usage data (since last
information is *not* blurred (the assumption is that it will be overwritten on reboot, and all-time). This information is *not* blurred (the assumption is
a regular basis, and is aggregated anyways). The keys are: that it will be overwritten on a regular basis, and is aggregated anyways). The
keys are:
* active.connected: number of paired connections * active.connected: number of paired connections
* active.waiting: number of not-yet-paired connections * active.waiting: number of not-yet-paired connections
* since_reboot.bytes: sum of 'total_bytes' * since_reboot.bytes: sum of 'total_bytes'
* since_reboot.total: number of completed connections * since_reboot.total: number of completed connections
* since_reboot.moods: dict mapping mood string to number of connections * since_reboot.moods: dict mapping mood string to number of connections
* all_time.bytes: same
* all_time.total
* all_time.moods
The server will write twistd.pid and twistd.log files as usual, if daemonized The server will write twistd.pid and twistd.log files as usual, if daemonized
by twistd. by twistd. twistd.log will only contain startup, shutdown, and exception
messages. To record information about each connection, use --usage-logfile.
""" """
class Options(usage.Options): class Options(usage.Options):
@ -48,9 +52,9 @@ class Options(usage.Options):
optParameters = [ optParameters = [
("port", "p", "tcp:4001", "endpoint to listen on"), ("port", "p", "tcp:4001", "endpoint to listen on"),
("blur-usage", None, None, "blur timestamps and data sizes in logs"),
("usage-logfile", None, None, "record usage data (JSON lines)"), ("usage-logfile", None, None, "record usage data (JSON lines)"),
("blur-usage", None, None, "blur logged timestamps and data sizes"), ("stats-file", None, None, "record usage in JSON format"),
("stats-json", None, None, "record usage since-reboot"),
] ]
def opt_blur_usage(self, arg): def opt_blur_usage(self, arg):
@ -58,11 +62,8 @@ class Options(usage.Options):
def makeService(config, reactor=reactor): def makeService(config, reactor=reactor):
s = MultiService()
t = transit_server.Transit(blur_usage=config["blur-usage"],
usage_logfile=config["usage-logfile"],
stats_json_path=config["stats-json"])
s.setServiceParent(t) # for the timer
ep = endpoints.serverFromString(reactor, config["port"]) # to listen ep = endpoints.serverFromString(reactor, config["port"]) # to listen
s.setServiceParent(StreamServerEndpointService(ep, t)) f = transit_server.Transit(blur_usage=config["blur-usage"],
return s usage_logfile=config["usage-logfile"],
stats_file=config["stats-file"])
return StreamServerEndpointService(ep, f)

View File

@ -0,0 +1,69 @@
# no unicode_literals untill twisted update
from twisted.application import service
from twisted.internet import defer, task, reactor
from twisted.python import log
from click.testing import CliRunner
import mock
from ..cli import cli
from ..transit import allocate_tcp_port
from ..server.server import RelayServer
class ServerBase:
def setUp(self):
self._setup_relay(None)
def _setup_relay(self, error, advertise_version=None):
self.sp = service.MultiService()
self.sp.startService()
self.transitport = allocate_tcp_port()
# need to talk to twisted team about only using unicode in
# endpoints.serverFromString
s = RelayServer("tcp:%s:interface=127.0.0.1" % self.transitport,
advertise_version=advertise_version,
signal_error=error)
s.setServiceParent(self.sp)
self._relay_server = s
self._rendezvous = s._rendezvous
self._transit_server = s._transit
# ws://127.0.0.1:%d/wormhole-relay/ws
self.transit = u"tcp:127.0.0.1:%d" % self.transitport
def tearDown(self):
# Unit tests that spawn a (blocking) client in a thread might still
# have threads running at this point, if one is stuck waiting for a
# message from a companion which has exited with an error. Our
# relay's .stopService() drops all connections, which ought to
# encourage those threads to terminate soon. If they don't, print a
# warning to ease debugging.
# XXX FIXME there's something in _noclobber test that's not
# waiting for a close, I think -- was pretty relieably getting
# unclean-reactor, but adding a slight pause here stops it...
from twisted.internet import reactor
tp = reactor.getThreadPool()
if not tp.working:
d = defer.succeed(None)
d.addCallback(lambda _: self.sp.stopService())
d.addCallback(lambda _: task.deferLater(reactor, 0.1, lambda: None))
return d
return self.sp.stopService()
# disconnect all callers
d = defer.maybeDeferred(self.sp.stopService)
wait_d = defer.Deferred()
# wait a second, then check to see if it worked
reactor.callLater(1.0, wait_d.callback, None)
def _later(res):
if len(tp.working):
log.msg("wormhole.test.common.ServerBase.tearDown:"
" I was unable to convince all threads to exit.")
tp.dumpStats()
print("tearDown warning: threads are still active")
print("This test will probably hang until one of the"
" clients gives up of their own accord.")
else:
log.msg("wormhole.test.common.ServerBase.tearDown:"
" I convinced all threads to exit.")
return d
wait_d.addCallback(_later)
return wait_d

View File

@ -5,7 +5,7 @@ from twisted.internet import protocol, reactor, defer
from twisted.internet.endpoints import clientFromString, connectProtocol from twisted.internet.endpoints import clientFromString, connectProtocol
from twisted.web import client from twisted.web import client
from .common import ServerBase from .common import ServerBase
from ..server import transit_server from .. import transit_server
class Accumulator(protocol.Protocol): class Accumulator(protocol.Protocol):
def __init__(self): def __init__(self):

View File

@ -188,7 +188,7 @@ class TransitConnection(protocol.Protocol):
self.factory.recordUsage(self._started, "errory", 0, self.factory.recordUsage(self._started, "errory", 0,
total_time, None) total_time, None)
class Transit(protocol.ServerFactory, service.MultiService): class Transit(protocol.ServerFactory):
# I manage pairs of simultaneous connections to a secondary TCP port, # I manage pairs of simultaneous connections to a secondary TCP port,
# both forwarded to the other. Clients must begin each connection with # both forwarded to the other. Clients must begin each connection with
# "please relay TOKEN for SIDE\n" (or a legacy form without the "for # "please relay TOKEN for SIDE\n" (or a legacy form without the "for
@ -221,11 +221,13 @@ class Transit(protocol.ServerFactory, service.MultiService):
MAXTIME = 60*SECONDS MAXTIME = 60*SECONDS
protocol = TransitConnection protocol = TransitConnection
def __init__(self, db, blur_usage): def __init__(self, blur_usage, usage_logfile, stats_file):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self._db = db
self._blur_usage = blur_usage self._blur_usage = blur_usage
self._log_requests = blur_usage is None self._log_requests = blur_usage is None
if usage_logfile:
self._usage_logfile = open(usage_logfile, "a")
self._stats_file = stats_file
self._pending_requests = {} # token -> set((side, TransitConnection)) self._pending_requests = {} # token -> set((side, TransitConnection))
self._active_connections = set() # TransitConnection self._active_connections = set() # TransitConnection
self._counts = collections.defaultdict(int) self._counts = collections.defaultdict(int)
@ -264,10 +266,11 @@ class Transit(protocol.ServerFactory, service.MultiService):
def recordUsage(self, started, result, total_bytes, def recordUsage(self, started, result, total_bytes,
total_time, waiting_time): total_time, waiting_time):
if self._log_requests: if self._log_requests:
log.msg("Transit.recordUsage (%dB)" % total_bytes) log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes)
if self._blur_usage: if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage) started = self._blur_usage * (started // self._blur_usage)
total_bytes = blur_size(total_bytes) total_bytes = blur_size(total_bytes)
if self._usage_logfile
self._db.execute("INSERT INTO `transit_usage`" self._db.execute("INSERT INTO `transit_usage`"
" (`started`, `total_time`, `waiting_time`," " (`started`, `total_time`, `waiting_time`,"
" `total_bytes`, `result`)" " `total_bytes`, `result`)"