From 644c7c684066e8047f0f4a61503878c7a34c0ebf Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 5 May 2016 18:46:11 -0700 Subject: [PATCH] DB schema change: add/store/return msgid This enables dump-timing to correlate sender logs with receiver logs. --- src/wormhole/server/db-schemas/v1.sql | 3 ++- src/wormhole/server/rendezvous.py | 27 +++++++++++---------- src/wormhole/server/rendezvous_web.py | 2 +- src/wormhole/server/rendezvous_websocket.py | 4 ++- src/wormhole/test/test_server.py | 1 + 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/wormhole/server/db-schemas/v1.sql b/src/wormhole/server/db-schemas/v1.sql index cdfe4f9..7d7115a 100644 --- a/src/wormhole/server/db-schemas/v1.sql +++ b/src/wormhole/server/db-schemas/v1.sql @@ -15,7 +15,8 @@ CREATE TABLE `messages` `phase` VARCHAR, -- not numeric, more of a PAKE-phase indicator string -- phase="_allocate" and "_deallocate" are used internally `body` VARCHAR, - `server_rx` INTEGER + `server_rx` INTEGER, + `msgid` VARCHAR ); CREATE INDEX `messages_idx` ON `messages` (`appid`, `channelid`); diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index c94649a..122f780 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -41,7 +41,7 @@ class Channel: if row["phase"] in (u"_allocate", u"_deallocate"): continue messages.append({"phase": row["phase"], "body": row["body"], - "server_rx": row["server_rx"]}) + "server_rx": row["server_rx"], "id": row["msgid"]}) return messages def add_listener(self, ep): @@ -51,30 +51,31 @@ class Channel: def remove_listener(self, ep): self._listeners.discard(ep) - def broadcast_message(self, phase, body, server_rx): + def broadcast_message(self, phase, body, server_rx, msgid): for ep in self._listeners: ep.send_rendezvous_event({"phase": phase, "body": body, - "server_rx": server_rx}) + "server_rx": server_rx, "id": msgid}) - def _add_message(self, side, phase, body, server_rx): + def _add_message(self, side, phase, body, server_rx, msgid): db = self._db db.execute("INSERT INTO `messages`" - " (`appid`, `channelid`, `side`, `phase`, `body`, `server_rx`)" - " VALUES (?,?,?,?, ?,?)", - (self._appid, self._channelid, side, phase, - body, server_rx)) + " (`appid`, `channelid`, `side`, `phase`, `body`," + " `server_rx`, `msgid`)" + " VALUES (?,?,?,?,?, ?,?)", + (self._appid, self._channelid, side, phase, body, + server_rx, msgid)) db.commit() def allocate(self, side): - self._add_message(side, ALLOCATE, None, time.time()) + self._add_message(side, ALLOCATE, None, time.time(), None) - def add_message(self, side, phase, body, server_rx): - self._add_message(side, phase, body, server_rx) - self.broadcast_message(phase, body, server_rx) + def add_message(self, side, phase, body, server_rx, msgid): + self._add_message(side, phase, body, server_rx, msgid) + self.broadcast_message(phase, body, server_rx, msgid) return self.get_messages() # for rendezvous_web.py POST /add def deallocate(self, side, mood): - self._add_message(side, DEALLOCATE, mood, time.time()) + self._add_message(side, DEALLOCATE, mood, time.time(), None) db = self._db seen = set([row["side"] for row in db.execute("SELECT `side` FROM `messages`" diff --git a/src/wormhole/server/rendezvous_web.py b/src/wormhole/server/rendezvous_web.py index 10b7b97..c89654e 100644 --- a/src/wormhole/server/rendezvous_web.py +++ b/src/wormhole/server/rendezvous_web.py @@ -132,7 +132,7 @@ class Adder(RelayResource): app = self._rendezvous.get_app(appid) channel = app.get_channel(channelid) - messages = channel.add_message(side, phase, body, time.time()) + messages = channel.add_message(side, phase, body, time.time(), None) response = {"welcome": self._welcome, "messages": messages, "sent": time.time()} return json_response(request, response) diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py index d6585bb..60e381a 100644 --- a/src/wormhole/server/rendezvous_websocket.py +++ b/src/wormhole/server/rendezvous_websocket.py @@ -153,7 +153,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): raise Error("missing 'phase'") if "body" not in msg: raise Error("missing 'body'") - channel.add_message(self._side, msg["phase"], msg["body"], server_rx) + msgid = msg.get("id") # optional + channel.add_message(self._side, msg["phase"], msg["body"], + server_rx, msgid) def handle_deallocate(self, channel, msg): deleted = channel.deallocate(self._side, msg.get("mood")) diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index a9eb684..a33e7b8 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -57,6 +57,7 @@ def unjson(data): def strip_message(msg): m2 = msg.copy() + m2.pop("id", None) m2.pop("server_rx", None) return m2