more stats / recording works
This commit is contained in:
parent
53864f57f0
commit
b7bcdfdca3
|
@ -1,4 +1,5 @@
|
|||
import time
|
||||
import json
|
||||
from collections import defaultdict
|
||||
|
||||
import automat
|
||||
|
@ -126,29 +127,54 @@ class LogFileUsageRecorder:
|
|||
"total_bytes": total_bytes,
|
||||
"mood": mood,
|
||||
}
|
||||
self._file.write(json.dumps(data))
|
||||
|
||||
self._file.write(json.dumps(data) + "\n")
|
||||
self._file.flush()
|
||||
|
||||
|
||||
@implementer(IUsageWriter)
|
||||
class DatabaseUsageRecorder:
|
||||
|
||||
def __init__(self, _db):
|
||||
def __init__(self, db):
|
||||
self._db = db
|
||||
|
||||
def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None):
|
||||
"""
|
||||
IUsageWriter.
|
||||
"""
|
||||
self._db.execute(
|
||||
"INSERT INTO `usage`"
|
||||
" (`started`, `total_time`, `waiting_time`,"
|
||||
" `total_bytes`, `result`)"
|
||||
" VALUES (?,?,?,?,?)",
|
||||
(started, total_time, waiting_time, total_bytes, mood)
|
||||
)
|
||||
# XXX FIXME see comment in transit_server
|
||||
#self._update_stats()
|
||||
self._db.commit()
|
||||
|
||||
|
||||
class UsageRecorder(object):
|
||||
def round_to(size, coarseness):
|
||||
return int(coarseness*(1+int((size-1)/coarseness)))
|
||||
|
||||
|
||||
def blur_size(size):
|
||||
if size == 0:
|
||||
return 0
|
||||
if size < 1e6:
|
||||
return round_to(size, 10e3)
|
||||
if size < 1e9:
|
||||
return round_to(size, 1e6)
|
||||
return round_to(size, 100e6)
|
||||
|
||||
|
||||
class UsageTracker(object):
|
||||
"""
|
||||
Tracks usage statistics of connections
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, blur_usage):
|
||||
self._backends = set()
|
||||
self._blur_usage = blur_usage
|
||||
|
||||
def add_backend(self, backend):
|
||||
"""
|
||||
|
@ -181,7 +207,6 @@ class UsageRecorder(object):
|
|||
|
||||
:param int buddy_bytes: number of bytes our partner sent
|
||||
"""
|
||||
|
||||
# ideally self._reactor.seconds() or similar, but ..
|
||||
finished = time.time()
|
||||
if buddy_started is not None:
|
||||
|
@ -193,9 +218,11 @@ class UsageRecorder(object):
|
|||
total_time = finished - started
|
||||
waiting_time = None
|
||||
total_bytes = bytes_sent
|
||||
# probably want like "backends" here or something? original
|
||||
# code logs some JSON (maybe) and writes to a database (maybe)
|
||||
# and tests record in memory.
|
||||
|
||||
if self._blur_usage:
|
||||
started = self._blur_usage * (started // self._blur_usage)
|
||||
total_bytes = blur_size(total_bytes)
|
||||
|
||||
self._notify_backends({
|
||||
"started": started,
|
||||
"total_time": total_time,
|
||||
|
|
|
@ -22,9 +22,10 @@ class DB(unittest.TestCase):
|
|||
with mock.patch("time.time", return_value=T+0):
|
||||
t = Transit(blur_usage=None, log_file=None, usage_db=usage_db)
|
||||
db = self.open_db(usage_db)
|
||||
usage = list(t.usage._backends)[0]
|
||||
|
||||
with mock.patch("time.time", return_value=T+1):
|
||||
t.recordUsage(started=123, result="happy", total_bytes=100,
|
||||
usage.record_usage(started=123, mood="happy", total_bytes=100,
|
||||
total_time=10, waiting_time=2)
|
||||
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
|
||||
[dict(result="happy", started=123,
|
||||
|
@ -36,7 +37,7 @@ class DB(unittest.TestCase):
|
|||
waiting=0, connected=0))
|
||||
|
||||
with mock.patch("time.time", return_value=T+2):
|
||||
t.recordUsage(started=150, result="errory", total_bytes=200,
|
||||
usage.record_usage(started=150, mood="errory", total_bytes=200,
|
||||
total_time=11, waiting_time=3)
|
||||
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
|
||||
[dict(result="happy", started=123,
|
||||
|
@ -58,18 +59,22 @@ class DB(unittest.TestCase):
|
|||
|
||||
def test_no_db(self):
|
||||
t = Transit(blur_usage=None, log_file=None, usage_db=None)
|
||||
self.assertEqual(0, len(t.usage._backends))
|
||||
|
||||
t.recordUsage(started=123, result="happy", total_bytes=100,
|
||||
total_time=10, waiting_time=2)
|
||||
t.timerUpdateStats()
|
||||
|
||||
class LogToStdout(unittest.TestCase):
|
||||
def test_log(self):
|
||||
# emit lines of JSON to log_file, if set
|
||||
log_file = io.StringIO()
|
||||
t = Transit(blur_usage=None, log_file=log_file, usage_db=None)
|
||||
t.recordUsage(started=123, result="happy", total_bytes=100,
|
||||
total_time=10, waiting_time=2)
|
||||
with mock.patch("time.time", return_value=133):
|
||||
t.usage.record(
|
||||
started=123,
|
||||
buddy_started=125,
|
||||
result="happy",
|
||||
bytes_sent=100,
|
||||
buddy_bytes=0,
|
||||
)
|
||||
self.assertEqual(json.loads(log_file.getvalue()),
|
||||
{"started": 123, "total_time": 10,
|
||||
"waiting_time": 2, "total_bytes": 100,
|
||||
|
@ -80,8 +85,16 @@ class LogToStdout(unittest.TestCase):
|
|||
# requested amount, and sizes should be rounded up too
|
||||
log_file = io.StringIO()
|
||||
t = Transit(blur_usage=60, log_file=log_file, usage_db=None)
|
||||
t.recordUsage(started=123, result="happy", total_bytes=11999,
|
||||
total_time=10, waiting_time=2)
|
||||
|
||||
with mock.patch("time.time", return_value=123 + 10):
|
||||
t.usage.record(
|
||||
started=123,
|
||||
buddy_started=125,
|
||||
result="happy",
|
||||
bytes_sent=11999,
|
||||
buddy_bytes=8001,
|
||||
)
|
||||
print(log_file.getvalue())
|
||||
self.assertEqual(json.loads(log_file.getvalue()),
|
||||
{"started": 120, "total_time": 10,
|
||||
"waiting_time": 2, "total_bytes": 20000,
|
||||
|
@ -89,5 +102,10 @@ class LogToStdout(unittest.TestCase):
|
|||
|
||||
def test_do_not_log(self):
|
||||
t = Transit(blur_usage=60, log_file=None, usage_db=None)
|
||||
t.recordUsage(started=123, result="happy", total_bytes=11999,
|
||||
total_time=10, waiting_time=2)
|
||||
t.usage.record(
|
||||
started=123,
|
||||
buddy_started=124,
|
||||
result="happy",
|
||||
bytes_sent=11999,
|
||||
buddy_bytes=12,
|
||||
)
|
||||
|
|
|
@ -3,7 +3,10 @@ from binascii import hexlify
|
|||
from twisted.trial import unittest
|
||||
from .common import ServerBase
|
||||
from .. import transit_server
|
||||
from ..server_state import MemoryUsageRecorder
|
||||
from ..server_state import (
|
||||
MemoryUsageRecorder,
|
||||
blur_size,
|
||||
)
|
||||
|
||||
def handshake(token, side=None):
|
||||
hs = b"please relay " + hexlify(token)
|
||||
|
@ -21,22 +24,21 @@ class _Transit:
|
|||
])
|
||||
|
||||
def test_blur_size(self):
|
||||
blur = transit_server.blur_size
|
||||
self.failUnlessEqual(blur(0), 0)
|
||||
self.failUnlessEqual(blur(1), 10e3)
|
||||
self.failUnlessEqual(blur(10e3), 10e3)
|
||||
self.failUnlessEqual(blur(10e3+1), 20e3)
|
||||
self.failUnlessEqual(blur(15e3), 20e3)
|
||||
self.failUnlessEqual(blur(20e3), 20e3)
|
||||
self.failUnlessEqual(blur(1e6), 1e6)
|
||||
self.failUnlessEqual(blur(1e6+1), 2e6)
|
||||
self.failUnlessEqual(blur(1.5e6), 2e6)
|
||||
self.failUnlessEqual(blur(2e6), 2e6)
|
||||
self.failUnlessEqual(blur(900e6), 900e6)
|
||||
self.failUnlessEqual(blur(1000e6), 1000e6)
|
||||
self.failUnlessEqual(blur(1050e6), 1100e6)
|
||||
self.failUnlessEqual(blur(1100e6), 1100e6)
|
||||
self.failUnlessEqual(blur(1150e6), 1200e6)
|
||||
self.failUnlessEqual(blur_size(0), 0)
|
||||
self.failUnlessEqual(blur_size(1), 10e3)
|
||||
self.failUnlessEqual(blur_size(10e3), 10e3)
|
||||
self.failUnlessEqual(blur_size(10e3+1), 20e3)
|
||||
self.failUnlessEqual(blur_size(15e3), 20e3)
|
||||
self.failUnlessEqual(blur_size(20e3), 20e3)
|
||||
self.failUnlessEqual(blur_size(1e6), 1e6)
|
||||
self.failUnlessEqual(blur_size(1e6+1), 2e6)
|
||||
self.failUnlessEqual(blur_size(1.5e6), 2e6)
|
||||
self.failUnlessEqual(blur_size(2e6), 2e6)
|
||||
self.failUnlessEqual(blur_size(900e6), 900e6)
|
||||
self.failUnlessEqual(blur_size(1000e6), 1000e6)
|
||||
self.failUnlessEqual(blur_size(1050e6), 1100e6)
|
||||
self.failUnlessEqual(blur_size(1100e6), 1100e6)
|
||||
self.failUnlessEqual(blur_size(1150e6), 1200e6)
|
||||
|
||||
def test_register(self):
|
||||
p1 = self.new_protocol()
|
||||
|
@ -331,12 +333,15 @@ class _Transit:
|
|||
# hang up before sending anything
|
||||
p1.disconnect()
|
||||
|
||||
|
||||
class TransitWithLogs(_Transit, ServerBase, unittest.TestCase):
|
||||
log_requests = True
|
||||
|
||||
|
||||
class TransitWithoutLogs(_Transit, ServerBase, unittest.TestCase):
|
||||
log_requests = False
|
||||
|
||||
|
||||
class Usage(ServerBase, unittest.TestCase):
|
||||
log_requests = True
|
||||
|
||||
|
@ -344,6 +349,7 @@ class Usage(ServerBase, unittest.TestCase):
|
|||
super(Usage, self).setUp()
|
||||
self._usage = MemoryUsageRecorder()
|
||||
self._transit_server.usage.add_backend(self._usage)
|
||||
## self._transit_server.usage._blur_usage = None
|
||||
|
||||
def test_empty(self):
|
||||
p1 = self.new_protocol()
|
||||
|
|
|
@ -12,24 +12,14 @@ HOUR = 60*MINUTE
|
|||
DAY = 24*HOUR
|
||||
MB = 1000*1000
|
||||
|
||||
def round_to(size, coarseness):
|
||||
return int(coarseness*(1+int((size-1)/coarseness)))
|
||||
|
||||
def blur_size(size):
|
||||
if size == 0:
|
||||
return 0
|
||||
if size < 1e6:
|
||||
return round_to(size, 10e3)
|
||||
if size < 1e9:
|
||||
return round_to(size, 1e6)
|
||||
return round_to(size, 100e6)
|
||||
|
||||
|
||||
from wormhole_transit_relay.server_state import (
|
||||
TransitServerState,
|
||||
PendingRequests,
|
||||
ActiveConnections,
|
||||
UsageRecorder,
|
||||
UsageTracker,
|
||||
DatabaseUsageRecorder,
|
||||
LogFileUsageRecorder,
|
||||
ITransitClient,
|
||||
)
|
||||
from zope.interface import implementer
|
||||
|
@ -255,7 +245,7 @@ class Transit(protocol.ServerFactory):
|
|||
def __init__(self, blur_usage, log_file, usage_db):
|
||||
self.active_connections = ActiveConnections()
|
||||
self.pending_requests = PendingRequests(self.active_connections)
|
||||
self.usage = UsageRecorder()
|
||||
self.usage = UsageTracker(blur_usage)
|
||||
self._blur_usage = blur_usage
|
||||
self._log_requests = blur_usage is None
|
||||
if self._blur_usage:
|
||||
|
@ -264,10 +254,13 @@ class Transit(protocol.ServerFactory):
|
|||
else:
|
||||
log.msg("not blurring access times")
|
||||
self._debug_log = False
|
||||
self._log_file = log_file
|
||||
## self._log_file = log_file
|
||||
self._db = None
|
||||
if usage_db:
|
||||
self._db = get_db(usage_db)
|
||||
self.usage.add_backend(DatabaseUsageRecorder(self._db))
|
||||
if log_file:
|
||||
self.usage.add_backend(LogFileUsageRecorder(log_file))
|
||||
self._rebooted = time.time()
|
||||
# we don't track TransitConnections until they submit a token
|
||||
## self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection))
|
||||
|
@ -317,6 +310,9 @@ class Transit(protocol.ServerFactory):
|
|||
" VALUES (?,?,?, ?,?)",
|
||||
(started, total_time, waiting_time,
|
||||
total_bytes, result))
|
||||
# XXXX aaaaaAA! okay, so just this one type of usage also
|
||||
# does some other random stats-stuff; need more
|
||||
# refactorizing
|
||||
self._update_stats()
|
||||
self._db.commit()
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user