Merge branch '118-release'

This improves the websocket-based Rendezvous Server protocol, making it
possible for a future client to maintain a Wormhole connection despite the
individual websocket connection being temporarily dropped, or the server
being restarted.

closes #118
This commit is contained in:
Brian Warner 2016-12-25 23:25:39 -05:00
commit 8d3832aa3e
3 changed files with 308 additions and 15 deletions

View File

@ -9,6 +9,8 @@ def generate_mailbox_id():
class CrowdedError(Exception): class CrowdedError(Exception):
pass pass
class ReclaimedError(Exception):
pass
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"]) Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
TransitUsage = namedtuple("TransitUsage", TransitUsage = namedtuple("TransitUsage",
@ -41,6 +43,20 @@ class Mailbox:
" (`mailbox_id`, `opened`, `side`, `added`)" " (`mailbox_id`, `opened`, `side`, `added`)"
" VALUES(?,?,?,?)", " VALUES(?,?,?,?)",
(self._mailbox_id, True, side, when)) (self._mailbox_id, True, side, when))
# We accept re-opening a mailbox which a side previously closed,
# unlike claim_nameplate(), which forbids any side from re-claiming a
# nameplate which they previously released. (Nameplates forbid this
# because the act of claiming a nameplate for the first time causes a
# new mailbox to be created, which should only happen once).
# Mailboxes have their own distinct objects (to manage
# subscriptions), so closing one which was already closed requires
# making a new object, which works by calling open() just before
# close(). We really do want to support re-closing closed mailboxes,
# because this enables intermittently-connected clients, who remember
# sending a 'close' but aren't sure whether it was received or not,
# then get shut down. Those clients will wake up and re-send the
# 'close', until they receive the 'closed' ack message.
self._touch(when) self._touch(when)
db.commit() # XXX: reconcile the need for this with the comment above db.commit() # XXX: reconcile the need for this with the comment above
@ -219,6 +235,10 @@ class AppNamespace:
" (`nameplates_id`, `claimed`, `side`, `added`)" " (`nameplates_id`, `claimed`, `side`, `added`)"
" VALUES(?,?,?,?)", " VALUES(?,?,?,?)",
(npid, True, side, when)) (npid, True, side, when))
else:
if not row["claimed"]:
raise ReclaimedError("you cannot re-claim a nameplate that your side previously released")
# since that might cause a new mailbox to be allocated
db.commit() db.commit()
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError

View File

