fix global stats-gathering / recording

This commit is contained in:
meejah 2021-02-12 18:16:30 -07:00
parent 60e70bac3c
commit ca55509763
5 changed files with 81 additions and 39 deletions

View File

@ -118,8 +118,10 @@ class DatabaseUsageRecorder:
" VALUES (?,?,?,?,?)", " VALUES (?,?,?,?,?)",
(started, total_time, waiting_time, total_bytes, mood) (started, total_time, waiting_time, total_bytes, mood)
) )
# XXX FIXME see comment in transit_server # original code did "self._update_stats()" here, thus causing
#self._update_stats() # "global" stats update on every connection update .. should
# we repeat this behavior, or really only record every
# 60-seconds with the timer?
self._db.commit() self._db.commit()
@ -227,6 +229,26 @@ class UsageTracker(object):
"mood": result, "mood": result,
}) })
def update_stats(self, rebooted, updated, connected, waiting,
incomplete_bytes):
"""
Update general statistics.
"""
# in original code, this is only recorded in the database
# .. perhaps a better way to do this, but ..
for backend in self._backends:
if isinstance(backend, DatabaseUsageRecorder):
backend._db.execute("DELETE FROM `current`")
backend._db.execute(
"INSERT INTO `current`"
" (`rebooted`, `updated`, `connected`, `waiting`,"
" `incomplete_bytes`)"
" VALUES (?, ?, ?, ?, ?)",
(int(rebooted), int(updated), connected, waiting,
incomplete_bytes)
)
def _notify_backends(self, data): def _notify_backends(self, data):
""" """
Internal helper. Tell every backend we have about a new usage record. Internal helper. Tell every backend we have about a new usage record.

View File

@ -44,8 +44,8 @@ def makeService(config, reactor=reactor):
log_file=log_file, log_file=log_file,
usage_db=db, usage_db=db,
) )
factory = transit_server.Transit(usage) factory = transit_server.Transit(usage, reactor.seconds)
parent = MultiService() parent = MultiService()
StreamServerEndpointService(ep, factory).setServiceParent(parent) StreamServerEndpointService(ep, factory).setServiceParent(parent)
### FIXME TODO TimerService(5*60.0, factory.timerUpdateStats).setServiceParent(parent) TimerService(5*60.0, factory.update_stats).setServiceParent(parent)
return parent return parent

View File

@ -70,7 +70,7 @@ class ServerBase:
log_file=log_file, log_file=log_file,
usage_db=usage_db, usage_db=usage_db,
) )
self._transit_server = Transit(usage) self._transit_server = Transit(usage, lambda: 123456789.0)
self._transit_server._debug_log = self.log_requests self._transit_server._debug_log = self.log_requests
def new_protocol(self): def new_protocol(self):

View File

