clients: use new send-multiple API

This commit is contained in:
Brian Warner 2015-11-22 18:16:43 -08:00
parent 56b88d0b40
commit 969619fff5
2 changed files with 41 additions and 20 deletions

View File

@ -52,22 +52,31 @@ class Channel:
return (phase, body) return (phase, body)
return None return None
def send(self, phase, msg): def send(self, phase, body):
return self.send_many([(phase, body)])
def send_many(self, messages):
# TODO: retry on failure, with exponential backoff. We're guarding # TODO: retry on failure, with exponential backoff. We're guarding
# against the rendezvous server being temporarily offline. # against the rendezvous server being temporarily offline.
if not isinstance(phase, type(u"")): raise TypeError(type(phase)) payload_messages = []
if not isinstance(msg, type(b"")): raise TypeError(type(msg)) for (phase, body) in messages:
self._sent_messages.add( (phase,msg) ) if not isinstance(phase, type(u"")): raise TypeError(type(phase))
if not isinstance(body, type(b"")): raise TypeError(type(body))
self._sent_messages.add( (phase,body) )
payload_messages.append({"phase": phase,
"body": hexlify(body).decode("ascii")})
payload = {"appid": self._appid, payload = {"appid": self._appid,
"channelid": self._channelid, "channelid": self._channelid,
"side": self._side, "side": self._side,
"phase": phase, "messages": payload_messages,
"body": hexlify(msg).decode("ascii")} }
data = json.dumps(payload).encode("utf-8") data = json.dumps(payload).encode("utf-8")
r = requests.post(self._relay_url+"add", data=data, r = requests.post(self._relay_url+"add_messages", data=data,
timeout=self._timeout) timeout=self._timeout)
r.raise_for_status() r.raise_for_status()
resp = r.json() resp = r.json()
if "welcome" in resp:
self._handle_welcome(resp["welcome"])
self._add_inbound_messages(resp["messages"]) self._add_inbound_messages(resp["messages"])
def get_first_of(self, phases): def get_first_of(self, phases):
@ -90,15 +99,15 @@ class Channel:
raise Timeout raise Timeout
queryargs = urlencode([("appid", self._appid), queryargs = urlencode([("appid", self._appid),
("channelid", self._channelid)]) ("channelid", self._channelid)])
f = EventSourceFollower(self._relay_url+"get?%s" % queryargs, url = self._relay_url + "watch_messages?%s" % queryargs
remaining) f = EventSourceFollower(url, remaining)
# we loop here until the connection is lost, or we see the # we loop here until the connection is lost, or we see the
# message we want # message we want
for (eventtype, data) in f.iter_events(): for (eventtype, data) in f.iter_events():
if eventtype == "welcome": if eventtype == "welcome":
self._handle_welcome(json.loads(data)) self._handle_welcome(json.loads(data))
if eventtype == "message": if eventtype == "message":
self._add_inbound_messages([json.loads(data)]) self._add_inbound_messages(json.loads(data))
phase_and_body = self._find_inbound_message(phases) phase_and_body = self._find_inbound_message(phases)
if phase_and_body: if phase_and_body:
f.close() f.close()

View File

@ -94,18 +94,30 @@ class Channel:
return (phase, body) return (phase, body)
return None return None
def send(self, phase, msg): def send(self, phase, body):
return self.send_many([(phase, body)])
def send_many(self, messages):
# TODO: retry on failure, with exponential backoff. We're guarding # TODO: retry on failure, with exponential backoff. We're guarding
# against the rendezvous server being temporarily offline. # against the rendezvous server being temporarily offline.
if not isinstance(phase, type(u"")): raise TypeError(type(phase)) payload_messages = []
if not isinstance(msg, type(b"")): raise TypeError(type(msg)) for (phase, body) in messages:
self._sent_messages.add( (phase,msg) ) if not isinstance(phase, type(u"")): raise TypeError(type(phase))
if not isinstance(body, type(b"")): raise TypeError(type(body))
self._sent_messages.add( (phase,body) )
payload_messages.append({"phase": phase,
"body": hexlify(body).decode("ascii")})
payload = {"appid": self._appid, payload = {"appid": self._appid,
"channelid": self._channelid, "channelid": self._channelid,
"side": self._side, "side": self._side,
"phase": phase, "messages": payload_messages,
"body": hexlify(msg).decode("ascii")} }
d = post_json(self._agent, self._relay_url+"add", payload) d = post_json(self._agent, self._relay_url+"add_messages", payload)
def _maybe_handle_welcome(resp):
if "welcome" in resp:
self._handle_welcome(resp["welcome"])
return resp
d.addCallback(_maybe_handle_welcome)
d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"])) d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"]))
return d return d
@ -127,7 +139,7 @@ class Channel:
if name == "welcome": if name == "welcome":
self._handle_welcome(json.loads(data)) self._handle_welcome(json.loads(data))
if name == "message": if name == "message":
self._add_inbound_messages([json.loads(data)]) self._add_inbound_messages(json.loads(data))
phase_and_body = self._find_inbound_message(phases) phase_and_body = self._find_inbound_message(phases)
if phase_and_body is not None and not msgs: if phase_and_body is not None and not msgs:
msgs.append(phase_and_body) msgs.append(phase_and_body)
@ -135,8 +147,8 @@ class Channel:
# TODO: use agent=self._agent # TODO: use agent=self._agent
queryargs = urlencode([("appid", self._appid), queryargs = urlencode([("appid", self._appid),
("channelid", self._channelid)]) ("channelid", self._channelid)])
es = ReconnectingEventSource(self._relay_url+"get?%s" % queryargs, url = self._relay_url + "watch_messages?%s" % queryargs
_handle) es = ReconnectingEventSource(url, _handle)
es.startService() # TODO: .setServiceParent(self) es.startService() # TODO: .setServiceParent(self)
es.activate() es.activate()
d.addCallback(lambda _: es.deactivate()) d.addCallback(lambda _: es.deactivate())