Merge branch 'transit-usage'
This commit is contained in:
@ -21,14 +21,21 @@ CREATE INDEX `messages_idx` ON `messages` (`appid`, `channelid`);
`type` VARCHAR, -- "rendezvous" or "transit"
`started` INTEGER, -- seconds since epoch, rounded to one day
`result` VARCHAR, -- happy, scary, lonely, errory, pruney
-- "happy": both sides close with mood=happy
-- "scary": any side closes with mood=scary (bad MAC, probably wrong pw)
-- "lonely": any side closes with mood=lonely (no response from 2nd side)
-- "errory": any side closes with mood=errory (other errors)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
-- rendezvous moods:
-- "happy": both sides close with mood=happy
-- "scary": any side closes with mood=scary (bad MAC, probably wrong pw)
-- "lonely": any side closes with mood=lonely (no response from 2nd side)
-- "errory": any side closes with mood=errory (other errors)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
-- transit moods:
-- "errory": this side have the wrong handshake
-- "lonely": good handshake, but the other side never showed up
-- "happy": both sides gave correct handshake
`total_bytes` INTEGER, -- for transit, total bytes relayed (both directions)
`total_time` INTEGER, -- seconds from start to closed, or None
`waiting_time` INTEGER -- seconds from start to 2nd side appearing, or None
@ -13,16 +13,44 @@ def abbrev(t):
return "%.1fms" % (t*1e3)
return "%.1fus" % (t*1e6)
def abbreviate_space(s, SI=True):
if s is None:
return "-"
if SI:
U = 1000.0
isuffix = "B"
U = 1024.0
isuffix = "iB"
def r(count, suffix):
return "%.2f %s%s" % (count, suffix, isuffix)
if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode
return "%d B" % s
if s < U*U:
return r(s/U, "k")
if s < U*U*U:
return r(s/(U*U), "M")
if s < U*U*U*U:
return r(s/(U*U*U), "G")
if s < U*U*U*U*U:
return r(s/(U*U*U*U), "T")
if s < U*U*U*U*U*U:
return r(s/(U*U*U*U*U), "P")
return r(s/(U*U*U*U*U*U), "E")
def print_event(event):
started, result, waiting_time, total_time = event
event_type, started, result, total_bytes, waiting_time, total_time = event
followthrough = None
if waiting_time and total_time:
followthrough = total_time - waiting_time
print("%s: %-6s total=%7s wait=%7s ft=%7s" %
(time.ctime(started), result,
print(" %16s: total=%7s wait=%7s ft=%7s size=%s (%s)" %
("%s-%s" % (event_type, result),
def show_usage(args):
@ -30,30 +58,58 @@ def show_usage(args):
raise UsageError("cannot find relay.sqlite, please run from the server directory")
oldest = None
newest = None
counters = defaultdict(int)
rendezvous_counters = defaultdict(int)
transit_counters = defaultdict(int)
total_transit_bytes = 0
db = get_db("relay.sqlite")
c = db.execute("SELECT * FROM `usage` ORDER BY `started` ASC LIMIT ?",
c = db.execute("SELECT * FROM `usage`"
" ORDER BY `started` ASC LIMIT ?",
for row in c.fetchall():
if row["type"] == u"rendezvous":
counters = rendezvous_counters
elif row["type"] == u"transit":
counters = transit_counters
total_transit_bytes += row["total_bytes"]
counters["total"] += 1
counters[row["result"]] += 1
if oldest is None or row["started"] < oldest:
oldest = row["started"]
if newest is None or row["started"] > newest:
newest = row["started"]
event = (row["started"], row["result"],
row["waiting_time"], row["total_time"])
event = (row["type"], row["started"], row["result"],
row["total_bytes"], row["waiting_time"], row["total_time"])
total = counters["total"]
if total:
if rendezvous_counters["total"] or transit_counters["total"]:
print("(most recent started %s ago)" % abbrev(time.time() - newest))
if rendezvous_counters["total"]:
print("rendezvous events:")
counters = rendezvous_counters
elapsed = time.time() - oldest
print("%d events in %s (%.2f per hour)" % (total, abbrev(elapsed),
(3600 * total / elapsed)))
print(", ".join(["%s=%d (%d%%)" %
(k, counters[k], (100.0 * counters[k] / total))
for k in sorted(counters)
if k != "total"]))
total = counters["total"]
print(" %d events in %s (%.2f per hour)" % (total, abbrev(elapsed),
(3600 * total / elapsed)))
print("", ", ".join(["%s=%d (%d%%)" %
(k, counters[k], (100.0 * counters[k] / total))
for k in sorted(counters)
if k != "total"]))
if transit_counters["total"]:
print("transit events:")
counters = transit_counters
elapsed = time.time() - oldest
total = counters["total"]
print(" %d events in %s (%.2f per hour)" % (total, abbrev(elapsed),
(3600 * total / elapsed)))
rate = total_transit_bytes / elapsed
print(" %s total bytes, %sps" % (abbreviate_space(total_transit_bytes),
print("", ", ".join(["%s=%d (%d%%)" %
(k, counters[k], (100.0 * counters[k] / total))
for k in sorted(counters)
if k != "total"]))
return 0
def tail_usage(args):
@ -63,15 +119,20 @@ def tail_usage(args):
# we don't seem to have unique row IDs, so this is an inaccurate and
# inefficient hack
seen = set()
while True:
old = time.time() - 2*60*60
c = db.execute("SELECT * FROM `usage` WHERE `started` > ?"
" ORDER BY `started` ASC", (old,))
for row in c.fetchall():
event = (row["started"], row["result"],
row["waiting_time"], row["total_time"])
if event not in seen:
while True:
old = time.time() - 2*60*60
c = db.execute("SELECT * FROM `usage`"
" WHERE `started` > ?"
" ORDER BY `started` ASC", (old,))
for row in c.fetchall():
event = (row["type"], row["started"], row["result"],
row["total_bytes"], row["waiting_time"],
if event not in seen:
except KeyboardInterrupt:
return 0
return 0
@ -298,9 +298,11 @@ class Channel:
def _store_summary(self, summary):
(started, result, total_time, waiting_time) = summary
self._db.execute("INSERT INTO `usage`"
" (`started`, `result`, `total_time`, `waiting_time`)"
" VALUES (?,?,?,?)",
(started, result, total_time, waiting_time))
" (`type`, `started`, `result`,"
" `total_time`, `waiting_time`)"
" VALUES (?,?,?, ?,?)",
(u"rendezvous", started, result,
total_time, waiting_time))
def _summarize(self, messages, delete_time):
@ -38,7 +38,7 @@ class RelayServer(service.MultiService):
self.relay = Relay(self.db, welcome) # accessible from tests
self.root.putChild(b"wormhole-relay", self.relay)
if transitport:
self.transit = Transit()
self.transit = Transit(self.db)
self.transit.setServiceParent(self) # for the timer
t = endpoints.serverFromString(reactor, transitport)
self.transport_service = ServerEndpointService(t, self.transit)
@ -1,5 +1,5 @@
from __future__ import print_function
import re
import re, time
from twisted.python import log
from twisted.internet import protocol
from twisted.application import service
@ -16,8 +16,12 @@ class TransitConnection(protocol.Protocol):
self._token_buffer = b""
self._sent_ok = False
self._buddy = None
self._had_buddy = False
self._total_sent = 0
def connectionMade(self):
self._started = time.time()
def dataReceived(self, data):
if self._sent_ok:
# We are an IPushProducer to our buddy's IConsumer, so they'll
@ -29,10 +33,12 @@ class TransitConnection(protocol.Protocol):
self._total_sent += len(data)
if self._got_token: # but not yet sent_ok
log.msg("transit impatience failure")
return self.disconnect() # impatience yields failure
# else this should be (part of) the token
self._token_buffer += data
buf = self._token_buffer
@ -59,6 +65,7 @@ class TransitConnection(protocol.Protocol):
def buddy_connected(self, them):
self._buddy = them
self._had_buddy = True
self._sent_ok = True
# Connect the two as a producer/consumer pair. We use streaming=True,
@ -77,11 +84,37 @@ class TransitConnection(protocol.Protocol):
log.msg("connectionLost %r %s" % (self, reason))
if self._buddy:
self.factory.transitFinished(self, self._total_sent)
# Record usage. There are four cases:
# * 1: we connected, never had a buddy
# * 2: we connected first, we disconnect before the buddy
# * 3: we connected first, buddy disconnects first
# * 4: buddy connected first, we disconnect before buddy
# * 5: buddy connected first, buddy disconnects first
# whoever disconnects first gets to write the usage record (1,2,4)
finished = time.time()
if not self._had_buddy: # 1
total_time = finished - self._started
self.factory.recordUsage(self._started, u"lonely", 0,
total_time, None)
if self._had_buddy and self._buddy: # 2,4
total_bytes = self._total_sent + self._buddy._total_sent
starts = [self._started, self._buddy._started]
total_time = finished - min(starts)
waiting_time = max(starts) - min(starts)
self.factory.recordUsage(self._started, u"happy", total_bytes,
total_time, waiting_time)
def disconnect(self):
finished = time.time()
total_time = finished - self._started
self.factory.recordUsage(self._started, u"errory", 0,
total_time, None)
class Transit(protocol.ServerFactory, service.MultiService):
# I manage pairs of simultaneous connections to a secondary TCP port,
@ -110,8 +143,9 @@ class Transit(protocol.ServerFactory, service.MultiService):
protocol = TransitConnection
def __init__(self):
def __init__(self, db):
self._db = db
self._pending_requests = {} # token -> TransitConnection
self._active_connections = set() # TransitConnection
@ -127,8 +161,20 @@ class Transit(protocol.ServerFactory, service.MultiService):
self._pending_requests[token] = p
log.msg("transit relay 1: %r" % token)
# TODO: timer
def transitFinished(self, p, total_sent):
log.msg("transitFinished (%dB) %r" % (total_sent, p))
def recordUsage(self, started, result, total_bytes,
total_time, waiting_time):
log.msg("Transit.recordUsage (%dB)" % total_bytes)
self._db.execute("INSERT INTO `usage`"
" (`type`, `started`, `result`, `total_bytes`,"
" `total_time`, `waiting_time`)"
" VALUES (?,?,?,?, ?,?)",
(u"transit", started, result, total_bytes,
total_time, waiting_time))
def transitFinished(self, p):
log.msg("transitFinished %r" % (p,))
for token,tc in self._pending_requests.items():
if tc is p:
del self._pending_requests[token]
@ -15,6 +15,7 @@ class ServerBase:
self._relay_server = s.relay
self._transit_server = s.transit
self.relayurl = u"" % relayport
self.transit = u"tcp:" % transitport
@ -6,7 +6,9 @@ from twisted.internet.threads import deferToThread
from ..blocking.transcribe import (Wormhole, UsageError, ChannelManager,
from ..blocking.eventsource import EventSourceFollower
from ..blocking.transit import TransitSender, TransitReceiver
from ..blocking.transit import (TransitSender, TransitReceiver,
from .common import ServerBase
APPID = u"appid"
@ -461,6 +463,8 @@ class Transit(_DoBothMixin, ServerBase, unittest.TestCase):
# it'd be nice to factor this chunk out with 'yield from', but that
# didn't appear until python-3.3, and isn't in py2 at all.
(sp, rp) = yield self.doBoth([s.connect], [r.connect])
yield deferToThread(sp.send_record, b"01234")
rec = yield deferToThread(rp.receive_record)
@ -508,3 +512,28 @@ class Transit(_DoBothMixin, ServerBase, unittest.TestCase):
self.assertEqual(rec, b"01234")
yield deferToThread(sp.close)
yield deferToThread(rp.close)
# TODO: this may be racy if we don't poll the server to make sure
# it's witnessed the first connection closing before querying the DB
#import time
#yield deferToThread(time.sleep, 1)
# check the transit relay's DB, make sure it counted the bytes
db = self._transit_server._db
c = db.execute("SELECT * FROM `usage` WHERE `type`=?", (u"transit",))
rows = c.fetchall()
self.assertEqual(len(rows), 1)
row = rows[0]
self.assertEqual(row["result"], u"happy")
# Sender first writes relay_handshake and waits for OK, but that's
# not counted by the transit server. Then sender writes
# sender_handshake and waits for receiver_handshake. Then sender
# writes GO and the body. Body is length-prefixed SecretBox, so
# includes 4-byte length, 24-byte nonce, and 16-byte MAC.
sender_count = (len(build_sender_handshake(b""))+
# Receiver first writes relay_handshake and waits for OK, but that's
# not counted. Then receiver writes receiver_handshake and waits for
# sender_handshake+GO.
receiver_count = len(build_receiver_handshake(b""))
self.assertEqual(row["total_bytes"], sender_count+receiver_count)
Reference in New Issue
Block a user