diff --git a/misc/dump-timing.py b/misc/dump-timing.py index 90191a4..6e530e3 100644 --- a/misc/dump-timing.py +++ b/misc/dump-timing.py @@ -14,8 +14,9 @@ timeline = [] groups_out = [] for which,fn in enumerate(streams): with open(fn, "rb") as f: - for (start, finish, what, start_d, finish_d) in json.load(f): - timeline.append( (start, finish, which, what, start_d, finish_d) ) + for (start, sent, finish, what, start_d, finish_d) in json.load(f): + timeline.append( (start, sent, finish, which, what, + start_d, finish_d) ) print("%s is %s" % (labels[which], fn)) groups_out.append({"id": which, "content": fn, "className": "group-%d" % which}) @@ -25,7 +26,8 @@ timeline.sort(key=lambda row: row[0]) first = timeline[0][0] print("started at %s" % time.ctime(start)) viz_out = [] -for num, (start, finish, which, what, start_d, finish_d) in enumerate(timeline): +for num, (start, sent, finish, which, what, + start_d, finish_d) in enumerate(timeline): delta = start - first delta_s = "%.6f" % delta start_d_str = ", ".join(["%s=%s" % (name, start_d[name]) diff --git a/src/wormhole/blocking/transcribe.py b/src/wormhole/blocking/transcribe.py index b786187..0c1bdf8 100644 --- a/src/wormhole/blocking/transcribe.py +++ b/src/wormhole/blocking/transcribe.py @@ -27,7 +27,7 @@ def to_bytes(u): class Channel: def __init__(self, relay_url, appid, channelid, side, handle_welcome, - wait, timeout): + wait, timeout, timing): self._relay_url = relay_url self._appid = appid self._channelid = channelid @@ -38,6 +38,7 @@ class Channel: self._started = time.time() self._wait = wait self._timeout = timeout + self._timing = timing def _add_inbound_messages(self, messages): for msg in messages: @@ -65,10 +66,12 @@ class Channel: "phase": phase, "body": hexlify(msg).decode("ascii")} data = json.dumps(payload).encode("utf-8") + _sent = self._timing.add_event("send %s" % phase) r = requests.post(self._relay_url+"add", data=data, timeout=self._timeout) r.raise_for_status() resp = r.json() + self._timing.finish_event(_sent, resp.get("sent")) if "welcome" in resp: self._handle_welcome(resp["welcome"]) self._add_inbound_messages(resp["messages"]) @@ -86,6 +89,9 @@ class Channel: # wasn't one of our own messages. It will either come from # previously-received messages, or from an EventSource that we attach # to the corresponding URL + _sent = self._timing.add_event("get %s" % "/".join(sorted(phases))) + _server_sent = None + phase_and_body = self._find_inbound_message(phases) while phase_and_body is None: remaining = self._started + self._timeout - time.time() @@ -97,17 +103,20 @@ class Channel: remaining) # we loop here until the connection is lost, or we see the # message we want - for (eventtype, data) in f.iter_events(): + for (eventtype, line) in f.iter_events(): if eventtype == "welcome": - self._handle_welcome(json.loads(data)) + self._handle_welcome(json.loads(line)) if eventtype == "message": - self._add_inbound_messages([json.loads(data)]) + data = json.loads(line) + self._add_inbound_messages([data]) phase_and_body = self._find_inbound_message(phases) if phase_and_body: f.close() + _server_sent = data.get("sent") break if not phase_and_body: time.sleep(self._wait) + self._timing.finish_event(_sent, _server_sent) return phase_and_body def get(self, phase): @@ -124,47 +133,56 @@ class Channel: try: # ignore POST failure, don't call r.raise_for_status(), set a # short timeout and ignore failures - requests.post(self._relay_url+"deallocate", data=data, - timeout=5) + _sent = self._timing.add_event("close") + r = requests.post(self._relay_url+"deallocate", data=data, + timeout=5) + resp = r.json() + self._timing.finish_event(_sent, resp.get("sent")) except requests.exceptions.RequestException: pass class ChannelManager: - def __init__(self, relay_url, appid, side, handle_welcome, + def __init__(self, relay_url, appid, side, handle_welcome, timing=None, wait=0.5*SECOND, timeout=3*MINUTE): self._relay_url = relay_url self._appid = appid self._side = side self._handle_welcome = handle_welcome + self._timing = timing or DebugTiming() self._wait = wait self._timeout = timeout def list_channels(self): queryargs = urlencode([("appid", self._appid)]) + _sent = self._timing.add_event("list") r = requests.get(self._relay_url+"list?%s" % queryargs, timeout=self._timeout) r.raise_for_status() data = r.json() if "welcome" in data: self._handle_welcome(data["welcome"]) + self._timing.finish_event(_sent, data.get("sent")) channelids = data["channelids"] return channelids def allocate(self): data = json.dumps({"appid": self._appid, "side": self._side}).encode("utf-8") + _sent = self._timing.add_event("allocate") r = requests.post(self._relay_url+"allocate", data=data, timeout=self._timeout) r.raise_for_status() data = r.json() if "welcome" in data: self._handle_welcome(data["welcome"]) + self._timing.finish_event(_sent, data.get("sent")) channelid = data["channelid"] return channelid def connect(self, channelid): return Channel(self._relay_url, self._appid, channelid, self._side, - self._handle_welcome, self._wait, self._timeout) + self._handle_welcome, self._wait, self._timeout, + self._timing) def close_on_error(f): # method decorator # Clients report certain errors as "moods", so the server can make a @@ -208,6 +226,7 @@ class Wormhole: side = hexlify(os.urandom(5)).decode("ascii") self._channel_manager = ChannelManager(relay_url, appid, side, self.handle_welcome, + self._timing, self._wait, self._timeout) self._channel = None self.code = None @@ -249,9 +268,7 @@ class Wormhole: def get_code(self, code_length=2): if self.code is not None: raise UsageError - _start = self._timing.add_event("alloc channel") channelid = self._channel_manager.allocate() - self._timing.finish_event(_start) code = codes.make_code(channelid, code_length) assert isinstance(code, type(u"")), type(code) self._set_code_and_channelid(code) @@ -317,13 +334,8 @@ class Wormhole: def _get_key(self): if not self.key: - _sent = self._timing.add_event("send pake") self._channel.send(u"pake", self.msg1) - self._timing.finish_event(_sent) - - _sent = self._timing.add_event("get pake") pake_msg = self._channel.get(u"pake") - self._timing.finish_event(_sent) self.key = self.sp.finish(pake_msg) self.verifier = self.derive_key(u"wormhole:verifier") @@ -331,12 +343,10 @@ class Wormhole: if not self._send_confirm: return - _sent = self._timing.add_event("send confirmation") confkey = self.derive_key(u"wormhole:confirmation") nonce = os.urandom(CONFMSG_NONCE_LENGTH) confmsg = make_confmsg(confkey, nonce) self._channel.send(u"_confirm", confmsg) - self._timing.finish_event(_sent) @close_on_error def get_verifier(self): @@ -365,9 +375,7 @@ class Wormhole: self._get_key() data_key = self.derive_key(u"wormhole:phase:%s" % phase) outbound_encrypted = self._encrypt_data(data_key, outbound_data) - _sent2 = self._timing.add_event("send") self._channel.send(phase, outbound_encrypted) - self._timing.finish_event(_sent2) self._timing.finish_event(_sent) @close_on_error @@ -385,18 +393,14 @@ class Wormhole: if not self._got_confirmation: phases.append(u"_confirm") phases.append(phase) - _sent2 = self._timing.add_event("get", phases=phases) (got_phase, body) = self._channel.get_first_of(phases) - self._timing.finish_event(_sent2) if got_phase == u"_confirm": confkey = self.derive_key(u"wormhole:confirmation") nonce = body[:CONFMSG_NONCE_LENGTH] if body != make_confmsg(confkey, nonce): raise WrongPasswordError self._got_confirmation = True - _sent2 = self._timing.add_event("get", phases=[phase]) (got_phase, body) = self._channel.get_first_of([phase]) - self._timing.finish_event(_sent2) assert got_phase == phase self._timing.finish_event(_sent) try: @@ -414,6 +418,4 @@ class Wormhole: self._timing.finish_event(self._timing_started, mood=mood) c, self._channel = self._channel, None monitor.close(c) - _sent = self._timing.add_event("close") c.deallocate(mood) - self._timing.finish_event(_sent) diff --git a/src/wormhole/timing.py b/src/wormhole/timing.py index 7ea1852..3b67d1b 100644 --- a/src/wormhole/timing.py +++ b/src/wormhole/timing.py @@ -4,13 +4,18 @@ import json, time class DebugTiming: def __init__(self): self.data = [] - def add_event(self, name, **details): - # [ start, [stop], name, start_details{}, stop_details{} ] - self.data.append( [time.time(), None, name, details, {}] ) + def add_event(self, name, when=None, **details): + # [ start, [server_sent], [stop], name, start_details{}, stop_details{} ] + if when is None: + when = time.time() + when = float(when) + self.data.append( [when, None, None, name, details, {}] ) return len(self.data)-1 - def finish_event(self, index, **details): - self.data[index][1] = time.time() - self.data[index][4] = details + def finish_event(self, index, server_sent=None, **details): + if server_sent is not None: + self.data[index][1] = float(server_sent) + self.data[index][2] = time.time() + self.data[index][5] = details def write(self, fn, stderr): with open(fn, "wb") as f: json.dump(self.data, f) diff --git a/src/wormhole/twisted/transcribe.py b/src/wormhole/twisted/transcribe.py index d46d2de..96d7179 100644 --- a/src/wormhole/twisted/transcribe.py +++ b/src/wormhole/twisted/transcribe.py @@ -72,13 +72,14 @@ def get_json(agent, url): class Channel: def __init__(self, relay_url, appid, channelid, side, handle_welcome, - agent): + agent, timing): self._relay_url = relay_url self._appid = appid self._channelid = channelid self._side = side self._handle_welcome = handle_welcome self._agent = agent + self._timing = timing self._messages = set() # (phase,body) , body is bytes self._sent_messages = set() # (phase,body) @@ -108,8 +109,10 @@ class Channel: "side": self._side, "phase": phase, "body": hexlify(msg).decode("ascii")} + _sent = self._timing.add_event("send %s" % phase) d = post_json(self._agent, self._relay_url+"add", payload) def _maybe_handle_welcome(resp): + self._timing.finish_event(_sent, resp.get("sent")) if "welcome" in resp: self._handle_welcome(resp["welcome"]) return resp @@ -126,19 +129,24 @@ class Channel: # wasn't one of our own messages. It will either come from # previously-received messages, or from an EventSource that we attach # to the corresponding URL + _sent = self._timing.add_event("get %s" % "/".join(sorted(phases))) + phase_and_body = self._find_inbound_message(phases) if phase_and_body is not None: + self._timing.finish_event(_sent) return defer.succeed(phase_and_body) d = defer.Deferred() msgs = [] - def _handle(name, data): + def _handle(name, line): if name == "welcome": - self._handle_welcome(json.loads(data)) + self._handle_welcome(json.loads(line)) if name == "message": - self._add_inbound_messages([json.loads(data)]) + data = json.loads(line) + self._add_inbound_messages([data]) phase_and_body = self._find_inbound_message(phases) if phase_and_body is not None and not msgs: msgs.append(phase_and_body) + self._timing.finish_event(_sent, data.get("sent")) d.callback(None) queryargs = urlencode([("appid", self._appid), ("channelid", self._channelid)]) @@ -160,46 +168,58 @@ class Channel: def deallocate(self, mood=None): # only try once, no retries + _sent = self._timing.add_event("close") d = post_json(self._agent, self._relay_url+"deallocate", {"appid": self._appid, "channelid": self._channelid, "side": self._side, "mood": mood}) + def _done(resp): + self._timing.finish_event(_sent, resp.get("sent")) + d.addCallback(_done) d.addBoth(lambda _: None) # ignore POST failure return d class ChannelManager: - def __init__(self, relay, appid, side, handle_welcome): + def __init__(self, relay, appid, side, handle_welcome, timing=None): assert isinstance(relay, type(u"")) self._relay = relay self._appid = appid self._side = side self._handle_welcome = handle_welcome + self._timing = timing or DebugTiming() self._pool = web_client.HTTPConnectionPool(reactor, True) # persistent self._agent = web_client.Agent(reactor, pool=self._pool) @inlineCallbacks def allocate(self): url = self._relay + "allocate" + _sent = self._timing.add_event("allocate") data = yield post_json(self._agent, url, {"appid": self._appid, "side": self._side}) if "welcome" in data: self._handle_welcome(data["welcome"]) + self._timing.finish_event(_sent, data.get("sent")) returnValue(data["channelid"]) @inlineCallbacks def list_channels(self): queryargs = urlencode([("appid", self._appid)]) url = self._relay + u"list?%s" % queryargs + _sent = self._timing.add_event("list") r = yield get_json(self._agent, url) + self._timing.finish_event(_sent, r.get("sent")) returnValue(r["channelids"]) def connect(self, channelid): return Channel(self._relay, self._appid, channelid, self._side, - self._handle_welcome, self._agent) + self._handle_welcome, self._agent, self._timing) + @inlineCallbacks def shutdown(self): - return self._pool.closeCachedConnections() + _sent = self._timing.add_event("pool shutdown") + yield self._pool.closeCachedConnections() + self._timing.finish_event(_sent) def close_on_error(meth): # method decorator # Clients report certain errors as "moods", so the server can make a @@ -251,7 +271,8 @@ class Wormhole: def _set_side(self, side): self._side = side self._channel_manager = ChannelManager(self._relay_url, self._appid, - self._side, self.handle_welcome) + self._side, self.handle_welcome, + self._timing) self._channel = None def handle_welcome(self, welcome): @@ -281,9 +302,7 @@ class Wormhole: if self.code is not None: raise UsageError if self._started_get_code: raise UsageError self._started_get_code = True - _start = self._timing.add_event("alloc channel") channelid = yield self._channel_manager.allocate() - self._timing.finish_event(_start) code = codes.make_code(channelid, code_length) assert isinstance(code, type(u"")), type(code) self._set_code_and_channelid(code) @@ -369,12 +388,9 @@ class Wormhole: # TODO: prevent multiple invocation if self.key: returnValue(self.key) - _sent = self._timing.add_event("send pake") yield self._channel.send(u"pake", self.msg1) - self._timing.finish_event(_sent) - _sent = self._timing.add_event("get pake") pake_msg = yield self._channel.get(u"pake") - self._timing.finish_event(_sent) + key = self.sp.finish(pake_msg) self.key = key self.verifier = self.derive_key(u"wormhole:verifier") @@ -385,9 +401,7 @@ class Wormhole: confkey = self.derive_key(u"wormhole:confirmation") nonce = os.urandom(CONFMSG_NONCE_LENGTH) confmsg = make_confmsg(confkey, nonce) - _sent = self._timing.add_event("send confirmation") yield self._channel.send(u"_confirm", confmsg) - self._timing.finish_event(_sent) returnValue(key) @close_on_error @@ -418,9 +432,7 @@ class Wormhole: yield self._get_key() data_key = self.derive_key(u"wormhole:phase:%s" % phase) outbound_encrypted = self._encrypt_data(data_key, outbound_data) - _sent2 = self._timing.add_event("send") yield self._channel.send(phase, outbound_encrypted) - self._timing.finish_event(_sent2) self._timing.finish_event(_sent) @close_on_error @@ -439,9 +451,7 @@ class Wormhole: if not self._got_confirmation: phases.append(u"_confirm") phases.append(phase) - _sent2 = self._timing.add_event("get", phases=phases) phase_and_body = yield self._channel.get_first_of(phases) - self._timing.finish_event(_sent2) (got_phase, body) = phase_and_body if got_phase == u"_confirm": confkey = self.derive_key(u"wormhole:confirmation") @@ -449,9 +459,7 @@ class Wormhole: if body != make_confmsg(confkey, nonce): raise WrongPasswordError self._got_confirmation = True - _sent3 = self._timing.add_event("get", phases=[phase]) phase_and_body = yield self._channel.get_first_of([phase]) - self._timing.finish_event(_sent3) (got_phase, body) = phase_and_body self._timing.finish_event(_sent) assert got_phase == phase @@ -472,10 +480,6 @@ class Wormhole: self._timing.finish_event(self._timing_started, mood=mood) c, self._channel = self._channel, None monitor.close(c) - _sent = self._timing.add_event("close") yield c.deallocate(mood) - self._timing.finish_event(_sent) - _sent = self._timing.add_event("pool shutdown") yield self._channel_manager.shutdown() - self._timing.finish_event(_sent)