diff --git a/src/wormhole/blocking/transcribe.py b/src/wormhole/blocking/transcribe.py index 7dd6e17..80dce5b 100644 --- a/src/wormhole/blocking/transcribe.py +++ b/src/wormhole/blocking/transcribe.py @@ -1,3 +1,4 @@ +from __future__ import print_function import time, requests, json, textwrap from binascii import hexlify, unhexlify from spake2 import SPAKE2_A, SPAKE2_B @@ -38,12 +39,53 @@ class ReceiverWrongPasswordError(WrongPasswordError): """ # POST /allocate -> {channel-id: INT} -# POST /CHANNEL-ID/SIDE/pake/post {message: STR} -> {messages: [STR..]} -# POST /CHANNEL-ID/SIDE/pake/poll -> {messages: [STR..]} -# POST /CHANNEL-ID/SIDE/data/post {message: STR} -> {messages: [STR..]} -# POST /CHANNEL-ID/SIDE/data/poll -> {messages: [STR..]} +# these return all messages for CHANNEL-ID= and WHICH= but SIDE!= +# WHICH=(pake,data) +# POST /CHANNEL-ID/SIDE/WHICH/post {message: STR} -> {messages: [STR..]} +# POST /CHANNEL-ID/SIDE/WHICH/poll -> {messages: [STR..]} +# GET /CHANNEL-ID/SIDE/WHICH/poll (eventsource) -> STR, STR, .. +# # POST /CHANNEL-ID/SIDE/deallocate -> waiting | deleted +class EventSourceFollower: + def __init__(self, url, timeout): + self.resp = requests.get(url, + headers={"accept": "text/event-stream"}, + stream=True, + timeout=timeout) + self.resp.raise_for_status() + + def close(self): + self.resp.close() + + def _get_fields(self, lines): + while True: + first_line = lines.next() # raises StopIteration when closed + fieldname, data = first_line.split(": ", 1) + data_lines = [data] + while True: + next_line = lines.next() + if not next_line: # empty string, original was "\n" + yield (fieldname, "\n".join(data_lines)) + break + data_lines.append(next_line) + + def iter_events(self): + # I think Request.iter_lines and .iter_content use chunk_size= in a + # funny way, and nothing happens until at least that much data has + # arrived. So unless we set chunk_size=1, we won't hear about lines + # for a long time. I'd prefer that chunk_size behaved like + # read(size), and gave you 1<=x<=size bytes in response. + lines_iter = self.resp.iter_lines(chunk_size=1) + for (fieldname, data) in self._get_fields(lines_iter): + if fieldname == "data": + yield data + else: + print("weird fieldname", fieldname, data) + + def get_message(self): + return self.iter_events().next() + class Common: def url(self, suffix): return "%s%d/%s/%s" % (self.relay, self.channel_id, self.side, suffix) @@ -63,10 +105,10 @@ class Common: remaining = self.started + self.timeout - time.time() if remaining < 0: raise Timeout - time.sleep(self.wait) - r = requests.post(self.url(url_suffix)) - r.raise_for_status() - msgs = r.json()["messages"] + #time.sleep(self.wait) + f = EventSourceFollower(self.url(url_suffix), remaining) + msgs = [json.loads(f.get_message())["message"]] + f.close() return msgs def _allocate(self): diff --git a/src/wormhole/servers/relay.py b/src/wormhole/servers/relay.py index c7afe7f..3f96f94 100644 --- a/src/wormhole/servers/relay.py +++ b/src/wormhole/servers/relay.py @@ -12,6 +12,36 @@ MB = 1000*1000 CHANNEL_EXPIRATION_TIME = 1*HOUR +class EventsProtocol: + def __init__(self, request): + self.request = request + + def sendComment(self, comment): + # this is ignored by clients, but can keep the connection open in the + # face of firewall/NAT timeouts. It also helps unit tests, since + # apparently twisted.web.client.Agent doesn't consider the connection + # to be established until it sees the first byte of the reponse body. + self.request.write(": %s\n\n" % comment) + + def sendEvent(self, data, name=None, id=None, retry=None): + if name: + self.request.write("event: %s\n" % name.encode("utf-8")) + # e.g. if name=foo, then the client web page should do: + # (new EventSource(url)).addEventListener("foo", handlerfunc) + # Note that this basically defaults to "message". + if id: + self.request.write("id: %s\n" % id.encode("utf-8")) + if retry: + self.request.write("retry: %d\n" % retry) # milliseconds + for line in data.splitlines(): + self.request.write("data: %s\n" % line.encode("utf-8")) + self.request.write("\n") + + def stop(self): + self.request.finish() + +# note: no versions of IE (including the current IE11) support EventSource + class Channel(resource.Resource): isLeaf = True # I handle /CHANNEL-ID/* @@ -21,6 +51,7 @@ class Channel(resource.Resource): # these return all messages for CHANNEL-ID= and WHICH= but SIDE!= # POST /CHANNEL-ID/SIDE/WHICH/post {message: STR} -> {messages: [STR..]} # POST /CHANNEL-ID/SIDE/WHICH/poll -> {messages: [STR..]} + # GET /CHANNEL-ID/SIDE/WHICH/poll (eventsource) -> STR, STR, .. # # POST /CHANNEL-ID/SIDE/deallocate -> waiting | deleted @@ -31,6 +62,37 @@ class Channel(resource.Resource): self.expire_at = time.time() + CHANNEL_EXPIRATION_TIME self.sides = set() self.messages = [] # (side, which, str) + self.event_channels = set() # (side, which, ep) + + + def render_GET(self, request): + # rest of URL is: SIDE/WHICH/(post|poll) + their_side = request.postpath[0] + their_which = request.postpath[1] + if "text/event-stream" not in (request.getHeader("accept") or ""): + request.setResponseCode(http.BAD_REQUEST, "Must use EventSource") + return "Must use EventSource (Content-Type: text/event-stream)" + request.setHeader("content-type", "text/event-stream") + ep = EventsProtocol(request) + handle = (their_side, their_which, ep) + self.event_channels.add(handle) + request.notifyFinish().addErrback(self._shutdown, handle) + for (msg_side, msg_which, msg_str) in self.messages: + self.message_added(msg_side, msg_which, msg_str, channels=[handle]) + return server.NOT_DONE_YET + + def _shutdown(self, _, handle): + self.event_channels.discard(handle) + + + def message_added(self, msg_side, msg_which, msg_str, channels=None): + if channels is None: + channels = self.event_channels + for (their_side, their_which, their_ep) in channels: + if msg_side != their_side and msg_which == their_which: + data = json.dumps({ "side": msg_side, "message": msg_str }) + their_ep.sendEvent(data) + def render_POST(self, request): # rest of URL is: SIDE/WHICH/(post|poll) @@ -62,6 +124,7 @@ class Channel(resource.Resource): if verb == "post": data = json.load(request.content) self.messages.append( (side, which, data["message"]) ) + self.message_added(side, which, data["message"]) request.setHeader("content-type", "application/json; charset=utf-8") return json.dumps({"messages": other_messages})+"\n"