break out more message components, use SidedMessage

This commit is contained in:
Brian Warner 2016-05-20 16:39:59 -07:00
parent 05aa5ca76e
commit 181ef04a91
3 changed files with 65 additions and 44 deletions

View File

@ -52,6 +52,9 @@ TransitUsage = namedtuple("TransitUsage",
["started", "waiting_time", "total_time",
"total_bytes", "result"])
SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body",
"server_rx", "msg_id"])
class Mailbox:
def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id):
self._app = app
@ -93,8 +96,10 @@ class Mailbox:
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` ASC",
(self._app_id, self._mailbox_id)).fetchall():
messages.append({"phase": row["phase"], "body": row["body"],
"server_rx": row["server_rx"], "id": row["msg_id"]})
sm = SidedMessage(side=row["side"], phase=row["phase"],
body=row["body"], server_rx=row["server_rx"],
msg_id=row["msg_id"])
messages.append(sm)
return messages
def add_listener(self, handle, send_f, stop_f):
@ -104,25 +109,23 @@ class Mailbox:
def remove_listener(self, handle):
self._listeners.pop(handle)
def broadcast_message(self, phase, body, server_rx, msgid):
def broadcast_message(self, sm):
for (send_f, stop_f) in self._listeners.values():
send_f({"phase": phase, "body": body,
"server_rx": server_rx, "id": msgid})
send_f(sm)
def _add_message(self, side, phase, body, server_rx, msgid):
db = self._db
db.execute("INSERT INTO `messages`"
" (`app_id`, `mailbox_id`, `side`, `phase`, `body`,"
" `server_rx`, `msg_id`)"
" VALUES (?,?,?,?,?, ?,?)",
(self._app_id, self._mailbox_id, side, phase, body,
server_rx, msgid))
db.commit()
def _add_message(self, sm):
self._db.execute("INSERT INTO `messages`"
" (`app_id`, `mailbox_id`, `side`, `phase`, `body`,"
" `server_rx`, `msg_id`)"
" VALUES (?,?,?,?,?, ?,?)",
(self._app_id, self._mailbox_id, sm.side,
sm.phase, sm.body, sm.server_rx, sm.msg_id))
self._db.commit()
def add_message(self, side, phase, body, server_rx, msgid):
self._add_message(side, phase, body, server_rx, msgid)
self.broadcast_message(phase, body, server_rx, msgid)
return self.get_messages() # for rendezvous_web.py POST /add
def add_message(self, sm):
assert isinstance(sm, SidedMessage)
self._add_message(sm)
self.broadcast_message(sm)
def close(self, side, mood, when):
assert isinstance(side, type(u"")), type(side)

View File

@ -2,7 +2,7 @@ import json, time
from twisted.internet import reactor
from twisted.python import log
from autobahn.twisted import websocket
from .rendezvous import CrowdedError
from .rendezvous import CrowdedError, SidedMessage
# The WebSocket allows the client to send "commands" to the server, and the
# server to send "responses" to the client. Note that commands and responses
@ -63,7 +63,7 @@ from .rendezvous import CrowdedError
#
# -> {type: "open", mailbox: str} -> message
# sends old messages now, and subscribes to deliver future messages
# <- {type: "message", message: {phase:, body:}} # body is hex
# <- {type: "message", side:, phase:, body:, msg_id:}} # body is hex
# -> {type: "add", phase: str, body: hex} # will send echo in a "message"
#
# -> {type: "close", mood: str} -> closed
@ -191,12 +191,13 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
assert isinstance(mailbox_id, type(u""))
self._mailbox = self._app.open_mailbox(mailbox_id, self._side,
server_rx)
def _send(event):
self.send("message", message=event)
def _send(sm):
self.send("message", side=sm.side, phase=sm.phase,
body=sm.body, server_rx=sm.server_rx, id=sm.msg_id)
def _stop():
pass
for old_message in self._mailbox.add_listener(self, _send, _stop):
_send(old_message)
for old_sm in self._mailbox.add_listener(self, _send, _stop):
_send(old_sm)
def handle_add(self, msg, server_rx):
if not self._mailbox:
@ -206,8 +207,10 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
if "body" not in msg:
raise Error("missing 'body'")
msgid = msg.get("id") # optional
self._mailbox.add_message(self._side, msg["phase"], msg["body"],
server_rx, msgid)
sm = SidedMessage(side=self._side, phase=msg["phase"],
body=msg["body"], server_rx=server_rx,
msg_id=msgid)
self._mailbox.add_message(sm)
def handle_close(self, msg, server_rx):
if not self._mailbox:

View File

@ -10,7 +10,7 @@ from autobahn.twisted import websocket
from .. import __version__
from .common import ServerBase
from ..server import rendezvous, transit_server
from ..server.rendezvous import Usage
from ..server.rendezvous import Usage, SidedMessage
class Reachable(ServerBase, unittest.TestCase):
@ -254,7 +254,9 @@ class Server(ServerBase, unittest.TestCase):
app = self._rendezvous.get_app(u"appid")
mailbox_id = u"mid"
m1 = app.open_mailbox(mailbox_id, u"side1", 0)
m1.add_message(u"side1", u"phase", u"body", 1, u"msgid")
m1.add_message(SidedMessage(side=u"side1", phase=u"phase",
body=u"body", server_rx=1,
msg_id=u"msgid"))
msgs = self._messages(app)
self.assertEqual(len(msgs), 1)
self.assertEqual(msgs[0]["body"], u"body")
@ -263,34 +265,43 @@ class Server(ServerBase, unittest.TestCase):
l2 = []; stop2 = []; stop2_f = lambda: stop2.append(True)
old = m1.add_listener("handle1", l1.append, stop1_f)
self.assertEqual(len(old), 1)
self.assertEqual(old[0]["body"], u"body")
self.assertEqual(old[0].side, u"side1")
self.assertEqual(old[0].body, u"body")
m1.add_message(u"side1", u"phase2", u"body2", 1, u"msgid")
m1.add_message(SidedMessage(side=u"side1", phase=u"phase2",
body=u"body2", server_rx=1,
msg_id=u"msgid"))
self.assertEqual(len(l1), 1)
self.assertEqual(l1[0]["body"], u"body2")
self.assertEqual(l1[0].body, u"body2")
old = m1.add_listener("handle2", l2.append, stop2_f)
self.assertEqual(len(old), 2)
m1.add_message(u"side1", u"phase3", u"body3", 1, u"msgid")
m1.add_message(SidedMessage(side=u"side1", phase=u"phase3",
body=u"body3", server_rx=1,
msg_id=u"msgid"))
self.assertEqual(len(l1), 2)
self.assertEqual(l1[-1]["body"], u"body3")
self.assertEqual(l1[-1].body, u"body3")
self.assertEqual(len(l2), 1)
self.assertEqual(l2[-1]["body"], u"body3")
self.assertEqual(l2[-1].body, u"body3")
m1.remove_listener("handle1")
m1.add_message(u"side1", u"phase4", u"body4", 1, u"msgid")
m1.add_message(SidedMessage(side=u"side1", phase=u"phase4",
body=u"body4", server_rx=1,
msg_id=u"msgid"))
self.assertEqual(len(l1), 2)
self.assertEqual(l1[-1]["body"], u"body3")
self.assertEqual(l1[-1].body, u"body3")
self.assertEqual(len(l2), 2)
self.assertEqual(l2[-1]["body"], u"body4")
self.assertEqual(l2[-1].body, u"body4")
m1._shutdown()
self.assertEqual(stop1, [])
self.assertEqual(stop2, [True])
# message adds are not idempotent: clients filter duplicates
m1.add_message(u"side1", u"phase", u"body", 1, u"msgid")
m1.add_message(SidedMessage(side=u"side1", phase=u"phase",
body=u"body", server_rx=1,
msg_id=u"msgid"))
msgs = self._messages(app)
self.assertEqual(len(msgs), 5)
self.assertEqual(msgs[-1]["body"], u"body")
@ -654,17 +665,21 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(err[u"error"], u"open requires 'mailbox'")
mb1 = app.open_mailbox(u"mb1", u"side2", 0)
mb1.add_message(u"side2", u"phase", u"body", 0, u"msgid")
mb1.add_message(SidedMessage(side=u"side2", phase=u"phase",
body=u"body", server_rx=0,
msg_id=u"msgid"))
c1.send(u"open", mailbox=u"mb1")
m = yield c1.next_non_ack()
self.assertEqual(m[u"type"], u"message")
self.assertEqual(m[u"message"][u"body"], u"body")
self.assertEqual(m[u"body"], u"body")
mb1.add_message(u"side2", u"phase2", u"body2", 0, u"msgid")
mb1.add_message(SidedMessage(side=u"side2", phase=u"phase2",
body=u"body2", server_rx=0,
msg_id=u"msgid"))
m = yield c1.next_non_ack()
self.assertEqual(m[u"type"], u"message")
self.assertEqual(m[u"message"][u"body"], u"body2")
self.assertEqual(m[u"body"], u"body2")
c1.send(u"open", mailbox=u"mb1")
err = yield c1.next_non_ack()
@ -701,10 +716,10 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send(u"add", phase=u"phase", body=u"body")
m = yield c1.next_non_ack() # echoed back
self.assertEqual(m[u"type"], u"message")
self.assertEqual(m[u"message"][u"body"], u"body")
self.assertEqual(m[u"body"], u"body")
self.assertEqual(len(l1), 1)
self.assertEqual(l1[0][u"body"], u"body")
self.assertEqual(l1[0].body, u"body")
@inlineCallbacks
def test_close(self):