clients: use new send-multiple API
This commit is contained in:
parent
56b88d0b40
commit
969619fff5
|
@ -52,22 +52,31 @@ class Channel:
|
||||||
return (phase, body)
|
return (phase, body)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def send(self, phase, msg):
|
def send(self, phase, body):
|
||||||
|
return self.send_many([(phase, body)])
|
||||||
|
|
||||||
|
def send_many(self, messages):
|
||||||
# TODO: retry on failure, with exponential backoff. We're guarding
|
# TODO: retry on failure, with exponential backoff. We're guarding
|
||||||
# against the rendezvous server being temporarily offline.
|
# against the rendezvous server being temporarily offline.
|
||||||
|
payload_messages = []
|
||||||
|
for (phase, body) in messages:
|
||||||
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
||||||
if not isinstance(msg, type(b"")): raise TypeError(type(msg))
|
if not isinstance(body, type(b"")): raise TypeError(type(body))
|
||||||
self._sent_messages.add( (phase,msg) )
|
self._sent_messages.add( (phase,body) )
|
||||||
|
payload_messages.append({"phase": phase,
|
||||||
|
"body": hexlify(body).decode("ascii")})
|
||||||
payload = {"appid": self._appid,
|
payload = {"appid": self._appid,
|
||||||
"channelid": self._channelid,
|
"channelid": self._channelid,
|
||||||
"side": self._side,
|
"side": self._side,
|
||||||
"phase": phase,
|
"messages": payload_messages,
|
||||||
"body": hexlify(msg).decode("ascii")}
|
}
|
||||||
data = json.dumps(payload).encode("utf-8")
|
data = json.dumps(payload).encode("utf-8")
|
||||||
r = requests.post(self._relay_url+"add", data=data,
|
r = requests.post(self._relay_url+"add_messages", data=data,
|
||||||
timeout=self._timeout)
|
timeout=self._timeout)
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
resp = r.json()
|
resp = r.json()
|
||||||
|
if "welcome" in resp:
|
||||||
|
self._handle_welcome(resp["welcome"])
|
||||||
self._add_inbound_messages(resp["messages"])
|
self._add_inbound_messages(resp["messages"])
|
||||||
|
|
||||||
def get_first_of(self, phases):
|
def get_first_of(self, phases):
|
||||||
|
@ -90,15 +99,15 @@ class Channel:
|
||||||
raise Timeout
|
raise Timeout
|
||||||
queryargs = urlencode([("appid", self._appid),
|
queryargs = urlencode([("appid", self._appid),
|
||||||
("channelid", self._channelid)])
|
("channelid", self._channelid)])
|
||||||
f = EventSourceFollower(self._relay_url+"get?%s" % queryargs,
|
url = self._relay_url + "watch_messages?%s" % queryargs
|
||||||
remaining)
|
f = EventSourceFollower(url, remaining)
|
||||||
# we loop here until the connection is lost, or we see the
|
# we loop here until the connection is lost, or we see the
|
||||||
# message we want
|
# message we want
|
||||||
for (eventtype, data) in f.iter_events():
|
for (eventtype, data) in f.iter_events():
|
||||||
if eventtype == "welcome":
|
if eventtype == "welcome":
|
||||||
self._handle_welcome(json.loads(data))
|
self._handle_welcome(json.loads(data))
|
||||||
if eventtype == "message":
|
if eventtype == "message":
|
||||||
self._add_inbound_messages([json.loads(data)])
|
self._add_inbound_messages(json.loads(data))
|
||||||
phase_and_body = self._find_inbound_message(phases)
|
phase_and_body = self._find_inbound_message(phases)
|
||||||
if phase_and_body:
|
if phase_and_body:
|
||||||
f.close()
|
f.close()
|
||||||
|
|
|
@ -94,18 +94,30 @@ class Channel:
|
||||||
return (phase, body)
|
return (phase, body)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def send(self, phase, msg):
|
def send(self, phase, body):
|
||||||
|
return self.send_many([(phase, body)])
|
||||||
|
|
||||||
|
def send_many(self, messages):
|
||||||
# TODO: retry on failure, with exponential backoff. We're guarding
|
# TODO: retry on failure, with exponential backoff. We're guarding
|
||||||
# against the rendezvous server being temporarily offline.
|
# against the rendezvous server being temporarily offline.
|
||||||
|
payload_messages = []
|
||||||
|
for (phase, body) in messages:
|
||||||
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
if not isinstance(phase, type(u"")): raise TypeError(type(phase))
|
||||||
if not isinstance(msg, type(b"")): raise TypeError(type(msg))
|
if not isinstance(body, type(b"")): raise TypeError(type(body))
|
||||||
self._sent_messages.add( (phase,msg) )
|
self._sent_messages.add( (phase,body) )
|
||||||
|
payload_messages.append({"phase": phase,
|
||||||
|
"body": hexlify(body).decode("ascii")})
|
||||||
payload = {"appid": self._appid,
|
payload = {"appid": self._appid,
|
||||||
"channelid": self._channelid,
|
"channelid": self._channelid,
|
||||||
"side": self._side,
|
"side": self._side,
|
||||||
"phase": phase,
|
"messages": payload_messages,
|
||||||
"body": hexlify(msg).decode("ascii")}
|
}
|
||||||
d = post_json(self._agent, self._relay_url+"add", payload)
|
d = post_json(self._agent, self._relay_url+"add_messages", payload)
|
||||||
|
def _maybe_handle_welcome(resp):
|
||||||
|
if "welcome" in resp:
|
||||||
|
self._handle_welcome(resp["welcome"])
|
||||||
|
return resp
|
||||||
|
d.addCallback(_maybe_handle_welcome)
|
||||||
d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"]))
|
d.addCallback(lambda resp: self._add_inbound_messages(resp["messages"]))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -127,7 +139,7 @@ class Channel:
|
||||||
if name == "welcome":
|
if name == "welcome":
|
||||||
self._handle_welcome(json.loads(data))
|
self._handle_welcome(json.loads(data))
|
||||||
if name == "message":
|
if name == "message":
|
||||||
self._add_inbound_messages([json.loads(data)])
|
self._add_inbound_messages(json.loads(data))
|
||||||
phase_and_body = self._find_inbound_message(phases)
|
phase_and_body = self._find_inbound_message(phases)
|
||||||
if phase_and_body is not None and not msgs:
|
if phase_and_body is not None and not msgs:
|
||||||
msgs.append(phase_and_body)
|
msgs.append(phase_and_body)
|
||||||
|
@ -135,8 +147,8 @@ class Channel:
|
||||||
# TODO: use agent=self._agent
|
# TODO: use agent=self._agent
|
||||||
queryargs = urlencode([("appid", self._appid),
|
queryargs = urlencode([("appid", self._appid),
|
||||||
("channelid", self._channelid)])
|
("channelid", self._channelid)])
|
||||||
es = ReconnectingEventSource(self._relay_url+"get?%s" % queryargs,
|
url = self._relay_url + "watch_messages?%s" % queryargs
|
||||||
_handle)
|
es = ReconnectingEventSource(url, _handle)
|
||||||
es.startService() # TODO: .setServiceParent(self)
|
es.startService() # TODO: .setServiceParent(self)
|
||||||
es.activate()
|
es.activate()
|
||||||
d.addCallback(lambda _: es.deactivate())
|
d.addCallback(lambda _: es.deactivate())
|
||||||
|
|
Loading…
Reference in New Issue
Block a user