server: forbid reclaiming previously-closed nameplates
at least by the same side. This forces the contour of claims (by any given side) to be strictly unclaimed -> claimed -> released. The "claim" action (unclaimed -> claimed) is idempotent and can be repeated arbitrarily, as long as they happen on separate websocket connections. Likewise for the "release" action (unclaimed -> released). But once a side releases a nameplate, it should never roll so far back that it tries to claim it again, especially because the first claim causes a mailbox to be allocated, and if we manage to allocate two different mailboxes for a single nameplate, then we've thrown idempotency out the window.
This commit is contained in:
parent
3a4a3f544f
commit
7ddf0d3c2d
|
@ -9,6 +9,8 @@ def generate_mailbox_id():
|
||||||
|
|
||||||
class CrowdedError(Exception):
|
class CrowdedError(Exception):
|
||||||
pass
|
pass
|
||||||
|
class ReclaimedError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
|
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
|
||||||
TransitUsage = namedtuple("TransitUsage",
|
TransitUsage = namedtuple("TransitUsage",
|
||||||
|
@ -41,6 +43,20 @@ class Mailbox:
|
||||||
" (`mailbox_id`, `opened`, `side`, `added`)"
|
" (`mailbox_id`, `opened`, `side`, `added`)"
|
||||||
" VALUES(?,?,?,?)",
|
" VALUES(?,?,?,?)",
|
||||||
(self._mailbox_id, True, side, when))
|
(self._mailbox_id, True, side, when))
|
||||||
|
# We accept re-opening a mailbox which a side previously closed,
|
||||||
|
# unlike claim_nameplate(), which forbids any side from re-claiming a
|
||||||
|
# nameplate which they previously released. (Nameplates forbid this
|
||||||
|
# because the act of claiming a nameplate for the first time causes a
|
||||||
|
# new mailbox to be created, which should only happen once).
|
||||||
|
# Mailboxes have their own distinct objects (to manage
|
||||||
|
# subscriptions), so closing one which was already closed requires
|
||||||
|
# making a new object, which works by calling open() just before
|
||||||
|
# close(). We really do want to support re-closing closed mailboxes,
|
||||||
|
# because this enables intermittently-connected clients, who remember
|
||||||
|
# sending a 'close' but aren't sure whether it was received or not,
|
||||||
|
# then get shut down. Those clients will wake up and re-send the
|
||||||
|
# 'close', until they receive the 'closed' ack message.
|
||||||
|
|
||||||
self._touch(when)
|
self._touch(when)
|
||||||
db.commit() # XXX: reconcile the need for this with the comment above
|
db.commit() # XXX: reconcile the need for this with the comment above
|
||||||
|
|
||||||
|
@ -219,6 +235,10 @@ class AppNamespace:
|
||||||
" (`nameplates_id`, `claimed`, `side`, `added`)"
|
" (`nameplates_id`, `claimed`, `side`, `added`)"
|
||||||
" VALUES(?,?,?,?)",
|
" VALUES(?,?,?,?)",
|
||||||
(npid, True, side, when))
|
(npid, True, side, when))
|
||||||
|
else:
|
||||||
|
if not row["claimed"]:
|
||||||
|
raise ReclaimedError("you cannot re-claim a nameplate that your side previously released")
|
||||||
|
# since that might cause a new mailbox to be allocated
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
|
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
|
||||||
|
|
|
@ -3,7 +3,7 @@ import time
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
from twisted.python import log
|
from twisted.python import log
|
||||||
from autobahn.twisted import websocket
|
from autobahn.twisted import websocket
|
||||||
from .rendezvous import CrowdedError, SidedMessage
|
from .rendezvous import CrowdedError, ReclaimedError, SidedMessage
|
||||||
from ..util import dict_to_bytes, bytes_to_dict
|
from ..util import dict_to_bytes, bytes_to_dict
|
||||||
|
|
||||||
# The WebSocket allows the client to send "commands" to the server, and the
|
# The WebSocket allows the client to send "commands" to the server, and the
|
||||||
|
@ -190,6 +190,8 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
|
||||||
server_rx)
|
server_rx)
|
||||||
except CrowdedError:
|
except CrowdedError:
|
||||||
raise Error("crowded")
|
raise Error("crowded")
|
||||||
|
except ReclaimedError:
|
||||||
|
raise Error("reclaimed")
|
||||||
self.send("claimed", mailbox=mailbox_id)
|
self.send("claimed", mailbox=mailbox_id)
|
||||||
|
|
||||||
def handle_release(self, msg, server_rx):
|
def handle_release(self, msg, server_rx):
|
||||||
|
|
|
@ -1073,6 +1073,47 @@ class WebSocketAPI(_Util, ServerBase, unittest.TestCase):
|
||||||
c.close()
|
c.close()
|
||||||
yield c.d
|
yield c.d
|
||||||
|
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def test_interrupted_client_nameplate_reclaimed(self):
|
||||||
|
c = yield self.make_client()
|
||||||
|
yield c.next_non_ack()
|
||||||
|
c.send("bind", appid="appid", side="side")
|
||||||
|
app = self._rendezvous.get_app("appid")
|
||||||
|
|
||||||
|
# a new claim on a previously-closed nameplate is forbidden. We make
|
||||||
|
# a new nameplate here and manually open a second claim on it, so the
|
||||||
|
# nameplate stays alive long enough for the code check to happen.
|
||||||
|
c = yield self.make_client()
|
||||||
|
yield c.next_non_ack()
|
||||||
|
c.send("bind", appid="appid", side="side")
|
||||||
|
c.send("claim", nameplate="np2")
|
||||||
|
m = yield c.next_non_ack()
|
||||||
|
self.assertEqual(m["type"], "claimed")
|
||||||
|
app.claim_nameplate("np2", "side2", 0)
|
||||||
|
c.send("release", nameplate="np2")
|
||||||
|
m = yield c.next_non_ack()
|
||||||
|
self.assertEqual(m["type"], "released")
|
||||||
|
np_row, side_rows = self._nameplate(app, "np2")
|
||||||
|
claims = sorted([(row["side"], row["claimed"]) for row in side_rows])
|
||||||
|
self.assertEqual(claims, [("side", 0), ("side2", 1)])
|
||||||
|
c.close()
|
||||||
|
yield c.d
|
||||||
|
|
||||||
|
c = yield self.make_client()
|
||||||
|
yield c.next_non_ack()
|
||||||
|
c.send("bind", appid="appid", side="side")
|
||||||
|
c.send("claim", nameplate="np2") # new claim is forbidden
|
||||||
|
err = yield c.next_non_ack()
|
||||||
|
self.assertEqual(err["type"], "error")
|
||||||
|
self.assertEqual(err["error"], "reclaimed")
|
||||||
|
|
||||||
|
np_row, side_rows = self._nameplate(app, "np2")
|
||||||
|
claims = sorted([(row["side"], row["claimed"]) for row in side_rows])
|
||||||
|
self.assertEqual(claims, [("side", 0), ("side2", 1)])
|
||||||
|
c.close()
|
||||||
|
yield c.d
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def test_interrupted_client_mailbox(self):
|
def test_interrupted_client_mailbox(self):
|
||||||
# a client's interactions with the server might be split over
|
# a client's interactions with the server might be split over
|
||||||
|
|
Loading…
Reference in New Issue
Block a user