clients: use new send-multiple API
This commit is contained in:
parent
56b88d0b40
commit
969619fff5
|
@ -52,22 +52,31 @@ class Channel:
|
|||
return (phase, body)
|
||||
return None
|
||||
|
||||
def send(self, phase, msg):
|
||||
def send(self, phase, body):
|
||||
return self.send_many([(phase, body)])
|
||||
|
||||
def send_many(self, messages):
|
||||
# TODO: retry on failure, with exponential backoff. We're guarding
|
||||
# against the rendezvous server being temporarily offline.
|
||||
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
||||
if not isinstance(msg, type(b"")): raise TypeError(type(msg))
|
||||
self._sent_messages.add( (phase,msg) )
|
||||
payload_messages = []
|
||||
for (phase, body) in messages:
|
||||
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
||||
if not isinstance(body, type(b"")): raise TypeError(type(body))
|
||||
self._sent_messages.add( (phase,body) )
|
||||
payload_messages.append({"phase": phase,
|
||||
"body": hexlify(body).decode("ascii")})
|
||||
payload = {"appid": self._appid,
|
||||
"channelid": self._channelid,
|
||||
"side": self._side,
|
||||
"phase": phase,
|
||||
"body": hexlify(msg).decode("ascii")}
|
||||
"messages": payload_messages,
|
||||
}
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
r = requests.post(self._relay_url+"add", data=data,
|
||||
r = requests.post(self._relay_url+"add_messages", data=data,
|
||||
timeout=self._timeout)
|
||||
r.raise_for_status()
|
||||
resp = r.json()
|
||||
if "welcome" in resp:
|
||||
self._handle_welcome(resp["welcome"])
|
||||
self._add_inbound_messages(resp["messages"])
|
||||
|
||||
def get_first_of(self, phases):
|
||||
|
@ -90,15 +99,15 @@ class Channel:
|
|||
raise Timeout
|
||||
queryargs = urlencode([("appid", self._appid),
|
||||
("channelid", self._channelid)])
|
||||
f = EventSourceFollower(self._relay_url+"get?%s" % queryargs,
|
||||
remaining)
|
||||
url = self._relay_url + "watch_messages?%s" % queryargs
|
||||
f = EventSourceFollower(url, remaining)
|
||||
# we loop here until the connection is lost, or we see the
|
||||
# message we want
|
||||
for (eventtype, data) in f.iter_events():
|
||||
if eventtype == "welcome":
|
||||
self._handle_welcome(json.loads(data))
|
||||
if eventtype == "message":
|
||||
self._add_inbound_messages([json.loads(data)])
|
||||
self._add_inbound_messages(json.loads(data))
|
||||
phase_and_body = self._find_inbound_message(phases)
|
||||
if phase_and_body:
|
||||
f.close()
|
||||
|
|
|
@ -94,18 +94,30 @@ class Channel:
|
|||
return (phase, body)
|
||||
return None
|
||||
|
||||
def send(self, phase, msg):
|
||||
def send(self, phase, body):
|
||||
return self.send_many([(phase, body)])
|
||||
|
||||
def send_many(self, messages):
|
||||
# TODO: retry on failure, with exponential backoff. We're guarding
|
||||
# against the rendezvous server being temporarily offline.
|
||||
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
||||
if not isinstance(msg, type(b"")): raise TypeError(type(msg))
|
||||
self._sent_messages.add( (phase,msg) )
|
||||
payload_messages = []
|
||||
for (phase, body) in messages:
|
||||
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
||||
if not isinstance(body, type(b"")): raise TypeError(type(body))
|
||||
self._sent_messages.add( (phase,body) )
|
||||
payload_messages.append({"phase": phase,
|
||||
"body": hexlify(body).decode("ascii")})
|
||||
payload = {"appid": self._appid,
|
||||
"channelid": self._channelid,
|
||||
"side": self._side,
|
||||
"phase": phase,
|
||||
"body": hexlify(msg).decode("ascii")}
|
||||
d = post_json(self._agent, self._relay_url+"add", payload)
|
||||
"messages": payload_messages,
|
||||
}
|
||||
d = post_json(self._agent, self._relay_url+"add_messages", payload)
|
||||
def _maybe_handle_welcome(resp):
|
||||
if "welcome" in resp:
|
||||
self._handle_welcome(resp["welcome"])
|
||||
return resp
|
||||
d.addCallback(_maybe_handle_welcome)
|
||||
d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"]))
|
||||
return d
|
||||
|
||||
|
@ -127,7 +139,7 @@ class Channel:
|
|||
if name == "welcome":
|
||||
self._handle_welcome(json.loads(data))
|
||||
if name == "message":
|
||||
self._add_inbound_messages([json.loads(data)])
|
||||
self._add_inbound_messages(json.loads(data))
|
||||
phase_and_body = self._find_inbound_message(phases)
|
||||
if phase_and_body is not None and not msgs:
|
||||
msgs.append(phase_and_body)
|
||||
|
@ -135,8 +147,8 @@ class Channel:
|
|||
# TODO: use agent=self._agent
|
||||
queryargs = urlencode([("appid", self._appid),
|
||||
("channelid", self._channelid)])
|
||||
es = ReconnectingEventSource(self._relay_url+"get?%s" % queryargs,
|
||||
_handle)
|
||||
url = self._relay_url + "watch_messages?%s" % queryargs
|
||||
es = ReconnectingEventSource(url, _handle)
|
||||
es.startService() # TODO: .setServiceParent(self)
|
||||
es.activate()
|
||||
d.addCallback(lambda _: es.deactivate())
|
||||
|
|
Loading…
Reference in New Issue
Block a user