DB schema change: add/store/return msgid
This enables dump-timing to correlate sender logs with receiver logs.
This commit is contained in:
parent
8a80242532
commit
644c7c6840
|
@ -15,7 +15,8 @@ CREATE TABLE `messages`
|
||||||
`phase` VARCHAR, -- not numeric, more of a PAKE-phase indicator string
|
`phase` VARCHAR, -- not numeric, more of a PAKE-phase indicator string
|
||||||
-- phase="_allocate" and "_deallocate" are used internally
|
-- phase="_allocate" and "_deallocate" are used internally
|
||||||
`body` VARCHAR,
|
`body` VARCHAR,
|
||||||
`server_rx` INTEGER
|
`server_rx` INTEGER,
|
||||||
|
`msgid` VARCHAR
|
||||||
);
|
);
|
||||||
CREATE INDEX `messages_idx` ON `messages` (`appid`, `channelid`);
|
CREATE INDEX `messages_idx` ON `messages` (`appid`, `channelid`);
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ class Channel:
|
||||||
if row["phase"] in (u"_allocate", u"_deallocate"):
|
if row["phase"] in (u"_allocate", u"_deallocate"):
|
||||||
continue
|
continue
|
||||||
messages.append({"phase": row["phase"], "body": row["body"],
|
messages.append({"phase": row["phase"], "body": row["body"],
|
||||||
"server_rx": row["server_rx"]})
|
"server_rx": row["server_rx"], "id": row["msgid"]})
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
def add_listener(self, ep):
|
def add_listener(self, ep):
|
||||||
|
@ -51,30 +51,31 @@ class Channel:
|
||||||
def remove_listener(self, ep):
|
def remove_listener(self, ep):
|
||||||
self._listeners.discard(ep)
|
self._listeners.discard(ep)
|
||||||
|
|
||||||
def broadcast_message(self, phase, body, server_rx):
|
def broadcast_message(self, phase, body, server_rx, msgid):
|
||||||
for ep in self._listeners:
|
for ep in self._listeners:
|
||||||
ep.send_rendezvous_event({"phase": phase, "body": body,
|
ep.send_rendezvous_event({"phase": phase, "body": body,
|
||||||
"server_rx": server_rx})
|
"server_rx": server_rx, "id": msgid})
|
||||||
|
|
||||||
def _add_message(self, side, phase, body, server_rx):
|
def _add_message(self, side, phase, body, server_rx, msgid):
|
||||||
db = self._db
|
db = self._db
|
||||||
db.execute("INSERT INTO `messages`"
|
db.execute("INSERT INTO `messages`"
|
||||||
" (`appid`, `channelid`, `side`, `phase`, `body`, `server_rx`)"
|
" (`appid`, `channelid`, `side`, `phase`, `body`,"
|
||||||
" VALUES (?,?,?,?, ?,?)",
|
" `server_rx`, `msgid`)"
|
||||||
(self._appid, self._channelid, side, phase,
|
" VALUES (?,?,?,?,?, ?,?)",
|
||||||
body, server_rx))
|
(self._appid, self._channelid, side, phase, body,
|
||||||
|
server_rx, msgid))
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
def allocate(self, side):
|
def allocate(self, side):
|
||||||
self._add_message(side, ALLOCATE, None, time.time())
|
self._add_message(side, ALLOCATE, None, time.time(), None)
|
||||||
|
|
||||||
def add_message(self, side, phase, body, server_rx):
|
def add_message(self, side, phase, body, server_rx, msgid):
|
||||||
self._add_message(side, phase, body, server_rx)
|
self._add_message(side, phase, body, server_rx, msgid)
|
||||||
self.broadcast_message(phase, body, server_rx)
|
self.broadcast_message(phase, body, server_rx, msgid)
|
||||||
return self.get_messages() # for rendezvous_web.py POST /add
|
return self.get_messages() # for rendezvous_web.py POST /add
|
||||||
|
|
||||||
def deallocate(self, side, mood):
|
def deallocate(self, side, mood):
|
||||||
self._add_message(side, DEALLOCATE, mood, time.time())
|
self._add_message(side, DEALLOCATE, mood, time.time(), None)
|
||||||
db = self._db
|
db = self._db
|
||||||
seen = set([row["side"] for row in
|
seen = set([row["side"] for row in
|
||||||
db.execute("SELECT `side` FROM `messages`"
|
db.execute("SELECT `side` FROM `messages`"
|
||||||
|
|
|
@ -132,7 +132,7 @@ class Adder(RelayResource):
|
||||||
|
|
||||||
app = self._rendezvous.get_app(appid)
|
app = self._rendezvous.get_app(appid)
|
||||||
channel = app.get_channel(channelid)
|
channel = app.get_channel(channelid)
|
||||||
messages = channel.add_message(side, phase, body, time.time())
|
messages = channel.add_message(side, phase, body, time.time(), None)
|
||||||
response = {"welcome": self._welcome, "messages": messages,
|
response = {"welcome": self._welcome, "messages": messages,
|
||||||
"sent": time.time()}
|
"sent": time.time()}
|
||||||
return json_response(request, response)
|
return json_response(request, response)
|
||||||
|
|
|
@ -153,7 +153,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
|
||||||
raise Error("missing 'phase'")
|
raise Error("missing 'phase'")
|
||||||
if "body" not in msg:
|
if "body" not in msg:
|
||||||
raise Error("missing 'body'")
|
raise Error("missing 'body'")
|
||||||
channel.add_message(self._side, msg["phase"], msg["body"], server_rx)
|
msgid = msg.get("id") # optional
|
||||||
|
channel.add_message(self._side, msg["phase"], msg["body"],
|
||||||
|
server_rx, msgid)
|
||||||
|
|
||||||
def handle_deallocate(self, channel, msg):
|
def handle_deallocate(self, channel, msg):
|
||||||
deleted = channel.deallocate(self._side, msg.get("mood"))
|
deleted = channel.deallocate(self._side, msg.get("mood"))
|
||||||
|
|
|
@ -57,6 +57,7 @@ def unjson(data):
|
||||||
|
|
||||||
def strip_message(msg):
|
def strip_message(msg):
|
||||||
m2 = msg.copy()
|
m2 = msg.copy()
|
||||||
|
m2.pop("id", None)
|
||||||
m2.pop("server_rx", None)
|
m2.pop("server_rx", None)
|
||||||
return m2
|
return m2
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user