From b51237d958343a1c3f0d83c05191ae40b70f4502 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 00:06:19 -0700 Subject: [PATCH] start of refactoring usage-recording: pass one test --- src/wormhole_transit_relay/server_state.py | 85 +++++++++++++++++-- .../test/test_transit_server.py | 8 +- src/wormhole_transit_relay/transit_server.py | 15 +++- 3 files changed, 90 insertions(+), 18 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index ffa0819..21b2464 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -1,13 +1,18 @@ +import time from collections import defaultdict import automat from zope.interface import ( Interface, + Attribute, implementer, ) class ITransitClient(Interface): + + started_time = Attribute("timestamp when the connection was established") + def send(data): """ Send some byets to the client @@ -34,6 +39,11 @@ class ITransitClient(Interface): class TestClient(object): _partner = None _data = b"" + _started_time = time.time() + + @property + def started_time(self): + return _started_time def send_to_partner(self, data): print("{} GOT:{}".format(id(self), repr(data))) @@ -57,6 +67,53 @@ class TestClient(object): print("disconnect_partner: {}".format(id(self._partner))) +class UsageRecorder(object): + """ + Tracks usage statistics of connections + """ + + def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): + """ + :param int started: timestamp when our connection started + + :param int buddy_started: None, or the timestamp when our + partner's connection started (will be None if we don't yet + have a partner). + + :param str result: a label for the result of the connection + (one of the "moods"). + + :param int bytes_sent: number of bytes we sent + + :param int buddy_bytes: number of bytes our partner sent + """ + + # ideally self._reactor.seconds() or similar, but .. + finished = time.time() + if buddy_started is not None: + starts = [started, buddy_started] + total_time = finished - min(starts) + waiting_time = max(starts) - min(starts) + total_bytes = bytes_sent + buddy_bytes + else: + total_time = finished - started + waiting_time = None + total_bytes = bytes_sent + # probably want like "backends" here or something? original + # code logs some JSON (maybe) and writes to a database (maybe) + # and tests record in memory. + self.json_record({ + "started": started, + "total_time": total_time, + "waiting_time": waiting_time, + "total_bytes": total_bytes, + "mood": result, + }) + + def json_record(self, data): + pass + + class ActiveConnections(object): """ Tracks active connections. A connection is 'active' when both @@ -163,8 +220,9 @@ class TransitServerState(object): _mood = "empty" _total_sent = 0 - def __init__(self, pending_requests): + def __init__(self, pending_requests, usage_recorder): self._pending_requests = pending_requests + self._usage = usage_recorder def get_token(self): """ @@ -300,6 +358,17 @@ class TransitServerState(object): def _disconnect_partner(self): self._client.disconnect_partner() + # some outputs to record "usage" information .. + @_machine.output() + def _record_usage(self): + self._usage.record( + started=self._client.started_time, + buddy_started=self._buddy._client.started_time if self._buddy is not None else None, + result=self._mood, + bytes_sent=self._total_sent, + buddy_bytes=self._buddy._total_sent if self._buddy is not None else None + ) + # some outputs to record the "mood" .. @_machine.output() def _mood_happy(self): @@ -404,17 +473,17 @@ class TransitServerState(object): wait_relay.upon( bad_token, enter=done, - outputs=[_mood_errory, _send_bad, _disconnect], + outputs=[_mood_errory, _send_bad, _disconnect, _record_usage], ) wait_relay.upon( got_bytes, enter=done, - outputs=[_count_bytes, _mood_errory, _disconnect], + outputs=[_count_bytes, _mood_errory, _disconnect, _record_usage], ) wait_relay.upon( connection_lost, enter=done, - outputs=[_disconnect], + outputs=[_disconnect, _record_usage], ) wait_partner.upon( @@ -425,12 +494,12 @@ class TransitServerState(object): wait_partner.upon( connection_lost, enter=done, - outputs=[_mood_lonely, _unregister], + outputs=[_mood_lonely, _unregister, _record_usage], ) wait_partner.upon( got_bytes, enter=done, - outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister], + outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister, _record_usage], ) relaying.upon( @@ -441,12 +510,12 @@ class TransitServerState(object): relaying.upon( connection_lost, enter=done, - outputs=[_mood_happy_if_first, _disconnect_partner, _unregister], + outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage], ) relaying.upon( partner_connection_lost, enter=done, - outputs=[_mood_happy_if_second, _disconnect, _unregister], + outputs=[_mood_happy_if_second, _disconnect, _unregister, _record_usage], ) done.upon( diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index bca740e..d151b21 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -340,10 +340,7 @@ class Usage(ServerBase, unittest.TestCase): def setUp(self): super(Usage, self).setUp() self._usage = [] - def record(started, result, total_bytes, total_time, waiting_time): - self._usage.append((started, result, total_bytes, - total_time, waiting_time)) - self._transit_server.recordUsage = record + self._transit_server.usage.json_record = self._usage.append def test_empty(self): p1 = self.new_protocol() @@ -365,8 +362,7 @@ class Usage(ServerBase, unittest.TestCase): # that will log the "empty" usage event self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "empty", self._usage) + self.assertEqual("empty", self._usage[0]["mood"]) def test_errory(self): p1 = self.new_protocol() diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index abd6406..1727610 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -29,6 +29,7 @@ from wormhole_transit_relay.server_state import ( TransitServerState, PendingRequests, ActiveConnections, + UsageRecorder, ITransitClient, ) from zope.interface import implementer @@ -41,6 +42,7 @@ class TransitConnection(LineReceiver): # This must be >= to the longest possible handshake message. MAX_LENGTH = 1024 + started_time = None def send(self, data): """ @@ -78,9 +80,15 @@ class TransitConnection(LineReceiver): return d def connectionMade(self): - self._state = TransitServerState(self.factory.pending_requests) + # ideally more like self._reactor.seconds() ... but Twisted + # doesn't have a good way to get the reactor for a protocol + # (besides "use the global one") + self.started_time = time.time() + self._state = TransitServerState( + self.factory.pending_requests, + self.factory.usage, + ) self._state.connection_made(self) - self._started = time.time() self._log_requests = self.factory._log_requests try: self.transport.setTcpKeepAlive(True) @@ -154,8 +162,6 @@ class TransitConnection(LineReceiver): self.transport.loseConnection() def connectionLost(self, reason): - finished = time.time() - total_time = finished - self._started self._state.connection_lost() # XXX FIXME record usage @@ -249,6 +255,7 @@ class Transit(protocol.ServerFactory): def __init__(self, blur_usage, log_file, usage_db): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) + self.usage = UsageRecorder() self._blur_usage = blur_usage self._log_requests = blur_usage is None if self._blur_usage: