diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 442adbf..1be9146 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -8,6 +8,7 @@ from zope.interface import ( Attribute, implementer, ) +from .database import get_db class ITransitClient(Interface): @@ -167,12 +168,41 @@ def blur_size(size): return round_to(size, 100e6) +def create_usage_tracker(blur_usage, log_file, usage_db): + """ + :param int blur_usage: see UsageTracker + + :param log_file: None or a file-like object to write JSON-encoded + lines of usage information to. + + :param usage_db: None or an sqlite3 database connection + + :returns: a new UsageTracker instance configured with backends. + """ + tracker = UsageTracker(blur_usage) + if usage_db: + db = get_db(usage_db) + tracker.add_backend(DatabaseUsageRecorder(db)) + if log_file: + tracker.add_backend(LogFileUsageRecorder(log_file)) + return tracker + + + + class UsageTracker(object): """ Tracks usage statistics of connections """ def __init__(self, blur_usage): + """ + :param int blur_usage: None or the number of seconds to use as a + window around which to blur time statistics (e.g. "60" means times + will be rounded to 1 minute intervals). When blur_usage is + non-zero, sizes will also be rounded into buckets of "one + megabyte", "one gigabyte" or "lots" + """ self._backends = set() self._blur_usage = blur_usage @@ -223,6 +253,9 @@ class UsageTracker(object): started = self._blur_usage * (started // self._blur_usage) total_bytes = blur_size(total_bytes) + # This is "a dict" instead of "kwargs" because we have to make + # it into a dict for the log use-case and in-memory/testing + # use-case anyway so this is less repeats of the names. self._notify_backends({ "started": started, "total_time": total_time, @@ -233,7 +266,7 @@ class UsageTracker(object): def _notify_backends(self, data): """ - Internal helper. Tell every backend we have about a new usage. + Internal helper. Tell every backend we have about a new usage record. """ for backend in self._backends: backend.record_usage(**data) @@ -241,8 +274,11 @@ class UsageTracker(object): class ActiveConnections(object): """ - Tracks active connections. A connection is 'active' when both - sides have shown up and they are glued together. + Tracks active connections. + + A connection is 'active' when both sides have shown up and they + are glued together (and thus could be passing data back and forth + if any is flowing). """ def __init__(self): self._connections = set() @@ -268,12 +304,20 @@ class ActiveConnections(object): class PendingRequests(object): """ - Tracks the tokens we have received from client connections and - maps them to their partner connections for requests that haven't - yet been 'glued together' (that is, one side hasn't yet shown up). + Tracks outstanding (non-"active") requests. + + We register client connections against the tokens we have + received. When the other side shows up we can thus match it to the + correct partner connection. At this point, the connection becomes + "active" is and is thus no longer "pending" and so will no longer + be in this collection. """ def __init__(self, active_connections): + """ + :param active_connections: an instance of ActiveConnections where + connections are put when both sides arrive. + """ self._requests = defaultdict(set) # token -> set((side, TransitConnection)) self._active = active_connections @@ -285,16 +329,23 @@ class PendingRequests(object): if token in self._requests: self._requests[token].discard((side, tc)) if not self._requests[token]: + # no more sides; token is dead del self._requests[token] self._active.unregister(tc) - def register_token(self, token, new_side, new_tc): + def register(self, token, new_side, new_tc): """ A client has connected and successfully offered a token (and optional 'side' token). If this is the first one for this token, we merely remember it. If it is the second side for this token we connect them together. + :param bytes token: the token for this connection. + + :param bytes new_side: None or the side token for this connection + + :param TransitServerState new_tc: the state-machine of the connection + :returns bool: True if we are the first side to register this token """ @@ -562,7 +613,7 @@ class TransitServerState(object): """ self._token = token self._side = side - self._first = self._pending_requests.register_token(token, side, self) + self._first = self._pending_requests.register(token, side, self) @_machine.state(initial=True) def listening(self): diff --git a/src/wormhole_transit_relay/server_tap.py b/src/wormhole_transit_relay/server_tap.py index 8fbfde2..cbf3efa 100644 --- a/src/wormhole_transit_relay/server_tap.py +++ b/src/wormhole_transit_relay/server_tap.py @@ -6,6 +6,7 @@ from twisted.application.internet import (TimerService, StreamServerEndpointService) from twisted.internet import endpoints from . import transit_server +from .server_state import create_usage_tracker from .increase_rlimits import increase_rlimits LONGDESC = """\ @@ -32,13 +33,18 @@ class Options(usage.Options): def makeService(config, reactor=reactor): increase_rlimits() ep = endpoints.serverFromString(reactor, config["port"]) # to listen - log_file = (os.fdopen(int(config["log-fd"]), "w") - if config["log-fd"] is not None - else None) - f = transit_server.Transit(blur_usage=config["blur-usage"], - log_file=log_file, - usage_db=config["usage-db"]) + log_file = ( + os.fdopen(int(config["log-fd"]), "w") + if config["log-fd"] is not None + else None + ) + usage = create_usage_tracker( + blur_usage=config["blur-usage"], + log_file=log_file, + usage_db=config["usage-db"], + ) + factory = transit_server.Transit(usage) parent = MultiService() - StreamServerEndpointService(ep, f).setServiceParent(parent) - TimerService(5*60.0, f.timerUpdateStats).setServiceParent(parent) + StreamServerEndpointService(ep, factory).setServiceParent(parent) +### FIXME TODO TimerService(5*60.0, factory.timerUpdateStats).setServiceParent(parent) return parent diff --git a/src/wormhole_transit_relay/test/common.py b/src/wormhole_transit_relay/test/common.py index 8073ee0..adbecf8 100644 --- a/src/wormhole_transit_relay/test/common.py +++ b/src/wormhole_transit_relay/test/common.py @@ -11,6 +11,8 @@ from zope.interface import ( from ..transit_server import ( Transit, ) +from ..transit_server import Transit +from ..server_state import create_usage_tracker class IRelayTestClient(Interface): @@ -42,6 +44,7 @@ class IRelayTestClient(Interface): Erase any received data to this point. """ + class ServerBase: log_requests = False @@ -62,11 +65,12 @@ class ServerBase: self.flush() def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None): - self._transit_server = Transit( + usage = create_usage_tracker( blur_usage=blur_usage, log_file=log_file, usage_db=usage_db, ) + self._transit_server = Transit(usage) self._transit_server._debug_log = self.log_requests def new_protocol(self): diff --git a/src/wormhole_transit_relay/test/test_service.py b/src/wormhole_transit_relay/test/test_service.py index 003de32..f84b01a 100644 --- a/src/wormhole_transit_relay/test/test_service.py +++ b/src/wormhole_transit_relay/test/test_service.py @@ -11,7 +11,7 @@ class Service(unittest.TestCase): def test_defaults(self): o = server_tap.Options() o.parseOptions([]) - with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t: + with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: s = server_tap.makeService(o) self.assertEqual(t.mock_calls, [mock.call(blur_usage=None, @@ -21,7 +21,7 @@ class Service(unittest.TestCase): def test_blur(self): o = server_tap.Options() o.parseOptions(["--blur-usage=60"]) - with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t: + with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: server_tap.makeService(o) self.assertEqual(t.mock_calls, [mock.call(blur_usage=60, @@ -31,7 +31,7 @@ class Service(unittest.TestCase): o = server_tap.Options() o.parseOptions(["--log-fd=99"]) fd = object() - with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t: + with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: with mock.patch("wormhole_transit_relay.server_tap.os.fdopen", return_value=fd) as f: server_tap.makeService(o) diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index 4c0b036..bce450b 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -6,6 +6,7 @@ except ImportError: import mock from twisted.trial import unittest from ..transit_server import Transit +from ..server_state import create_usage_tracker from .. import database class DB(unittest.TestCase): @@ -20,7 +21,7 @@ class DB(unittest.TestCase): os.mkdir(d) usage_db = os.path.join(d, "usage.sqlite") with mock.patch("time.time", return_value=T+0): - t = Transit(blur_usage=None, log_file=None, usage_db=usage_db) + t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=usage_db)) db = self.open_db(usage_db) usage = list(t.usage._backends)[0] @@ -58,7 +59,7 @@ class DB(unittest.TestCase): waiting=0, connected=0)) def test_no_db(self): - t = Transit(blur_usage=None, log_file=None, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=None)) self.assertEqual(0, len(t.usage._backends)) @@ -66,7 +67,7 @@ class LogToStdout(unittest.TestCase): def test_log(self): # emit lines of JSON to log_file, if set log_file = io.StringIO() - t = Transit(blur_usage=None, log_file=log_file, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None)) with mock.patch("time.time", return_value=133): t.usage.record( started=123, @@ -84,7 +85,7 @@ class LogToStdout(unittest.TestCase): # if blurring is enabled, timestamps should be rounded to the # requested amount, and sizes should be rounded up too log_file = io.StringIO() - t = Transit(blur_usage=60, log_file=log_file, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None)) with mock.patch("time.time", return_value=123 + 10): t.usage.record( @@ -101,7 +102,7 @@ class LogToStdout(unittest.TestCase): "mood": "happy"}) def test_do_not_log(self): - t = Transit(blur_usage=60, log_file=None, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=60, log_file=None, usage_db=None)) t.usage.record( started=123, buddy_started=124, diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index ee71b06..a897bd4 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -4,7 +4,6 @@ from collections import defaultdict from twisted.python import log from twisted.internet import protocol from twisted.protocols.basic import LineReceiver -from .database import get_db SECONDS = 1.0 MINUTE = 60*SECONDS @@ -79,7 +78,7 @@ class TransitConnection(LineReceiver): self.factory.usage, ) self._state.connection_made(self) - self._log_requests = self.factory._log_requests +## self._log_requests = self.factory._log_requests try: self.transport.setTcpKeepAlive(True) except AttributeError: @@ -131,8 +130,8 @@ class TransitConnection(LineReceiver): # there will be two producer/consumer pairs. def __buddy_disconnected(self): - if self._log_requests: - log.msg("buddy_disconnected %s" % self.describeToken()) +## if self._log_requests: +## log.msg("buddy_disconnected %s" % self.describeToken()) self._buddy = None self._mood = "jilted" self.transport.loseConnection() @@ -210,117 +209,61 @@ class TransitConnection(LineReceiver): class Transit(protocol.ServerFactory): - # I manage pairs of simultaneous connections to a secondary TCP port, - # both forwarded to the other. Clients must begin each connection with - # "please relay TOKEN for SIDE\n" (or a legacy form without the "for - # SIDE"). Two connections match if they use the same TOKEN and have - # different SIDEs (the redundant connections are dropped when a match is - # made). Legacy connections match any with the same TOKEN, ignoring SIDE - # (so two legacy connections will match each other). + """ + I manage pairs of simultaneous connections to a secondary TCP port, + both forwarded to the other. Clients must begin each connection with + "please relay TOKEN for SIDE\n" (or a legacy form without the "for + SIDE"). Two connections match if they use the same TOKEN and have + different SIDEs (the redundant connections are dropped when a match is + made). Legacy connections match any with the same TOKEN, ignoring SIDE + (so two legacy connections will match each other). - # I will send "ok\n" when the matching connection is established, or - # disconnect if no matching connection is made within MAX_WAIT_TIME - # seconds. I will disconnect if you send data before the "ok\n". All data - # you get after the "ok\n" will be from the other side. You will not - # receive "ok\n" until the other side has also connected and submitted a - # matching token (and differing SIDE). + I will send "ok\n" when the matching connection is established, or + disconnect if no matching connection is made within MAX_WAIT_TIME + seconds. I will disconnect if you send data before the "ok\n". All data + you get after the "ok\n" will be from the other side. You will not + receive "ok\n" until the other side has also connected and submitted a + matching token (and differing SIDE). - # In addition, the connections will be dropped after MAXLENGTH bytes have - # been sent by either side, or MAXTIME seconds have elapsed after the - # matching connections were established. A future API will reveal these - # limits to clients instead of causing mysterious spontaneous failures. + In addition, the connections will be dropped after MAXLENGTH bytes have + been sent by either side, or MAXTIME seconds have elapsed after the + matching connections were established. A future API will reveal these + limits to clients instead of causing mysterious spontaneous failures. - # These relay connections are not half-closeable (unlike full TCP - # connections, applications will not receive any data after half-closing - # their outgoing side). Applications must negotiate shutdown with their - # peer and not close the connection until all data has finished - # transferring in both directions. Applications which only need to send - # data in one direction can use close() as usual. + These relay connections are not half-closeable (unlike full TCP + connections, applications will not receive any data after half-closing + their outgoing side). Applications must negotiate shutdown with their + peer and not close the connection until all data has finished + transferring in both directions. Applications which only need to send + data in one direction can use close() as usual. + """ + # TODO: unused MAX_WAIT_TIME = 30*SECONDS + # TODO: unused MAXLENGTH = 10*MB + # TODO: unused MAXTIME = 60*SECONDS protocol = TransitConnection - def __init__(self, blur_usage, log_file, usage_db): + def __init__(self, usage): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) - self.usage = UsageTracker(blur_usage) - self._blur_usage = blur_usage - self._log_requests = blur_usage is None - if self._blur_usage: - log.msg("blurring access times to %d seconds" % self._blur_usage) - log.msg("not logging Transit connections to Twisted log") - else: - log.msg("not blurring access times") + self.usage = usage + if False: + # these logs-message should be made by the usage-tracker + # .. or in the "tap" setup? + if blur_usage: + log.msg("blurring access times to %d seconds" % self._blur_usage) + log.msg("not logging Transit connections to Twisted log") + else: + log.msg("not blurring access times") self._debug_log = False -## self._log_file = log_file - self._db = None - if usage_db: - self._db = get_db(usage_db) - self.usage.add_backend(DatabaseUsageRecorder(self._db)) - if log_file: - self.usage.add_backend(LogFileUsageRecorder(log_file)) + self._rebooted = time.time() - # we don't track TransitConnections until they submit a token -## self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) -## self._active_connections = set() # TransitConnection - - def transitFinished(self, tc, token, side, description): - if token in self._pending_requests: - side_tc = (side, tc) - self._pending_requests[token].discard(side_tc) - if not self._pending_requests[token]: # set is now empty - del self._pending_requests[token] - if self._debug_log: - log.msg("transitFinished %s" % (description,)) - self._active_connections.discard(tc) - # we could update the usage database "current" row immediately, or wait - # until the 5-minute timer updates it. If we update it now, just after - # losing a connection, we should probably also update it just after - # establishing one (at the end of connection_got_token). For now I'm - # going to omit these, but maybe someday we'll turn them both on. The - # consequence is that a manual execution of the munin scripts ("munin - # run wormhole_transit_active") will give the wrong value just after a - # connect/disconnect event. Actual munin graphs should accurately - # report connections that last longer than the 5-minute sampling - # window, which is what we actually care about. - #self.timerUpdateStats() - - def recordUsage(self, started, result, total_bytes, - total_time, waiting_time): - if self._debug_log: - log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes) - if self._blur_usage: - started = self._blur_usage * (started // self._blur_usage) - total_bytes = blur_size(total_bytes) - if self._log_file is not None: - data = {"started": started, - "total_time": total_time, - "waiting_time": waiting_time, - "total_bytes": total_bytes, - "mood": result, - } - self._log_file.write(json.dumps(data)+"\n") - self._log_file.flush() - if self._db: - self._db.execute("INSERT INTO `usage`" - " (`started`, `total_time`, `waiting_time`," - " `total_bytes`, `result`)" - " VALUES (?,?,?, ?,?)", - (started, total_time, waiting_time, - total_bytes, result)) - # XXXX aaaaaAA! okay, so just this one type of usage also - # does some other random stats-stuff; need more - # refactorizing - self._update_stats() - self._db.commit() - - def timerUpdateStats(self): - if self._db: - self._update_stats() - self._db.commit() + # XXX TODO self._rebooted and the below could be in a separate + # object? or in the DatabaseUsageRecorder .. but not here def _update_stats(self): # current status: should be zero when idle rebooted = self._rebooted