Rendezvous: change internal API

Deliver not-yet-JSONed objects to listeners (both in broadcast_message
and as the "catch-up" responses to add_listener). Also make the (web)
frontend responsible for adding "sent" timestamps. This all makes
rendezvous.py less web-centric.
This commit is contained in:
Brian Warner 2016-04-17 16:22:27 -07:00
parent c3bd9e936e
commit efc23bd078
2 changed files with 19 additions and 12 deletions

View File

@ -1,5 +1,5 @@
from __future__ import print_function from __future__ import print_function
import json, time, random import time, random
from twisted.python import log from twisted.python import log
from twisted.application import service, internet from twisted.application import service, internet
@ -24,8 +24,9 @@ class Channel:
self._log_requests = log_requests self._log_requests = log_requests
self._appid = appid self._appid = appid
self._channelid = channelid self._channelid = channelid
self._listeners = set() # EventsProtocol instances, with a .sendEvent self._listeners = set() # instances with .send_rendezvous_event (that
# that takes a JSONable object # takes a JSONable object) and
# .stop_rendezvous_watcher()
def get_messages(self): def get_messages(self):
messages = [] messages = []
@ -48,14 +49,13 @@ class Channel:
(self._appid, self._channelid)).fetchall(): (self._appid, self._channelid)).fetchall():
if row["phase"] in (u"_allocate", u"_deallocate"): if row["phase"] in (u"_allocate", u"_deallocate"):
continue continue
yield json.dumps({"phase": row["phase"], "body": row["body"]}) yield {"phase": row["phase"], "body": row["body"]}
def remove_listener(self, ep): def remove_listener(self, ep):
self._listeners.discard(ep) self._listeners.discard(ep)
def broadcast_message(self, phase, body): def broadcast_message(self, phase, body):
data = json.dumps({"phase": phase, "body": body, "sent": time.time()})
for ep in self._listeners: for ep in self._listeners:
ep.sendEvent(data) ep.send_rendezvous_event({"phase": phase, "body": body})
def _add_message(self, side, phase, body): def _add_message(self, side, phase, body):
db = self._db db = self._db
@ -181,17 +181,17 @@ class Channel:
(self._appid, self._channelid)) (self._appid, self._channelid))
db.commit() db.commit()
# Shut down any EventSource listeners, just in case they're still # Shut down any listeners, just in case they're still lingering
# lingering around. # around.
for ep in self._listeners: for ep in self._listeners:
ep.stop() ep.stop_rendezvous_watcher()
self._app.free_channel(self._channelid) self._app.free_channel(self._channelid)
def _shutdown(self): def _shutdown(self):
# used at test shutdown to accelerate client disconnects # used at test shutdown to accelerate client disconnects
for ep in self._listeners: for ep in self._listeners:
ep.stop() ep.stop_rendezvous_watcher()
class AppNamespace: class AppNamespace:
def __init__(self, db, welcome, blur_usage, log_requests, appid): def __init__(self, db, welcome, blur_usage, log_requests, appid):

View File

@ -34,6 +34,13 @@ class EventsProtocol:
def stop(self): def stop(self):
self.request.finish() self.request.finish()
def send_rendezvous_event(self, data):
data = data.copy()
data["sent"] = time.time()
self.sendEvent(json.dumps(data))
def stop_rendezvous_watcher(self):
self.stop()
# note: no versions of IE (including the current IE11) support EventSource # note: no versions of IE (including the current IE11) support EventSource
# relay URLs are as follows: (MESSAGES=[{phase:,body:}..]) # relay URLs are as follows: (MESSAGES=[{phase:,body:}..])
@ -151,7 +158,7 @@ class GetterOrWatcher(RelayResource):
request.notifyFinish().addErrback(lambda f: request.notifyFinish().addErrback(lambda f:
channel.remove_listener(ep)) channel.remove_listener(ep))
for old_event in old_events: for old_event in old_events:
ep.sendEvent(old_event) ep.send_rendezvous_event(old_event)
return server.NOT_DONE_YET return server.NOT_DONE_YET
class Watcher(RelayResource): class Watcher(RelayResource):
@ -170,7 +177,7 @@ class Watcher(RelayResource):
request.notifyFinish().addErrback(lambda f: request.notifyFinish().addErrback(lambda f:
channel.remove_listener(ep)) channel.remove_listener(ep))
for old_event in old_events: for old_event in old_events:
ep.sendEvent(old_event) ep.send_rendezvous_event(old_event)
return server.NOT_DONE_YET return server.NOT_DONE_YET
class Deallocator(RelayResource): class Deallocator(RelayResource):