diff --git a/misc/dump-timing.py b/misc/dump-timing.py index 6a222a8..c36e44d 100644 --- a/misc/dump-timing.py +++ b/misc/dump-timing.py @@ -1,78 +1,122 @@ # To use the web() option, you should do: # * cd misc -# * npm install vis zepto +# * npm install d3 zepto from __future__ import print_function -import os, sys, time, json +import os, sys, time, json, random streams = sys.argv[1:] +if not streams: + print("run like: python dump-timing.py tx.json rx.json") + sys.exit(1) num_streams = len(streams) labels = dict([(num, " "*num + "[%d]" % (num+1) + " "*(num_streams-1-num)) for num in range(num_streams)]) -timeline = [] -groups_out = [] -for which,fn in enumerate(streams): +abs_timeline = [] +sides = [] + +for side,fn in enumerate(streams): with open(fn, "rb") as f: - for (start, sent, finish, what, start_d, finish_d) in json.load(f): - timeline.append( (start, sent, finish, which, what, - start_d, finish_d) ) - print("%s is %s" % (labels[which], fn)) - groups_out.append({"id": which, "content": fn, - "className": "group-%d" % which}) + for (start, sent, finish, what, details) in json.load(f): + abs_timeline.append( (start, sent, finish, side, what, details) ) + print("%s is %s" % (labels[side], fn)) + sides.append(os.path.basename(fn)) +# relativize all timestamps +all_times = [e[0] for e in abs_timeline] + [e[2] for e in abs_timeline if e[2]] +all_times.sort() +earliest = all_times[0] +def rel(t): + if t is None: return None + return t - earliest +timeline = [ (rel(start), rel(sent), rel(finish), side, what, details) + for (start, sent, finish, side, what, details) + in abs_timeline ] +data = {} -timeline.sort(key=lambda row: row[0]) -first = timeline[0][0] -print("started at %s" % time.ctime(start)) -viz_out = [] -for num, (start, sent, finish, which, what, - start_d, finish_d) in enumerate(timeline): - delta = start - first - delta_s = "%.6f" % delta - start_d_str = ", ".join(["%s=%s" % (name, start_d[name]) - for name in sorted(start_d)]) - finish_d_str = ", ".join(["%s=%s" % (name, finish_d[name]) - for name in sorted(finish_d)]) - details_str = start_d_str - if finish_d_str: - details_str += "/" + finish_d_str - finish_str = "" - if finish is not None: - finish_str = " +%.6f" % (finish - start) - print("%9s: %s %s %s%s" % (delta_s, labels[which], what, details_str, - finish_str)) - viz_start = start*1000 - viz_end = None if finish is None else finish*1000 - viz_type = "range" if finish else "point" - if what == "wormhole started" or what == "wormhole": - viz_type = "background" - viz_content = '%s' % (details_str or "(No details)", - what) # sigh - viz_className = "item-group-%d" % which - if "waiting" in start_d: - viz_className += " wait-%s" % start_d["waiting"] - viz_out.append({"id": num, "start": viz_start, "end": viz_end, - "group": which, #"subgroup": num, - "content": viz_content, - "className": viz_className, # or style: - "type": viz_type, - }) - if sent is not None: - viz_out.append({"id": "%d.sent" % num, "start": sent*1000, - "group": which, #"subgroup": num, - "content": "sent", - "className": viz_className, - "type": "point"}) +# we pre-calculate the "lane" that each item uses, here in python, rather +# than leaving that up to the javascript. +data["lanes"] = ["proc", # 0 gets high-level events and spans: process start, + # imports, command dispatch, code established, key + # established, transit connected, process exit + "API", # 1 gets API call spans (apps usually only wait for + # one at a time, so they won't overlap): get_code, + # input_code, get_verifier, get_data, send_data, + # close + "wait", # 2 shows waiting-for-human: input code, get + # permission + "app", # 3: file-xfer events + "skt", # 4: websocket message send/receives + "misc", # 5: anything else + ] +data["bounds"] = {"min": rel(all_times[0]), "max": rel(all_times[-1]), + } +data["sides"] = sides +print("started at %s" % time.ctime(earliest)) +print("duration %s seconds" % data["bounds"]["max"]) +items = data["items"] = [] +for num, (start, sent, finish, side, what, details) in enumerate(timeline): + background = False + if what in ["wormhole",]: + # background region for wormhole lifetime + lane = 0 + background = True + elif what in ["process start", "import", "command dispatch", + "code established", "key established", "transit connected", + "exit"]: + lane = 0 + elif what in ["API get_code", "API input_code", "API set_code", + "API get_verifier", "API get_data", "API send_data", + #"API get data", "API send data", + "API close"]: + lane = 1 + elif details.get("waiting") in ["user", "crypto"]: # permission or math + lane = 2 + elif what in ["tx file", "get ack", "rx file", "unpack zip", "send ack"]: + lane = 3 + elif what in ["websocket"]: + # connection establishment + lane = 4 + background = True + elif (what in ["welcome", "error"] # rendezvous message receives + or what in ["allocate", "list", "get", "add", "deallocate"] + # rendezvous message sends + ): + lane = 4 + else: + lane = 5 # unknown + + if background: + continue # disable until I figure out how to draw these better + + details_str = ", ".join(["%s=%s" % (name, details[name]) + for name in sorted(details)]) + + items.append({"side": side, + "lane": lane, + "start_time": start, + "server_sent": sent, + "finish_time": finish, # maybe None + "what": what, + "details": details, + "details_str": details_str, + "wiggle": random.randint(0,4), + }) + + #if "waiting" in details: + # viz_className += " wait-%s" % details["waiting"] + +from pprint import pprint +pprint(data) here = os.path.dirname(__file__) web_root = os.path.join(here, "web") -vis_root = os.path.join(here, "node_modules", "vis", "dist") -zepto_root = os.path.join(here, "node_modules", "zepto") -if not os.path.isdir(vis_root) or not os.path.isdir(zepto_root): - print("Cannot find 'vis' and 'zepto' in misc/node_modules/") - print("Please run 'npm install vis zepto' from the misc/ directory.") +lib_root = os.path.join(here, "node_modules") +if not os.path.isdir(lib_root): + print("Cannot find 'd3' and 'd3-tip' in misc/node_modules/") + print("Please run 'npm install d3 d3-tip zepto' from the misc/ directory.") sys.exit(1) def web(): @@ -82,15 +126,14 @@ def web(): from twisted.internet import reactor, endpoints ep = endpoints.serverFromString(reactor, "tcp:8066:interface=127.0.0.1") root = static.File(web_root) - data_json = {"items": viz_out, "groups": groups_out} - data = json.dumps(data_json).encode("utf-8") - root.putChild("data.json", static.Data(data, "application/json")) - root.putChild("vis", static.File(vis_root)) - root.putChild("zepto", static.File(zepto_root)) + root.putChild("data.json", static.Data(json.dumps(data).encode("utf-8"), + "application/json")) + root.putChild("lib", static.File(lib_root)) class Shutdown(resource.Resource): def render_GET(self, request): - print("timeline ready, server shutting down") - reactor.stop() + if 0: + print("timeline ready, server shutting down") + reactor.stop() return "shutting down" root.putChild("done", Shutdown()) site = server.Site(root) diff --git a/misc/web/timeline.css b/misc/web/timeline.css index c49565d..ea90e15 100644 --- a/misc/web/timeline.css +++ b/misc/web/timeline.css @@ -1,20 +1,51 @@ -div.item-group-0 { - background-color: #fcc; +rect.bar { + stroke: black; } -div.item-group-1 { - background-color: #cfc; +.lane-0 { + fill: #fcc; } -div.item-group-2 { - background-color: #ccf; +.lane-1 { + fill: #cfc; } -div.wait-user { - background-color: #ccc; +.lane-2 { + fill: #ccf; +} + +.lane-3 { + fill: #ccf; +} + +.lane-4 { + fill: #ccf; +} + +.lane-5 { + fill: #ccf; +} + +.lane-6 { + fill: #ccf; +} + +rect.wait-user { + fill: #ccc; +} + +rect.wait-crypto { + fill: #bbe; } .vis-item .vis-item-overflow { overflow: visible; } + +.d3-tip { + margin: 4px; + padding: 2px; + background: #111; + color: #fff; +} diff --git a/misc/web/timeline.html b/misc/web/timeline.html index 190fdd9..41e92fe 100644 --- a/misc/web/timeline.html +++ b/misc/web/timeline.html @@ -2,9 +2,9 @@ Timeline Visualizer - - - + + + diff --git a/misc/web/timeline.js b/misc/web/timeline.js index 1554f29..0c9978f 100644 --- a/misc/web/timeline.js +++ b/misc/web/timeline.js @@ -1,59 +1,356 @@ -var vis, $; // hush +var d3, $; // hush -var container = document.getElementById("viz"); -var options = {editable: false, - showCurrentTime: false, - snap: null, - order: function(a,b) { return a.id - b.id; } - }; -var timeline = new vis.Timeline(container, options); +var container = d3.select("#viz"); +var data; var items; +var globals = {}; -$.getJSON("data.json", function(data) { - items = new vis.DataSet(data.items); - timeline.setData({groups: new vis.DataSet(data.groups), - items: items}); - var start = data.items[0].start; - var end = data.items[data.items.length-1].start; - var span = end - start; - timeline.setWindow(start - (span/10), end + (span/10)); - //timeline.fit(); // doesn't leave space on the ends - timeline.setOptions({min: start - (span/10), - max: end + (span/10), - zoomMin: 50, - zoomMax: 1.2*span}); - var bar = timeline.addCustomTime(start, "cursor"); - timeline.on("timechange", update_cursor); - update_cursor({time: new Date(start)}); - timeline.on("doubleClick", zoom); - timeline.on("select", select_item); +var zoom = d3.behavior.zoom().scaleExtent([1, Infinity]); +function zoomin() { + //var w = Number(container.style("width").slice(0,-2)); + //console.log("zoomin", w); + //zoom.center([w/2, 20]); // doesn't work yet + zoom.scale(zoom.scale() * 2); + globals.redraw(); +} +function zoomout() { + zoom.scale(zoom.scale() * 0.5); + globals.redraw(); +} + +$.getJSON("data.json", function(d) { + data = d; + + const LANE_HEIGHT = 30; + const RECT_HEIGHT = 20; + + // the Y axis is as follows: + // * each lane is LANE_HEIGHT tall (e.g. 30px) + // * the actual rects are RECT_HEIGHT tall (e.g. 20px) + // * there is a one-lane-tall gap at the top of the chart + // * there are data.sides.length sides (e.g. one tx, one rx) + // * there are data.lanes.length lanes (e.g. 6), each with a name + // * there is a one-lane-tall gap between each side + // * there is a one-lane-tall gap after the last lane + // * the horizontal scale markers begin after that gap + // * the tick marks extend another 6 pixels down + + var w = Number(container.style("width").slice(0,-2)); + + function y_off(d) { + return (LANE_HEIGHT * (d.side*(data.lanes.length+1) + d.lane) + + d.wiggle); + } + var bottom_rule_y = LANE_HEIGHT * data.sides.length * (data.lanes.length+1); + var bottom_y = bottom_rule_y + 45; + + var tip = d3.tip() + .attr("class", "d3-tip") + .html(function(d) { return "" + d.details_str + ""; }) + .direction("s") + ; + + var chart = container.append("svg:svg") + .attr("id", "outer_chart") + .attr("width", w) + .attr("pointer-events", "all") + .call(zoom) + .call(tip) + ; + //var chart_g = chart.append("svg:g"); + + // this "backboard" rect lets us catch mouse events anywhere in the + // chart, even between the bars. Without it, we only see events on solid + // objects like bars and text, but not in the gaps between. + chart.append("svg:rect") + .attr("id", "outer_rect") + .attr("width", w).attr("height", bottom_y).attr("fill", "none"); + + // but the stuff we put inside it should have some room + w = w-50; + + chart.selectAll("text.sides-label").data(data.sides).enter() + .append("svg:text") + .attr("class", "sides-label") + .attr("x", "0px") + .attr("y", function(d,idx) { + return y_off({side: idx, lane: data.lanes.length/2, + wiggle: 0}) ;}) + .attr("text-anchor", "start") // anchor at top-left + .attr("dy", ".71em") + .attr("fill", "black") + .text(function(d) { return d; }) + ; + + var lanes_by_sides = []; + data.sides.forEach(function(side, side_index) { + data.lanes.forEach(function(lane, lane_index) { + lanes_by_sides.push({side: side, side_index: side_index, + lane: lane, lane_index: lane_index}); + }); + }); + + chart.selectAll("text.lanes-label").data(lanes_by_sides).enter() + .append("svg:text") + .attr("class", "lanes-label") + .attr("x", "50px") + .attr("y", function(d) { + return y_off({side: d.side_index, lane: d.lane_index, + wiggle: 0}) ;}) + .attr("text-anchor", "start") // anchor at top-left + .attr("dy", ".91em") + .attr("fill", "#f88") + .text(function(d) { return d.lane; }) + ; + + chart.append("svg:text") + .attr("class", "seconds-label") + //.attr("x", w/2).attr("y", y + 35) + .attr("text-anchor", "middle") + .attr("fill", "black") + .text("seconds"); + + d3.select("#outer_chart").attr("height", bottom_y); + d3.select("#outer_rect").attr("height", bottom_y); + d3.select("#zoom").attr("transform", "translate("+(w-10)+","+10+")"); + + function reltime(t) {return t-data.bounds.min;} + var last = data.bounds.max - data.bounds.min; + //last = reltime(d3.max(data.dyhb, function(d){return d.finish_time;})); + last = last * 1.05; + // long downloads are likely to have too much info, start small + if (last > 10.0) + last = 10.0; + // d3.time.scale() has no support for ms or us. + var xOFF = d3.time.scale().domain([data.bounds.min, data.bounds.max]) + .range([0,w]); + var x = d3.scale.linear().domain([-last*0.05, last]) + .range([0,w]); + zoom.x(x); + function tx(d) { return "translate(" +x(d) + ",0)"; } + function left(d) { return x(reltime(d.start_time)); } + function left_server(d) { return x(reltime(d.server_sent)); } + function right(d) { + return d.finish_time ? x(reltime(d.finish_time)) : "1px"; + } + function width(d) { + return d.finish_time ? x(reltime(d.finish_time))-x(reltime(d.start_time)) : "1px"; + } + function halfwidth(d) { + if (d.finish_time) + return (x(reltime(d.finish_time))-x(reltime(d.start_time)))/2; + return "1px"; + } + function middle(d) { + if (d.finish_time) + return (x(reltime(d.start_time))+x(reltime(d.finish_time)))/2; + else + return x(reltime(d.start_time)) + 1; + } + function color(d) { return data.server_info[d.serverid].color; } + function servername(d) { return data.server_info[d.serverid].short; } + function timeformat(duration) { + // TODO: trim to microseconds, maybe humanize + return duration; + } + + function redraw() { + // at this point zoom/pan must be fixed + var min = data.bounds.min + x.domain()[0]; + var max = data.bounds.min + x.domain()[1]; + function inside(d) { + var finish_time = d.finish_time || d.start_time; + if (Math.max(d.start_time, min) <= Math.min(finish_time, max)) + return true; + return false; + } + + // from the data, build a list of bars, dots, and lines + var clipped = {bars: [], dots: [], lines: []}; + data.items.filter(inside).forEach(function(d) { + if (!d.finish_time) { + clipped.dots.push(d); + } else { + clipped.bars.push(d); + if (!!d.server_sent) { + clipped.lines.push(d); + } + } + }); + globals.clipped = clipped; + + //chart.select(".dyhb-label") + // .attr("x", x(0))//"20px") + // .attr("y", y); + + // Panning and zooming will re-run this function multiple times, and + // bars will come and go, so we must process all three selections + // (including enter() and exit()). + + // TODO: add dots for events that have only start, not finish. Add + // the server-sent bar (a vertical line, half height, centered + // vertically) for events that have server-sent as well as finish. + // This probably requires creating a dot for everything, but making + // it invisible if finished is non-null, likewise for the server-sent + // bar. + + // each item gets an SVG group (g.bars), translated left and down + // to match the start time and side/lane of the event + var bars = chart.selectAll("g.bars") + .data(clipped.bars, function(d) { return d.start_time; }) + .attr("transform", function(d) { + return "translate("+left(d)+","+y_off(d)+")"; }) + ; + // update the variable parts of each bar, which depends upon the + // current pan/zoom values + bars.select("rect") + .attr("width", width); + bars.select("text") + .attr("x", halfwidth); + bars.exit().remove(); + var new_bars = bars.enter() + .append("svg:g") + .attr("class", "bars") + .attr("transform", function(d) { + return "translate("+left(d)+","+y_off(d)+")"; }) + ; + + // inside the group, we have a rect with a width for the duration of + // the event, and a fixed height. The fill and stroke color depend + // upon the event, and the title has the details. We append the rects + // first, so the text is drawn on top (higher z-order) + //y += 30*(1+d3.max(data.bars, function(d){return d.row;})); + new_bars.append("svg:rect") + .attr("width", width) + .attr("height", RECT_HEIGHT) + .attr("class", function(d) { + var c = ["bar", "lane-" + d.lane]; + if (d.details.waiting) + c.push("wait-" + d.details.waiting); + return c.join(" "); + }) + .on("mouseover", function(d) {if (d.details_str) tip.show(d);}) + .on("mouseout", tip.hide) + //.attr("title", function(d) {return d.details_str;}) + ; + + // each group also has a text, with 'x' set to place it in the middle + // of the rect, and text contents that are drawn in the rect + new_bars.append("svg:text") + .attr("x", halfwidth) + .attr("text-anchor", "middle") + .attr("dy", "0.9em") + //.attr("fill", "black") + .text((d) => d.what) + .on("mouseover", function(d) {if (d.details_str) tip.show(d);}) + .on("mouseout", tip.hide) + ; + + // dots: events that have a single timestamp, rather than a range. + // These get an SVG group, and a circle and some text. + var dots = chart.selectAll("g.dots") + .data(clipped.dots, (d) => d.start_time) + .attr("transform", + (d) => "translate("+left(d)+","+(y_off(d)+LANE_HEIGHT/3)+")") + ; + dots.exit().remove(); + var new_dots = dots.enter() + .append("svg:g") + .attr("class", "dots") + .attr("transform", + (d) => "translate("+left(d)+","+(y_off(d)+LANE_HEIGHT/3)+")") + ; + new_dots.append("svg:circle") + .attr("r", "5") + .attr("class", (d) => "dot lane-"+d.lane) + .attr("fill", "#888") + .attr("stroke", "black") + .on("mouseover", function(d) {if (d.details_str) tip.show(d);}) + .on("mouseout", tip.hide) + ; + new_dots.append("svg:text") + .attr("x", "5px") + .attr("text-anchor", "start") + .attr("dy", "0.2em") + .text((d) => d.what) + .on("mouseover", function(d) {if (d.details_str) tip.show(d);}) + .on("mouseout", tip.hide) + ; + + // lines: these represent the time at which the server sent a message + // which finished a bar. These get an SVG group, and a line + var lines = chart.selectAll("g.lines") + .data(clipped.lines, (d) => d.start_time) + .attr("transform", + (d) => "translate("+left_server(d)+","+y_off(d)+")") + ; + lines.exit().remove(); + var new_lines = lines.enter() + .append("svg:g") + .attr("class", "lines") + .attr("transform", + (d) => "translate("+left_server(d)+","+(y_off(d))+")") + ; + new_lines.append("svg:line") + .attr("x1", 0).attr("y1", -5).attr("x2", "0").attr("y2", LANE_HEIGHT) + .attr("class", (d) => "line lane-"+d.lane) + .attr("stroke", "red") + ; + + + + + // horizontal scale markers: vertical lines at rational timestamps + var rules = chart.selectAll("g.rule") + .data(x.ticks(10)) + .attr("transform", tx); + rules.select("text").text(x.tickFormat(10)); + + var newrules = rules.enter().insert("svg:g") + .attr("class", "rule") + .attr("transform", tx) + ; + + newrules.append("svg:line") + .attr("class", "rule-tick") + .attr("stroke", "black"); + chart.selectAll("line.rule-tick") + .attr("y1", bottom_rule_y) + .attr("y2", bottom_rule_y + 6); + newrules.append("svg:line") + .attr("class", "rule-red") + .attr("stroke", "red") + .attr("stroke-opacity", .3); + chart.selectAll("line.rule-red") + .attr("y1", 0) + .attr("y2", bottom_rule_y); + newrules.append("svg:text") + .attr("class", "rule-text") + .attr("dy", ".71em") + .attr("text-anchor", "middle") + .attr("fill", "black") + .text(x.tickFormat(10)); + chart.selectAll("text.rule-text") + .attr("y", bottom_rule_y + 9); + rules.exit().remove(); + chart.select(".seconds-label") + .attr("x", w/2) + .attr("y", bottom_rule_y + 35); + + } + globals.x = x; + globals.redraw = redraw; + + zoom.on("zoom", redraw); + + d3.select("#zoom_in_button").on("click", zoomin); + d3.select("#zoom_out_button").on("click", zoomout); + d3.select("#reset_button").on("click", + function() { + x.domain([-last*0.05, last]).range([0,w]); + redraw(); + }); + + redraw(); $.get("done", function(_) {}); }); - -function zoom(properties) { - var target = properties.time.valueOf(); - var w = timeline.getWindow(); - var span = w.end - w.start; - var new_span = span / 2; - var new_start = target - new_span/2; - var new_end = target + new_span/2; - timeline.setWindow(new_start, new_end, {animation: true}); -} - -function update_cursor(properties) { - var t = properties.time; - document.getElementById("cursor_date").innerText = t; - var m = vis.moment(t); - document.getElementById("cursor_time").innerText = m.format("ss.SSSSSS"); -} - -function select_item(properties) { - var item_id = properties.items[0]; - var i = items.get(item_id); - if (i.end) { - var elapsed = (i.end - i.start) / 1000; - $("div#elapsed").text("elapsed: " + elapsed + " s"); - } else { - $("div#elapsed").text(""); - } -} diff --git a/src/wormhole/blocking/transcribe.py b/src/wormhole/blocking/transcribe.py index d4b0935..21f576f 100644 --- a/src/wormhole/blocking/transcribe.py +++ b/src/wormhole/blocking/transcribe.py @@ -69,12 +69,12 @@ class Channel: "phase": phase, "body": hexlify(msg).decode("ascii")} data = json.dumps(payload).encode("utf-8") - _sent = self._timing.add_event("send %s" % phase) - r = requests.post(self._relay_url+"add", data=data, - timeout=self._timeout) - r.raise_for_status() - resp = r.json() - self._timing.finish_event(_sent, resp.get("sent")) + with self._timing.add("send %s" % phase) as t: + r = requests.post(self._relay_url+"add", data=data, + timeout=self._timeout) + r.raise_for_status() + resp = r.json() + t.server_sent(resp.get("sent")) if "welcome" in resp: self._handle_welcome(resp["welcome"]) self._add_inbound_messages(resp["messages"]) @@ -92,34 +92,31 @@ class Channel: # wasn't one of our own messages. It will either come from # previously-received messages, or from an EventSource that we attach # to the corresponding URL - _sent = self._timing.add_event("get %s" % "/".join(sorted(phases))) - _server_sent = None - - phase_and_body = self._find_inbound_message(phases) - while phase_and_body is None: - remaining = self._started + self._timeout - time.time() - if remaining < 0: - raise Timeout - queryargs = urlencode([("appid", self._appid), - ("channelid", self._channelid)]) - f = EventSourceFollower(self._relay_url+"watch?%s" % queryargs, - remaining) - # we loop here until the connection is lost, or we see the - # message we want - for (eventtype, line) in f.iter_events(): - if eventtype == "welcome": - self._handle_welcome(json.loads(line)) - if eventtype == "message": - data = json.loads(line) - self._add_inbound_messages([data]) - phase_and_body = self._find_inbound_message(phases) - if phase_and_body: - f.close() - _server_sent = data.get("sent") - break - if not phase_and_body: - time.sleep(self._wait) - self._timing.finish_event(_sent, _server_sent) + with self._timing.add("get %s" % "/".join(sorted(phases))) as t: + phase_and_body = self._find_inbound_message(phases) + while phase_and_body is None: + remaining = self._started + self._timeout - time.time() + if remaining < 0: + raise Timeout + queryargs = urlencode([("appid", self._appid), + ("channelid", self._channelid)]) + f = EventSourceFollower(self._relay_url+"watch?%s" % queryargs, + remaining) + # we loop here until the connection is lost, or we see the + # message we want + for (eventtype, line) in f.iter_events(): + if eventtype == "welcome": + self._handle_welcome(json.loads(line)) + if eventtype == "message": + data = json.loads(line) + self._add_inbound_messages([data]) + phase_and_body = self._find_inbound_message(phases) + if phase_and_body: + f.close() + t.server_sent(data.get("sent")) + break + if not phase_and_body: + time.sleep(self._wait) return phase_and_body def get(self, phase): @@ -136,11 +133,11 @@ class Channel: try: # ignore POST failure, don't call r.raise_for_status(), set a # short timeout and ignore failures - _sent = self._timing.add_event("close") - r = requests.post(self._relay_url+"deallocate", data=data, - timeout=5) - resp = r.json() - self._timing.finish_event(_sent, resp.get("sent")) + with self._timing.add("close") as t: + r = requests.post(self._relay_url+"deallocate", data=data, + timeout=5) + resp = r.json() + t.server_sent(resp.get("sent")) except requests.exceptions.RequestException: pass @@ -157,28 +154,28 @@ class ChannelManager: def list_channels(self): queryargs = urlencode([("appid", self._appid)]) - _sent = self._timing.add_event("list") - r = requests.get(self._relay_url+"list?%s" % queryargs, - timeout=self._timeout) - r.raise_for_status() - data = r.json() - if "welcome" in data: - self._handle_welcome(data["welcome"]) - self._timing.finish_event(_sent, data.get("sent")) + with self._timing.add("list") as t: + r = requests.get(self._relay_url+"list?%s" % queryargs, + timeout=self._timeout) + r.raise_for_status() + data = r.json() + if "welcome" in data: + self._handle_welcome(data["welcome"]) + t.server_sent(data.get("sent")) channelids = data["channelids"] return channelids def allocate(self): data = json.dumps({"appid": self._appid, "side": self._side}).encode("utf-8") - _sent = self._timing.add_event("allocate") - r = requests.post(self._relay_url+"allocate", data=data, - timeout=self._timeout) - r.raise_for_status() - data = r.json() - if "welcome" in data: - self._handle_welcome(data["welcome"]) - self._timing.finish_event(_sent, data.get("sent")) + with self._timing.add("allocate") as t: + r = requests.post(self._relay_url+"allocate", data=data, + timeout=self._timeout) + r.raise_for_status() + data = r.json() + if "welcome" in data: + self._handle_welcome(data["welcome"]) + t.server_sent(data.get("sent")) channelid = data["channelid"] return channelid @@ -239,7 +236,7 @@ class Wormhole: self._got_data = set() self._got_confirmation = False self._closed = False - self._timing_started = self._timing.add_event("wormhole") + self._timing_started = self._timing.add("wormhole") def __enter__(self): return self @@ -284,11 +281,10 @@ class Wormhole: # discover the welcome message (and warn the user about an obsolete # client) initial_channelids = lister() - _start = self._timing.add_event("input code", waiting="user") - code = codes.input_code_with_completion(prompt, - initial_channelids, lister, - code_length) - self._timing.finish_event(_start) + with self._timing.add("input code", waiting="user"): + code = codes.input_code_with_completion(prompt, + initial_channelids, lister, + code_length) return code def set_code(self, code): # used for human-made pre-generated codes @@ -299,7 +295,7 @@ class Wormhole: def _set_code_and_channelid(self, code): if self.code is not None: raise UsageError - self._timing.add_event("code established") + self._timing.add("code established") mo = re.search(r'^(\d+)-', code) if not mo: raise ValueError("code (%s) must start with NN-" % code) @@ -342,7 +338,7 @@ class Wormhole: self.key = self.sp.finish(pake_msg) self.verifier = self.derive_key(u"wormhole:verifier") - self._timing.add_event("key established") + self._timing.add("key established") if not self._send_confirm: return @@ -369,17 +365,16 @@ class Wormhole: if phase.startswith(u"_"): raise UsageError # reserved for internals if self.code is None: raise UsageError if self._channel is None: raise UsageError - _sent = self._timing.add_event("API send data", phase=phase) - # Without predefined roles, we can't derive predictably unique keys - # for each side, so we use the same key for both. We use random - # nonces to keep the messages distinct, and the Channel automatically - # ignores reflections. - self._sent_data.add(phase) - self._get_key() - data_key = self.derive_key(u"wormhole:phase:%s" % phase) - outbound_encrypted = self._encrypt_data(data_key, outbound_data) - self._channel.send(phase, outbound_encrypted) - self._timing.finish_event(_sent) + with self._timing.add("API send data", phase=phase): + # Without predefined roles, we can't derive predictably unique + # keys for each side, so we use the same key for both. We use + # random nonces to keep the messages distinct, and the Channel + # automatically ignores reflections. + self._sent_data.add(phase) + self._get_key() + data_key = self.derive_key(u"wormhole:phase:%s" % phase) + outbound_encrypted = self._encrypt_data(data_key, outbound_data) + self._channel.send(phase, outbound_encrypted) @close_on_error def get_data(self, phase=u"data"): @@ -389,23 +384,22 @@ class Wormhole: if self._closed: raise UsageError if self.code is None: raise UsageError if self._channel is None: raise UsageError - _sent = self._timing.add_event("API get data", phase=phase) - self._got_data.add(phase) - self._get_key() - phases = [] - if not self._got_confirmation: - phases.append(u"_confirm") - phases.append(phase) - (got_phase, body) = self._channel.get_first_of(phases) - if got_phase == u"_confirm": - confkey = self.derive_key(u"wormhole:confirmation") - nonce = body[:CONFMSG_NONCE_LENGTH] - if body != make_confmsg(confkey, nonce): - raise WrongPasswordError - self._got_confirmation = True - (got_phase, body) = self._channel.get_first_of([phase]) - assert got_phase == phase - self._timing.finish_event(_sent) + with self._timing.add("API get data", phase=phase): + self._got_data.add(phase) + self._get_key() + phases = [] + if not self._got_confirmation: + phases.append(u"_confirm") + phases.append(phase) + (got_phase, body) = self._channel.get_first_of(phases) + if got_phase == u"_confirm": + confkey = self.derive_key(u"wormhole:confirmation") + nonce = body[:CONFMSG_NONCE_LENGTH] + if body != make_confmsg(confkey, nonce): + raise WrongPasswordError + self._got_confirmation = True + (got_phase, body) = self._channel.get_first_of([phase]) + assert got_phase == phase try: data_key = self.derive_key(u"wormhole:phase:%s" % phase) inbound_data = self._decrypt_data(data_key, body) @@ -418,7 +412,7 @@ class Wormhole: raise TypeError(type(mood)) self._closed = True if self._channel: - self._timing.finish_event(self._timing_started, mood=mood) + self._timing_started.finish(mood=mood) c, self._channel = self._channel, None monitor.close(c) c.deallocate(mood) diff --git a/src/wormhole/cli/cmd_receive.py b/src/wormhole/cli/cmd_receive.py index 8dadd02..0de21cd 100644 --- a/src/wormhole/cli/cmd_receive.py +++ b/src/wormhole/cli/cmd_receive.py @@ -37,9 +37,8 @@ class TwistedReceiver: def go(self): tor_manager = None if self.args.tor: - _start = self.args.timing.add_event("import TorManager") - from ..twisted.tor_manager import TorManager - self.args.timing.finish_event(_start) + with self.args.timing.add("import", which="tor_manager"): + from ..twisted.tor_manager import TorManager tor_manager = TorManager(self._reactor, timing=self.args.timing) # For now, block everything until Tor has started. Soon: launch # tor in parallel with everything else, make sure the TorManager @@ -172,15 +171,15 @@ class TwistedReceiver: return abs_destname def ask_permission(self): - _start = self.args.timing.add_event("permission", waiting="user") - while True and not self.args.accept_file: - ok = six.moves.input("ok? (y/n): ") - if ok.lower().startswith("y"): - break - print(u"transfer rejected", file=sys.stderr) - self.args.timing.finish_event(_start, answer="no") - raise RespondError("transfer rejected") - self.args.timing.finish_event(_start, answer="yes") + with self.args.timing.add("permission", waiting="user") as t: + while True and not self.args.accept_file: + ok = six.moves.input("ok? (y/n): ") + if ok.lower().startswith("y"): + break + print(u"transfer rejected", file=sys.stderr) + t.detail(answer="no") + raise RespondError("transfer rejected") + t.detail(answer="yes") @inlineCallbacks def establish_transit(self, w, them_d, tor_manager): @@ -207,20 +206,20 @@ class TwistedReceiver: transit_receiver.add_their_direct_hints(tdata["direct_connection_hints"]) transit_receiver.add_their_relay_hints(tdata["relay_connection_hints"]) record_pipe = yield transit_receiver.connect() + self.args.timing.add("transit connected") returnValue(record_pipe) @inlineCallbacks def transfer_data(self, record_pipe, f): self.msg(u"Receiving (%s).." % record_pipe.describe()) - _start = self.args.timing.add_event("rx file") - progress = tqdm(file=self.args.stdout, - disable=self.args.hide_progress, - unit="B", unit_scale=True, total=self.xfersize) - with progress: - received = yield record_pipe.writeToFile(f, self.xfersize, - progress.update) - self.args.timing.finish_event(_start) + with self.args.timing.add("rx file"): + progress = tqdm(file=self.args.stdout, + disable=self.args.hide_progress, + unit="B", unit_scale=True, total=self.xfersize) + with progress: + received = yield record_pipe.writeToFile(f, self.xfersize, + progress.update) # except TransitError if received < self.xfersize: @@ -239,21 +238,19 @@ class TwistedReceiver: def write_directory(self, f): self.msg(u"Unpacking zipfile..") - _start = self.args.timing.add_event("unpack zip") - with zipfile.ZipFile(f, "r", zipfile.ZIP_DEFLATED) as zf: - zf.extractall(path=self.abs_destname) - # extractall() appears to offer some protection against - # malicious pathnames. For example, "/tmp/oops" and - # "../tmp/oops" both do the same thing as the (safe) - # "tmp/oops". - self.msg(u"Received files written to %s/" % - os.path.basename(self.abs_destname)) - f.close() - self.args.timing.finish_event(_start) + with self.args.timing.add("unpack zip"): + with zipfile.ZipFile(f, "r", zipfile.ZIP_DEFLATED) as zf: + zf.extractall(path=self.abs_destname) + # extractall() appears to offer some protection against + # malicious pathnames. For example, "/tmp/oops" and + # "../tmp/oops" both do the same thing as the (safe) + # "tmp/oops". + self.msg(u"Received files written to %s/" % + os.path.basename(self.abs_destname)) + f.close() @inlineCallbacks def close_transit(self, record_pipe): - _start = self.args.timing.add_event("ack") - yield record_pipe.send_record(b"ok\n") - yield record_pipe.close() - self.args.timing.finish_event(_start) + with self.args.timing.add("send ack"): + yield record_pipe.send_record(b"ok\n") + yield record_pipe.close() diff --git a/src/wormhole/cli/cmd_send.py b/src/wormhole/cli/cmd_send.py index e3a5871..c4811af 100644 --- a/src/wormhole/cli/cmd_send.py +++ b/src/wormhole/cli/cmd_send.py @@ -210,6 +210,7 @@ def _send_file_twisted(tdata, transit_sender, fd_to_send, fd_to_send.seek(0,0) record_pipe = yield transit_sender.connect() + timing.add("transit connected") # record_pipe should implement IConsumer, chunks are just records print(u"Sending (%s).." % record_pipe.describe(), file=stdout) @@ -221,17 +222,17 @@ def _send_file_twisted(tdata, transit_sender, fd_to_send, return data fs = basic.FileSender() - _start = timing.add_event("tx file") - with progress: - yield fs.beginFileTransfer(fd_to_send, record_pipe, transform=_count) - timing.finish_event(_start) + with timing.add("tx file"): + with progress: + yield fs.beginFileTransfer(fd_to_send, record_pipe, + transform=_count) print(u"File sent.. waiting for confirmation", file=stdout) - _start = timing.add_event("get ack") - ack = yield record_pipe.receive_record() - record_pipe.close() - if ack != b"ok\n": - timing.finish_event(_start, ack="failed") - raise TransferError("Transfer failed (remote says: %r)" % ack) - print(u"Confirmation received. Transfer complete.", file=stdout) - timing.finish_event(_start, ack="ok") + with timing.add("get ack") as t: + ack = yield record_pipe.receive_record() + record_pipe.close() + if ack != b"ok\n": + t.detail(ack="failed") + raise TransferError("Transfer failed (remote says: %r)" % ack) + print(u"Confirmation received. Transfer complete.", file=stdout) + t.detail(ack="ok") diff --git a/src/wormhole/cli/runner.py b/src/wormhole/cli/runner.py index 1ed73d5..b77b5f2 100644 --- a/src/wormhole/cli/runner.py +++ b/src/wormhole/cli/runner.py @@ -1,19 +1,22 @@ from __future__ import print_function +import time +start = time.time() import os, sys from twisted.internet.defer import maybeDeferred from twisted.internet.task import react from ..errors import TransferError, WrongPasswordError, Timeout from ..timing import DebugTiming from .cli_args import parser +top_import_finish = time.time() def dispatch(args): # returns Deferred if args.func == "send/send": - from . import cmd_send + with args.timing.add("import", which="cmd_send"): + from . import cmd_send return cmd_send.send(args) if args.func == "receive/receive": - _start = args.timing.add_event("import c_r_t") - from . import cmd_receive - args.timing.finish_event(_start) + with args.timing.add("import", which="cmd_receive"): + from . import cmd_receive return cmd_receive.receive(args) raise ValueError("unknown args.func %s" % args.func) @@ -34,11 +37,12 @@ def run(reactor, argv, cwd, stdout, stderr, executable=None): args.stderr = stderr args.timing = timing = DebugTiming() - timing.add_event("command dispatch") + timing.add("command dispatch") + timing.add("import", when=start, which="top").finish(when=top_import_finish) # fires with None, or raises an error d = maybeDeferred(dispatch, args) def _maybe_dump_timing(res): - timing.add_event("exit") + timing.add("exit") if args.dump_timing: timing.write(args.dump_timing, stderr) return res diff --git a/src/wormhole/timing.py b/src/wormhole/timing.py index 3b67d1b..e4f884d 100644 --- a/src/wormhole/timing.py +++ b/src/wormhole/timing.py @@ -1,23 +1,57 @@ -from __future__ import print_function +from __future__ import print_function, absolute_import import json, time +class Event: + def __init__(self, name, when, **details): + # data fields that will be dumped to JSON later + self._start = time.time() if when is None else float(when) + self._server_sent = None + self._stop = None + self._name = name + self._details = details + + def server_sent(self, when): + self._server_sent = when + + def detail(self, **details): + self._details.update(details) + + def finish(self, server_sent=None, **details): + self._stop = time.time() + if server_sent: + self.server_sent(server_sent) + self.detail(**details) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + if exc_type: + # inlineCallbacks uses a special exception (defer._DefGen_Return) + # to deliver returnValue(), so if returnValue is used inside our + # with: block, we'll mistakenly think it means something broke. + # I've moved all returnValue() calls outside the 'with + # timing.add()' blocks to avoid this, but if a new one + # accidentally pops up, it'll get marked as an error. I used to + # catch-and-release _DefGen_Return to avoid this, but removed it + # because it requires referencing defer.py's private class + self.finish(exception=str(exc_type)) + else: + self.finish() + class DebugTiming: def __init__(self): - self.data = [] - def add_event(self, name, when=None, **details): - # [ start, [server_sent], [stop], name, start_details{}, stop_details{} ] - if when is None: - when = time.time() - when = float(when) - self.data.append( [when, None, None, name, details, {}] ) - return len(self.data)-1 - def finish_event(self, index, server_sent=None, **details): - if server_sent is not None: - self.data[index][1] = float(server_sent) - self.data[index][2] = time.time() - self.data[index][5] = details + self._events = [] + + def add(self, name, when=None, **details): + ev = Event(name, when, **details) + self._events.append(ev) + return ev + def write(self, fn, stderr): with open(fn, "wb") as f: - json.dump(self.data, f) + data = [ [e._start, e._server_sent, e._stop, e._name, e._details] + for e in self._events ] + json.dump(data, f) f.write("\n") print("Timing data written to %s" % fn, file=stderr) diff --git a/src/wormhole/twisted/tor_manager.py b/src/wormhole/twisted/tor_manager.py index b487276..80a72c1 100644 --- a/src/wormhole/twisted/tor_manager.py +++ b/src/wormhole/twisted/tor_manager.py @@ -44,31 +44,29 @@ class TorManager: self._can_run_service = False returnValue(True) - _start_find = self._timing.add_event("find tor") + _start_find = self._timing.add("find tor") # try port 9051, then try /var/run/tor/control . Throws on failure. state = None - _start_tcp = self._timing.add_event("tor localhost") - try: - connection = (self._reactor, "127.0.0.1", self._tor_control_port) - state = yield txtorcon.build_tor_connection(connection) - self._tor_protocol = state.protocol - except ConnectError: - print("unable to reach Tor on %d" % self._tor_control_port) - pass - self._timing.finish_event(_start_tcp) - - if not state: - _start_unix = self._timing.add_event("tor unix") + with self._timing.add("tor localhost"): try: - connection = (self._reactor, "/var/run/tor/control") - # add build_state=False to get back a Protocol object instead - # of a State object + connection = (self._reactor, "127.0.0.1", self._tor_control_port) state = yield txtorcon.build_tor_connection(connection) self._tor_protocol = state.protocol - except (ValueError, ConnectError): - print("unable to reach Tor on /var/run/tor/control") + except ConnectError: + print("unable to reach Tor on %d" % self._tor_control_port) pass - self._timing.finish_event(_start_unix) + + if not state: + with self._timing.add("tor unix"): + try: + connection = (self._reactor, "/var/run/tor/control") + # add build_state=False to get back a Protocol object + # instead of a State object + state = yield txtorcon.build_tor_connection(connection) + self._tor_protocol = state.protocol + except (ValueError, ConnectError): + print("unable to reach Tor on /var/run/tor/control") + pass if state: print("connected to pre-existing Tor process") @@ -78,37 +76,36 @@ class TorManager: yield self._create_my_own_tor() # that sets self._tor_socks_port and self._tor_protocol - self._timing.finish_event(_start_find) + _start_find.finish() self._can_run_service = True returnValue(True) @inlineCallbacks def _create_my_own_tor(self): - _start_launch = self._timing.add_event("launch tor") - start = time.time() - config = self.config = txtorcon.TorConfig() - if 0: - # The default is for launch_tor to create a tempdir itself, and - # delete it when done. We only need to set a DataDirectory if we - # want it to be persistent. - import tempfile - datadir = tempfile.mkdtemp() - config.DataDirectory = datadir + with self._timing.add("launch tor"): + start = time.time() + config = self.config = txtorcon.TorConfig() + if 0: + # The default is for launch_tor to create a tempdir itself, + # and delete it when done. We only need to set a + # DataDirectory if we want it to be persistent. + import tempfile + datadir = tempfile.mkdtemp() + config.DataDirectory = datadir - #config.ControlPort = allocate_tcp_port() # defaults to 9052 - #print("setting config.ControlPort to", config.ControlPort) - config.SocksPort = allocate_tcp_port() - self._tor_socks_port = config.SocksPort - print("setting config.SocksPort to", config.SocksPort) + #config.ControlPort = allocate_tcp_port() # defaults to 9052 + #print("setting config.ControlPort to", config.ControlPort) + config.SocksPort = allocate_tcp_port() + self._tor_socks_port = config.SocksPort + print("setting config.SocksPort to", config.SocksPort) - tpp = yield txtorcon.launch_tor(config, self._reactor, - #tor_binary= - ) - # gives a TorProcessProtocol with .tor_protocol - self._tor_protocol = tpp.tor_protocol - print("tp:", self._tor_protocol) - print("elapsed:", time.time() - start) - self._timing.finish_event(_start_launch) + tpp = yield txtorcon.launch_tor(config, self._reactor, + #tor_binary= + ) + # gives a TorProcessProtocol with .tor_protocol + self._tor_protocol = tpp.tor_protocol + print("tp:", self._tor_protocol) + print("elapsed:", time.time() - start) returnValue(True) def is_non_public_numeric_address(self, host): diff --git a/src/wormhole/twisted/transcribe.py b/src/wormhole/twisted/transcribe.py index b026e2b..4edeffa 100644 --- a/src/wormhole/twisted/transcribe.py +++ b/src/wormhole/twisted/transcribe.py @@ -77,14 +77,16 @@ class Wormhole: self._sent_messages = set() # (phase, body_bytes) self._delivered_messages = set() # (phase, body_bytes) self._received_messages = {} # phase -> body_bytes + self._received_messages_sent = {} # phase -> server timestamp self._sent_phases = set() # phases, to prohibit double-send self._got_phases = set() # phases, to prohibit double-read self._sleepers = [] self._confirmation_failed = False self._closed = False self._deallocated_status = None - self._timing_started = self._timing.add_event("wormhole") + self._timing_started = self._timing.add("wormhole") self._ws = None + self._ws_t = None # timing Event self._ws_channel_claimed = False self._error = None @@ -112,6 +114,7 @@ class Wormhole: ep = self._make_endpoint(p.hostname, p.port or 80) # .connect errbacks if the TCP connection fails self._ws = yield ep.connect(f) + self._ws_t = self._timing.add("websocket") # f.d is errbacked if WebSocket negotiation fails yield f.d # WebSocket drops data sent before onOpen() fires self._ws_send(u"bind", appid=self._appid, side=self._side) @@ -134,6 +137,7 @@ class Wormhole: return meth(msg) def _ws_handle_welcome(self, msg): + self._timing.add("welcome").server_sent(msg["sent"]) welcome = msg["welcome"] if ("motd" in welcome and not self.motd_displayed): @@ -184,6 +188,7 @@ class Wormhole: self._wakeup() def _ws_handle_error(self, msg): + self._timing.add("error").server_sent(msg["sent"]) err = ServerError("%s: %s" % (msg["error"], msg["orig"]), self._ws_url) return self._signal_error(err) @@ -203,18 +208,20 @@ class Wormhole: if self._code is not None: raise UsageError if self._started_get_code: raise UsageError self._started_get_code = True - _sent = self._timing.add_event("allocate") - yield self._ws_send(u"allocate") - while self._channelid is None: - yield self._sleep() - self._timing.finish_event(_sent) - code = codes.make_code(self._channelid, code_length) - assert isinstance(code, type(u"")), type(code) - self._set_code(code) - self._start() + with self._timing.add("API get_code"): + with self._timing.add("allocate") as t: + self._allocate_t = t + yield self._ws_send(u"allocate") + while self._channelid is None: + yield self._sleep() + code = codes.make_code(self._channelid, code_length) + assert isinstance(code, type(u"")), type(code) + self._set_code(code) + self._start() returnValue(code) def _ws_handle_allocated(self, msg): + self._allocate_t.server_sent(msg["sent"]) if self._channelid is not None: return self._signal_error("got duplicate channelid") self._channelid = msg["channelid"] @@ -222,9 +229,10 @@ class Wormhole: def _start(self): # allocate the rest now too, so it can be serialized - self._sp = SPAKE2_Symmetric(to_bytes(self._code), - idSymmetric=to_bytes(self._appid)) - self._msg1 = self._sp.start() + with self._timing.add("pake1", waiting="crypto"): + self._sp = SPAKE2_Symmetric(to_bytes(self._code), + idSymmetric=to_bytes(self._appid)) + self._msg1 = self._sp.start() # entry point 2a: interactively type in a code, with completion @inlineCallbacks @@ -238,16 +246,16 @@ class Wormhole: # TODO: send the request early, show the prompt right away, hide the # latency in the user's indecision and slow typing. If we're lucky # the answer will come back before they hit TAB. - initial_channelids = yield self._list_channels() - _start = self._timing.add_event("input code", waiting="user") - t = self._reactor.addSystemEventTrigger("before", "shutdown", - self._warn_readline) - code = yield deferToThread(codes.input_code_with_completion, - prompt, - initial_channelids, _lister, - code_length) - self._reactor.removeSystemEventTrigger(t) - self._timing.finish_event(_start) + with self._timing.add("API input_code"): + initial_channelids = yield self._list_channels() + with self._timing.add("input code", waiting="user"): + t = self._reactor.addSystemEventTrigger("before", "shutdown", + self._warn_readline) + code = yield deferToThread(codes.input_code_with_completion, + prompt, + initial_channelids, _lister, + code_length) + self._reactor.removeSystemEventTrigger(t) returnValue(code) # application will give this to set_code() def _warn_readline(self): @@ -286,12 +294,11 @@ class Wormhole: @inlineCallbacks def _list_channels(self): - _sent = self._timing.add_event("list") - self._latest_channelids = None - yield self._ws_send(u"list") - while self._latest_channelids is None: - yield self._sleep() - self._timing.finish_event(_sent) + with self._timing.add("list"): + self._latest_channelids = None + yield self._ws_send(u"list") + while self._latest_channelids is None: + yield self._sleep() returnValue(self._latest_channelids) def _ws_handle_channelids(self, msg): @@ -305,13 +312,14 @@ class Wormhole: mo = re.search(r'^(\d+)-', code) if not mo: raise ValueError("code (%s) must start with NN-" % code) - self._channelid = int(mo.group(1)) - self._set_code(code) - self._start() + with self._timing.add("API set_code"): + self._channelid = int(mo.group(1)) + self._set_code(code) + self._start() def _set_code(self, code): if self._code is not None: raise UsageError - self._timing.add_event("code established") + self._timing.add("code established") self._code = code def serialize(self): @@ -349,16 +357,17 @@ class Wormhole: def get_verifier(self): if self._closed: raise UsageError if self._code is None: raise UsageError - yield self._get_master_key() - # If the caller cares about the verifier, then they'll probably also - # willing to wait a moment to see the _confirm message. Each side - # sends this as soon as it sees the other's PAKE message. So the - # sender should see this hot on the heels of the inbound PAKE message - # (a moment after _get_master_key() returns). The receiver will see - # this a round-trip after they send their PAKE (because the sender is - # using wait=True inside _get_master_key, below: otherwise the sender - # might go do some blocking call). - yield self._msg_get(u"_confirm") + with self._timing.add("API get_verifier"): + yield self._get_master_key() + # If the caller cares about the verifier, then they'll probably + # also willing to wait a moment to see the _confirm message. Each + # side sends this as soon as it sees the other's PAKE message. So + # the sender should see this hot on the heels of the inbound PAKE + # message (a moment after _get_master_key() returns). The + # receiver will see this a round-trip after they send their PAKE + # (because the sender is using wait=True inside _get_master_key, + # below: otherwise the sender might go do some blocking call). + yield self._msg_get(u"_confirm") returnValue(self._verifier) @inlineCallbacks @@ -369,9 +378,10 @@ class Wormhole: yield self._msg_send(u"pake", self._msg1) pake_msg = yield self._msg_get(u"pake") - self._key = self._sp.finish(pake_msg) + with self._timing.add("pake2", waiting="crypto"): + self._key = self._sp.finish(pake_msg) self._verifier = self.derive_key(u"wormhole:verifier") - self._timing.add_event("key established") + self._timing.add("key established") if self._send_confirm: # both sides send different (random) confirmation messages @@ -385,11 +395,13 @@ class Wormhole: self._sent_messages.add( (phase, body) ) # TODO: retry on failure, with exponential backoff. We're guarding # against the rendezvous server being temporarily offline. + t = self._timing.add("add", phase=phase, wait=wait) yield self._ws_send(u"add", phase=phase, body=hexlify(body).decode("ascii")) if wait: while (phase, body) not in self._delivered_messages: yield self._sleep() + t.finish() def _ws_handle_message(self, msg): m = msg["message"] @@ -404,6 +416,7 @@ class Wormhole: err = ServerError("got duplicate phase %s" % phase, self._ws_url) return self._signal_error(err) self._received_messages[phase] = body + self._received_messages_sent[phase] = msg.get(u"sent") if phase == u"_confirm": # TODO: we might not have a master key yet, if the caller wasn't # waiting in _get_master_key() when a back-to-back pake+_confirm @@ -418,12 +431,15 @@ class Wormhole: @inlineCallbacks def _msg_get(self, phase): - _start = self._timing.add_event("get(%s)" % phase) - while phase not in self._received_messages: - yield self._sleep() # we can wait a long time here - # that will throw an error if something goes wrong - self._timing.finish_event(_start) - returnValue(self._received_messages[phase]) + with self._timing.add("get", phase=phase) as t: + while phase not in self._received_messages: + yield self._sleep() # we can wait a long time here + # that will throw an error if something goes wrong + msg = self._received_messages[phase] + sent = self._received_messages_sent[phase] + if sent: + t.server_sent(sent) + returnValue(msg) def derive_key(self, purpose, length=SecretBox.KEY_SIZE): if not isinstance(purpose, type(u"")): raise TypeError(type(purpose)) @@ -459,16 +475,15 @@ class Wormhole: if phase.startswith(u"_"): raise UsageError # reserved for internals if phase in self._sent_phases: raise UsageError # only call this once self._sent_phases.add(phase) - _sent = self._timing.add_event("API send data", phase=phase, wait=wait) - # Without predefined roles, we can't derive predictably unique keys - # for each side, so we use the same key for both. We use random - # nonces to keep the messages distinct, and we automatically ignore - # reflections. - yield self._get_master_key() - data_key = self.derive_key(u"wormhole:phase:%s" % phase) - outbound_encrypted = self._encrypt_data(data_key, outbound_data) - yield self._msg_send(phase, outbound_encrypted, wait) - self._timing.finish_event(_sent) + with self._timing.add("API send_data", phase=phase, wait=wait): + # Without predefined roles, we can't derive predictably unique + # keys for each side, so we use the same key for both. We use + # random nonces to keep the messages distinct, and we + # automatically ignore reflections. + yield self._get_master_key() + data_key = self.derive_key(u"wormhole:phase:%s" % phase) + outbound_encrypted = self._encrypt_data(data_key, outbound_data) + yield self._msg_send(phase, outbound_encrypted, wait) @inlineCallbacks def get_data(self, phase=u"data"): @@ -478,10 +493,9 @@ class Wormhole: if phase.startswith(u"_"): raise UsageError # reserved for internals if phase in self._got_phases: raise UsageError # only call this once self._got_phases.add(phase) - _sent = self._timing.add_event("API get data", phase=phase) - yield self._get_master_key() - body = yield self._msg_get(phase) # we can wait a long time here - self._timing.finish_event(_sent) + with self._timing.add("API get_data", phase=phase): + yield self._get_master_key() + body = yield self._msg_get(phase) # we can wait a long time here try: data_key = self.derive_key(u"wormhole:phase:%s" % phase) inbound_data = self._decrypt_data(data_key, body) @@ -491,6 +505,7 @@ class Wormhole: def _ws_closed(self, wasClean, code, reason): self._ws = None + self._ws_t.finish() # TODO: schedule reconnect, unless we're done @inlineCallbacks @@ -517,20 +532,21 @@ class Wormhole: if not isinstance(mood, (type(None), type(u""))): raise TypeError(type(mood)) - self._timing.finish_event(self._timing_started, mood=mood) - yield self._deallocate(mood) - # TODO: mark WebSocket as don't-reconnect - self._ws.transport.loseConnection() # probably flushes - del self._ws + with self._timing.add("API close"): + yield self._deallocate(mood) + # TODO: mark WebSocket as don't-reconnect + self._ws.transport.loseConnection() # probably flushes + del self._ws + self._ws_t.finish() + self._timing_started.finish(mood=mood) returnValue(f) @inlineCallbacks def _deallocate(self, mood): - _sent = self._timing.add_event("close") - yield self._ws_send(u"deallocate", mood=mood) - while self._deallocated_status is None: - yield self._sleep(wake_on_error=False) - self._timing.finish_event(_sent) + with self._timing.add("deallocate"): + yield self._ws_send(u"deallocate", mood=mood) + while self._deallocated_status is None: + yield self._sleep(wake_on_error=False) # TODO: set a timeout, don't wait forever for an ack # TODO: if the connection is lost, let it go returnValue(self._deallocated_status) diff --git a/src/wormhole/twisted/transit.py b/src/wormhole/twisted/transit.py index 7987962..d0eef5e 100644 --- a/src/wormhole/twisted/transit.py +++ b/src/wormhole/twisted/transit.py @@ -565,7 +565,7 @@ class Common: self._winner = None self._reactor = reactor self._timing = timing or DebugTiming() - self._timing_started = self._timing.add_event("transit") + self._timing.add("transit") def _build_listener(self): if self._no_listen or self._tor_manager: @@ -690,13 +690,12 @@ class Common: @inlineCallbacks def connect(self): - _start = self._timing.add_event("transit connect") - yield self._get_transit_key() - # we want to have the transit key before starting any outbound - # connections, so those connections will know what to say when they - # connect - winner = yield self._connect() - self._timing.finish_event(_start) + with self._timing.add("transit connect"): + yield self._get_transit_key() + # we want to have the transit key before starting any outbound + # connections, so those connections will know what to say when + # they connect + winner = yield self._connect() returnValue(winner) def _connect(self):