From b7bcdfdca3d762a1e9bcdb8239badb16bb17bc28 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 02:18:36 -0700 Subject: [PATCH] more stats / recording works --- src/wormhole_transit_relay/server_state.py | 45 +++++++++++++++---- src/wormhole_transit_relay/test/test_stats.py | 40 ++++++++++++----- .../test/test_transit_server.py | 40 ++++++++++------- src/wormhole_transit_relay/transit_server.py | 26 +++++------ 4 files changed, 99 insertions(+), 52 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index b98615e..442adbf 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -1,4 +1,5 @@ import time +import json from collections import defaultdict import automat @@ -126,29 +127,54 @@ class LogFileUsageRecorder: "total_bytes": total_bytes, "mood": mood, } - self._file.write(json.dumps(data)) - + self._file.write(json.dumps(data) + "\n") + self._file.flush() @implementer(IUsageWriter) class DatabaseUsageRecorder: - def __init__(self, _db): + def __init__(self, db): self._db = db def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None): """ IUsageWriter. """ + self._db.execute( + "INSERT INTO `usage`" + " (`started`, `total_time`, `waiting_time`," + " `total_bytes`, `result`)" + " VALUES (?,?,?,?,?)", + (started, total_time, waiting_time, total_bytes, mood) + ) + # XXX FIXME see comment in transit_server + #self._update_stats() + self._db.commit() -class UsageRecorder(object): +def round_to(size, coarseness): + return int(coarseness*(1+int((size-1)/coarseness))) + + +def blur_size(size): + if size == 0: + return 0 + if size < 1e6: + return round_to(size, 10e3) + if size < 1e9: + return round_to(size, 1e6) + return round_to(size, 100e6) + + +class UsageTracker(object): """ Tracks usage statistics of connections """ - def __init__(self): + def __init__(self, blur_usage): self._backends = set() + self._blur_usage = blur_usage def add_backend(self, backend): """ @@ -181,7 +207,6 @@ class UsageRecorder(object): :param int buddy_bytes: number of bytes our partner sent """ - # ideally self._reactor.seconds() or similar, but .. finished = time.time() if buddy_started is not None: @@ -193,9 +218,11 @@ class UsageRecorder(object): total_time = finished - started waiting_time = None total_bytes = bytes_sent - # probably want like "backends" here or something? original - # code logs some JSON (maybe) and writes to a database (maybe) - # and tests record in memory. + + if self._blur_usage: + started = self._blur_usage * (started // self._blur_usage) + total_bytes = blur_size(total_bytes) + self._notify_backends({ "started": started, "total_time": total_time, diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index 1f114b1..4c0b036 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -22,9 +22,10 @@ class DB(unittest.TestCase): with mock.patch("time.time", return_value=T+0): t = Transit(blur_usage=None, log_file=None, usage_db=usage_db) db = self.open_db(usage_db) + usage = list(t.usage._backends)[0] with mock.patch("time.time", return_value=T+1): - t.recordUsage(started=123, result="happy", total_bytes=100, + usage.record_usage(started=123, mood="happy", total_bytes=100, total_time=10, waiting_time=2) self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), [dict(result="happy", started=123, @@ -36,7 +37,7 @@ class DB(unittest.TestCase): waiting=0, connected=0)) with mock.patch("time.time", return_value=T+2): - t.recordUsage(started=150, result="errory", total_bytes=200, + usage.record_usage(started=150, mood="errory", total_bytes=200, total_time=11, waiting_time=3) self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), [dict(result="happy", started=123, @@ -58,18 +59,22 @@ class DB(unittest.TestCase): def test_no_db(self): t = Transit(blur_usage=None, log_file=None, usage_db=None) + self.assertEqual(0, len(t.usage._backends)) - t.recordUsage(started=123, result="happy", total_bytes=100, - total_time=10, waiting_time=2) - t.timerUpdateStats() class LogToStdout(unittest.TestCase): def test_log(self): # emit lines of JSON to log_file, if set log_file = io.StringIO() t = Transit(blur_usage=None, log_file=log_file, usage_db=None) - t.recordUsage(started=123, result="happy", total_bytes=100, - total_time=10, waiting_time=2) + with mock.patch("time.time", return_value=133): + t.usage.record( + started=123, + buddy_started=125, + result="happy", + bytes_sent=100, + buddy_bytes=0, + ) self.assertEqual(json.loads(log_file.getvalue()), {"started": 123, "total_time": 10, "waiting_time": 2, "total_bytes": 100, @@ -80,8 +85,16 @@ class LogToStdout(unittest.TestCase): # requested amount, and sizes should be rounded up too log_file = io.StringIO() t = Transit(blur_usage=60, log_file=log_file, usage_db=None) - t.recordUsage(started=123, result="happy", total_bytes=11999, - total_time=10, waiting_time=2) + + with mock.patch("time.time", return_value=123 + 10): + t.usage.record( + started=123, + buddy_started=125, + result="happy", + bytes_sent=11999, + buddy_bytes=8001, + ) + print(log_file.getvalue()) self.assertEqual(json.loads(log_file.getvalue()), {"started": 120, "total_time": 10, "waiting_time": 2, "total_bytes": 20000, @@ -89,5 +102,10 @@ class LogToStdout(unittest.TestCase): def test_do_not_log(self): t = Transit(blur_usage=60, log_file=None, usage_db=None) - t.recordUsage(started=123, result="happy", total_bytes=11999, - total_time=10, waiting_time=2) + t.usage.record( + started=123, + buddy_started=124, + result="happy", + bytes_sent=11999, + buddy_bytes=12, + ) diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index 22c8ee4..60cc8c9 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -3,7 +3,10 @@ from binascii import hexlify from twisted.trial import unittest from .common import ServerBase from .. import transit_server -from ..server_state import MemoryUsageRecorder +from ..server_state import ( + MemoryUsageRecorder, + blur_size, +) def handshake(token, side=None): hs = b"please relay " + hexlify(token) @@ -21,22 +24,21 @@ class _Transit: ]) def test_blur_size(self): - blur = transit_server.blur_size - self.failUnlessEqual(blur(0), 0) - self.failUnlessEqual(blur(1), 10e3) - self.failUnlessEqual(blur(10e3), 10e3) - self.failUnlessEqual(blur(10e3+1), 20e3) - self.failUnlessEqual(blur(15e3), 20e3) - self.failUnlessEqual(blur(20e3), 20e3) - self.failUnlessEqual(blur(1e6), 1e6) - self.failUnlessEqual(blur(1e6+1), 2e6) - self.failUnlessEqual(blur(1.5e6), 2e6) - self.failUnlessEqual(blur(2e6), 2e6) - self.failUnlessEqual(blur(900e6), 900e6) - self.failUnlessEqual(blur(1000e6), 1000e6) - self.failUnlessEqual(blur(1050e6), 1100e6) - self.failUnlessEqual(blur(1100e6), 1100e6) - self.failUnlessEqual(blur(1150e6), 1200e6) + self.failUnlessEqual(blur_size(0), 0) + self.failUnlessEqual(blur_size(1), 10e3) + self.failUnlessEqual(blur_size(10e3), 10e3) + self.failUnlessEqual(blur_size(10e3+1), 20e3) + self.failUnlessEqual(blur_size(15e3), 20e3) + self.failUnlessEqual(blur_size(20e3), 20e3) + self.failUnlessEqual(blur_size(1e6), 1e6) + self.failUnlessEqual(blur_size(1e6+1), 2e6) + self.failUnlessEqual(blur_size(1.5e6), 2e6) + self.failUnlessEqual(blur_size(2e6), 2e6) + self.failUnlessEqual(blur_size(900e6), 900e6) + self.failUnlessEqual(blur_size(1000e6), 1000e6) + self.failUnlessEqual(blur_size(1050e6), 1100e6) + self.failUnlessEqual(blur_size(1100e6), 1100e6) + self.failUnlessEqual(blur_size(1150e6), 1200e6) def test_register(self): p1 = self.new_protocol() @@ -331,12 +333,15 @@ class _Transit: # hang up before sending anything p1.disconnect() + class TransitWithLogs(_Transit, ServerBase, unittest.TestCase): log_requests = True + class TransitWithoutLogs(_Transit, ServerBase, unittest.TestCase): log_requests = False + class Usage(ServerBase, unittest.TestCase): log_requests = True @@ -344,6 +349,7 @@ class Usage(ServerBase, unittest.TestCase): super(Usage, self).setUp() self._usage = MemoryUsageRecorder() self._transit_server.usage.add_backend(self._usage) +## self._transit_server.usage._blur_usage = None def test_empty(self): p1 = self.new_protocol() diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index fba84be..ee71b06 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -12,24 +12,14 @@ HOUR = 60*MINUTE DAY = 24*HOUR MB = 1000*1000 -def round_to(size, coarseness): - return int(coarseness*(1+int((size-1)/coarseness))) - -def blur_size(size): - if size == 0: - return 0 - if size < 1e6: - return round_to(size, 10e3) - if size < 1e9: - return round_to(size, 1e6) - return round_to(size, 100e6) - from wormhole_transit_relay.server_state import ( TransitServerState, PendingRequests, ActiveConnections, - UsageRecorder, + UsageTracker, + DatabaseUsageRecorder, + LogFileUsageRecorder, ITransitClient, ) from zope.interface import implementer @@ -255,7 +245,7 @@ class Transit(protocol.ServerFactory): def __init__(self, blur_usage, log_file, usage_db): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) - self.usage = UsageRecorder() + self.usage = UsageTracker(blur_usage) self._blur_usage = blur_usage self._log_requests = blur_usage is None if self._blur_usage: @@ -264,10 +254,13 @@ class Transit(protocol.ServerFactory): else: log.msg("not blurring access times") self._debug_log = False - self._log_file = log_file +## self._log_file = log_file self._db = None if usage_db: self._db = get_db(usage_db) + self.usage.add_backend(DatabaseUsageRecorder(self._db)) + if log_file: + self.usage.add_backend(LogFileUsageRecorder(log_file)) self._rebooted = time.time() # we don't track TransitConnections until they submit a token ## self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) @@ -317,6 +310,9 @@ class Transit(protocol.ServerFactory): " VALUES (?,?,?, ?,?)", (started, total_time, waiting_time, total_bytes, result)) + # XXXX aaaaaAA! okay, so just this one type of usage also + # does some other random stats-stuff; need more + # refactorizing self._update_stats() self._db.commit()