rendezvous: change add_listener signature
Pass in a handle and a pair of functions, rather than an object with two well-known methods. This should make it easier to subscribe to multiple channels in the future.
This commit is contained in:
parent
a34fb2a98b
commit
bdc9066c23
|
@ -24,9 +24,9 @@ class Channel:
|
||||||
self._log_requests = log_requests
|
self._log_requests = log_requests
|
||||||
self._appid = appid
|
self._appid = appid
|
||||||
self._channelid = channelid
|
self._channelid = channelid
|
||||||
self._listeners = set() # instances with .send_rendezvous_event (that
|
self._listeners = {} # handle -> (send_f, stop_f)
|
||||||
# takes a JSONable object) and
|
# "handle" is a hashable object, for deregistration
|
||||||
# .stop_rendezvous_watcher()
|
# send_f() takes a JSONable object, stop_f() has no args
|
||||||
|
|
||||||
def get_channelid(self):
|
def get_channelid(self):
|
||||||
return self._channelid
|
return self._channelid
|
||||||
|
@ -44,17 +44,17 @@ class Channel:
|
||||||
"server_rx": row["server_rx"], "id": row["msgid"]})
|
"server_rx": row["server_rx"], "id": row["msgid"]})
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
def add_listener(self, ep):
|
def add_listener(self, handle, send_f, stop_f):
|
||||||
self._listeners.add(ep)
|
self._listeners[handle] = (send_f, stop_f)
|
||||||
return self.get_messages()
|
return self.get_messages()
|
||||||
|
|
||||||
def remove_listener(self, ep):
|
def remove_listener(self, handle):
|
||||||
self._listeners.discard(ep)
|
self._listeners.pop(handle)
|
||||||
|
|
||||||
def broadcast_message(self, phase, body, server_rx, msgid):
|
def broadcast_message(self, phase, body, server_rx, msgid):
|
||||||
for ep in self._listeners:
|
for (send_f, stop_f) in self._listeners.values():
|
||||||
ep.send_rendezvous_event({"phase": phase, "body": body,
|
send_f({"phase": phase, "body": body,
|
||||||
"server_rx": server_rx, "id": msgid})
|
"server_rx": server_rx, "id": msgid})
|
||||||
|
|
||||||
def _add_message(self, side, phase, body, server_rx, msgid):
|
def _add_message(self, side, phase, body, server_rx, msgid):
|
||||||
db = self._db
|
db = self._db
|
||||||
|
@ -183,15 +183,15 @@ class Channel:
|
||||||
|
|
||||||
# Shut down any listeners, just in case they're still lingering
|
# Shut down any listeners, just in case they're still lingering
|
||||||
# around.
|
# around.
|
||||||
for ep in self._listeners:
|
for (send_f, stop_f) in self._listeners.values():
|
||||||
ep.stop_rendezvous_watcher()
|
stop_f()
|
||||||
|
|
||||||
self._app.free_channel(self._channelid)
|
self._app.free_channel(self._channelid)
|
||||||
|
|
||||||
def _shutdown(self):
|
def _shutdown(self):
|
||||||
# used at test shutdown to accelerate client disconnects
|
# used at test shutdown to accelerate client disconnects
|
||||||
for ep in self._listeners:
|
for (send_f, stop_f) in self._listeners.values():
|
||||||
ep.stop_rendezvous_watcher()
|
stop_f()
|
||||||
|
|
||||||
class AppNamespace:
|
class AppNamespace:
|
||||||
def __init__(self, db, welcome, blur_usage, log_requests, appid):
|
def __init__(self, db, welcome, blur_usage, log_requests, appid):
|
||||||
|
|
|
@ -150,8 +150,12 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
|
||||||
if self._watching:
|
if self._watching:
|
||||||
raise Error("already watching")
|
raise Error("already watching")
|
||||||
self._watching = True
|
self._watching = True
|
||||||
for old_message in channel.add_listener(self):
|
def _send(event):
|
||||||
self.send_rendezvous_event(old_message)
|
self.send_rendezvous_event(event)
|
||||||
|
def _stop():
|
||||||
|
self.stop_rendezvous_watcher()
|
||||||
|
for old_message in channel.add_listener(self, _send, _stop):
|
||||||
|
_send(old_message)
|
||||||
|
|
||||||
def handle_add(self, channel, msg, server_rx):
|
def handle_add(self, channel, msg, server_rx):
|
||||||
if "phase" not in msg:
|
if "phase" not in msg:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user