start of refactoring usage-recording: pass one test
This commit is contained in:
parent
0e64707459
commit
b51237d958
|
@ -1,13 +1,18 @@
|
||||||
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
import automat
|
import automat
|
||||||
from zope.interface import (
|
from zope.interface import (
|
||||||
Interface,
|
Interface,
|
||||||
|
Attribute,
|
||||||
implementer,
|
implementer,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class ITransitClient(Interface):
|
class ITransitClient(Interface):
|
||||||
|
|
||||||
|
started_time = Attribute("timestamp when the connection was established")
|
||||||
|
|
||||||
def send(data):
|
def send(data):
|
||||||
"""
|
"""
|
||||||
Send some byets to the client
|
Send some byets to the client
|
||||||
|
@ -34,6 +39,11 @@ class ITransitClient(Interface):
|
||||||
class TestClient(object):
|
class TestClient(object):
|
||||||
_partner = None
|
_partner = None
|
||||||
_data = b""
|
_data = b""
|
||||||
|
_started_time = time.time()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def started_time(self):
|
||||||
|
return _started_time
|
||||||
|
|
||||||
def send_to_partner(self, data):
|
def send_to_partner(self, data):
|
||||||
print("{} GOT:{}".format(id(self), repr(data)))
|
print("{} GOT:{}".format(id(self), repr(data)))
|
||||||
|
@ -57,6 +67,53 @@ class TestClient(object):
|
||||||
print("disconnect_partner: {}".format(id(self._partner)))
|
print("disconnect_partner: {}".format(id(self._partner)))
|
||||||
|
|
||||||
|
|
||||||
|
class UsageRecorder(object):
|
||||||
|
"""
|
||||||
|
Tracks usage statistics of connections
|
||||||
|
"""
|
||||||
|
|
||||||
|
def record(self, started, buddy_started, result, bytes_sent, buddy_bytes):
|
||||||
|
"""
|
||||||
|
:param int started: timestamp when our connection started
|
||||||
|
|
||||||
|
:param int buddy_started: None, or the timestamp when our
|
||||||
|
partner's connection started (will be None if we don't yet
|
||||||
|
have a partner).
|
||||||
|
|
||||||
|
:param str result: a label for the result of the connection
|
||||||
|
(one of the "moods").
|
||||||
|
|
||||||
|
:param int bytes_sent: number of bytes we sent
|
||||||
|
|
||||||
|
:param int buddy_bytes: number of bytes our partner sent
|
||||||
|
"""
|
||||||
|
|
||||||
|
# ideally self._reactor.seconds() or similar, but ..
|
||||||
|
finished = time.time()
|
||||||
|
if buddy_started is not None:
|
||||||
|
starts = [started, buddy_started]
|
||||||
|
total_time = finished - min(starts)
|
||||||
|
waiting_time = max(starts) - min(starts)
|
||||||
|
total_bytes = bytes_sent + buddy_bytes
|
||||||
|
else:
|
||||||
|
total_time = finished - started
|
||||||
|
waiting_time = None
|
||||||
|
total_bytes = bytes_sent
|
||||||
|
# probably want like "backends" here or something? original
|
||||||
|
# code logs some JSON (maybe) and writes to a database (maybe)
|
||||||
|
# and tests record in memory.
|
||||||
|
self.json_record({
|
||||||
|
"started": started,
|
||||||
|
"total_time": total_time,
|
||||||
|
"waiting_time": waiting_time,
|
||||||
|
"total_bytes": total_bytes,
|
||||||
|
"mood": result,
|
||||||
|
})
|
||||||
|
|
||||||
|
def json_record(self, data):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class ActiveConnections(object):
|
class ActiveConnections(object):
|
||||||
"""
|
"""
|
||||||
Tracks active connections. A connection is 'active' when both
|
Tracks active connections. A connection is 'active' when both
|
||||||
|
@ -163,8 +220,9 @@ class TransitServerState(object):
|
||||||
_mood = "empty"
|
_mood = "empty"
|
||||||
_total_sent = 0
|
_total_sent = 0
|
||||||
|
|
||||||
def __init__(self, pending_requests):
|
def __init__(self, pending_requests, usage_recorder):
|
||||||
self._pending_requests = pending_requests
|
self._pending_requests = pending_requests
|
||||||
|
self._usage = usage_recorder
|
||||||
|
|
||||||
def get_token(self):
|
def get_token(self):
|
||||||
"""
|
"""
|
||||||
|
@ -300,6 +358,17 @@ class TransitServerState(object):
|
||||||
def _disconnect_partner(self):
|
def _disconnect_partner(self):
|
||||||
self._client.disconnect_partner()
|
self._client.disconnect_partner()
|
||||||
|
|
||||||
|
# some outputs to record "usage" information ..
|
||||||
|
@_machine.output()
|
||||||
|
def _record_usage(self):
|
||||||
|
self._usage.record(
|
||||||
|
started=self._client.started_time,
|
||||||
|
buddy_started=self._buddy._client.started_time if self._buddy is not None else None,
|
||||||
|
result=self._mood,
|
||||||
|
bytes_sent=self._total_sent,
|
||||||
|
buddy_bytes=self._buddy._total_sent if self._buddy is not None else None
|
||||||
|
)
|
||||||
|
|
||||||
# some outputs to record the "mood" ..
|
# some outputs to record the "mood" ..
|
||||||
@_machine.output()
|
@_machine.output()
|
||||||
def _mood_happy(self):
|
def _mood_happy(self):
|
||||||
|
@ -404,17 +473,17 @@ class TransitServerState(object):
|
||||||
wait_relay.upon(
|
wait_relay.upon(
|
||||||
bad_token,
|
bad_token,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_mood_errory, _send_bad, _disconnect],
|
outputs=[_mood_errory, _send_bad, _disconnect, _record_usage],
|
||||||
)
|
)
|
||||||
wait_relay.upon(
|
wait_relay.upon(
|
||||||
got_bytes,
|
got_bytes,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_count_bytes, _mood_errory, _disconnect],
|
outputs=[_count_bytes, _mood_errory, _disconnect, _record_usage],
|
||||||
)
|
)
|
||||||
wait_relay.upon(
|
wait_relay.upon(
|
||||||
connection_lost,
|
connection_lost,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_disconnect],
|
outputs=[_disconnect, _record_usage],
|
||||||
)
|
)
|
||||||
|
|
||||||
wait_partner.upon(
|
wait_partner.upon(
|
||||||
|
@ -425,12 +494,12 @@ class TransitServerState(object):
|
||||||
wait_partner.upon(
|
wait_partner.upon(
|
||||||
connection_lost,
|
connection_lost,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_mood_lonely, _unregister],
|
outputs=[_mood_lonely, _unregister, _record_usage],
|
||||||
)
|
)
|
||||||
wait_partner.upon(
|
wait_partner.upon(
|
||||||
got_bytes,
|
got_bytes,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister],
|
outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister, _record_usage],
|
||||||
)
|
)
|
||||||
|
|
||||||
relaying.upon(
|
relaying.upon(
|
||||||
|
@ -441,12 +510,12 @@ class TransitServerState(object):
|
||||||
relaying.upon(
|
relaying.upon(
|
||||||
connection_lost,
|
connection_lost,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_mood_happy_if_first, _disconnect_partner, _unregister],
|
outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage],
|
||||||
)
|
)
|
||||||
relaying.upon(
|
relaying.upon(
|
||||||
partner_connection_lost,
|
partner_connection_lost,
|
||||||
enter=done,
|
enter=done,
|
||||||
outputs=[_mood_happy_if_second, _disconnect, _unregister],
|
outputs=[_mood_happy_if_second, _disconnect, _unregister, _record_usage],
|
||||||
)
|
)
|
||||||
|
|
||||||
done.upon(
|
done.upon(
|
||||||
|
|
|
@ -340,10 +340,7 @@ class Usage(ServerBase, unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(Usage, self).setUp()
|
super(Usage, self).setUp()
|
||||||
self._usage = []
|
self._usage = []
|
||||||
def record(started, result, total_bytes, total_time, waiting_time):
|
self._transit_server.usage.json_record = self._usage.append
|
||||||
self._usage.append((started, result, total_bytes,
|
|
||||||
total_time, waiting_time))
|
|
||||||
self._transit_server.recordUsage = record
|
|
||||||
|
|
||||||
def test_empty(self):
|
def test_empty(self):
|
||||||
p1 = self.new_protocol()
|
p1 = self.new_protocol()
|
||||||
|
@ -365,8 +362,7 @@ class Usage(ServerBase, unittest.TestCase):
|
||||||
|
|
||||||
# that will log the "empty" usage event
|
# that will log the "empty" usage event
|
||||||
self.assertEqual(len(self._usage), 1, self._usage)
|
self.assertEqual(len(self._usage), 1, self._usage)
|
||||||
(started, result, total_bytes, total_time, waiting_time) = self._usage[0]
|
self.assertEqual("empty", self._usage[0]["mood"])
|
||||||
self.assertEqual(result, "empty", self._usage)
|
|
||||||
|
|
||||||
def test_errory(self):
|
def test_errory(self):
|
||||||
p1 = self.new_protocol()
|
p1 = self.new_protocol()
|
||||||
|
|
|
@ -29,6 +29,7 @@ from wormhole_transit_relay.server_state import (
|
||||||
TransitServerState,
|
TransitServerState,
|
||||||
PendingRequests,
|
PendingRequests,
|
||||||
ActiveConnections,
|
ActiveConnections,
|
||||||
|
UsageRecorder,
|
||||||
ITransitClient,
|
ITransitClient,
|
||||||
)
|
)
|
||||||
from zope.interface import implementer
|
from zope.interface import implementer
|
||||||
|
@ -41,6 +42,7 @@ class TransitConnection(LineReceiver):
|
||||||
# This must be >= to the longest possible handshake message.
|
# This must be >= to the longest possible handshake message.
|
||||||
|
|
||||||
MAX_LENGTH = 1024
|
MAX_LENGTH = 1024
|
||||||
|
started_time = None
|
||||||
|
|
||||||
def send(self, data):
|
def send(self, data):
|
||||||
"""
|
"""
|
||||||
|
@ -78,9 +80,15 @@ class TransitConnection(LineReceiver):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def connectionMade(self):
|
def connectionMade(self):
|
||||||
self._state = TransitServerState(self.factory.pending_requests)
|
# ideally more like self._reactor.seconds() ... but Twisted
|
||||||
|
# doesn't have a good way to get the reactor for a protocol
|
||||||
|
# (besides "use the global one")
|
||||||
|
self.started_time = time.time()
|
||||||
|
self._state = TransitServerState(
|
||||||
|
self.factory.pending_requests,
|
||||||
|
self.factory.usage,
|
||||||
|
)
|
||||||
self._state.connection_made(self)
|
self._state.connection_made(self)
|
||||||
self._started = time.time()
|
|
||||||
self._log_requests = self.factory._log_requests
|
self._log_requests = self.factory._log_requests
|
||||||
try:
|
try:
|
||||||
self.transport.setTcpKeepAlive(True)
|
self.transport.setTcpKeepAlive(True)
|
||||||
|
@ -154,8 +162,6 @@ class TransitConnection(LineReceiver):
|
||||||
self.transport.loseConnection()
|
self.transport.loseConnection()
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
finished = time.time()
|
|
||||||
total_time = finished - self._started
|
|
||||||
self._state.connection_lost()
|
self._state.connection_lost()
|
||||||
|
|
||||||
# XXX FIXME record usage
|
# XXX FIXME record usage
|
||||||
|
@ -249,6 +255,7 @@ class Transit(protocol.ServerFactory):
|
||||||
def __init__(self, blur_usage, log_file, usage_db):
|
def __init__(self, blur_usage, log_file, usage_db):
|
||||||
self.active_connections = ActiveConnections()
|
self.active_connections = ActiveConnections()
|
||||||
self.pending_requests = PendingRequests(self.active_connections)
|
self.pending_requests = PendingRequests(self.active_connections)
|
||||||
|
self.usage = UsageRecorder()
|
||||||
self._blur_usage = blur_usage
|
self._blur_usage = blur_usage
|
||||||
self._log_requests = blur_usage is None
|
self._log_requests = blur_usage is None
|
||||||
if self._blur_usage:
|
if self._blur_usage:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user