clients: use new send-multiple API

This commit is contained in:
Brian Warner 2015-11-22 18:16:43 -08:00
parent 56b88d0b40
commit 969619fff5
2 changed files with 41 additions and 20 deletions

View File

@ -52,22 +52,31 @@ class Channel:
return (phase, body)
return None
def send(self, phase, msg):
def send(self, phase, body):
return self.send_many([(phase, body)])
def send_many(self, messages):
# TODO: retry on failure, with exponential backoff. We're guarding
# against the rendezvous server being temporarily offline.
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
if not isinstance(msg, type(b"")): raise TypeError(type(msg))
self._sent_messages.add( (phase,msg) )
payload_messages = []
for (phase, body) in messages:
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
if not isinstance(body, type(b"")): raise TypeError(type(body))
self._sent_messages.add( (phase,body) )
payload_messages.append({"phase": phase,
"body": hexlify(body).decode("ascii")})
payload = {"appid": self._appid,
"channelid": self._channelid,
"side": self._side,
"phase": phase,
"body": hexlify(msg).decode("ascii")}
"messages": payload_messages,
}
data = json.dumps(payload).encode("utf-8")
r = requests.post(self._relay_url+"add", data=data,
r = requests.post(self._relay_url+"add_messages", data=data,
timeout=self._timeout)
r.raise_for_status()
resp = r.json()
if "welcome" in resp:
self._handle_welcome(resp["welcome"])
self._add_inbound_messages(resp["messages"])
def get_first_of(self, phases):
@ -90,15 +99,15 @@ class Channel:
raise Timeout
queryargs = urlencode([("appid", self._appid),
("channelid", self._channelid)])
f = EventSourceFollower(self._relay_url+"get?%s" % queryargs,
remaining)
url = self._relay_url + "watch_messages?%s" % queryargs
f = EventSourceFollower(url, remaining)
# we loop here until the connection is lost, or we see the
# message we want
for (eventtype, data) in f.iter_events():
if eventtype == "welcome":
self._handle_welcome(json.loads(data))
if eventtype == "message":
self._add_inbound_messages([json.loads(data)])
self._add_inbound_messages(json.loads(data))
phase_and_body = self._find_inbound_message(phases)
if phase_and_body:
f.close()

View File

@ -94,18 +94,30 @@ class Channel:
return (phase, body)
return None
def send(self, phase, msg):
def send(self, phase, body):
return self.send_many([(phase, body)])
def send_many(self, messages):
# TODO: retry on failure, with exponential backoff. We're guarding
# against the rendezvous server being temporarily offline.
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
if not isinstance(msg, type(b"")): raise TypeError(type(msg))
self._sent_messages.add( (phase,msg) )
payload_messages = []
for (phase, body) in messages:
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
if not isinstance(body, type(b"")): raise TypeError(type(body))
self._sent_messages.add( (phase,body) )
payload_messages.append({"phase": phase,
"body": hexlify(body).decode("ascii")})
payload = {"appid": self._appid,
"channelid": self._channelid,
"side": self._side,
"phase": phase,
"body": hexlify(msg).decode("ascii")}
d = post_json(self._agent, self._relay_url+"add", payload)
"messages": payload_messages,
}
d = post_json(self._agent, self._relay_url+"add_messages", payload)
def _maybe_handle_welcome(resp):
if "welcome" in resp:
self._handle_welcome(resp["welcome"])
return resp
d.addCallback(_maybe_handle_welcome)
d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"]))
return d
@ -127,7 +139,7 @@ class Channel:
if name == "welcome":
self._handle_welcome(json.loads(data))
if name == "message":
self._add_inbound_messages([json.loads(data)])
self._add_inbound_messages(json.loads(data))
phase_and_body = self._find_inbound_message(phases)
if phase_and_body is not None and not msgs:
msgs.append(phase_and_body)
@ -135,8 +147,8 @@ class Channel:
# TODO: use agent=self._agent
queryargs = urlencode([("appid", self._appid),
("channelid", self._channelid)])
es = ReconnectingEventSource(self._relay_url+"get?%s" % queryargs,
_handle)
url = self._relay_url + "watch_messages?%s" % queryargs
es = ReconnectingEventSource(url, _handle)
es.startService() # TODO: .setServiceParent(self)
es.activate()
d.addCallback(lambda _: es.deactivate())