diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 71cdcc5..8ad206a 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -9,6 +9,8 @@ def generate_mailbox_id(): class CrowdedError(Exception): pass +class ReclaimedError(Exception): + pass Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"]) TransitUsage = namedtuple("TransitUsage", @@ -41,6 +43,20 @@ class Mailbox: " (`mailbox_id`, `opened`, `side`, `added`)" " VALUES(?,?,?,?)", (self._mailbox_id, True, side, when)) + # We accept re-opening a mailbox which a side previously closed, + # unlike claim_nameplate(), which forbids any side from re-claiming a + # nameplate which they previously released. (Nameplates forbid this + # because the act of claiming a nameplate for the first time causes a + # new mailbox to be created, which should only happen once). + # Mailboxes have their own distinct objects (to manage + # subscriptions), so closing one which was already closed requires + # making a new object, which works by calling open() just before + # close(). We really do want to support re-closing closed mailboxes, + # because this enables intermittently-connected clients, who remember + # sending a 'close' but aren't sure whether it was received or not, + # then get shut down. Those clients will wake up and re-send the + # 'close', until they receive the 'closed' ack message. + self._touch(when) db.commit() # XXX: reconcile the need for this with the comment above @@ -219,6 +235,10 @@ class AppNamespace: " (`nameplates_id`, `claimed`, `side`, `added`)" " VALUES(?,?,?,?)", (npid, True, side, when)) + else: + if not row["claimed"]: + raise ReclaimedError("you cannot re-claim a nameplate that your side previously released") + # since that might cause a new mailbox to be allocated db.commit() self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py index 32519b0..67e46f4 100644 --- a/src/wormhole/server/rendezvous_websocket.py +++ b/src/wormhole/server/rendezvous_websocket.py @@ -3,7 +3,7 @@ import time from twisted.internet import reactor from twisted.python import log from autobahn.twisted import websocket -from .rendezvous import CrowdedError, SidedMessage +from .rendezvous import CrowdedError, ReclaimedError, SidedMessage from ..util import dict_to_bytes, bytes_to_dict # The WebSocket allows the client to send "commands" to the server, and the @@ -62,6 +62,7 @@ from ..util import dict_to_bytes, bytes_to_dict # -> {type: "claim", nameplate: str} -> mailbox # <- {type: "claimed", mailbox: str} # -> {type: "release"} +# .nameplate is optional, but must match previous claim() # <- {type: "released"} # # -> {type: "open", mailbox: str} -> message @@ -70,6 +71,7 @@ from ..util import dict_to_bytes, bytes_to_dict # -> {type: "add", phase: str, body: hex} # will send echo in a "message" # # -> {type: "close", mood: str} -> closed +# .mailbox is optional, but must match previous open() # <- {type: "closed"} # # <- {type: "error", error: str, orig: {}} # in response to malformed msgs @@ -89,8 +91,13 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): self._side = None self._did_allocate = False # only one allocate() per websocket self._listening = False + self._did_claim = False self._nameplate_id = None + self._did_release = False + self._did_open = False self._mailbox = None + self._mailbox_id = None + self._did_close = False def onConnect(self, request): rv = self.factory.rendezvous @@ -125,7 +132,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): if mtype == "claim": return self.handle_claim(msg, server_rx) if mtype == "release": - return self.handle_release(server_rx) + return self.handle_release(msg, server_rx) if mtype == "open": return self.handle_open(msg, server_rx) @@ -172,6 +179,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): def handle_claim(self, msg, server_rx): if "nameplate" not in msg: raise Error("claim requires 'nameplate'") + if self._did_claim: + raise Error("only one claim per connection") + self._did_claim = True nameplate_id = msg["nameplate"] assert isinstance(nameplate_id, type("")), type(nameplate_id) self._nameplate_id = nameplate_id @@ -180,23 +190,36 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): server_rx) except CrowdedError: raise Error("crowded") + except ReclaimedError: + raise Error("reclaimed") self.send("claimed", mailbox=mailbox_id) - def handle_release(self, server_rx): - if not self._nameplate_id: - raise Error("must claim a nameplate before releasing it") - self._app.release_nameplate(self._nameplate_id, self._side, server_rx) - self._nameplate_id = None + def handle_release(self, msg, server_rx): + if self._did_release: + raise Error("only one release per connection") + if "nameplate" in msg: + if self._nameplate_id is not None: + if msg["nameplate"] != self._nameplate_id: + raise Error("release and claim must use same nameplate") + nameplate_id = msg["nameplate"] + else: + if self._nameplate_id is None: + raise Error("release without nameplate must follow claim") + nameplate_id = self._nameplate_id + assert nameplate_id is not None + self._did_release = True + self._app.release_nameplate(nameplate_id, self._side, server_rx) self.send("released") def handle_open(self, msg, server_rx): if self._mailbox: - raise Error("you already have a mailbox open") + raise Error("only one open per connection") if "mailbox" not in msg: raise Error("open requires 'mailbox'") mailbox_id = msg["mailbox"] assert isinstance(mailbox_id, type("")) + self._mailbox_id = mailbox_id self._mailbox = self._app.open_mailbox(mailbox_id, self._side, server_rx) def _send(sm): @@ -222,11 +245,24 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): self._mailbox.add_message(sm) def handle_close(self, msg, server_rx): + if self._did_close: + raise Error("only one close per connection") + if "mailbox" in msg: + if self._mailbox_id is not None: + if msg["mailbox"] != self._mailbox_id: + raise Error("open and close must use same mailbox") + mailbox_id = msg["mailbox"] + else: + if self._mailbox_id is None: + raise Error("close without mailbox must follow open") + mailbox_id = self._mailbox_id if not self._mailbox: - raise Error("must open mailbox before closing") + self._mailbox = self._app.open_mailbox(mailbox_id, self._side, + server_rx) if self._listening: self._mailbox.remove_listener(self) self._listening = False + self._did_close = True self._mailbox.close(self._side, msg.get("mood"), server_rx) self._mailbox = None self.send("closed") diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index d2aaa83..06daff3 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -754,6 +754,11 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): mailbox_id = m["mailbox"] self.assertEqual(type(mailbox_id), type("")) + c1.send("claim", nameplate="np1") + err = yield c1.next_non_ack() + self.assertEqual(err["type"], "error", err) + self.assertEqual(err["error"], "only one claim per connection") + nids = app.get_nameplate_ids() self.assertEqual(len(nids), 1) self.assertEqual("np1", list(nids)[0]) @@ -796,14 +801,14 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): err = yield c1.next_non_ack() self.assertEqual(err["type"], "error") self.assertEqual(err["error"], - "must claim a nameplate before releasing it") + "release without nameplate must follow claim") c1.send("claim", nameplate="np1") yield c1.next_non_ack() c1.send("release") m = yield c1.next_non_ack() - self.assertEqual(m["type"], "released") + self.assertEqual(m["type"], "released", m) np_row, side_rows = self._nameplate(app, "np1") claims = [(row["side"], row["claimed"]) for row in side_rows] @@ -813,8 +818,45 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): c1.send("release") # no longer claimed err = yield c1.next_non_ack() self.assertEqual(err["type"], "error") + self.assertEqual(err["error"], "only one release per connection") + + @inlineCallbacks + def test_release_named(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + + c1.send("claim", nameplate="np1") + yield c1.next_non_ack() + + c1.send("release", nameplate="np1") + m = yield c1.next_non_ack() + self.assertEqual(m["type"], "released", m) + + @inlineCallbacks + def test_release_named_ignored(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + + c1.send("release", nameplate="np1") # didn't do claim first, ignored + m = yield c1.next_non_ack() + self.assertEqual(m["type"], "released", m) + + @inlineCallbacks + def test_release_named_mismatch(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + + c1.send("claim", nameplate="np1") + yield c1.next_non_ack() + + c1.send("release", nameplate="np2") # mismatching nameplate + err = yield c1.next_non_ack() + self.assertEqual(err["type"], "error") self.assertEqual(err["error"], - "must claim a nameplate before releasing it") + "release and claim must use same nameplate") @inlineCallbacks def test_open(self): @@ -849,7 +891,7 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): c1.send("open", mailbox="mb1") err = yield c1.next_non_ack() self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "you already have a mailbox open") + self.assertEqual(err["error"], "only one open per connection") @inlineCallbacks def test_add(self): @@ -896,7 +938,7 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): c1.send("close", mood="mood") # must open first err = yield c1.next_non_ack() self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must open mailbox before closing") + self.assertEqual(err["error"], "close without mailbox must follow open") c1.send("open", mailbox="mb1") yield c1.sync() @@ -910,8 +952,46 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): c1.send("close", mood="mood") # already closed err = yield c1.next_non_ack() + self.assertEqual(err["type"], "error", m) + self.assertEqual(err["error"], "only one close per connection") + + @inlineCallbacks + def test_close_named(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + + c1.send("open", mailbox="mb1") + yield c1.sync() + + c1.send("close", mailbox="mb1", mood="mood") + m = yield c1.next_non_ack() + self.assertEqual(m["type"], "closed") + + @inlineCallbacks + def test_close_named_ignored(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + + c1.send("close", mailbox="mb1", mood="mood") # no open first, ignored + m = yield c1.next_non_ack() + self.assertEqual(m["type"], "closed") + + @inlineCallbacks + def test_close_named_mismatch(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + + c1.send("open", mailbox="mb1") + yield c1.sync() + + c1.send("close", mailbox="mb2", mood="mood") + err = yield c1.next_non_ack() self.assertEqual(err["type"], "error") - self.assertEqual(err["error"], "must open mailbox before closing") + self.assertEqual(err["error"], "open and close must use same mailbox") + @inlineCallbacks def test_disconnect(self): @@ -934,6 +1014,163 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase): yield d self.assertFalse(mb1.has_listeners()) + @inlineCallbacks + def test_interrupted_client_nameplate(self): + # a client's interactions with the server might be split over + # multiple sequential WebSocket connections, e.g. when the server is + # bounced and the client reconnects, or vice versa + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + app = self._rendezvous.get_app("appid") + + c.send("claim", nameplate="np1") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "claimed") + mailbox_id = m["mailbox"] + self.assertEqual(type(mailbox_id), type("")) + np_row, side_rows = self._nameplate(app, "np1") + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertEqual(claims, [("side", True)]) + c.close() + yield c.d + + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + c.send("claim", nameplate="np1") # idempotent + m = yield c.next_non_ack() + self.assertEqual(m["type"], "claimed") + self.assertEqual(m["mailbox"], mailbox_id) # mailbox id is stable + np_row, side_rows = self._nameplate(app, "np1") + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertEqual(claims, [("side", True)]) + c.close() + yield c.d + + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + # we haven't done a claim with this particular connection, but we can + # still send a release as long as we include the nameplate + c.send("release", nameplate="np1") # release-without-claim + m = yield c.next_non_ack() + self.assertEqual(m["type"], "released") + np_row, side_rows = self._nameplate(app, "np1") + self.assertEqual(np_row, None) + c.close() + yield c.d + + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + # and the release is idempotent, when done on separate connections + c.send("release", nameplate="np1") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "released") + np_row, side_rows = self._nameplate(app, "np1") + self.assertEqual(np_row, None) + c.close() + yield c.d + + + @inlineCallbacks + def test_interrupted_client_nameplate_reclaimed(self): + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + app = self._rendezvous.get_app("appid") + + # a new claim on a previously-closed nameplate is forbidden. We make + # a new nameplate here and manually open a second claim on it, so the + # nameplate stays alive long enough for the code check to happen. + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + c.send("claim", nameplate="np2") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "claimed") + app.claim_nameplate("np2", "side2", 0) + c.send("release", nameplate="np2") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "released") + np_row, side_rows = self._nameplate(app, "np2") + claims = sorted([(row["side"], row["claimed"]) for row in side_rows]) + self.assertEqual(claims, [("side", 0), ("side2", 1)]) + c.close() + yield c.d + + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + c.send("claim", nameplate="np2") # new claim is forbidden + err = yield c.next_non_ack() + self.assertEqual(err["type"], "error") + self.assertEqual(err["error"], "reclaimed") + + np_row, side_rows = self._nameplate(app, "np2") + claims = sorted([(row["side"], row["claimed"]) for row in side_rows]) + self.assertEqual(claims, [("side", 0), ("side2", 1)]) + c.close() + yield c.d + + @inlineCallbacks + def test_interrupted_client_mailbox(self): + # a client's interactions with the server might be split over + # multiple sequential WebSocket connections, e.g. when the server is + # bounced and the client reconnects, or vice versa + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + app = self._rendezvous.get_app("appid") + mb1 = app.open_mailbox("mb1", "side2", 0) + mb1.add_message(SidedMessage(side="side2", phase="phase", + body="body", server_rx=0, + msg_id="msgid")) + + c.send("open", mailbox="mb1") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "message") + self.assertEqual(m["body"], "body") + self.assertTrue(mb1.has_listeners()) + c.close() + yield c.d + + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + # open should be idempotent + c.send("open", mailbox="mb1") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "message") + self.assertEqual(m["body"], "body") + mb_row, side_rows = self._mailbox(app, "mb1") + openeds = [(row["side"], row["opened"]) for row in side_rows] + self.assertIn(("side", 1), openeds) # TODO: why 1, and not True? + + # close on the same connection as open is ok + c.send("close", mailbox="mb1", mood="mood") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "closed", m) + mb_row, side_rows = self._mailbox(app, "mb1") + openeds = [(row["side"], row["opened"]) for row in side_rows] + self.assertIn(("side", 0), openeds) + c.close() + yield c.d + + # close (on a separate connection) is idempotent + c = yield self.make_client() + yield c.next_non_ack() + c.send("bind", appid="appid", side="side") + c.send("close", mailbox="mb1", mood="mood") + m = yield c.next_non_ack() + self.assertEqual(m["type"], "closed", m) + mb_row, side_rows = self._mailbox(app, "mb1") + openeds = [(row["side"], row["opened"]) for row in side_rows] + self.assertIn(("side", 0), openeds) + c.close() + yield c.d + class Summary(unittest.TestCase): def test_mailbox(self):