@ -12,19 +12,31 @@ from .. import database
class DB(unittest.TestCase): class DB(unittest.TestCase):
def test_db(self): def test_db(self):
T = 1519075308.0 T = 1519075308.0
class Timer:
t = T
def __call__(self):
return self.t
get_time = Timer()
d = self.mktemp() d = self.mktemp()
os.mkdir(d) os.mkdir(d)
usage_db = os.path.join(d, "usage.sqlite") usage_db = os.path.join(d, "usage.sqlite")
db = database.get_db(usage_db) db = database.get_db(usage_db)
with mock.patch("time.time", return_value=T+0): t = Transit(
t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=db)) create_usage_tracker(blur_usage=None, log_file=None, usage_db=db),
get_time,
)
self.assertEqual(len(t.usage._backends), 1) self.assertEqual(len(t.usage._backends), 1)
usage = list(t.usage._backends)[0] usage = list(t.usage._backends)[0]
with mock.patch("time.time", return_value=T+1): get_time.t = T + 1
usage.record_usage(started=123, mood="happy", total_bytes=100, usage.record_usage(started=123, mood="happy", total_bytes=100,
total_time=10, waiting_time=2) total_time=10, waiting_time=2)
t.update_stats()
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
[dict(result="happy", started=123, [dict(result="happy", started=123,
total_bytes=100, total_time=10, waiting_time=2), total_bytes=100, total_time=10, waiting_time=2),
@ -34,9 +46,10 @@ class DB(unittest.TestCase):
incomplete_bytes=0, incomplete_bytes=0,
waiting=0, connected=0)) waiting=0, connected=0))
with mock.patch("time.time", return_value=T+2): get_time.t = T + 2
usage.record_usage(started=150, mood="errory", total_bytes=200, usage.record_usage(started=150, mood="errory", total_bytes=200,
total_time=11, waiting_time=3) total_time=11, waiting_time=3)
t.update_stats()
self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(),
[dict(result="happy", started=123, [dict(result="happy", started=123,
total_bytes=100, total_time=10, waiting_time=2), total_bytes=100, total_time=10, waiting_time=2),
@ -48,15 +61,18 @@ class DB(unittest.TestCase):
incomplete_bytes=0, incomplete_bytes=0,
waiting=0, connected=0)) waiting=0, connected=0))
with mock.patch("time.time", return_value=T+3): get_time.t = T + 3
t.timerUpdateStats() t.update_stats()
self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(), self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(),
dict(rebooted=T+0, updated=T+3, dict(rebooted=T+0, updated=T+3,
incomplete_bytes=0, incomplete_bytes=0,
waiting=0, connected=0)) waiting=0, connected=0))
def test_no_db(self): def test_no_db(self):
t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=None)) t = Transit(
create_usage_tracker(blur_usage=None, log_file=None, usage_db=None),
lambda: 0,
)
self.assertEqual(0, len(t.usage._backends)) self.assertEqual(0, len(t.usage._backends))
@ -64,7 +80,10 @@ class LogToStdout(unittest.TestCase):
def test_log(self): def test_log(self):
# emit lines of JSON to log_file, if set # emit lines of JSON to log_file, if set
log_file = io.StringIO() log_file = io.StringIO()
t = Transit(create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None)) t = Transit(
create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None),
lambda: 0,
)
with mock.patch("time.time", return_value=133): with mock.patch("time.time", return_value=133):
t.usage.record( t.usage.record(
started=123, started=123,
@ -82,7 +101,10 @@ class LogToStdout(unittest.TestCase):
# if blurring is enabled, timestamps should be rounded to the # if blurring is enabled, timestamps should be rounded to the
# requested amount, and sizes should be rounded up too # requested amount, and sizes should be rounded up too
log_file = io.StringIO() log_file = io.StringIO()
t = Transit(create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None)) t = Transit(
create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None),
lambda: 0,
)
with mock.patch("time.time", return_value=123 + 10): with mock.patch("time.time", return_value=123 + 10):
t.usage.record( t.usage.record(
@ -99,7 +121,10 @@ class LogToStdout(unittest.TestCase):
"mood": "happy"}) "mood": "happy"})
def test_do_not_log(self): def test_do_not_log(self):
t = Transit(create_usage_tracker(blur_usage=60, log_file=None, usage_db=None)) t = Transit(
create_usage_tracker(blur_usage=60, log_file=None, usage_db=None),
lambda: 0,
)
t.usage.record( t.usage.record(
started=123, started=123,
buddy_started=124, buddy_started=124,

View File

@ -193,33 +193,28 @@ class Transit(protocol.ServerFactory):
MAXTIME = 60*SECONDS MAXTIME = 60*SECONDS
protocol = TransitConnection protocol = TransitConnection
def __init__(self, usage): def __init__(self, usage, get_timestamp):
self.active_connections = ActiveConnections() self.active_connections = ActiveConnections()
self.pending_requests = PendingRequests(self.active_connections) self.pending_requests = PendingRequests(self.active_connections)
self.usage = usage self.usage = usage
self._debug_log = False self._debug_log = False
self._timestamp = get_timestamp
self._rebooted = self._timestamp()
self._rebooted = time.time() def update_stats(self):
# XXX TODO self._rebooted and the below could be in a separate
# object? or in the DatabaseUsageRecorder .. but not here
def _update_stats(self):
# current status: should be zero when idle
rebooted = self._rebooted
updated = time.time()
connected = len(self._active_connections) / 2
# TODO: when a connection is half-closed, len(active) will be odd. a # TODO: when a connection is half-closed, len(active) will be odd. a
# moment later (hopefully) the other side will disconnect, but # moment later (hopefully) the other side will disconnect, but
# _update_stats isn't updated until later. # _update_stats isn't updated until later.
waiting = len(self._pending_requests)
# "waiting" doesn't count multiple parallel connections from the same # "waiting" doesn't count multiple parallel connections from the same
# side # side
incomplete_bytes = sum(tc._total_sent self.usage.update_stats(
for tc in self._active_connections) rebooted=self._rebooted,
self._db.execute("DELETE FROM `current`") updated=self._timestamp(),
self._db.execute("INSERT INTO `current`" connected=len(self.active_connections._connections),
" (`rebooted`, `updated`, `connected`, `waiting`," waiting=len(self.pending_requests._requests),
" `incomplete_bytes`)" incomplete_bytes=sum(
" VALUES (?, ?, ?, ?, ?)", tc._total_sent
(rebooted, updated, connected, waiting, for tc in self.active_connections._connections
incomplete_bytes)) ),
)