diff --git a/src/wormhole/blocking/transcribe.py b/src/wormhole/blocking/transcribe.py index 2e88c5b..afd144a 100644 --- a/src/wormhole/blocking/transcribe.py +++ b/src/wormhole/blocking/transcribe.py @@ -52,22 +52,31 @@ class Channel: return (phase, body) return None - def send(self, phase, msg): + def send(self, phase, body): + return self.send_many([(phase, body)]) + + def send_many(self, messages): # TODO: retry on failure, with exponential backoff. We're guarding # against the rendezvous server being temporarily offline. - if not isinstance(phase, type(u"")): raise TypeError(type(phase)) - if not isinstance(msg, type(b"")): raise TypeError(type(msg)) - self._sent_messages.add( (phase,msg) ) + payload_messages = [] + for (phase, body) in messages: + if not isinstance(phase, type(u"")): raise TypeError(type(phase)) + if not isinstance(body, type(b"")): raise TypeError(type(body)) + self._sent_messages.add( (phase,body) ) + payload_messages.append({"phase": phase, + "body": hexlify(body).decode("ascii")}) payload = {"appid": self._appid, "channelid": self._channelid, "side": self._side, - "phase": phase, - "body": hexlify(msg).decode("ascii")} + "messages": payload_messages, + } data = json.dumps(payload).encode("utf-8") - r = requests.post(self._relay_url+"add", data=data, + r = requests.post(self._relay_url+"add_messages", data=data, timeout=self._timeout) r.raise_for_status() resp = r.json() + if "welcome" in resp: + self._handle_welcome(resp["welcome"]) self._add_inbound_messages(resp["messages"]) def get_first_of(self, phases): @@ -90,15 +99,15 @@ class Channel: raise Timeout queryargs = urlencode([("appid", self._appid), ("channelid", self._channelid)]) - f = EventSourceFollower(self._relay_url+"get?%s" % queryargs, - remaining) + url = self._relay_url + "watch_messages?%s" % queryargs + f = EventSourceFollower(url, remaining) # we loop here until the connection is lost, or we see the # message we want for (eventtype, data) in f.iter_events(): if eventtype == "welcome": self._handle_welcome(json.loads(data)) if eventtype == "message": - self._add_inbound_messages([json.loads(data)]) + self._add_inbound_messages(json.loads(data)) phase_and_body = self._find_inbound_message(phases) if phase_and_body: f.close() diff --git a/src/wormhole/twisted/transcribe.py b/src/wormhole/twisted/transcribe.py index 6ea9e41..4d8e902 100644 --- a/src/wormhole/twisted/transcribe.py +++ b/src/wormhole/twisted/transcribe.py @@ -94,18 +94,30 @@ class Channel: return (phase, body) return None - def send(self, phase, msg): + def send(self, phase, body): + return self.send_many([(phase, body)]) + + def send_many(self, messages): # TODO: retry on failure, with exponential backoff. We're guarding # against the rendezvous server being temporarily offline. - if not isinstance(phase, type(u"")): raise TypeError(type(phase)) - if not isinstance(msg, type(b"")): raise TypeError(type(msg)) - self._sent_messages.add( (phase,msg) ) + payload_messages = [] + for (phase, body) in messages: + if not isinstance(phase, type(u"")): raise TypeError(type(phase)) + if not isinstance(body, type(b"")): raise TypeError(type(body)) + self._sent_messages.add( (phase,body) ) + payload_messages.append({"phase": phase, + "body": hexlify(body).decode("ascii")}) payload = {"appid": self._appid, "channelid": self._channelid, "side": self._side, - "phase": phase, - "body": hexlify(msg).decode("ascii")} - d = post_json(self._agent, self._relay_url+"add", payload) + "messages": payload_messages, + } + d = post_json(self._agent, self._relay_url+"add_messages", payload) + def _maybe_handle_welcome(resp): + if "welcome" in resp: + self._handle_welcome(resp["welcome"]) + return resp + d.addCallback(_maybe_handle_welcome) d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"])) return d @@ -127,7 +139,7 @@ class Channel: if name == "welcome": self._handle_welcome(json.loads(data)) if name == "message": - self._add_inbound_messages([json.loads(data)]) + self._add_inbound_messages(json.loads(data)) phase_and_body = self._find_inbound_message(phases) if phase_and_body is not None and not msgs: msgs.append(phase_and_body) @@ -135,8 +147,8 @@ class Channel: # TODO: use agent=self._agent queryargs = urlencode([("appid", self._appid), ("channelid", self._channelid)]) - es = ReconnectingEventSource(self._relay_url+"get?%s" % queryargs, - _handle) + url = self._relay_url + "watch_messages?%s" % queryargs + es = ReconnectingEventSource(url, _handle) es.startService() # TODO: .setServiceParent(self) es.activate() d.addCallback(lambda _: es.deactivate())