diff --git a/docs/dilation-protocol.md b/docs/dilation-protocol.md index 7625353..848922e 100644 --- a/docs/dilation-protocol.md +++ b/docs/dilation-protocol.md @@ -105,11 +105,15 @@ resumed or reestablished. Dilation is triggered by calling the `w.dilate()` API. This returns a Deferred that will fire once the first L3 connection is established. It fires -with a 3-tuple of endpoints that can be used to establish subchannels. +with a 3-tuple of endpoints that can be used to establish subchannels, or an +error if dilation is not possible. If the other side's `versions` message +indicates that it does not support dilation, the Deferred will errback with +an `OldPeerCannotDilateError`. For dilation to succeed, both sides must call `w.dilate()`, since the resulting endpoints are the only way to access the subchannels. If the other -side never calls `w.dilate()`, the Deferred will never fire. +side is capable of dilation, but never calls `w.dilate()`, the Deferred will +never fire. The L1 (mailbox) path is used to deliver dilation requests and connection hints. The current mailbox protocol uses named "phases" to distinguish @@ -260,7 +264,7 @@ trigger an immediate error for most non-magic-wormhole listeners (e.g. HTTP servers that were contacted by accident). If the wrong handshake is received, the connection will be dropped. For debugging purposes, the node might want to keep looking at data beyond the first incorrect character and log -everything until the first newline. +a few hundred characters until the first newline. Everything beyond that point is a Noise protocol message, which consists of a 4-byte big-endian length field, followed by the indicated number of bytes. @@ -271,29 +275,44 @@ master PAKE key using HKDF. Each L2 connection uses the same dilation key, but different ephemeral keys, so each gets a different session key. The Leader sends the first message, which is a psk-encrypted ephemeral key. -The Follower sends the next message, its own psk-encrypted ephemeral key. The -Follower then sends an empty packet as the "key confirmation message", which -will be encrypted by the shared key. +The Follower sends the next message, its own psk-encrypted ephemeral key. +These two messages are known as "handshake messages" in the Noise protocol, +and must be processed in a specific order (the Leader must not accept the +Follower's message until it has generated its own). Noise allows handshake +messages to include a payload, but we do not use this feature. -The Leader sees the KCM and knows the connection is viable. It delivers the -protocol object to the L3 manager, which will decide which connection to -select. When the L2 connection is selected to be the new L3, it will send an -empty KCM of its own, to let the Follower know the connection being selected. -All other L2 connections (either viable or still in handshake) are dropped, -all other connection attempts are cancelled. All listening sockets may or may -not be shut down (TODO: think about it). +All subsequent messages as known as "Noise transport messages", and use +independent channels for each direction, so they no longer have ordering +dependencies. Transport messages are encrypted by the shared key, in a form +that evolves as more messages are sent. -The Follower will wait for either an empty KCM (at which point the L2 -connection is delivered to the Dilation manager as the new L3), a -disconnection, or an invalid message (which causes the connection to be -dropped). Other connections and/or listening sockets are stopped. +The Follower's first transport message is an empty packet, which we use as a +"key confirmation message" (KCM). + +The Leader doesn't send a transport message right away: it waits to see the +Follower's KCM, which indicates this connection is viable (i.e. the Follower +used the same dilation key as the Leader, which means they both used the same +wormhole code). + +The Leader delivers the now-viable protocol object to the L3 manager, which +will decide which connection to select. When some L2 connection is selected +to be the new L3, the Leader finally sends an empty KCM of its own over that +L2, to let the Follower know which connection has been selected. All other L2 +connections (either viable or still in handshake) are dropped, and all other +connection attempts are cancelled. All listening sockets may or may not be +shut down (TODO: think about it). + +After sending their KCM, the Follower will wait for either an empty KCM (at +which point the L2 connection is delivered to the Dilation manager as the new +L3), a disconnection, or an invalid message (which causes the connection to +be dropped). Other connections and/or listening sockets are stopped. Internally, the L2Protocol object manages the Noise session itself. It knows (via a constructor argument) whether it is on the Leader or Follower side, which affects both the role is plays in the Noise pattern, and the reaction -to receiving the ephemeral key (for which only the Follower sends an empty -KCM message). After that, the L2Protocol notifies the L3 object in three -situations: +to receiving the handshake message / ephemeral key (for which only the +Follower sends an empty KCM message). After that, the L2Protocol notifies the +L3 object in three situations: * the Noise session produces a valid decrypted frame (for Leader, this includes the Follower's KCM, and thus indicates a viable candidate for diff --git a/src/wormhole/_boss.py b/src/wormhole/_boss.py index 226735b..ec6c845 100644 --- a/src/wormhole/_boss.py +++ b/src/wormhole/_boss.py @@ -205,8 +205,8 @@ class Boss(object): self._did_start_code = True self._C.set_code(code) - def dilate(self): - return self._D.dilate() # fires with endpoints + def dilate(self, no_listen=False): + return self._D.dilate(no_listen=no_listen) # fires with endpoints @m.input() def send(self, plaintext): diff --git a/src/wormhole/_dilation/connection.py b/src/wormhole/_dilation/connection.py index 8bb82aa..7152e78 100644 --- a/src/wormhole/_dilation/connection.py +++ b/src/wormhole/_dilation/connection.py @@ -69,9 +69,13 @@ class Disconnect(Exception): # (everything past this point is a Frame, with be4 length prefix. Frames are # either noise handshake or an encrypted message) # 4: if LEADER, send noise handshake string. if FOLLOWER, wait for it +# LEADER: m=n.write_message(), FOLLOWER: n.read_message(m) # 5: if FOLLOWER, send noise response string. if LEADER, wait for it -# 6: ... - +# FOLLOWER: m=n.write_message(), LEADER: n.read_message(m) +# 6: if FOLLOWER: send KCM (m=n.encrypt('')), wait for KCM (n.decrypt(m)) +# if LEADER: wait for KCM, gather viable connections, select +# send KCM over selected connection, drop the rest +# 7: both: send Ping/Pong/Open/Data/Close/Ack records (n.encrypt(rec)) RelayOK = namedtuple("RelayOk", []) @@ -491,6 +495,7 @@ class DilatedConnectionProtocol(Protocol, object): self._manager = None # set if/when we are selected self._disconnected = OneShotObserver(self._eventual_queue) self._can_send_records = False + self._inbound_record_queue = [] @m.state(initial=True) def unselected(self): @@ -520,6 +525,18 @@ class DilatedConnectionProtocol(Protocol, object): def add_candidate(self): self._connector.add_candidate(self) + @m.output() + def queue_inbound_record(self, record): + # the Follower will see a dataReceived chunk containing both the KCM + # (leader says we've been picked) and the first record. + # Connector.consider takes an eventual-turn to decide to accept this + # connection, which means the record will arrive before we get + # .select() and move to the 'selected' state where we can + # deliver_record. So we need to queue the record for a turn. TODO: + # when we move to the sans-io event-driven scheme, this queue + # shouldn't be necessary + self._inbound_record_queue.append(record) + @m.output() def set_manager(self, manager): self._manager = manager @@ -530,12 +547,21 @@ class DilatedConnectionProtocol(Protocol, object): def can_send_records(self, manager): self._can_send_records = True + @m.output() + def process_inbound_queue(self, manager): + while self._inbound_record_queue: + r = self._inbound_record_queue.pop(0) + self._manager.got_record(r) + @m.output() def deliver_record(self, record): self._manager.got_record(record) unselected.upon(got_kcm, outputs=[add_candidate], enter=selecting) - selecting.upon(select, outputs=[set_manager, can_send_records], enter=selected) + selecting.upon(got_record, outputs=[queue_inbound_record], enter=selecting) + selecting.upon(select, + outputs=[set_manager, can_send_records, process_inbound_queue], + enter=selected) selected.upon(got_record, outputs=[deliver_record], enter=selected) # called by Connector diff --git a/src/wormhole/_dilation/manager.py b/src/wormhole/_dilation/manager.py index f5724be..574c7b2 100644 --- a/src/wormhole/_dilation/manager.py +++ b/src/wormhole/_dilation/manager.py @@ -6,6 +6,7 @@ from attr.validators import provides, instance_of, optional from automat import MethodicalMachine from zope.interface import implementer from twisted.internet.defer import Deferred, inlineCallbacks, returnValue +from twisted.internet.interfaces import IAddress from twisted.python import log from .._interfaces import IDilator, IDilationManager, ISend, ITerminator from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr @@ -97,6 +98,7 @@ class Manager(object): _reactor = attrib(repr=False) _eventual_queue = attrib(repr=False) _cooperator = attrib(repr=False) + _host_addr = attrib(validator=provides(IAddress)) _no_listen = attrib(default=False) _tor = None # TODO _timing = None # TODO @@ -114,7 +116,6 @@ class Manager(object): self._made_first_connection = False self._first_connected = OneShotObserver(self._eventual_queue) self._stopped = OneShotObserver(self._eventual_queue) - self._host_addr = _WormholeAddress() self._next_dilation_phase = 0 @@ -477,7 +478,6 @@ class Dilator(object): _reactor = attrib() _eventual_queue = attrib() _cooperator = attrib() - _no_listen = attrib(default=False) def __attrs_post_init__(self): self._got_versions_d = Deferred() @@ -485,21 +485,22 @@ class Dilator(object): self._endpoints = OneShotObserver(self._eventual_queue) self._pending_inbound_dilate_messages = deque() self._manager = None + self._host_addr = _WormholeAddress() def wire(self, sender, terminator): self._S = ISend(sender) self._T = ITerminator(terminator) # this is the primary entry point, called when w.dilate() is invoked - def dilate(self, transit_relay_location=None): + def dilate(self, transit_relay_location=None, no_listen=False): self._transit_relay_location = transit_relay_location if not self._started: self._started = True - self._start().addBoth(self._endpoints.fire) + self._start(no_listen).addBoth(self._endpoints.fire) return self._endpoints.when_fired() @inlineCallbacks - def _start(self): + def _start(self, no_listen): # first, we wait until we hear the VERSION message, which tells us 1: # the PAKE key works, so we can talk securely, 2: that they can do # dilation at all (if they can't then w.dilate() errbacks) @@ -522,7 +523,16 @@ class Dilator(object): self._transit_key, self._transit_relay_location, self._reactor, self._eventual_queue, - self._cooperator, no_listen=self._no_listen) + self._cooperator, self._host_addr, no_listen) + # We must open subchannel0 early, since messages may arrive very + # quickly once the connection is established. This subchannel may or + # may not ever get revealed to the caller, since the peer might not + # even be capable of dilation. + scid0 = to_be4(0) + peer_addr0 = _SubchannelAddress(scid0) + sc0 = SubChannel(scid0, self._manager, self._host_addr, peer_addr0) + self._manager.set_subchannel_zero(scid0, sc0) + self._manager.start() while self._pending_inbound_dilate_messages: @@ -531,15 +541,10 @@ class Dilator(object): yield self._manager.when_first_connected() - # we can open subchannels as soon as we get our first connection - scid0 = to_be4(0) - self._host_addr = _WormholeAddress() # TODO: share with Manager - peer_addr0 = _SubchannelAddress(scid0) + # we can open non-zero subchannels as soon as we get our first + # connection control_ep = ControlEndpoint(peer_addr0) - sc0 = SubChannel(scid0, self._manager, self._host_addr, peer_addr0) control_ep._subchannel_zero_opened(sc0) - self._manager.set_subchannel_zero(scid0, sc0) - connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr) listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr) diff --git a/src/wormhole/test/dilate/test_connect.py b/src/wormhole/test/dilate/test_connect.py index 7a60400..b5d575e 100644 --- a/src/wormhole/test/dilate/test_connect.py +++ b/src/wormhole/test/dilate/test_connect.py @@ -52,7 +52,7 @@ class Connect(unittest.TestCase): t_left = FakeTerminator() t_right = FakeTerminator() - d_left = manager.Dilator(reactor, eq, cooperator, no_listen=True) + d_left = manager.Dilator(reactor, eq, cooperator) d_left.wire(send_left, t_left) d_left.got_key(key) d_left.got_wormhole_versions({"can-dilate": ["1"]}) @@ -66,7 +66,7 @@ class Connect(unittest.TestCase): with mock.patch("wormhole._dilation.connector.ipaddrs.find_addresses", return_value=["127.0.0.1"]): - eps_left_d = d_left.dilate() + eps_left_d = d_left.dilate(no_listen=True) eps_right_d = d_right.dilate() eps_left = yield eps_left_d diff --git a/src/wormhole/test/dilate/test_connection.py b/src/wormhole/test/dilate/test_connection.py index 345f18b..deb2876 100644 --- a/src/wormhole/test/dilate/test_connection.py +++ b/src/wormhole/test/dilate/test_connection.py @@ -233,3 +233,77 @@ class Connection(unittest.TestCase): self.assertEqual(connector.mock_calls, []) self.assertEqual(t.mock_calls, [mock.call.loseConnection()]) clear_mock_calls(n, connector, t) + + def test_follower_combined(self): + c, n, connector, t, eq = make_con(FOLLOWER) + t_kcm = KCM() + t_open = Open(seqnum=1, scid=to_be4(0x11223344)) + n.decrypt = mock.Mock(side_effect=[ + encode_record(t_kcm), + encode_record(t_open), + ]) + exp_kcm = b"\x00\x00\x00\x03kcm" + n.encrypt = mock.Mock(side_effect=[b"kcm", b"ack1"]) + m = mock.Mock() # Manager + + c.makeConnection(t) + self.assertEqual(n.mock_calls, [mock.call.start_handshake()]) + self.assertEqual(connector.mock_calls, []) + self.assertEqual(t.mock_calls, [mock.call.write(b"outbound_prologue\n")]) + clear_mock_calls(n, connector, t, m) + + c.dataReceived(b"inbound_prologue\n") + + exp_handshake = b"\x00\x00\x00\x09handshake" + # however the FOLLOWER waits until receiving the leader's + # handshake before sending their own + self.assertEqual(n.mock_calls, []) + self.assertEqual(t.mock_calls, []) + self.assertEqual(connector.mock_calls, []) + + clear_mock_calls(n, connector, t, m) + + c.dataReceived(b"\x00\x00\x00\x0Ahandshake2") + # we're the follower, so we send our Noise handshake, then + # encrypt and send the KCM immediately + self.assertEqual(n.mock_calls, [ + mock.call.read_message(b"handshake2"), + mock.call.write_message(), + mock.call.encrypt(encode_record(t_kcm)), + ]) + self.assertEqual(connector.mock_calls, []) + self.assertEqual(t.mock_calls, [ + mock.call.write(exp_handshake), + mock.call.write(exp_kcm)]) + self.assertEqual(c._manager, None) + clear_mock_calls(n, connector, t, m) + + # the leader will select a connection, send the KCM, and then + # immediately send some more data + + kcm_and_msg1 = (b"\x00\x00\x00\x03KCM" + + b"\x00\x00\x00\x04msg1") + c.dataReceived(kcm_and_msg1) + + # follower: inbound KCM means we've been selected. + # in both cases we notify Connector.add_candidate(), and the Connector + # decides if/when to call .select() + + self.assertEqual(n.mock_calls, [mock.call.decrypt(b"KCM"), + mock.call.decrypt(b"msg1")]) + self.assertEqual(connector.mock_calls, [mock.call.add_candidate(c)]) + self.assertEqual(t.mock_calls, []) + clear_mock_calls(n, connector, t, m) + + # now pretend this connection wins (either the Leader decides to use + # this one among all the candiates, or we're the Follower and the + # Connector is reacting to add_candidate() by recognizing we're the + # only candidate there is) + c.select(m) + self.assertIdentical(c._manager, m) + # follower: we already sent the KCM, do nothing + self.assertEqual(n.mock_calls, []) + self.assertEqual(connector.mock_calls, []) + self.assertEqual(t.mock_calls, []) + self.assertEqual(m.mock_calls, [mock.call.got_record(t_open)]) + clear_mock_calls(n, connector, t, m) diff --git a/src/wormhole/test/dilate/test_manager.py b/src/wormhole/test/dilate/test_manager.py index b8258f1..5a86d3f 100644 --- a/src/wormhole/test/dilate/test_manager.py +++ b/src/wormhole/test/dilate/test_manager.py @@ -3,6 +3,7 @@ from zope.interface import alsoProvides from twisted.trial import unittest from twisted.internet.defer import Deferred from twisted.internet.task import Clock, Cooperator +from twisted.internet.interfaces import IAddress import mock from ...eventual import EventualQueue from ..._interfaces import ISend, IDilationManager, ITerminator @@ -14,7 +15,6 @@ from ..._dilation.manager import (Dilator, Manager, make_side, UnknownDilationMessageType, UnexpectedKCM, UnknownMessageType) -from ..._dilation.subchannel import _WormholeAddress from ..._dilation.connection import Open, Data, Close, Ack, KCM, Ping, Pong from .common import clear_mock_calls @@ -56,6 +56,16 @@ class TestDilator(unittest.TestCase): self.assertNoResult(d1) self.assertNoResult(d2) + host_addr = dil._host_addr + + peer_addr = object() + m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress", + return_value=peer_addr) + sc = mock.Mock() + m_sc = mock.patch("wormhole._dilation.manager.SubChannel", + return_value=sc) + scid0 = b"\x00\x00\x00\x00" + m = mock.Mock() alsoProvides(m, IDilationManager) m.when_first_connected.return_value = wfc_d = Deferred() @@ -63,47 +73,38 @@ class TestDilator(unittest.TestCase): return_value=m) as ml: with mock.patch("wormhole._dilation.manager.make_side", return_value="us"): - dil.got_wormhole_versions({"can-dilate": ["1"]}) + with m_sca, m_sc as m_sc_m: + dil.got_wormhole_versions({"can-dilate": ["1"]}) # that should create the Manager self.assertEqual(ml.mock_calls, [mock.call(send, "us", transit_key, - None, reactor, eq, coop, no_listen=False)]) + None, reactor, eq, coop, host_addr, False)]) + # and create subchannel0 + self.assertEqual(m_sc_m.mock_calls, + [mock.call(scid0, m, host_addr, peer_addr)]) # and tell it to start, and get wait-for-it-to-connect Deferred - self.assertEqual(m.mock_calls, [mock.call.start(), + self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc), + mock.call.start(), mock.call.when_first_connected(), ]) clear_mock_calls(m) self.assertNoResult(d1) self.assertNoResult(d2) - host_addr = _WormholeAddress() - m_wa = mock.patch("wormhole._dilation.manager._WormholeAddress", - return_value=host_addr) - peer_addr = object() - m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress", - return_value=peer_addr) ce = mock.Mock() m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", return_value=ce) - sc = mock.Mock() - m_sc = mock.patch("wormhole._dilation.manager.SubChannel", - return_value=sc) - lep = object() m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", return_value=lep) - with m_wa, m_sca, m_ce as m_ce_m, m_sc as m_sc_m, m_sle as m_sle_m: + with m_ce as m_ce_m, m_sle as m_sle_m: wfc_d.callback(None) eq.flush_sync() - scid0 = b"\x00\x00\x00\x00" self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)]) - self.assertEqual(m_sc_m.mock_calls, - [mock.call(scid0, m, host_addr, peer_addr)]) self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)]) self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)]) self.assertEqual(m.mock_calls, - [mock.call.set_subchannel_zero(scid0, sc), - mock.call.set_listener_endpoint(lep), + [mock.call.set_listener_endpoint(lep), ]) clear_mock_calls(m) @@ -166,6 +167,7 @@ class TestDilator(unittest.TestCase): dil, send, reactor, eq, clock, coop = make_dilator() dil._transit_key = b"key" d1 = dil.dilate() + host_addr = dil._host_addr self.assertNoResult(d1) pleasemsg = dict(type="please", side="them") dil.received_dilate(dict_to_bytes(pleasemsg)) @@ -176,14 +178,21 @@ class TestDilator(unittest.TestCase): alsoProvides(m, IDilationManager) m.when_first_connected.return_value = Deferred() + scid0 = b"\x00\x00\x00\x00" + sc = mock.Mock() + m_sc = mock.patch("wormhole._dilation.manager.SubChannel", + return_value=sc) + with mock.patch("wormhole._dilation.manager.Manager", return_value=m) as ml: with mock.patch("wormhole._dilation.manager.make_side", return_value="us"): - dil.got_wormhole_versions({"can-dilate": ["1"]}) + with m_sc: + dil.got_wormhole_versions({"can-dilate": ["1"]}) self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", - None, reactor, eq, coop, no_listen=False)]) - self.assertEqual(m.mock_calls, [mock.call.start(), + None, reactor, eq, coop, host_addr, False)]) + self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc), + mock.call.start(), mock.call.rx_PLEASE(pleasemsg), mock.call.rx_HINTS(hintmsg), mock.call.when_first_connected()]) @@ -191,16 +200,24 @@ class TestDilator(unittest.TestCase): def test_transit_relay(self): dil, send, reactor, eq, clock, coop = make_dilator() dil._transit_key = b"key" + host_addr = dil._host_addr relay = object() d1 = dil.dilate(transit_relay_location=relay) self.assertNoResult(d1) + scid0 = b"\x00\x00\x00\x00" + sc = mock.Mock() + m_sc = mock.patch("wormhole._dilation.manager.SubChannel", + return_value=sc) + with mock.patch("wormhole._dilation.manager.Manager") as ml: with mock.patch("wormhole._dilation.manager.make_side", return_value="us"): - dil.got_wormhole_versions({"can-dilate": ["1"]}) + with m_sc: + dil.got_wormhole_versions({"can-dilate": ["1"]}) self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", - relay, reactor, eq, coop, no_listen=False), + relay, reactor, eq, coop, host_addr, False), + mock.call().set_subchannel_zero(scid0, sc), mock.call().start(), mock.call().when_first_connected()]) @@ -234,12 +251,11 @@ def make_manager(leader=True): h.Inbound = mock.Mock(return_value=h.inbound) h.outbound = mock.Mock() h.Outbound = mock.Mock(return_value=h.outbound) - h.hostaddr = object() + h.hostaddr = mock.Mock() + alsoProvides(h.hostaddr, IAddress) with mock.patch("wormhole._dilation.manager.Inbound", h.Inbound): with mock.patch("wormhole._dilation.manager.Outbound", h.Outbound): - with mock.patch("wormhole._dilation.manager._WormholeAddress", - return_value=h.hostaddr): - m = Manager(h.send, side, h.key, h.relay, h.reactor, h.eq, h.coop) + m = Manager(h.send, side, h.key, h.relay, h.reactor, h.eq, h.coop, h.hostaddr) return m, h diff --git a/src/wormhole/wormhole.py b/src/wormhole/wormhole.py index 7d1ef24..1e5509d 100644 --- a/src/wormhole/wormhole.py +++ b/src/wormhole/wormhole.py @@ -193,10 +193,10 @@ class _DeferredWormhole(object): raise NoKeyError() return derive_key(self._key, to_bytes(purpose), length) - def dilate(self): + def dilate(self, no_listen=False): if not self._enable_dilate: raise NotImplementedError - return self._boss.dilate() # fires with (endpoints) + return self._boss.dilate(no_listen) # fires with (endpoints) def close(self): # fails with WormholeError unless we established a connection