dump-timing: store server-sent time too

Adjust dump-timing to ignore the extra data, for now. Also do some
general instrumentation cleanup.
This commit is contained in:
Brian Warner 2016-03-03 18:03:27 -08:00
parent aaf4e70a33
commit c5415495c0
4 changed files with 73 additions and 60 deletions

View File

@ -14,8 +14,9 @@ timeline = []
groups_out = []
for which,fn in enumerate(streams):
with open(fn, "rb") as f:
for (start, finish, what, start_d, finish_d) in json.load(f):
timeline.append( (start, finish, which, what, start_d, finish_d) )
for (start, sent, finish, what, start_d, finish_d) in json.load(f):
timeline.append( (start, sent, finish, which, what,
start_d, finish_d) )
print("%s is %s" % (labels[which], fn))
groups_out.append({"id": which, "content": fn,
"className": "group-%d" % which})
@ -25,7 +26,8 @@ timeline.sort(key=lambda row: row[0])
first = timeline[0][0]
print("started at %s" % time.ctime(start))
viz_out = []
for num, (start, finish, which, what, start_d, finish_d) in enumerate(timeline):
for num, (start, sent, finish, which, what,
start_d, finish_d) in enumerate(timeline):
delta = start - first
delta_s = "%.6f" % delta
start_d_str = ", ".join(["%s=%s" % (name, start_d[name])

View File

@ -27,7 +27,7 @@ def to_bytes(u):
class Channel:
def __init__(self, relay_url, appid, channelid, side, handle_welcome,
wait, timeout):
wait, timeout, timing):
self._relay_url = relay_url
self._appid = appid
self._channelid = channelid
@ -38,6 +38,7 @@ class Channel:
self._started = time.time()
self._wait = wait
self._timeout = timeout
self._timing = timing
def _add_inbound_messages(self, messages):
for msg in messages:
@ -65,10 +66,12 @@ class Channel:
"phase": phase,
"body": hexlify(msg).decode("ascii")}
data = json.dumps(payload).encode("utf-8")
_sent = self._timing.add_event("send %s" % phase)
r = requests.post(self._relay_url+"add", data=data,
timeout=self._timeout)
r.raise_for_status()
resp = r.json()
self._timing.finish_event(_sent, resp.get("sent"))
if "welcome" in resp:
self._handle_welcome(resp["welcome"])
self._add_inbound_messages(resp["messages"])
@ -86,6 +89,9 @@ class Channel:
# wasn't one of our own messages. It will either come from
# previously-received messages, or from an EventSource that we attach
# to the corresponding URL
_sent = self._timing.add_event("get %s" % "/".join(sorted(phases)))
_server_sent = None
phase_and_body = self._find_inbound_message(phases)
while phase_and_body is None:
remaining = self._started + self._timeout - time.time()
@ -97,17 +103,20 @@ class Channel:
remaining)
# we loop here until the connection is lost, or we see the
# message we want
for (eventtype, data) in f.iter_events():
for (eventtype, line) in f.iter_events():
if eventtype == "welcome":
self._handle_welcome(json.loads(data))
self._handle_welcome(json.loads(line))
if eventtype == "message":
self._add_inbound_messages([json.loads(data)])
data = json.loads(line)
self._add_inbound_messages([data])
phase_and_body = self._find_inbound_message(phases)
if phase_and_body:
f.close()
_server_sent = data.get("sent")
break
if not phase_and_body:
time.sleep(self._wait)
self._timing.finish_event(_sent, _server_sent)
return phase_and_body
def get(self, phase):
@ -124,47 +133,56 @@ class Channel:
try:
# ignore POST failure, don't call r.raise_for_status(), set a
# short timeout and ignore failures
requests.post(self._relay_url+"deallocate", data=data,
timeout=5)
_sent = self._timing.add_event("close")
r = requests.post(self._relay_url+"deallocate", data=data,
timeout=5)
resp = r.json()
self._timing.finish_event(_sent, resp.get("sent"))
except requests.exceptions.RequestException:
pass
class ChannelManager:
def __init__(self, relay_url, appid, side, handle_welcome,
def __init__(self, relay_url, appid, side, handle_welcome, timing=None,
wait=0.5*SECOND, timeout=3*MINUTE):
self._relay_url = relay_url
self._appid = appid
self._side = side
self._handle_welcome = handle_welcome
self._timing = timing or DebugTiming()
self._wait = wait
self._timeout = timeout
def list_channels(self):
queryargs = urlencode([("appid", self._appid)])
_sent = self._timing.add_event("list")
r = requests.get(self._relay_url+"list?%s" % queryargs,
timeout=self._timeout)
r.raise_for_status()
data = r.json()
if "welcome" in data:
self._handle_welcome(data["welcome"])
self._timing.finish_event(_sent, data.get("sent"))
channelids = data["channelids"]
return channelids
def allocate(self):
data = json.dumps({"appid": self._appid,
"side": self._side}).encode("utf-8")
_sent = self._timing.add_event("allocate")
r = requests.post(self._relay_url+"allocate", data=data,
timeout=self._timeout)
r.raise_for_status()
data = r.json()
if "welcome" in data:
self._handle_welcome(data["welcome"])
self._timing.finish_event(_sent, data.get("sent"))
channelid = data["channelid"]
return channelid
def connect(self, channelid):
return Channel(self._relay_url, self._appid, channelid, self._side,
self._handle_welcome, self._wait, self._timeout)
self._handle_welcome, self._wait, self._timeout,
self._timing)
def close_on_error(f): # method decorator
# Clients report certain errors as "moods", so the server can make a
@ -208,6 +226,7 @@ class Wormhole:
side = hexlify(os.urandom(5)).decode("ascii")
self._channel_manager = ChannelManager(relay_url, appid, side,
self.handle_welcome,
self._timing,
self._wait, self._timeout)
self._channel = None
self.code = None
@ -249,9 +268,7 @@ class Wormhole:
def get_code(self, code_length=2):
if self.code is not None: raise UsageError
_start = self._timing.add_event("alloc channel")
channelid = self._channel_manager.allocate()
self._timing.finish_event(_start)
code = codes.make_code(channelid, code_length)
assert isinstance(code, type(u"")), type(code)
self._set_code_and_channelid(code)
@ -317,13 +334,8 @@ class Wormhole:
def _get_key(self):
if not self.key:
_sent = self._timing.add_event("send pake")
self._channel.send(u"pake", self.msg1)
self._timing.finish_event(_sent)
_sent = self._timing.add_event("get pake")
pake_msg = self._channel.get(u"pake")
self._timing.finish_event(_sent)
self.key = self.sp.finish(pake_msg)
self.verifier = self.derive_key(u"wormhole:verifier")
@ -331,12 +343,10 @@ class Wormhole:
if not self._send_confirm:
return
_sent = self._timing.add_event("send confirmation")
confkey = self.derive_key(u"wormhole:confirmation")
nonce = os.urandom(CONFMSG_NONCE_LENGTH)
confmsg = make_confmsg(confkey, nonce)
self._channel.send(u"_confirm", confmsg)
self._timing.finish_event(_sent)
@close_on_error
def get_verifier(self):
@ -365,9 +375,7 @@ class Wormhole:
self._get_key()
data_key = self.derive_key(u"wormhole:phase:%s" % phase)
outbound_encrypted = self._encrypt_data(data_key, outbound_data)
_sent2 = self._timing.add_event("send")
self._channel.send(phase, outbound_encrypted)
self._timing.finish_event(_sent2)
self._timing.finish_event(_sent)
@close_on_error
@ -385,18 +393,14 @@ class Wormhole:
if not self._got_confirmation:
phases.append(u"_confirm")
phases.append(phase)
_sent2 = self._timing.add_event("get", phases=phases)
(got_phase, body) = self._channel.get_first_of(phases)
self._timing.finish_event(_sent2)
if got_phase == u"_confirm":
confkey = self.derive_key(u"wormhole:confirmation")
nonce = body[:CONFMSG_NONCE_LENGTH]
if body != make_confmsg(confkey, nonce):
raise WrongPasswordError
self._got_confirmation = True
_sent2 = self._timing.add_event("get", phases=[phase])
(got_phase, body) = self._channel.get_first_of([phase])
self._timing.finish_event(_sent2)
assert got_phase == phase
self._timing.finish_event(_sent)
try:
@ -414,6 +418,4 @@ class Wormhole:
self._timing.finish_event(self._timing_started, mood=mood)
c, self._channel = self._channel, None
monitor.close(c)
_sent = self._timing.add_event("close")
c.deallocate(mood)
self._timing.finish_event(_sent)

