connection: tolerate merged KCM and first record
When the follower's connection is accepted, they'll observe a single dataReceived chunk containing both the leader's KCM and the leader's first actual data record. The state machine considers the KCM for an eventual-turn before selecting the connection, so the data record will arrive while the connection isn't quite ready for it (if consider() were immediate, this wouldn't be a problem, but Automat doesn't deal with reentrant calls very well). So we queue any records that arrive before we're selected.
This commit is contained in:
parent
55056bd324
commit
d612b58dd8
|
@ -495,6 +495,7 @@ class DilatedConnectionProtocol(Protocol, object):
|
|||
self._manager = None # set if/when we are selected
|
||||
self._disconnected = OneShotObserver(self._eventual_queue)
|
||||
self._can_send_records = False
|
||||
self._inbound_record_queue = []
|
||||
|
||||
@m.state(initial=True)
|
||||
def unselected(self):
|
||||
|
@ -524,6 +525,18 @@ class DilatedConnectionProtocol(Protocol, object):
|
|||
def add_candidate(self):
|
||||
self._connector.add_candidate(self)
|
||||
|
||||
@m.output()
|
||||
def queue_inbound_record(self, record):
|
||||
# the Follower will see a dataReceived chunk containing both the KCM
|
||||
# (leader says we've been picked) and the first record.
|
||||
# Connector.consider takes an eventual-turn to decide to accept this
|
||||
# connection, which means the record will arrive before we get
|
||||
# .select() and move to the 'selected' state where we can
|
||||
# deliver_record. So we need to queue the record for a turn. TODO:
|
||||
# when we move to the sans-io event-driven scheme, this queue
|
||||
# shouldn't be necessary
|
||||
self._inbound_record_queue.append(record)
|
||||
|
||||
@m.output()
|
||||
def set_manager(self, manager):
|
||||
self._manager = manager
|
||||
|
@ -534,12 +547,21 @@ class DilatedConnectionProtocol(Protocol, object):
|
|||
def can_send_records(self, manager):
|
||||
self._can_send_records = True
|
||||
|
||||
@m.output()
|
||||
def process_inbound_queue(self, manager):
|
||||
while self._inbound_record_queue:
|
||||
r = self._inbound_record_queue.pop(0)
|
||||
self._manager.got_record(r)
|
||||
|
||||
@m.output()
|
||||
def deliver_record(self, record):
|
||||
self._manager.got_record(record)
|
||||
|
||||
unselected.upon(got_kcm, outputs=[add_candidate], enter=selecting)
|
||||
selecting.upon(select, outputs=[set_manager, can_send_records], enter=selected)
|
||||
selecting.upon(got_record, outputs=[queue_inbound_record], enter=selecting)
|
||||
selecting.upon(select,
|
||||
outputs=[set_manager, can_send_records, process_inbound_queue],
|
||||
enter=selected)
|
||||
selected.upon(got_record, outputs=[deliver_record], enter=selected)
|
||||
|
||||
# called by Connector
|
||||
|
|
|
@ -233,3 +233,77 @@ class Connection(unittest.TestCase):
|
|||
self.assertEqual(connector.mock_calls, [])
|
||||
self.assertEqual(t.mock_calls, [mock.call.loseConnection()])
|
||||
clear_mock_calls(n, connector, t)
|
||||
|
||||
def test_follower_combined(self):
|
||||
c, n, connector, t, eq = make_con(FOLLOWER)
|
||||
t_kcm = KCM()
|
||||
t_open = Open(seqnum=1, scid=to_be4(0x11223344))
|
||||
n.decrypt = mock.Mock(side_effect=[
|
||||
encode_record(t_kcm),
|
||||
encode_record(t_open),
|
||||
])
|
||||
exp_kcm = b"\x00\x00\x00\x03kcm"
|
||||
n.encrypt = mock.Mock(side_effect=[b"kcm", b"ack1"])
|
||||
m = mock.Mock() # Manager
|
||||
|
||||
c.makeConnection(t)
|
||||
self.assertEqual(n.mock_calls, [mock.call.start_handshake()])
|
||||
self.assertEqual(connector.mock_calls, [])
|
||||
self.assertEqual(t.mock_calls, [mock.call.write(b"outbound_prologue\n")])
|
||||
clear_mock_calls(n, connector, t, m)
|
||||
|
||||
c.dataReceived(b"inbound_prologue\n")
|
||||
|
||||
exp_handshake = b"\x00\x00\x00\x09handshake"
|
||||
# however the FOLLOWER waits until receiving the leader's
|
||||
# handshake before sending their own
|
||||
self.assertEqual(n.mock_calls, [])
|
||||
self.assertEqual(t.mock_calls, [])
|
||||
self.assertEqual(connector.mock_calls, [])
|
||||
|
||||
clear_mock_calls(n, connector, t, m)
|
||||
|
||||
c.dataReceived(b"\x00\x00\x00\x0Ahandshake2")
|
||||
# we're the follower, so we send our Noise handshake, then
|
||||
# encrypt and send the KCM immediately
|
||||
self.assertEqual(n.mock_calls, [
|
||||
mock.call.read_message(b"handshake2"),
|
||||
mock.call.write_message(),
|
||||
mock.call.encrypt(encode_record(t_kcm)),
|
||||
])
|
||||
self.assertEqual(connector.mock_calls, [])
|
||||
self.assertEqual(t.mock_calls, [
|
||||
mock.call.write(exp_handshake),
|
||||
mock.call.write(exp_kcm)])
|
||||
self.assertEqual(c._manager, None)
|
||||
clear_mock_calls(n, connector, t, m)
|
||||
|
||||
# the leader will select a connection, send the KCM, and then
|
||||
# immediately send some more data
|
||||
|
||||
kcm_and_msg1 = (b"\x00\x00\x00\x03KCM" +
|
||||
b"\x00\x00\x00\x04msg1")
|
||||
c.dataReceived(kcm_and_msg1)
|
||||
|
||||
# follower: inbound KCM means we've been selected.
|
||||
# in both cases we notify Connector.add_candidate(), and the Connector
|
||||
# decides if/when to call .select()
|
||||
|
||||
self.assertEqual(n.mock_calls, [mock.call.decrypt(b"KCM"),
|
||||
mock.call.decrypt(b"msg1")])
|
||||
self.assertEqual(connector.mock_calls, [mock.call.add_candidate(c)])
|
||||
self.assertEqual(t.mock_calls, [])
|
||||
clear_mock_calls(n, connector, t, m)
|
||||
|
||||
# now pretend this connection wins (either the Leader decides to use
|
||||
# this one among all the candiates, or we're the Follower and the
|
||||
# Connector is reacting to add_candidate() by recognizing we're the
|
||||
# only candidate there is)
|
||||
c.select(m)
|
||||
self.assertIdentical(c._manager, m)
|
||||
# follower: we already sent the KCM, do nothing
|
||||
self.assertEqual(n.mock_calls, [])
|
||||
self.assertEqual(connector.mock_calls, [])
|
||||
self.assertEqual(t.mock_calls, [])
|
||||
self.assertEqual(m.mock_calls, [mock.call.got_record(t_open)])
|
||||
clear_mock_calls(n, connector, t, m)
|
||||
|
|
Loading…
Reference in New Issue
Block a user