@ -3,7 +3,7 @@ import time
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python import log from twisted.python import log
from autobahn.twisted import websocket from autobahn.twisted import websocket
from .rendezvous import CrowdedError, SidedMessage from .rendezvous import CrowdedError, ReclaimedError, SidedMessage
from ..util import dict_to_bytes, bytes_to_dict from ..util import dict_to_bytes, bytes_to_dict
# The WebSocket allows the client to send "commands" to the server, and the # The WebSocket allows the client to send "commands" to the server, and the
@ -62,6 +62,7 @@ from ..util import dict_to_bytes, bytes_to_dict
# -> {type: "claim", nameplate: str} -> mailbox # -> {type: "claim", nameplate: str} -> mailbox
# <- {type: "claimed", mailbox: str} # <- {type: "claimed", mailbox: str}
# -> {type: "release"} # -> {type: "release"}
# .nameplate is optional, but must match previous claim()
# <- {type: "released"} # <- {type: "released"}
# #
# -> {type: "open", mailbox: str} -> message # -> {type: "open", mailbox: str} -> message
@ -70,6 +71,7 @@ from ..util import dict_to_bytes, bytes_to_dict
# -> {type: "add", phase: str, body: hex} # will send echo in a "message" # -> {type: "add", phase: str, body: hex} # will send echo in a "message"
# #
# -> {type: "close", mood: str} -> closed # -> {type: "close", mood: str} -> closed
# .mailbox is optional, but must match previous open()
# <- {type: "closed"} # <- {type: "closed"}
# #
# <- {type: "error", error: str, orig: {}} # in response to malformed msgs # <- {type: "error", error: str, orig: {}} # in response to malformed msgs
@ -89,8 +91,13 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
self._side = None self._side = None
self._did_allocate = False # only one allocate() per websocket self._did_allocate = False # only one allocate() per websocket
self._listening = False self._listening = False
self._did_claim = False
self._nameplate_id = None self._nameplate_id = None
self._did_release = False
self._did_open = False
self._mailbox = None self._mailbox = None
self._mailbox_id = None
self._did_close = False
def onConnect(self, request): def onConnect(self, request):
rv = self.factory.rendezvous rv = self.factory.rendezvous
@ -125,7 +132,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
if mtype == "claim": if mtype == "claim":
return self.handle_claim(msg, server_rx) return self.handle_claim(msg, server_rx)
if mtype == "release": if mtype == "release":
return self.handle_release(server_rx) return self.handle_release(msg, server_rx)
if mtype == "open": if mtype == "open":
return self.handle_open(msg, server_rx) return self.handle_open(msg, server_rx)
@ -172,6 +179,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
def handle_claim(self, msg, server_rx): def handle_claim(self, msg, server_rx):
if "nameplate" not in msg: if "nameplate" not in msg:
raise Error("claim requires 'nameplate'") raise Error("claim requires 'nameplate'")
if self._did_claim:
raise Error("only one claim per connection")
self._did_claim = True
nameplate_id = msg["nameplate"] nameplate_id = msg["nameplate"]
assert isinstance(nameplate_id, type("")), type(nameplate_id) assert isinstance(nameplate_id, type("")), type(nameplate_id)
self._nameplate_id = nameplate_id self._nameplate_id = nameplate_id
@ -180,23 +190,36 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
server_rx) server_rx)
except CrowdedError: except CrowdedError:
raise Error("crowded") raise Error("crowded")
except ReclaimedError:
raise Error("reclaimed")
self.send("claimed", mailbox=mailbox_id) self.send("claimed", mailbox=mailbox_id)
def handle_release(self, server_rx): def handle_release(self, msg, server_rx):
if not self._nameplate_id: if self._did_release:
raise Error("must claim a nameplate before releasing it") raise Error("only one release per connection")
self._app.release_nameplate(self._nameplate_id, self._side, server_rx) if "nameplate" in msg:
self._nameplate_id = None if self._nameplate_id is not None:
if msg["nameplate"] != self._nameplate_id:
raise Error("release and claim must use same nameplate")
nameplate_id = msg["nameplate"]
else:
if self._nameplate_id is None:
raise Error("release without nameplate must follow claim")
nameplate_id = self._nameplate_id
assert nameplate_id is not None
self._did_release = True
self._app.release_nameplate(nameplate_id, self._side, server_rx)
self.send("released") self.send("released")
def handle_open(self, msg, server_rx): def handle_open(self, msg, server_rx):
if self._mailbox: if self._mailbox:
raise Error("you already have a mailbox open") raise Error("only one open per connection")
if "mailbox" not in msg: if "mailbox" not in msg:
raise Error("open requires 'mailbox'") raise Error("open requires 'mailbox'")
mailbox_id = msg["mailbox"] mailbox_id = msg["mailbox"]
assert isinstance(mailbox_id, type("")) assert isinstance(mailbox_id, type(""))
self._mailbox_id = mailbox_id
self._mailbox = self._app.open_mailbox(mailbox_id, self._side, self._mailbox = self._app.open_mailbox(mailbox_id, self._side,
server_rx) server_rx)
def _send(sm): def _send(sm):
@ -222,11 +245,24 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
self._mailbox.add_message(sm) self._mailbox.add_message(sm)
def handle_close(self, msg, server_rx): def handle_close(self, msg, server_rx):
if self._did_close:
raise Error("only one close per connection")
if "mailbox" in msg:
if self._mailbox_id is not None:
if msg["mailbox"] != self._mailbox_id:
raise Error("open and close must use same mailbox")
mailbox_id = msg["mailbox"]
else:
if self._mailbox_id is None:
raise Error("close without mailbox must follow open")
mailbox_id = self._mailbox_id
if not self._mailbox: if not self._mailbox:
raise Error("must open mailbox before closing") self._mailbox = self._app.open_mailbox(mailbox_id, self._side,
server_rx)
if self._listening: if self._listening:
self._mailbox.remove_listener(self) self._mailbox.remove_listener(self)
self._listening = False self._listening = False
self._did_close = True
self._mailbox.close(self._side, msg.get("mood"), server_rx) self._mailbox.close(self._side, msg.get("mood"), server_rx)
self._mailbox = None self._mailbox = None
self.send("closed") self.send("closed")

