rdv_ws: send acks for each message

but only if the client is modern enough to include "id" in the message,
which lets us avoid sending acks to an 0.7.5 client (which would cause
them to abort, they don't like unrecognized server messages).

The acks let the client learn the server_rx time of messages that
terminate on the server, like "allocate" and "claim".
This commit is contained in:
Brian Warner 2016-05-05 19:02:52 -07:00
parent 644c7c6840
commit 5530c33185
3 changed files with 55 additions and 35 deletions

View File

@ -71,6 +71,11 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
try: try:
if "type" not in msg: if "type" not in msg:
raise Error("missing 'type'") raise Error("missing 'type'")
if "id" in msg:
# Only ack clients modern enough to include [id]. Older ones
# won't recognize the message, then they'll abort.
self.send("ack", id=msg["id"])
mtype = msg["type"] mtype = msg["type"]
if mtype == "ping": if mtype == "ping":
return self.handle_ping(msg) return self.handle_ping(msg)

View File

@ -410,6 +410,16 @@ class WSClient(websocket.WebSocketClientProtocol):
self.d = defer.Deferred() self.d = defer.Deferred()
return self.d return self.d
@inlineCallbacks
def next_non_ack(self):
while True:
m = yield self.next_event()
if m["type"] != "ack":
returnValue(m)
def strip_acks(self):
self.events = [e for e in self.events if e["type"] != u"ack"]
def send(self, mtype, **kwargs): def send(self, mtype, **kwargs):
kwargs["type"] = mtype kwargs["type"] = mtype
payload = json.dumps(kwargs).encode("utf-8") payload = json.dumps(kwargs).encode("utf-8")
@ -462,9 +472,9 @@ class WSClientSync(unittest.TestCase):
d.addBoth(sunc.append) d.addBoth(sunc.append)
add("pong", pong=1) add("pong", pong=1)
yield d yield d
m = yield c.next_event() m = yield c.next_non_ack()
self.assertEqual(m["type"], "one") self.assertEqual(m["type"], "one")
m = yield c.next_event() m = yield c.next_non_ack()
self.assertEqual(m["type"], "two") self.assertEqual(m["type"], "two")
self.assertEqual(c.events, []) self.assertEqual(c.events, [])
@ -476,9 +486,9 @@ class WSClientSync(unittest.TestCase):
add("two", two=2) add("two", two=2)
add("pong", pong=2) add("pong", pong=2)
yield d yield d
m = yield c.next_event() m = yield c.next_non_ack()
self.assertEqual(m["type"], "one") self.assertEqual(m["type"], "one")
m = yield c.next_event() m = yield c.next_non_ack()
self.assertEqual(m["type"], "two") self.assertEqual(m["type"], "two")
self.assertEqual(c.events, []) self.assertEqual(c.events, [])
@ -490,9 +500,9 @@ class WSClientSync(unittest.TestCase):
add("two", two=2) add("two", two=2)
add("pong", pong=3) add("pong", pong=3)
yield d yield d
m = yield c.next_event() m = yield c.next_non_ack()
self.assertEqual(m["type"], "one") self.assertEqual(m["type"], "one")
m = yield c.next_event() m = yield c.next_non_ack()
self.assertEqual(m["type"], "two") self.assertEqual(m["type"], "two")
self.assertEqual(c.events, []) self.assertEqual(c.events, [])
@ -524,14 +534,14 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
@inlineCallbacks @inlineCallbacks
def test_welcome(self): def test_welcome(self):
c1 = yield self.make_client() c1 = yield self.make_client()
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
self.assertEqual(self._rendezvous._apps, {}) self.assertEqual(self._rendezvous._apps, {})
@inlineCallbacks @inlineCallbacks
def test_allocate_1(self): def test_allocate_1(self):
c1 = yield self.make_client() c1 = yield self.make_client()
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c1.send(u"bind", appid=u"appid", side=u"side") c1.send(u"bind", appid=u"appid", side=u"side")
yield c1.sync() yield c1.sync()
@ -539,12 +549,12 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
app = self._rendezvous.get_app(u"appid") app = self._rendezvous.get_app(u"appid")
self.assertEqual(app.get_allocated(), set()) self.assertEqual(app.get_allocated(), set())
c1.send(u"list") c1.send(u"list")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], []) self.assertEqual(msg["channelids"], [])
c1.send(u"allocate") c1.send(u"allocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"allocated") self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"] cid = msg["channelid"]
self.failUnlessIsInstance(cid, int) self.failUnlessIsInstance(cid, int)
@ -553,32 +563,32 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(channel.get_messages(), []) self.assertEqual(channel.get_messages(), [])
c1.send(u"list") c1.send(u"list")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], [cid]) self.assertEqual(msg["channelids"], [cid])
c1.send(u"deallocate") c1.send(u"deallocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"deallocated") self.assertEqual(msg["type"], u"deallocated")
self.assertEqual(msg["status"], u"deleted") self.assertEqual(msg["status"], u"deleted")
self.assertEqual(app.get_allocated(), set()) self.assertEqual(app.get_allocated(), set())
c1.send(u"list") c1.send(u"list")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], []) self.assertEqual(msg["channelids"], [])
@inlineCallbacks @inlineCallbacks
def test_allocate_2(self): def test_allocate_2(self):
c1 = yield self.make_client() c1 = yield self.make_client()
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c1.send(u"bind", appid=u"appid", side=u"side") c1.send(u"bind", appid=u"appid", side=u"side")
yield c1.sync() yield c1.sync()
app = self._rendezvous.get_app(u"appid") app = self._rendezvous.get_app(u"appid")
self.assertEqual(app.get_allocated(), set()) self.assertEqual(app.get_allocated(), set())
c1.send(u"allocate") c1.send(u"allocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"allocated") self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"] cid = msg["channelid"]
self.failUnlessIsInstance(cid, int) self.failUnlessIsInstance(cid, int)
@ -588,7 +598,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
# second caller increases the number of known sides to 2 # second caller increases the number of known sides to 2
c2 = yield self.make_client() c2 = yield self.make_client()
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c2.send(u"bind", appid=u"appid", side=u"side-2") c2.send(u"bind", appid=u"appid", side=u"side-2")
c2.send(u"claim", channelid=cid) c2.send(u"claim", channelid=cid)
@ -600,38 +610,38 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
[{"phase": "1", "body": ""}]) [{"phase": "1", "body": ""}])
c1.send(u"list") c1.send(u"list")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], [cid]) self.assertEqual(msg["channelids"], [cid])
c2.send(u"list") c2.send(u"list")
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], [cid]) self.assertEqual(msg["channelids"], [cid])
c1.send(u"deallocate") c1.send(u"deallocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"deallocated") self.assertEqual(msg["type"], u"deallocated")
self.assertEqual(msg["status"], u"waiting") self.assertEqual(msg["status"], u"waiting")
c2.send(u"deallocate") c2.send(u"deallocate")
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], u"deallocated") self.assertEqual(msg["type"], u"deallocated")
self.assertEqual(msg["status"], u"deleted") self.assertEqual(msg["status"], u"deleted")
c2.send(u"list") c2.send(u"list")
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], u"channelids") self.assertEqual(msg["type"], u"channelids")
self.assertEqual(msg["channelids"], []) self.assertEqual(msg["channelids"], [])
@inlineCallbacks @inlineCallbacks
def test_allocate_and_claim(self): def test_allocate_and_claim(self):
c1 = yield self.make_client() c1 = yield self.make_client()
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c1.send(u"bind", appid=u"appid", side=u"side") c1.send(u"bind", appid=u"appid", side=u"side")
c1.send(u"allocate") c1.send(u"allocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"allocated") self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"] cid = msg["channelid"]
c1.send(u"claim", channelid=cid) c1.send(u"claim", channelid=cid)
@ -642,11 +652,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
@inlineCallbacks @inlineCallbacks
def test_allocate_and_claim_different(self): def test_allocate_and_claim_different(self):
c1 = yield self.make_client() c1 = yield self.make_client()
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c1.send(u"bind", appid=u"appid", side=u"side") c1.send(u"bind", appid=u"appid", side=u"side")
c1.send(u"allocate") c1.send(u"allocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"allocated") self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"] cid = msg["channelid"]
c1.send(u"claim", channelid=cid+1) c1.send(u"claim", channelid=cid+1)
@ -661,11 +671,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
@inlineCallbacks @inlineCallbacks
def test_message(self): def test_message(self):
c1 = yield self.make_client() c1 = yield self.make_client()
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c1.send(u"bind", appid=u"appid", side=u"side") c1.send(u"bind", appid=u"appid", side=u"side")
c1.send(u"allocate") c1.send(u"allocate")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], u"allocated") self.assertEqual(msg["type"], u"allocated")
cid = msg["channelid"] cid = msg["channelid"]
app = self._rendezvous.get_app(u"appid") app = self._rendezvous.get_app(u"appid")
@ -675,14 +685,16 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send(u"watch") c1.send(u"watch")
yield c1.sync() yield c1.sync()
self.assertEqual(len(channel._listeners), 1) self.assertEqual(len(channel._listeners), 1)
c1.strip_acks()
self.assertEqual(c1.events, []) self.assertEqual(c1.events, [])
c1.send(u"add", phase="1", body="msg1A") c1.send(u"add", phase="1", body="msg1A")
yield c1.sync() yield c1.sync()
c1.strip_acks()
self.assertEqual(strip_messages(channel.get_messages()), self.assertEqual(strip_messages(channel.get_messages()),
[{"phase": "1", "body": "msg1A"}]) [{"phase": "1", "body": "msg1A"}])
self.assertEqual(len(c1.events), 1) # echo should be sent right away self.assertEqual(len(c1.events), 1) # echo should be sent right away
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "1", "body": "msg1A"}) {"phase": "1", "body": "msg1A"})
@ -692,12 +704,12 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send(u"add", phase="1", body="msg1B") c1.send(u"add", phase="1", body="msg1B")
c1.send(u"add", phase="2", body="msg2A") c1.send(u"add", phase="2", body="msg2A")
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "1", "body": "msg1B"}) {"phase": "1", "body": "msg1B"})
msg = yield c1.next_event() msg = yield c1.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "2", "body": "msg2A"}) {"phase": "2", "body": "msg2A"})
@ -710,24 +722,24 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
# second client should see everything # second client should see everything
c2 = yield self.make_client() c2 = yield self.make_client()
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.check_welcome(msg) self.check_welcome(msg)
c2.send(u"bind", appid=u"appid", side=u"side") c2.send(u"bind", appid=u"appid", side=u"side")
c2.send(u"claim", channelid=cid) c2.send(u"claim", channelid=cid)
# 'watch' triggers delivery of old messages, in temporal order # 'watch' triggers delivery of old messages, in temporal order
c2.send(u"watch") c2.send(u"watch")
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "1", "body": "msg1A"}) {"phase": "1", "body": "msg1A"})
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "1", "body": "msg1B"}) {"phase": "1", "body": "msg1B"})
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "2", "body": "msg2A"}) {"phase": "2", "body": "msg2A"})
@ -736,7 +748,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send(u"add", phase="2", body="msg2A") c1.send(u"add", phase="2", body="msg2A")
# the duplicate message *does* get stored, and delivered # the duplicate message *does* get stored, and delivered
msg = yield c2.next_event() msg = yield c2.next_non_ack()
self.assertEqual(msg["type"], "message") self.assertEqual(msg["type"], "message")
self.assertEqual(strip_message(msg["message"]), self.assertEqual(strip_message(msg["message"]),
{"phase": "2", "body": "msg2A"}) {"phase": "2", "body": "msg2A"})

View File

@ -139,6 +139,9 @@ class Wormhole:
return return
return meth(msg) return meth(msg)
def _ws_handle_ack(self, msg):
pass
def _ws_handle_welcome(self, msg): def _ws_handle_welcome(self, msg):
self._timing.add("welcome").server_sent(msg["server_tx"]) self._timing.add("welcome").server_sent(msg["server_tx"])
welcome = msg["welcome"] welcome = msg["welcome"]