From efc23bd07806864a6d56f3087fe228be40a07b2a Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 17 Apr 2016 16:22:27 -0700 Subject: [PATCH] Rendezvous: change internal API Deliver not-yet-JSONed objects to listeners (both in broadcast_message and as the "catch-up" responses to add_listener). Also make the (web) frontend responsible for adding "sent" timestamps. This all makes rendezvous.py less web-centric. --- src/wormhole_server/rendezvous.py | 20 ++++++++++---------- src/wormhole_server/rendezvous_web.py | 11 +++++++++-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/wormhole_server/rendezvous.py b/src/wormhole_server/rendezvous.py index 8e25f4a..73be8dd 100644 --- a/src/wormhole_server/rendezvous.py +++ b/src/wormhole_server/rendezvous.py @@ -1,5 +1,5 @@ from __future__ import print_function -import json, time, random +import time, random from twisted.python import log from twisted.application import service, internet @@ -24,8 +24,9 @@ class Channel: self._log_requests = log_requests self._appid = appid self._channelid = channelid - self._listeners = set() # EventsProtocol instances, with a .sendEvent - # that takes a JSONable object + self._listeners = set() # instances with .send_rendezvous_event (that + # takes a JSONable object) and + # .stop_rendezvous_watcher() def get_messages(self): messages = [] @@ -48,14 +49,13 @@ class Channel: (self._appid, self._channelid)).fetchall(): if row["phase"] in (u"_allocate", u"_deallocate"): continue - yield json.dumps({"phase": row["phase"], "body": row["body"]}) + yield {"phase": row["phase"], "body": row["body"]} def remove_listener(self, ep): self._listeners.discard(ep) def broadcast_message(self, phase, body): - data = json.dumps({"phase": phase, "body": body, "sent": time.time()}) for ep in self._listeners: - ep.sendEvent(data) + ep.send_rendezvous_event({"phase": phase, "body": body}) def _add_message(self, side, phase, body): db = self._db @@ -181,17 +181,17 @@ class Channel: (self._appid, self._channelid)) db.commit() - # Shut down any EventSource listeners, just in case they're still - # lingering around. + # Shut down any listeners, just in case they're still lingering + # around. for ep in self._listeners: - ep.stop() + ep.stop_rendezvous_watcher() self._app.free_channel(self._channelid) def _shutdown(self): # used at test shutdown to accelerate client disconnects for ep in self._listeners: - ep.stop() + ep.stop_rendezvous_watcher() class AppNamespace: def __init__(self, db, welcome, blur_usage, log_requests, appid): diff --git a/src/wormhole_server/rendezvous_web.py b/src/wormhole_server/rendezvous_web.py index 63de23f..9854e27 100644 --- a/src/wormhole_server/rendezvous_web.py +++ b/src/wormhole_server/rendezvous_web.py @@ -34,6 +34,13 @@ class EventsProtocol: def stop(self): self.request.finish() + def send_rendezvous_event(self, data): + data = data.copy() + data["sent"] = time.time() + self.sendEvent(json.dumps(data)) + def stop_rendezvous_watcher(self): + self.stop() + # note: no versions of IE (including the current IE11) support EventSource # relay URLs are as follows: (MESSAGES=[{phase:,body:}..]) @@ -151,7 +158,7 @@ class GetterOrWatcher(RelayResource): request.notifyFinish().addErrback(lambda f: channel.remove_listener(ep)) for old_event in old_events: - ep.sendEvent(old_event) + ep.send_rendezvous_event(old_event) return server.NOT_DONE_YET class Watcher(RelayResource): @@ -170,7 +177,7 @@ class Watcher(RelayResource): request.notifyFinish().addErrback(lambda f: channel.remove_listener(ep)) for old_event in old_events: - ep.sendEvent(old_event) + ep.send_rendezvous_event(old_event) return server.NOT_DONE_YET class Deallocator(RelayResource):