View File

@ -754,6 +754,11 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
mailbox_id = m["mailbox"] mailbox_id = m["mailbox"]
self.assertEqual(type(mailbox_id), type("")) self.assertEqual(type(mailbox_id), type(""))
c1.send("claim", nameplate="np1")
err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error", err)
self.assertEqual(err["error"], "only one claim per connection")
nids = app.get_nameplate_ids() nids = app.get_nameplate_ids()
self.assertEqual(len(nids), 1) self.assertEqual(len(nids), 1)
self.assertEqual("np1", list(nids)[0]) self.assertEqual("np1", list(nids)[0])
@ -796,14 +801,14 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error") self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], self.assertEqual(err["error"],
"must claim a nameplate before releasing it") "release without nameplate must follow claim")
c1.send("claim", nameplate="np1") c1.send("claim", nameplate="np1")
yield c1.next_non_ack() yield c1.next_non_ack()
c1.send("release") c1.send("release")
m = yield c1.next_non_ack() m = yield c1.next_non_ack()
self.assertEqual(m["type"], "released") self.assertEqual(m["type"], "released", m)
np_row, side_rows = self._nameplate(app, "np1") np_row, side_rows = self._nameplate(app, "np1")
claims = [(row["side"], row["claimed"]) for row in side_rows] claims = [(row["side"], row["claimed"]) for row in side_rows]
@ -813,8 +818,45 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
c1.send("release") # no longer claimed c1.send("release") # no longer claimed
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error") self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], "only one release per connection")
@inlineCallbacks
def test_release_named(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
c1.send("claim", nameplate="np1")
yield c1.next_non_ack()
c1.send("release", nameplate="np1")
m = yield c1.next_non_ack()
self.assertEqual(m["type"], "released", m)
@inlineCallbacks
def test_release_named_ignored(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
c1.send("release", nameplate="np1") # didn't do claim first, ignored
m = yield c1.next_non_ack()
self.assertEqual(m["type"], "released", m)
@inlineCallbacks
def test_release_named_mismatch(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
c1.send("claim", nameplate="np1")
yield c1.next_non_ack()
c1.send("release", nameplate="np2") # mismatching nameplate
err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], self.assertEqual(err["error"],
"must claim a nameplate before releasing it") "release and claim must use same nameplate")
@inlineCallbacks @inlineCallbacks
def test_open(self): def test_open(self):
@ -849,7 +891,7 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
c1.send("open", mailbox="mb1") c1.send("open", mailbox="mb1")
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error") self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], "you already have a mailbox open") self.assertEqual(err["error"], "only one open per connection")
@inlineCallbacks @inlineCallbacks
def test_add(self): def test_add(self):
@ -896,7 +938,7 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
c1.send("close", mood="mood") # must open first c1.send("close", mood="mood") # must open first
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error") self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], "must open mailbox before closing") self.assertEqual(err["error"], "close without mailbox must follow open")
c1.send("open", mailbox="mb1") c1.send("open", mailbox="mb1")
yield c1.sync() yield c1.sync()
@ -910,8 +952,46 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
c1.send("close", mood="mood") # already closed c1.send("close", mood="mood") # already closed
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error", m)
self.assertEqual(err["error"], "only one close per connection")
@inlineCallbacks
def test_close_named(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
c1.send("open", mailbox="mb1")
yield c1.sync()
c1.send("close", mailbox="mb1", mood="mood")
m = yield c1.next_non_ack()
self.assertEqual(m["type"], "closed")
@inlineCallbacks
def test_close_named_ignored(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
c1.send("close", mailbox="mb1", mood="mood") # no open first, ignored
m = yield c1.next_non_ack()
self.assertEqual(m["type"], "closed")
@inlineCallbacks
def test_close_named_mismatch(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
c1.send("open", mailbox="mb1")
yield c1.sync()
c1.send("close", mailbox="mb2", mood="mood")
err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error") self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], "must open mailbox before closing") self.assertEqual(err["error"], "open and close must use same mailbox")
@inlineCallbacks @inlineCallbacks
def test_disconnect(self): def test_disconnect(self):
@ -934,6 +1014,163 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
yield d yield d
self.assertFalse(mb1.has_listeners()) self.assertFalse(mb1.has_listeners())
@inlineCallbacks
def test_interrupted_client_nameplate(self):
# a client's interactions with the server might be split over
# multiple sequential WebSocket connections, e.g. when the server is
# bounced and the client reconnects, or vice versa
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
app = self._rendezvous.get_app("appid")
c.send("claim", nameplate="np1")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "claimed")
mailbox_id = m["mailbox"]
self.assertEqual(type(mailbox_id), type(""))
np_row, side_rows = self._nameplate(app, "np1")
claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertEqual(claims, [("side", True)])
c.close()
yield c.d
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
c.send("claim", nameplate="np1") # idempotent
m = yield c.next_non_ack()
self.assertEqual(m["type"], "claimed")
self.assertEqual(m["mailbox"], mailbox_id) # mailbox id is stable
np_row, side_rows = self._nameplate(app, "np1")
claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertEqual(claims, [("side", True)])
c.close()
yield c.d
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
# we haven't done a claim with this particular connection, but we can
# still send a release as long as we include the nameplate
c.send("release", nameplate="np1") # release-without-claim
m = yield c.next_non_ack()
self.assertEqual(m["type"], "released")
np_row, side_rows = self._nameplate(app, "np1")
self.assertEqual(np_row, None)
c.close()
yield c.d
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
# and the release is idempotent, when done on separate connections
c.send("release", nameplate="np1")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "released")
np_row, side_rows = self._nameplate(app, "np1")
self.assertEqual(np_row, None)
c.close()
yield c.d
@inlineCallbacks
def test_interrupted_client_nameplate_reclaimed(self):
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
app = self._rendezvous.get_app("appid")
# a new claim on a previously-closed nameplate is forbidden. We make
# a new nameplate here and manually open a second claim on it, so the
# nameplate stays alive long enough for the code check to happen.
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
c.send("claim", nameplate="np2")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "claimed")
app.claim_nameplate("np2", "side2", 0)
c.send("release", nameplate="np2")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "released")
np_row, side_rows = self._nameplate(app, "np2")
claims = sorted([(row["side"], row["claimed"]) for row in side_rows])
self.assertEqual(claims, [("side", 0), ("side2", 1)])
c.close()
yield c.d
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
c.send("claim", nameplate="np2") # new claim is forbidden
err = yield c.next_non_ack()
self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], "reclaimed")
np_row, side_rows = self._nameplate(app, "np2")
claims = sorted([(row["side"], row["claimed"]) for row in side_rows])
self.assertEqual(claims, [("side", 0), ("side2", 1)])
c.close()
yield c.d
@inlineCallbacks
def test_interrupted_client_mailbox(self):
# a client's interactions with the server might be split over
# multiple sequential WebSocket connections, e.g. when the server is
# bounced and the client reconnects, or vice versa
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
app = self._rendezvous.get_app("appid")
mb1 = app.open_mailbox("mb1", "side2", 0)
mb1.add_message(SidedMessage(side="side2", phase="phase",
body="body", server_rx=0,
msg_id="msgid"))
c.send("open", mailbox="mb1")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "message")
self.assertEqual(m["body"], "body")
self.assertTrue(mb1.has_listeners())
c.close()
yield c.d
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
# open should be idempotent
c.send("open", mailbox="mb1")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "message")
self.assertEqual(m["body"], "body")
mb_row, side_rows = self._mailbox(app, "mb1")
openeds = [(row["side"], row["opened"]) for row in side_rows]
self.assertIn(("side", 1), openeds) # TODO: why 1, and not True?
# close on the same connection as open is ok
c.send("close", mailbox="mb1", mood="mood")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "closed", m)
mb_row, side_rows = self._mailbox(app, "mb1")
openeds = [(row["side"], row["opened"]) for row in side_rows]
self.assertIn(("side", 0), openeds)
c.close()
yield c.d
# close (on a separate connection) is idempotent
c = yield self.make_client()
yield c.next_non_ack()
c.send("bind", appid="appid", side="side")
c.send("close", mailbox="mb1", mood="mood")
m = yield c.next_non_ack()
self.assertEqual(m["type"], "closed", m)
mb_row, side_rows = self._mailbox(app, "mb1")
openeds = [(row["side"], row["opened"]) for row in side_rows]
self.assertIn(("side", 0), openeds)
c.close()
yield c.d
class Summary(unittest.TestCase): class Summary(unittest.TestCase):
def test_mailbox(self): def test_mailbox(self):