diff --git a/docs/state-machines/mailbox.dot b/docs/state-machines/mailbox.dot index 712fcd7..9bcd964 100644 --- a/docs/state-machines/mailbox.dot +++ b/docs/state-machines/mailbox.dot @@ -1,7 +1,7 @@ digraph { /* new idea */ - title [label="Message\nMachine" style="dotted"] + title [label="Mailbox\nMachine" style="dotted"] {rank=same; S0A S0B} S0A [label="S0A:\nunknown"] @@ -21,6 +21,9 @@ digraph { S0A -> S1A [label="got_mailbox"] S1A [label="S1A:\nknown"] S1A -> P_open [label="connected"] + S1A -> P1A_queue [label="add_message" style="dotted"] + P1A_queue [shape="box" label="queue" style="dotted"] + P1A_queue -> S1A [style="dotted"] S1A -> S2A [style="invis"] P_open -> P2_connected [style="invis"] @@ -85,7 +88,7 @@ digraph { S4B [label="S4B:\nclosed"] S4A -> S4B [label="connected"] S4B -> S4A [label="lost"] - S4B -> S4B [label="add_message\nrx_message\nclose"] + S4B -> S4B [label="add_message\nrx_message\nclose"] # is "close" needed? S0A -> P3A_done [label="close" color="red"] S0B -> P3B_done [label="close" color="red"] diff --git a/src/wormhole/_mailbox.py b/src/wormhole/_mailbox.py index 958d6f8..90f9d95 100644 --- a/src/wormhole/_mailbox.py +++ b/src/wormhole/_mailbox.py @@ -131,20 +131,16 @@ class Mailbox(object): @m.output() def N_release_and_accept(self, side, phase, body): self._N.release() - self._accept(side, phase, body) + if phase not in self._processed: + self._processed.add(phase) + self._O.got_message(side, phase, body) @m.output() def RC_tx_close(self): assert self._mood self._RC_tx_close() def _RC_tx_close(self): self._RC.tx_close(self._mailbox, self._mood) - @m.output() - def accept(self, side, phase, body): - self._accept(side, phase, body) - def _accept(self, side, phase, body): - if phase not in self._processed: - self._processed.add(phase) - self._O.got_message(side, phase, body) + @m.output() def dequeue(self, phase, body): self._pending_outbound.pop(phase, None) @@ -191,10 +187,12 @@ class Mailbox(object): S3B.upon(add_message, enter=S3B, outputs=[]) S3B.upon(rx_message_theirs, enter=S3B, outputs=[]) S3B.upon(rx_message_ours, enter=S3B, outputs=[]) + S3B.upon(close, enter=S3B, outputs=[]) S4A.upon(connected, enter=S4B, outputs=[]) S4B.upon(lost, enter=S4A, outputs=[]) S4.upon(add_message, enter=S4, outputs=[]) S4.upon(rx_message_theirs, enter=S4, outputs=[]) S4.upon(rx_message_ours, enter=S4, outputs=[]) + S4.upon(close, enter=S4, outputs=[]) diff --git a/src/wormhole/test/test_machines.py b/src/wormhole/test/test_machines.py index 55461c4..c569165 100644 --- a/src/wormhole/test/test_machines.py +++ b/src/wormhole/test/test_machines.py @@ -4,10 +4,10 @@ import mock from zope.interface import directlyProvides, implementer from twisted.trial import unittest from .. import (errors, timing, _order, _receive, _key, _code, _lister, - _input, _allocator, _send, _terminator) -from .._interfaces import (IKey, IReceive, IBoss, ISend, IMailbox, + _input, _allocator, _send, _terminator, _nameplate, _mailbox) +from .._interfaces import (IKey, IReceive, IBoss, ISend, IMailbox, IOrder, IRendezvousConnector, ILister, IInput, IAllocator, - INameplate, ICode, IWordlist) + INameplate, ICode, IWordlist, ITerminator) from .._key import derive_key, derive_phase_key, encrypt_data from ..util import dict_to_bytes, hexstr_to_bytes, bytes_to_hexstr, to_bytes from spake2 import SPAKE2_Symmetric @@ -506,6 +506,538 @@ class Allocator(unittest.TestCase): self.assertEqual(events, [("c.allocated", "1", "1-word-word"), ]) +class Nameplate(unittest.TestCase): + def build(self): + events = [] + n = _nameplate.Nameplate() + m = Dummy("m", events, IMailbox, "got_mailbox") + i = Dummy("i", events, IInput, "got_wordlist") + rc = Dummy("rc", events, IRendezvousConnector, "tx_claim", "tx_release") + t = Dummy("t", events, ITerminator, "nameplate_done") + n.wire(m, i, rc, t) + return n, m, i, rc, t, events + + def test_set_first(self): + # connection remains up throughout + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_connect_first(self): + # connection remains up throughout + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_reconnect_while_claiming(self): + # connection bounced while waiting for rx_claimed + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + n.lost() + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + + def test_reconnect_while_claimed(self): + # connection bounced while claimed: no retransmits should be sent + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.lost() + n.connected() + self.assertEqual(events, []) + + def test_reconnect_while_releasing(self): + # connection bounced while waiting for rx_released + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.lost() + n.connected() + self.assertEqual(events, [("rc.tx_release", "1")]) + + def test_reconnect_while_done(self): + # connection bounces after we're done + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + events[:] = [] + + n.lost() + n.connected() + self.assertEqual(events, []) + + def test_close_while_idle(self): + n, m, i, rc, t, events = self.build() + n.close() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_idle_connected(self): + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + n.close() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_unclaimed(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + n.close() # before ever being connected + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_claiming(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + n.close() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_claiming_but_disconnected(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + n.lost() + n.close() + self.assertEqual(events, []) + # we're now waiting for a connection, so we can release the nameplate + n.connected() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_claimed(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.close() + # this path behaves just like a deliberate release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_claimed_but_disconnected(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.lost() + n.close() + # we're now waiting for a connection, so we can release the nameplate + n.connected() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_releasing(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.close() # ignored, we're already on our way out the door + self.assertEqual(events, []) + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_releasing_but_disconnecteda(self): + n, m, i, rc, t, events = self.build() + n.set_nameplate("1") + self.assertEqual(events, []) + n.connected() + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.lost() + n.close() + # we must retransmit the tx_release when we reconnect + self.assertEqual(events, []) + + n.connected() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + + def test_close_while_done(self): + # connection remains up throughout + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + events[:] = [] + + n.close() # NOP + self.assertEqual(events, []) + + def test_close_while_done_but_disconnected(self): + # connection remains up throughout + n, m, i, rc, t, events = self.build() + n.connected() + self.assertEqual(events, []) + + n.set_nameplate("1") + self.assertEqual(events, [("rc.tx_claim", "1")]) + events[:] = [] + + wl = object() + with mock.patch("wormhole._nameplate.PGPWordList", return_value=wl): + n.rx_claimed("mbox1") + self.assertEqual(events, [("i.got_wordlist", wl), + ("m.got_mailbox", "mbox1"), + ]) + events[:] = [] + + n.release() + self.assertEqual(events, [("rc.tx_release", "1")]) + events[:] = [] + + n.rx_released() + self.assertEqual(events, [("t.nameplate_done",)]) + events[:] = [] + + n.lost() + n.close() # NOP + self.assertEqual(events, []) + +class Mailbox(unittest.TestCase): + def build(self): + events = [] + m = _mailbox.Mailbox("side1") + n = Dummy("n", events, INameplate, "release") + rc = Dummy("rc", events, IRendezvousConnector, + "tx_add", "tx_open", "tx_close") + o = Dummy("o", events, IOrder, "got_message") + t = Dummy("t", events, ITerminator, "mailbox_done") + m.wire(n, rc, o, t) + return m, n, rc, o, t, events + + # TODO: test moods + + def assert_events(self, events, initial_events, tx_add_events): + self.assertEqual(len(events), len(initial_events)+len(tx_add_events), + events) + self.assertEqual(events[:len(initial_events)], initial_events) + self.assertEqual(set(events[len(initial_events):]), tx_add_events) + + def test_connect_first(self): # connect before got_mailbox + m, n, rc, o, t, events = self.build() + m.add_message("phase1", b"msg1") + self.assertEqual(events, []) + + m.connected() + self.assertEqual(events, []) + + m.got_mailbox("mbox1") + self.assertEqual(events, [("rc.tx_open", "mbox1"), + ("rc.tx_add", "phase1", b"msg1")]) + events[:] = [] + + m.add_message("phase2", b"msg2") + self.assertEqual(events, [("rc.tx_add", "phase2", b"msg2")]) + events[:] = [] + + # bouncing the connection should retransmit everything, even the open() + m.lost() + self.assertEqual(events, []) + # and messages sent while here should be queued + m.add_message("phase3", b"msg3") + self.assertEqual(events, []) + + m.connected() + # the other messages are allowed to be sent in any order + self.assert_events(events, [("rc.tx_open", "mbox1")], + { ("rc.tx_add", "phase1", b"msg1"), + ("rc.tx_add", "phase2", b"msg2"), + ("rc.tx_add", "phase3", b"msg3"), + }) + events[:] = [] + + m.rx_message("side1", "phase1", b"msg1") # echo of our message, dequeue + self.assertEqual(events, []) + + m.lost() + m.connected() + self.assert_events(events, [("rc.tx_open", "mbox1")], + {("rc.tx_add", "phase2", b"msg2"), + ("rc.tx_add", "phase3", b"msg3"), + }) + events[:] = [] + + # a new message from the peer gets delivered, and the Nameplate is + # released since the message proves that our peer opened the Mailbox + # and therefore no longer needs the Nameplate + m.rx_message("side2", "phase1", b"msg1them") # new message from peer + self.assertEqual(events, [("n.release",), + ("o.got_message", "side2", "phase1", b"msg1them"), + ]) + events[:] = [] + + # we de-duplicate peer messages, but still re-release the nameplate + # since Nameplate is smart enough to ignore that + m.rx_message("side2", "phase1", b"msg1them") + self.assertEqual(events, [("n.release",), + ]) + events[:] = [] + + m.close("happy") + self.assertEqual(events, [("rc.tx_close", "mbox1", "happy")]) + events[:] = [] + + # while closing, we ignore a lot + m.add_message("phase-late", b"late") + m.rx_message("side1", "phase2", b"msg2") + m.close("happy") + self.assertEqual(events, []) + + # bouncing the connection forces a retransmit of the tx_close + m.lost() + self.assertEqual(events, []) + m.connected() + self.assertEqual(events, [("rc.tx_close", "mbox1", "happy")]) + events[:] = [] + + m.rx_closed() + self.assertEqual(events, [("t.mailbox_done",)]) + events[:] = [] + + # while closed, we ignore everything + m.add_message("phase-late", b"late") + m.rx_message("side1", "phase2", b"msg2") + m.close("happy") + m.lost() + m.connected() + self.assertEqual(events, []) + + def test_mailbox_first(self): # got_mailbox before connect + m, n, rc, o, t, events = self.build() + m.add_message("phase1", b"msg1") + self.assertEqual(events, []) + + m.got_mailbox("mbox1") + m.add_message("phase2", b"msg2") + self.assertEqual(events, []) + + m.connected() + + self.assert_events(events, [("rc.tx_open", "mbox1")], + { ("rc.tx_add", "phase1", b"msg1"), + ("rc.tx_add", "phase2", b"msg2"), + }) + + def test_close_while_idle(self): + m, n, rc, o, t, events = self.build() + m.close("happy") + self.assertEqual(events, [("t.mailbox_done",)]) + + def test_close_while_idle_but_connected(self): + m, n, rc, o, t, events = self.build() + m.connected() + m.close("happy") + self.assertEqual(events, [("t.mailbox_done",)]) + + def test_close_while_mailbox_disconnected(self): + m, n, rc, o, t, events = self.build() + m.got_mailbox("mbox1") + m.close("happy") + self.assertEqual(events, [("t.mailbox_done",)]) + + def test_close_while_reconnecting(self): + m, n, rc, o, t, events = self.build() + m.got_mailbox("mbox1") + m.connected() + self.assertEqual(events, [("rc.tx_open", "mbox1")]) + events[:] = [] + + m.lost() + self.assertEqual(events, []) + m.close("happy") + self.assertEqual(events, []) + # we now wait to connect, so we can send the tx_close + + m.connected() + self.assertEqual(events, [("rc.tx_close", "mbox1", "happy")]) + events[:] = [] + + m.rx_closed() + self.assertEqual(events, [("t.mailbox_done",)]) + events[:] = [] class Terminator(unittest.TestCase): def build(self): @@ -562,14 +1094,15 @@ class Terminator(unittest.TestCase): self._do_test("close", "nameplate", "mailbox") self._do_test("close", "mailbox", "nameplate") - + # TODO: test moods # TODO # #Send -# Mailbox -# Nameplate +# #Mailbox +# #Nameplate # #Terminator # Boss # RendezvousConnector (not a state machine) # #Input: exercise helper methods # #wordlist +# test idempotency / at-most-once where applicable