diff --git a/src/wormhole/_dilation/connection.py b/src/wormhole/_dilation/connection.py index 4db1545..7152e78 100644 --- a/src/wormhole/_dilation/connection.py +++ b/src/wormhole/_dilation/connection.py @@ -495,6 +495,7 @@ class DilatedConnectionProtocol(Protocol, object): self._manager = None # set if/when we are selected self._disconnected = OneShotObserver(self._eventual_queue) self._can_send_records = False + self._inbound_record_queue = [] @m.state(initial=True) def unselected(self): @@ -524,6 +525,18 @@ class DilatedConnectionProtocol(Protocol, object): def add_candidate(self): self._connector.add_candidate(self) + @m.output() + def queue_inbound_record(self, record): + # the Follower will see a dataReceived chunk containing both the KCM + # (leader says we've been picked) and the first record. + # Connector.consider takes an eventual-turn to decide to accept this + # connection, which means the record will arrive before we get + # .select() and move to the 'selected' state where we can + # deliver_record. So we need to queue the record for a turn. TODO: + # when we move to the sans-io event-driven scheme, this queue + # shouldn't be necessary + self._inbound_record_queue.append(record) + @m.output() def set_manager(self, manager): self._manager = manager @@ -534,12 +547,21 @@ class DilatedConnectionProtocol(Protocol, object): def can_send_records(self, manager): self._can_send_records = True + @m.output() + def process_inbound_queue(self, manager): + while self._inbound_record_queue: + r = self._inbound_record_queue.pop(0) + self._manager.got_record(r) + @m.output() def deliver_record(self, record): self._manager.got_record(record) unselected.upon(got_kcm, outputs=[add_candidate], enter=selecting) - selecting.upon(select, outputs=[set_manager, can_send_records], enter=selected) + selecting.upon(got_record, outputs=[queue_inbound_record], enter=selecting) + selecting.upon(select, + outputs=[set_manager, can_send_records, process_inbound_queue], + enter=selected) selected.upon(got_record, outputs=[deliver_record], enter=selected) # called by Connector diff --git a/src/wormhole/test/dilate/test_connection.py b/src/wormhole/test/dilate/test_connection.py index 345f18b..deb2876 100644 --- a/src/wormhole/test/dilate/test_connection.py +++ b/src/wormhole/test/dilate/test_connection.py @@ -233,3 +233,77 @@ class Connection(unittest.TestCase): self.assertEqual(connector.mock_calls, []) self.assertEqual(t.mock_calls, [mock.call.loseConnection()]) clear_mock_calls(n, connector, t) + + def test_follower_combined(self): + c, n, connector, t, eq = make_con(FOLLOWER) + t_kcm = KCM() + t_open = Open(seqnum=1, scid=to_be4(0x11223344)) + n.decrypt = mock.Mock(side_effect=[ + encode_record(t_kcm), + encode_record(t_open), + ]) + exp_kcm = b"\x00\x00\x00\x03kcm" + n.encrypt = mock.Mock(side_effect=[b"kcm", b"ack1"]) + m = mock.Mock() # Manager + + c.makeConnection(t) + self.assertEqual(n.mock_calls, [mock.call.start_handshake()]) + self.assertEqual(connector.mock_calls, []) + self.assertEqual(t.mock_calls, [mock.call.write(b"outbound_prologue\n")]) + clear_mock_calls(n, connector, t, m) + + c.dataReceived(b"inbound_prologue\n") + + exp_handshake = b"\x00\x00\x00\x09handshake" + # however the FOLLOWER waits until receiving the leader's + # handshake before sending their own + self.assertEqual(n.mock_calls, []) + self.assertEqual(t.mock_calls, []) + self.assertEqual(connector.mock_calls, []) + + clear_mock_calls(n, connector, t, m) + + c.dataReceived(b"\x00\x00\x00\x0Ahandshake2") + # we're the follower, so we send our Noise handshake, then + # encrypt and send the KCM immediately + self.assertEqual(n.mock_calls, [ + mock.call.read_message(b"handshake2"), + mock.call.write_message(), + mock.call.encrypt(encode_record(t_kcm)), + ]) + self.assertEqual(connector.mock_calls, []) + self.assertEqual(t.mock_calls, [ + mock.call.write(exp_handshake), + mock.call.write(exp_kcm)]) + self.assertEqual(c._manager, None) + clear_mock_calls(n, connector, t, m) + + # the leader will select a connection, send the KCM, and then + # immediately send some more data + + kcm_and_msg1 = (b"\x00\x00\x00\x03KCM" + + b"\x00\x00\x00\x04msg1") + c.dataReceived(kcm_and_msg1) + + # follower: inbound KCM means we've been selected. + # in both cases we notify Connector.add_candidate(), and the Connector + # decides if/when to call .select() + + self.assertEqual(n.mock_calls, [mock.call.decrypt(b"KCM"), + mock.call.decrypt(b"msg1")]) + self.assertEqual(connector.mock_calls, [mock.call.add_candidate(c)]) + self.assertEqual(t.mock_calls, []) + clear_mock_calls(n, connector, t, m) + + # now pretend this connection wins (either the Leader decides to use + # this one among all the candiates, or we're the Follower and the + # Connector is reacting to add_candidate() by recognizing we're the + # only candidate there is) + c.select(m) + self.assertIdentical(c._manager, m) + # follower: we already sent the KCM, do nothing + self.assertEqual(n.mock_calls, []) + self.assertEqual(connector.mock_calls, []) + self.assertEqual(t.mock_calls, []) + self.assertEqual(m.mock_calls, [mock.call.got_record(t_open)]) + clear_mock_calls(n, connector, t, m)