View File

@ -4,13 +4,18 @@ import json, time
class DebugTiming:
def __init__(self):
self.data = []
def add_event(self, name, **details):
# [ start, [stop], name, start_details{}, stop_details{} ]
self.data.append( [time.time(), None, name, details, {}] )
def add_event(self, name, when=None, **details):
# [ start, [server_sent], [stop], name, start_details{}, stop_details{} ]
if when is None:
when = time.time()
when = float(when)
self.data.append( [when, None, None, name, details, {}] )
return len(self.data)-1
def finish_event(self, index, **details):
self.data[index][1] = time.time()
self.data[index][4] = details
def finish_event(self, index, server_sent=None, **details):
if server_sent is not None:
self.data[index][1] = float(server_sent)
self.data[index][2] = time.time()
self.data[index][5] = details
def write(self, fn, stderr):
with open(fn, "wb") as f:
json.dump(self.data, f)

View File

@ -72,13 +72,14 @@ def get_json(agent, url):
class Channel:
def __init__(self, relay_url, appid, channelid, side, handle_welcome,
agent):
agent, timing):
self._relay_url = relay_url
self._appid = appid
self._channelid = channelid
self._side = side
self._handle_welcome = handle_welcome
self._agent = agent
self._timing = timing
self._messages = set() # (phase,body) , body is bytes
self._sent_messages = set() # (phase,body)
@ -108,8 +109,10 @@ class Channel:
"side": self._side,
"phase": phase,
"body": hexlify(msg).decode("ascii")}
_sent = self._timing.add_event("send %s" % phase)
d = post_json(self._agent, self._relay_url+"add", payload)
def _maybe_handle_welcome(resp):
self._timing.finish_event(_sent, resp.get("sent"))
if "welcome" in resp:
self._handle_welcome(resp["welcome"])
return resp
@ -126,19 +129,24 @@ class Channel:
# wasn't one of our own messages. It will either come from
# previously-received messages, or from an EventSource that we attach
# to the corresponding URL
_sent = self._timing.add_event("get %s" % "/".join(sorted(phases)))
phase_and_body = self._find_inbound_message(phases)
if phase_and_body is not None:
self._timing.finish_event(_sent)
return defer.succeed(phase_and_body)
d = defer.Deferred()
msgs = []
def _handle(name, data):
def _handle(name, line):
if name == "welcome":
self._handle_welcome(json.loads(data))
self._handle_welcome(json.loads(line))
if name == "message":
self._add_inbound_messages([json.loads(data)])
data = json.loads(line)
self._add_inbound_messages([data])
phase_and_body = self._find_inbound_message(phases)
if phase_and_body is not None and not msgs:
msgs.append(phase_and_body)
self._timing.finish_event(_sent, data.get("sent"))
d.callback(None)
queryargs = urlencode([("appid", self._appid),
("channelid", self._channelid)])
@ -160,46 +168,58 @@ class Channel:
def deallocate(self, mood=None):
# only try once, no retries
_sent = self._timing.add_event("close")
d = post_json(self._agent, self._relay_url+"deallocate",
{"appid": self._appid,
"channelid": self._channelid,
"side": self._side,
"mood": mood})
def _done(resp):
self._timing.finish_event(_sent, resp.get("sent"))
d.addCallback(_done)
d.addBoth(lambda _: None) # ignore POST failure
return d
class ChannelManager:
def __init__(self, relay, appid, side, handle_welcome):
def __init__(self, relay, appid, side, handle_welcome, timing=None):
assert isinstance(relay, type(u""))
self._relay = relay
self._appid = appid
self._side = side
self._handle_welcome = handle_welcome
self._timing = timing or DebugTiming()
self._pool = web_client.HTTPConnectionPool(reactor, True) # persistent
self._agent = web_client.Agent(reactor, pool=self._pool)
@inlineCallbacks
def allocate(self):
url = self._relay + "allocate"
_sent = self._timing.add_event("allocate")
data = yield post_json(self._agent, url, {"appid": self._appid,
"side": self._side})
if "welcome" in data:
self._handle_welcome(data["welcome"])
self._timing.finish_event(_sent, data.get("sent"))
returnValue(data["channelid"])
@inlineCallbacks
def list_channels(self):
queryargs = urlencode([("appid", self._appid)])
url = self._relay + u"list?%s" % queryargs
_sent = self._timing.add_event("list")
r = yield get_json(self._agent, url)
self._timing.finish_event(_sent, r.get("sent"))
returnValue(r["channelids"])
def connect(self, channelid):
return Channel(self._relay, self._appid, channelid, self._side,
self._handle_welcome, self._agent)
self._handle_welcome, self._agent, self._timing)
@inlineCallbacks
def shutdown(self):
return self._pool.closeCachedConnections()
_sent = self._timing.add_event("pool shutdown")
yield self._pool.closeCachedConnections()
self._timing.finish_event(_sent)
def close_on_error(meth): # method decorator
# Clients report certain errors as "moods", so the server can make a
@ -251,7 +271,8 @@ class Wormhole:
def _set_side(self, side):
self._side = side
self._channel_manager = ChannelManager(self._relay_url, self._appid,
self._side, self.handle_welcome)
self._side, self.handle_welcome,
self._timing)
self._channel = None
def handle_welcome(self, welcome):
@ -281,9 +302,7 @@ class Wormhole:
if self.code is not None: raise UsageError
if self._started_get_code: raise UsageError
self._started_get_code = True
_start = self._timing.add_event("alloc channel")
channelid = yield self._channel_manager.allocate()
self._timing.finish_event(_start)
code = codes.make_code(channelid, code_length)
assert isinstance(code, type(u"")), type(code)
self._set_code_and_channelid(code)
@ -369,12 +388,9 @@ class Wormhole:
# TODO: prevent multiple invocation
if self.key:
returnValue(self.key)
_sent = self._timing.add_event("send pake")
yield self._channel.send(u"pake", self.msg1)
self._timing.finish_event(_sent)
_sent = self._timing.add_event("get pake")
pake_msg = yield self._channel.get(u"pake")
self._timing.finish_event(_sent)
key = self.sp.finish(pake_msg)
self.key = key
self.verifier = self.derive_key(u"wormhole:verifier")
@ -385,9 +401,7 @@ class Wormhole:
confkey = self.derive_key(u"wormhole:confirmation")
nonce = os.urandom(CONFMSG_NONCE_LENGTH)
confmsg = make_confmsg(confkey, nonce)
_sent = self._timing.add_event("send confirmation")
yield self._channel.send(u"_confirm", confmsg)
self._timing.finish_event(_sent)
returnValue(key)
@close_on_error
@ -418,9 +432,7 @@ class Wormhole:
yield self._get_key()
data_key = self.derive_key(u"wormhole:phase:%s" % phase)
outbound_encrypted = self._encrypt_data(data_key, outbound_data)
_sent2 = self._timing.add_event("send")
yield self._channel.send(phase, outbound_encrypted)
self._timing.finish_event(_sent2)
self._timing.finish_event(_sent)
@close_on_error
@ -439,9 +451,7 @@ class Wormhole:
if not self._got_confirmation:
phases.append(u"_confirm")
phases.append(phase)
_sent2 = self._timing.add_event("get", phases=phases)
phase_and_body = yield self._channel.get_first_of(phases)
self._timing.finish_event(_sent2)
(got_phase, body) = phase_and_body
if got_phase == u"_confirm":
confkey = self.derive_key(u"wormhole:confirmation")
@ -449,9 +459,7 @@ class Wormhole:
if body != make_confmsg(confkey, nonce):
raise WrongPasswordError
self._got_confirmation = True
_sent3 = self._timing.add_event("get", phases=[phase])
phase_and_body = yield self._channel.get_first_of([phase])
self._timing.finish_event(_sent3)
(got_phase, body) = phase_and_body
self._timing.finish_event(_sent)
assert got_phase == phase
@ -472,10 +480,6 @@ class Wormhole:
self._timing.finish_event(self._timing_started, mood=mood)
c, self._channel = self._channel, None
monitor.close(c)
_sent = self._timing.add_event("close")
yield c.deallocate(mood)
self._timing.finish_event(_sent)
_sent = self._timing.add_event("pool shutdown")
yield self._channel_manager.shutdown()
self._timing.finish_event(_sent)