From 181ef04a91672a1e833bc23e7450d3a674934b96 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 20 May 2016 16:39:59 -0700 Subject: [PATCH] break out more message components, use SidedMessage --- src/wormhole/server/rendezvous.py | 39 ++++++++-------- src/wormhole/server/rendezvous_websocket.py | 19 ++++---- src/wormhole/test/test_server.py | 51 +++++++++++++-------- 3 files changed, 65 insertions(+), 44 deletions(-) diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 018b8e2..d439fbb 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -52,6 +52,9 @@ TransitUsage = namedtuple("TransitUsage", ["started", "waiting_time", "total_time", "total_bytes", "result"]) +SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body", + "server_rx", "msg_id"]) + class Mailbox: def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id): self._app = app @@ -93,8 +96,10 @@ class Mailbox: " WHERE `app_id`=? AND `mailbox_id`=?" " ORDER BY `server_rx` ASC", (self._app_id, self._mailbox_id)).fetchall(): - messages.append({"phase": row["phase"], "body": row["body"], - "server_rx": row["server_rx"], "id": row["msg_id"]}) + sm = SidedMessage(side=row["side"], phase=row["phase"], + body=row["body"], server_rx=row["server_rx"], + msg_id=row["msg_id"]) + messages.append(sm) return messages def add_listener(self, handle, send_f, stop_f): @@ -104,25 +109,23 @@ class Mailbox: def remove_listener(self, handle): self._listeners.pop(handle) - def broadcast_message(self, phase, body, server_rx, msgid): + def broadcast_message(self, sm): for (send_f, stop_f) in self._listeners.values(): - send_f({"phase": phase, "body": body, - "server_rx": server_rx, "id": msgid}) + send_f(sm) - def _add_message(self, side, phase, body, server_rx, msgid): - db = self._db - db.execute("INSERT INTO `messages`" - " (`app_id`, `mailbox_id`, `side`, `phase`, `body`," - " `server_rx`, `msg_id`)" - " VALUES (?,?,?,?,?, ?,?)", - (self._app_id, self._mailbox_id, side, phase, body, - server_rx, msgid)) - db.commit() + def _add_message(self, sm): + self._db.execute("INSERT INTO `messages`" + " (`app_id`, `mailbox_id`, `side`, `phase`, `body`," + " `server_rx`, `msg_id`)" + " VALUES (?,?,?,?,?, ?,?)", + (self._app_id, self._mailbox_id, sm.side, + sm.phase, sm.body, sm.server_rx, sm.msg_id)) + self._db.commit() - def add_message(self, side, phase, body, server_rx, msgid): - self._add_message(side, phase, body, server_rx, msgid) - self.broadcast_message(phase, body, server_rx, msgid) - return self.get_messages() # for rendezvous_web.py POST /add + def add_message(self, sm): + assert isinstance(sm, SidedMessage) + self._add_message(sm) + self.broadcast_message(sm) def close(self, side, mood, when): assert isinstance(side, type(u"")), type(side) diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py index 4129400..2a7e82f 100644 --- a/src/wormhole/server/rendezvous_websocket.py +++ b/src/wormhole/server/rendezvous_websocket.py @@ -2,7 +2,7 @@ import json, time from twisted.internet import reactor from twisted.python import log from autobahn.twisted import websocket -from .rendezvous import CrowdedError +from .rendezvous import CrowdedError, SidedMessage # The WebSocket allows the client to send "commands" to the server, and the # server to send "responses" to the client. Note that commands and responses @@ -63,7 +63,7 @@ from .rendezvous import CrowdedError # # -> {type: "open", mailbox: str} -> message # sends old messages now, and subscribes to deliver future messages -# <- {type: "message", message: {phase:, body:}} # body is hex +# <- {type: "message", side:, phase:, body:, msg_id:}} # body is hex # -> {type: "add", phase: str, body: hex} # will send echo in a "message" # # -> {type: "close", mood: str} -> closed @@ -191,12 +191,13 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): assert isinstance(mailbox_id, type(u"")) self._mailbox = self._app.open_mailbox(mailbox_id, self._side, server_rx) - def _send(event): - self.send("message", message=event) + def _send(sm): + self.send("message", side=sm.side, phase=sm.phase, + body=sm.body, server_rx=sm.server_rx, id=sm.msg_id) def _stop(): pass - for old_message in self._mailbox.add_listener(self, _send, _stop): - _send(old_message) + for old_sm in self._mailbox.add_listener(self, _send, _stop): + _send(old_sm) def handle_add(self, msg, server_rx): if not self._mailbox: @@ -206,8 +207,10 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): if "body" not in msg: raise Error("missing 'body'") msgid = msg.get("id") # optional - self._mailbox.add_message(self._side, msg["phase"], msg["body"], - server_rx, msgid) + sm = SidedMessage(side=self._side, phase=msg["phase"], + body=msg["body"], server_rx=server_rx, + msg_id=msgid) + self._mailbox.add_message(sm) def handle_close(self, msg, server_rx): if not self._mailbox: diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index b654a24..4d78bf4 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -10,7 +10,7 @@ from autobahn.twisted import websocket from .. import __version__ from .common import ServerBase from ..server import rendezvous, transit_server -from ..server.rendezvous import Usage +from ..server.rendezvous import Usage, SidedMessage class Reachable(ServerBase, unittest.TestCase): @@ -254,7 +254,9 @@ class Server(ServerBase, unittest.TestCase): app = self._rendezvous.get_app(u"appid") mailbox_id = u"mid" m1 = app.open_mailbox(mailbox_id, u"side1", 0) - m1.add_message(u"side1", u"phase", u"body", 1, u"msgid") + m1.add_message(SidedMessage(side=u"side1", phase=u"phase", + body=u"body", server_rx=1, + msg_id=u"msgid")) msgs = self._messages(app) self.assertEqual(len(msgs), 1) self.assertEqual(msgs[0]["body"], u"body") @@ -263,34 +265,43 @@ class Server(ServerBase, unittest.TestCase): l2 = []; stop2 = []; stop2_f = lambda: stop2.append(True) old = m1.add_listener("handle1", l1.append, stop1_f) self.assertEqual(len(old), 1) - self.assertEqual(old[0]["body"], u"body") + self.assertEqual(old[0].side, u"side1") + self.assertEqual(old[0].body, u"body") - m1.add_message(u"side1", u"phase2", u"body2", 1, u"msgid") + m1.add_message(SidedMessage(side=u"side1", phase=u"phase2", + body=u"body2", server_rx=1, + msg_id=u"msgid")) self.assertEqual(len(l1), 1) - self.assertEqual(l1[0]["body"], u"body2") + self.assertEqual(l1[0].body, u"body2") old = m1.add_listener("handle2", l2.append, stop2_f) self.assertEqual(len(old), 2) - m1.add_message(u"side1", u"phase3", u"body3", 1, u"msgid") + m1.add_message(SidedMessage(side=u"side1", phase=u"phase3", + body=u"body3", server_rx=1, + msg_id=u"msgid")) self.assertEqual(len(l1), 2) - self.assertEqual(l1[-1]["body"], u"body3") + self.assertEqual(l1[-1].body, u"body3") self.assertEqual(len(l2), 1) - self.assertEqual(l2[-1]["body"], u"body3") + self.assertEqual(l2[-1].body, u"body3") m1.remove_listener("handle1") - m1.add_message(u"side1", u"phase4", u"body4", 1, u"msgid") + m1.add_message(SidedMessage(side=u"side1", phase=u"phase4", + body=u"body4", server_rx=1, + msg_id=u"msgid")) self.assertEqual(len(l1), 2) - self.assertEqual(l1[-1]["body"], u"body3") + self.assertEqual(l1[-1].body, u"body3") self.assertEqual(len(l2), 2) - self.assertEqual(l2[-1]["body"], u"body4") + self.assertEqual(l2[-1].body, u"body4") m1._shutdown() self.assertEqual(stop1, []) self.assertEqual(stop2, [True]) # message adds are not idempotent: clients filter duplicates - m1.add_message(u"side1", u"phase", u"body", 1, u"msgid") + m1.add_message(SidedMessage(side=u"side1", phase=u"phase", + body=u"body", server_rx=1, + msg_id=u"msgid")) msgs = self._messages(app) self.assertEqual(len(msgs), 5) self.assertEqual(msgs[-1]["body"], u"body") @@ -654,17 +665,21 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(err[u"error"], u"open requires 'mailbox'") mb1 = app.open_mailbox(u"mb1", u"side2", 0) - mb1.add_message(u"side2", u"phase", u"body", 0, u"msgid") + mb1.add_message(SidedMessage(side=u"side2", phase=u"phase", + body=u"body", server_rx=0, + msg_id=u"msgid")) c1.send(u"open", mailbox=u"mb1") m = yield c1.next_non_ack() self.assertEqual(m[u"type"], u"message") - self.assertEqual(m[u"message"][u"body"], u"body") + self.assertEqual(m[u"body"], u"body") - mb1.add_message(u"side2", u"phase2", u"body2", 0, u"msgid") + mb1.add_message(SidedMessage(side=u"side2", phase=u"phase2", + body=u"body2", server_rx=0, + msg_id=u"msgid")) m = yield c1.next_non_ack() self.assertEqual(m[u"type"], u"message") - self.assertEqual(m[u"message"][u"body"], u"body2") + self.assertEqual(m[u"body"], u"body2") c1.send(u"open", mailbox=u"mb1") err = yield c1.next_non_ack() @@ -701,10 +716,10 @@ class WebSocketAPI(ServerBase, unittest.TestCase): c1.send(u"add", phase=u"phase", body=u"body") m = yield c1.next_non_ack() # echoed back self.assertEqual(m[u"type"], u"message") - self.assertEqual(m[u"message"][u"body"], u"body") + self.assertEqual(m[u"body"], u"body") self.assertEqual(len(l1), 1) - self.assertEqual(l1[0][u"body"], u"body") + self.assertEqual(l1[0].body, u"body") @inlineCallbacks def test_close(self):