Merge branch 'dilation-fixes'
This commit is contained in:
		
						commit
						293893ec01
					
				|  | @ -105,11 +105,15 @@ resumed or reestablished. | ||||||
| 
 | 
 | ||||||
| Dilation is triggered by calling the `w.dilate()` API. This returns a | Dilation is triggered by calling the `w.dilate()` API. This returns a | ||||||
| Deferred that will fire once the first L3 connection is established. It fires | Deferred that will fire once the first L3 connection is established. It fires | ||||||
| with a 3-tuple of endpoints that can be used to establish subchannels. | with a 3-tuple of endpoints that can be used to establish subchannels, or an | ||||||
|  | error if dilation is not possible. If the other side's `versions` message | ||||||
|  | indicates that it does not support dilation, the Deferred will errback with | ||||||
|  | an `OldPeerCannotDilateError`. | ||||||
| 
 | 
 | ||||||
| For dilation to succeed, both sides must call `w.dilate()`, since the | For dilation to succeed, both sides must call `w.dilate()`, since the | ||||||
| resulting endpoints are the only way to access the subchannels. If the other | resulting endpoints are the only way to access the subchannels. If the other | ||||||
| side never calls `w.dilate()`, the Deferred will never fire. | side is capable of dilation, but never calls `w.dilate()`, the Deferred will | ||||||
|  | never fire. | ||||||
| 
 | 
 | ||||||
| The L1 (mailbox) path is used to deliver dilation requests and connection | The L1 (mailbox) path is used to deliver dilation requests and connection | ||||||
| hints. The current mailbox protocol uses named "phases" to distinguish | hints. The current mailbox protocol uses named "phases" to distinguish | ||||||
|  | @ -260,7 +264,7 @@ trigger an immediate error for most non-magic-wormhole listeners (e.g. HTTP | ||||||
| servers that were contacted by accident). If the wrong handshake is received, | servers that were contacted by accident). If the wrong handshake is received, | ||||||
| the connection will be dropped. For debugging purposes, the node might want | the connection will be dropped. For debugging purposes, the node might want | ||||||
| to keep looking at data beyond the first incorrect character and log | to keep looking at data beyond the first incorrect character and log | ||||||
| everything until the first newline. | a few hundred characters until the first newline. | ||||||
| 
 | 
 | ||||||
| Everything beyond that point is a Noise protocol message, which consists of a | Everything beyond that point is a Noise protocol message, which consists of a | ||||||
| 4-byte big-endian length field, followed by the indicated number of bytes. | 4-byte big-endian length field, followed by the indicated number of bytes. | ||||||
|  | @ -271,29 +275,44 @@ master PAKE key using HKDF. Each L2 connection uses the same dilation key, | ||||||
| but different ephemeral keys, so each gets a different session key. | but different ephemeral keys, so each gets a different session key. | ||||||
| 
 | 
 | ||||||
| The Leader sends the first message, which is a psk-encrypted ephemeral key. | The Leader sends the first message, which is a psk-encrypted ephemeral key. | ||||||
| The Follower sends the next message, its own psk-encrypted ephemeral key. The | The Follower sends the next message, its own psk-encrypted ephemeral key. | ||||||
| Follower then sends an empty packet as the "key confirmation message", which | These two messages are known as "handshake messages" in the Noise protocol, | ||||||
| will be encrypted by the shared key. | and must be processed in a specific order (the Leader must not accept the | ||||||
|  | Follower's message until it has generated its own). Noise allows handshake | ||||||
|  | messages to include a payload, but we do not use this feature. | ||||||
| 
 | 
 | ||||||
| The Leader sees the KCM and knows the connection is viable. It delivers the | All subsequent messages as known as "Noise transport messages", and use | ||||||
| protocol object to the L3 manager, which will decide which connection to | independent channels for each direction, so they no longer have ordering | ||||||
| select. When the L2 connection is selected to be the new L3, it will send an | dependencies. Transport messages are encrypted by the shared key, in a form | ||||||
| empty KCM of its own, to let the Follower know the connection being selected. | that evolves as more messages are sent. | ||||||
| All other L2 connections (either viable or still in handshake) are dropped, |  | ||||||
| all other connection attempts are cancelled. All listening sockets may or may |  | ||||||
| not be shut down (TODO: think about it). |  | ||||||
| 
 | 
 | ||||||
| The Follower will wait for either an empty KCM (at which point the L2 | The Follower's first transport message is an empty packet, which we use as a | ||||||
| connection is delivered to the Dilation manager as the new L3), a | "key confirmation message" (KCM). | ||||||
| disconnection, or an invalid message (which causes the connection to be | 
 | ||||||
| dropped). Other connections and/or listening sockets are stopped. | The Leader doesn't send a transport message right away: it waits to see the | ||||||
|  | Follower's KCM, which indicates this connection is viable (i.e. the Follower | ||||||
|  | used the same dilation key as the Leader, which means they both used the same | ||||||
|  | wormhole code). | ||||||
|  | 
 | ||||||
|  | The Leader delivers the now-viable protocol object to the L3 manager, which | ||||||
|  | will decide which connection to select. When some L2 connection is selected | ||||||
|  | to be the new L3, the Leader finally sends an empty KCM of its own over that | ||||||
|  | L2, to let the Follower know which connection has been selected. All other L2 | ||||||
|  | connections (either viable or still in handshake) are dropped, and all other | ||||||
|  | connection attempts are cancelled. All listening sockets may or may not be | ||||||
|  | shut down (TODO: think about it). | ||||||
|  | 
 | ||||||
|  | After sending their KCM, the Follower will wait for either an empty KCM (at | ||||||
|  | which point the L2 connection is delivered to the Dilation manager as the new | ||||||
|  | L3), a disconnection, or an invalid message (which causes the connection to | ||||||
|  | be dropped). Other connections and/or listening sockets are stopped. | ||||||
| 
 | 
 | ||||||
| Internally, the L2Protocol object manages the Noise session itself. It knows | Internally, the L2Protocol object manages the Noise session itself. It knows | ||||||
| (via a constructor argument) whether it is on the Leader or Follower side, | (via a constructor argument) whether it is on the Leader or Follower side, | ||||||
| which affects both the role is plays in the Noise pattern, and the reaction | which affects both the role is plays in the Noise pattern, and the reaction | ||||||
| to receiving the ephemeral key (for which only the Follower sends an empty | to receiving the handshake message / ephemeral key (for which only the | ||||||
| KCM message). After that, the L2Protocol notifies the L3 object in three | Follower sends an empty KCM message). After that, the L2Protocol notifies the | ||||||
| situations: | L3 object in three situations: | ||||||
| 
 | 
 | ||||||
| * the Noise session produces a valid decrypted frame (for Leader, this | * the Noise session produces a valid decrypted frame (for Leader, this | ||||||
|   includes the Follower's KCM, and thus indicates a viable candidate for |   includes the Follower's KCM, and thus indicates a viable candidate for | ||||||
|  |  | ||||||
|  | @ -205,8 +205,8 @@ class Boss(object): | ||||||
|         self._did_start_code = True |         self._did_start_code = True | ||||||
|         self._C.set_code(code) |         self._C.set_code(code) | ||||||
| 
 | 
 | ||||||
|     def dilate(self): |     def dilate(self, no_listen=False): | ||||||
|         return self._D.dilate()  # fires with endpoints |         return self._D.dilate(no_listen=no_listen)  # fires with endpoints | ||||||
| 
 | 
 | ||||||
|     @m.input() |     @m.input() | ||||||
|     def send(self, plaintext): |     def send(self, plaintext): | ||||||
|  |  | ||||||
|  | @ -69,9 +69,13 @@ class Disconnect(Exception): | ||||||
| # (everything past this point is a Frame, with be4 length prefix. Frames are | # (everything past this point is a Frame, with be4 length prefix. Frames are | ||||||
| #  either noise handshake or an encrypted message) | #  either noise handshake or an encrypted message) | ||||||
| # 4: if LEADER, send noise handshake string. if FOLLOWER, wait for it | # 4: if LEADER, send noise handshake string. if FOLLOWER, wait for it | ||||||
|  | #    LEADER: m=n.write_message(), FOLLOWER: n.read_message(m) | ||||||
| # 5: if FOLLOWER, send noise response string. if LEADER, wait for it | # 5: if FOLLOWER, send noise response string. if LEADER, wait for it | ||||||
| # 6: ... | #    FOLLOWER: m=n.write_message(), LEADER: n.read_message(m) | ||||||
| 
 | # 6: if FOLLOWER: send KCM (m=n.encrypt('')), wait for KCM (n.decrypt(m)) | ||||||
|  | #    if LEADER: wait for KCM, gather viable connections, select | ||||||
|  | #               send KCM over selected connection, drop the rest | ||||||
|  | # 7: both: send Ping/Pong/Open/Data/Close/Ack records (n.encrypt(rec)) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| RelayOK = namedtuple("RelayOk", []) | RelayOK = namedtuple("RelayOk", []) | ||||||
|  | @ -491,6 +495,7 @@ class DilatedConnectionProtocol(Protocol, object): | ||||||
|         self._manager = None  # set if/when we are selected |         self._manager = None  # set if/when we are selected | ||||||
|         self._disconnected = OneShotObserver(self._eventual_queue) |         self._disconnected = OneShotObserver(self._eventual_queue) | ||||||
|         self._can_send_records = False |         self._can_send_records = False | ||||||
|  |         self._inbound_record_queue = [] | ||||||
| 
 | 
 | ||||||
|     @m.state(initial=True) |     @m.state(initial=True) | ||||||
|     def unselected(self): |     def unselected(self): | ||||||
|  | @ -520,6 +525,18 @@ class DilatedConnectionProtocol(Protocol, object): | ||||||
|     def add_candidate(self): |     def add_candidate(self): | ||||||
|         self._connector.add_candidate(self) |         self._connector.add_candidate(self) | ||||||
| 
 | 
 | ||||||
|  |     @m.output() | ||||||
|  |     def queue_inbound_record(self, record): | ||||||
|  |         # the Follower will see a dataReceived chunk containing both the KCM | ||||||
|  |         # (leader says we've been picked) and the first record. | ||||||
|  |         # Connector.consider takes an eventual-turn to decide to accept this | ||||||
|  |         # connection, which means the record will arrive before we get | ||||||
|  |         # .select() and move to the 'selected' state where we can | ||||||
|  |         # deliver_record. So we need to queue the record for a turn. TODO: | ||||||
|  |         # when we move to the sans-io event-driven scheme, this queue | ||||||
|  |         # shouldn't be necessary | ||||||
|  |         self._inbound_record_queue.append(record) | ||||||
|  | 
 | ||||||
|     @m.output() |     @m.output() | ||||||
|     def set_manager(self, manager): |     def set_manager(self, manager): | ||||||
|         self._manager = manager |         self._manager = manager | ||||||
|  | @ -530,12 +547,21 @@ class DilatedConnectionProtocol(Protocol, object): | ||||||
|     def can_send_records(self, manager): |     def can_send_records(self, manager): | ||||||
|         self._can_send_records = True |         self._can_send_records = True | ||||||
| 
 | 
 | ||||||
|  |     @m.output() | ||||||
|  |     def process_inbound_queue(self, manager): | ||||||
|  |         while self._inbound_record_queue: | ||||||
|  |             r = self._inbound_record_queue.pop(0) | ||||||
|  |             self._manager.got_record(r) | ||||||
|  | 
 | ||||||
|     @m.output() |     @m.output() | ||||||
|     def deliver_record(self, record): |     def deliver_record(self, record): | ||||||
|         self._manager.got_record(record) |         self._manager.got_record(record) | ||||||
| 
 | 
 | ||||||
|     unselected.upon(got_kcm, outputs=[add_candidate], enter=selecting) |     unselected.upon(got_kcm, outputs=[add_candidate], enter=selecting) | ||||||
|     selecting.upon(select, outputs=[set_manager, can_send_records], enter=selected) |     selecting.upon(got_record, outputs=[queue_inbound_record], enter=selecting) | ||||||
|  |     selecting.upon(select, | ||||||
|  |                    outputs=[set_manager, can_send_records, process_inbound_queue], | ||||||
|  |                    enter=selected) | ||||||
|     selected.upon(got_record, outputs=[deliver_record], enter=selected) |     selected.upon(got_record, outputs=[deliver_record], enter=selected) | ||||||
| 
 | 
 | ||||||
|     # called by Connector |     # called by Connector | ||||||
|  |  | ||||||
|  | @ -6,6 +6,7 @@ from attr.validators import provides, instance_of, optional | ||||||
| from automat import MethodicalMachine | from automat import MethodicalMachine | ||||||
| from zope.interface import implementer | from zope.interface import implementer | ||||||
| from twisted.internet.defer import Deferred, inlineCallbacks, returnValue | from twisted.internet.defer import Deferred, inlineCallbacks, returnValue | ||||||
|  | from twisted.internet.interfaces import IAddress | ||||||
| from twisted.python import log | from twisted.python import log | ||||||
| from .._interfaces import IDilator, IDilationManager, ISend, ITerminator | from .._interfaces import IDilator, IDilationManager, ISend, ITerminator | ||||||
| from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr | from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr | ||||||
|  | @ -97,6 +98,7 @@ class Manager(object): | ||||||
|     _reactor = attrib(repr=False) |     _reactor = attrib(repr=False) | ||||||
|     _eventual_queue = attrib(repr=False) |     _eventual_queue = attrib(repr=False) | ||||||
|     _cooperator = attrib(repr=False) |     _cooperator = attrib(repr=False) | ||||||
|  |     _host_addr = attrib(validator=provides(IAddress)) | ||||||
|     _no_listen = attrib(default=False) |     _no_listen = attrib(default=False) | ||||||
|     _tor = None  # TODO |     _tor = None  # TODO | ||||||
|     _timing = None  # TODO |     _timing = None  # TODO | ||||||
|  | @ -114,7 +116,6 @@ class Manager(object): | ||||||
|         self._made_first_connection = False |         self._made_first_connection = False | ||||||
|         self._first_connected = OneShotObserver(self._eventual_queue) |         self._first_connected = OneShotObserver(self._eventual_queue) | ||||||
|         self._stopped = OneShotObserver(self._eventual_queue) |         self._stopped = OneShotObserver(self._eventual_queue) | ||||||
|         self._host_addr = _WormholeAddress() |  | ||||||
| 
 | 
 | ||||||
|         self._next_dilation_phase = 0 |         self._next_dilation_phase = 0 | ||||||
| 
 | 
 | ||||||
|  | @ -477,7 +478,6 @@ class Dilator(object): | ||||||
|     _reactor = attrib() |     _reactor = attrib() | ||||||
|     _eventual_queue = attrib() |     _eventual_queue = attrib() | ||||||
|     _cooperator = attrib() |     _cooperator = attrib() | ||||||
|     _no_listen = attrib(default=False) |  | ||||||
| 
 | 
 | ||||||
|     def __attrs_post_init__(self): |     def __attrs_post_init__(self): | ||||||
|         self._got_versions_d = Deferred() |         self._got_versions_d = Deferred() | ||||||
|  | @ -485,21 +485,22 @@ class Dilator(object): | ||||||
|         self._endpoints = OneShotObserver(self._eventual_queue) |         self._endpoints = OneShotObserver(self._eventual_queue) | ||||||
|         self._pending_inbound_dilate_messages = deque() |         self._pending_inbound_dilate_messages = deque() | ||||||
|         self._manager = None |         self._manager = None | ||||||
|  |         self._host_addr = _WormholeAddress() | ||||||
| 
 | 
 | ||||||
|     def wire(self, sender, terminator): |     def wire(self, sender, terminator): | ||||||
|         self._S = ISend(sender) |         self._S = ISend(sender) | ||||||
|         self._T = ITerminator(terminator) |         self._T = ITerminator(terminator) | ||||||
| 
 | 
 | ||||||
|     # this is the primary entry point, called when w.dilate() is invoked |     # this is the primary entry point, called when w.dilate() is invoked | ||||||
|     def dilate(self, transit_relay_location=None): |     def dilate(self, transit_relay_location=None, no_listen=False): | ||||||
|         self._transit_relay_location = transit_relay_location |         self._transit_relay_location = transit_relay_location | ||||||
|         if not self._started: |         if not self._started: | ||||||
|             self._started = True |             self._started = True | ||||||
|             self._start().addBoth(self._endpoints.fire) |             self._start(no_listen).addBoth(self._endpoints.fire) | ||||||
|         return self._endpoints.when_fired() |         return self._endpoints.when_fired() | ||||||
| 
 | 
 | ||||||
|     @inlineCallbacks |     @inlineCallbacks | ||||||
|     def _start(self): |     def _start(self, no_listen): | ||||||
|         # first, we wait until we hear the VERSION message, which tells us 1: |         # first, we wait until we hear the VERSION message, which tells us 1: | ||||||
|         # the PAKE key works, so we can talk securely, 2: that they can do |         # the PAKE key works, so we can talk securely, 2: that they can do | ||||||
|         # dilation at all (if they can't then w.dilate() errbacks) |         # dilation at all (if they can't then w.dilate() errbacks) | ||||||
|  | @ -522,7 +523,16 @@ class Dilator(object): | ||||||
|                                 self._transit_key, |                                 self._transit_key, | ||||||
|                                 self._transit_relay_location, |                                 self._transit_relay_location, | ||||||
|                                 self._reactor, self._eventual_queue, |                                 self._reactor, self._eventual_queue, | ||||||
|                                 self._cooperator, no_listen=self._no_listen) |                                 self._cooperator, self._host_addr, no_listen) | ||||||
|  |         # We must open subchannel0 early, since messages may arrive very | ||||||
|  |         # quickly once the connection is established. This subchannel may or | ||||||
|  |         # may not ever get revealed to the caller, since the peer might not | ||||||
|  |         # even be capable of dilation. | ||||||
|  |         scid0 = to_be4(0) | ||||||
|  |         peer_addr0 = _SubchannelAddress(scid0) | ||||||
|  |         sc0 = SubChannel(scid0, self._manager, self._host_addr, peer_addr0) | ||||||
|  |         self._manager.set_subchannel_zero(scid0, sc0) | ||||||
|  | 
 | ||||||
|         self._manager.start() |         self._manager.start() | ||||||
| 
 | 
 | ||||||
|         while self._pending_inbound_dilate_messages: |         while self._pending_inbound_dilate_messages: | ||||||
|  | @ -531,15 +541,10 @@ class Dilator(object): | ||||||
| 
 | 
 | ||||||
|         yield self._manager.when_first_connected() |         yield self._manager.when_first_connected() | ||||||
| 
 | 
 | ||||||
|         # we can open subchannels as soon as we get our first connection |         # we can open non-zero subchannels as soon as we get our first | ||||||
|         scid0 = to_be4(0) |         # connection | ||||||
|         self._host_addr = _WormholeAddress()  # TODO: share with Manager |  | ||||||
|         peer_addr0 = _SubchannelAddress(scid0) |  | ||||||
|         control_ep = ControlEndpoint(peer_addr0) |         control_ep = ControlEndpoint(peer_addr0) | ||||||
|         sc0 = SubChannel(scid0, self._manager, self._host_addr, peer_addr0) |  | ||||||
|         control_ep._subchannel_zero_opened(sc0) |         control_ep._subchannel_zero_opened(sc0) | ||||||
|         self._manager.set_subchannel_zero(scid0, sc0) |  | ||||||
| 
 |  | ||||||
|         connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr) |         connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr) | ||||||
| 
 | 
 | ||||||
|         listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr) |         listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr) | ||||||
|  |  | ||||||
|  | @ -52,7 +52,7 @@ class Connect(unittest.TestCase): | ||||||
|         t_left = FakeTerminator() |         t_left = FakeTerminator() | ||||||
|         t_right = FakeTerminator() |         t_right = FakeTerminator() | ||||||
| 
 | 
 | ||||||
|         d_left = manager.Dilator(reactor, eq, cooperator, no_listen=True) |         d_left = manager.Dilator(reactor, eq, cooperator) | ||||||
|         d_left.wire(send_left, t_left) |         d_left.wire(send_left, t_left) | ||||||
|         d_left.got_key(key) |         d_left.got_key(key) | ||||||
|         d_left.got_wormhole_versions({"can-dilate": ["1"]}) |         d_left.got_wormhole_versions({"can-dilate": ["1"]}) | ||||||
|  | @ -66,7 +66,7 @@ class Connect(unittest.TestCase): | ||||||
| 
 | 
 | ||||||
|         with mock.patch("wormhole._dilation.connector.ipaddrs.find_addresses", |         with mock.patch("wormhole._dilation.connector.ipaddrs.find_addresses", | ||||||
|                         return_value=["127.0.0.1"]): |                         return_value=["127.0.0.1"]): | ||||||
|             eps_left_d = d_left.dilate() |             eps_left_d = d_left.dilate(no_listen=True) | ||||||
|             eps_right_d = d_right.dilate() |             eps_right_d = d_right.dilate() | ||||||
| 
 | 
 | ||||||
|         eps_left = yield eps_left_d |         eps_left = yield eps_left_d | ||||||
|  |  | ||||||
|  | @ -233,3 +233,77 @@ class Connection(unittest.TestCase): | ||||||
|         self.assertEqual(connector.mock_calls, []) |         self.assertEqual(connector.mock_calls, []) | ||||||
|         self.assertEqual(t.mock_calls, [mock.call.loseConnection()]) |         self.assertEqual(t.mock_calls, [mock.call.loseConnection()]) | ||||||
|         clear_mock_calls(n, connector, t) |         clear_mock_calls(n, connector, t) | ||||||
|  | 
 | ||||||
|  |     def test_follower_combined(self): | ||||||
|  |         c, n, connector, t, eq = make_con(FOLLOWER) | ||||||
|  |         t_kcm = KCM() | ||||||
|  |         t_open = Open(seqnum=1, scid=to_be4(0x11223344)) | ||||||
|  |         n.decrypt = mock.Mock(side_effect=[ | ||||||
|  |             encode_record(t_kcm), | ||||||
|  |             encode_record(t_open), | ||||||
|  |         ]) | ||||||
|  |         exp_kcm = b"\x00\x00\x00\x03kcm" | ||||||
|  |         n.encrypt = mock.Mock(side_effect=[b"kcm", b"ack1"]) | ||||||
|  |         m = mock.Mock()  # Manager | ||||||
|  | 
 | ||||||
|  |         c.makeConnection(t) | ||||||
|  |         self.assertEqual(n.mock_calls, [mock.call.start_handshake()]) | ||||||
|  |         self.assertEqual(connector.mock_calls, []) | ||||||
|  |         self.assertEqual(t.mock_calls, [mock.call.write(b"outbound_prologue\n")]) | ||||||
|  |         clear_mock_calls(n, connector, t, m) | ||||||
|  | 
 | ||||||
|  |         c.dataReceived(b"inbound_prologue\n") | ||||||
|  | 
 | ||||||
|  |         exp_handshake = b"\x00\x00\x00\x09handshake" | ||||||
|  |         # however the FOLLOWER waits until receiving the leader's | ||||||
|  |         # handshake before sending their own | ||||||
|  |         self.assertEqual(n.mock_calls, []) | ||||||
|  |         self.assertEqual(t.mock_calls, []) | ||||||
|  |         self.assertEqual(connector.mock_calls, []) | ||||||
|  | 
 | ||||||
|  |         clear_mock_calls(n, connector, t, m) | ||||||
|  | 
 | ||||||
|  |         c.dataReceived(b"\x00\x00\x00\x0Ahandshake2") | ||||||
|  |         # we're the follower, so we send our Noise handshake, then | ||||||
|  |         # encrypt and send the KCM immediately | ||||||
|  |         self.assertEqual(n.mock_calls, [ | ||||||
|  |             mock.call.read_message(b"handshake2"), | ||||||
|  |             mock.call.write_message(), | ||||||
|  |             mock.call.encrypt(encode_record(t_kcm)), | ||||||
|  |         ]) | ||||||
|  |         self.assertEqual(connector.mock_calls, []) | ||||||
|  |         self.assertEqual(t.mock_calls, [ | ||||||
|  |             mock.call.write(exp_handshake), | ||||||
|  |             mock.call.write(exp_kcm)]) | ||||||
|  |         self.assertEqual(c._manager, None) | ||||||
|  |         clear_mock_calls(n, connector, t, m) | ||||||
|  | 
 | ||||||
|  |         # the leader will select a connection, send the KCM, and then | ||||||
|  |         # immediately send some more data | ||||||
|  | 
 | ||||||
|  |         kcm_and_msg1 = (b"\x00\x00\x00\x03KCM" + | ||||||
|  |                         b"\x00\x00\x00\x04msg1") | ||||||
|  |         c.dataReceived(kcm_and_msg1) | ||||||
|  | 
 | ||||||
|  |         # follower: inbound KCM means we've been selected. | ||||||
|  |         # in both cases we notify Connector.add_candidate(), and the Connector | ||||||
|  |         # decides if/when to call .select() | ||||||
|  | 
 | ||||||
|  |         self.assertEqual(n.mock_calls, [mock.call.decrypt(b"KCM"), | ||||||
|  |                                         mock.call.decrypt(b"msg1")]) | ||||||
|  |         self.assertEqual(connector.mock_calls, [mock.call.add_candidate(c)]) | ||||||
|  |         self.assertEqual(t.mock_calls, []) | ||||||
|  |         clear_mock_calls(n, connector, t, m) | ||||||
|  | 
 | ||||||
|  |         # now pretend this connection wins (either the Leader decides to use | ||||||
|  |         # this one among all the candiates, or we're the Follower and the | ||||||
|  |         # Connector is reacting to add_candidate() by recognizing we're the | ||||||
|  |         # only candidate there is) | ||||||
|  |         c.select(m) | ||||||
|  |         self.assertIdentical(c._manager, m) | ||||||
|  |         # follower: we already sent the KCM, do nothing | ||||||
|  |         self.assertEqual(n.mock_calls, []) | ||||||
|  |         self.assertEqual(connector.mock_calls, []) | ||||||
|  |         self.assertEqual(t.mock_calls, []) | ||||||
|  |         self.assertEqual(m.mock_calls, [mock.call.got_record(t_open)]) | ||||||
|  |         clear_mock_calls(n, connector, t, m) | ||||||
|  |  | ||||||
|  | @ -3,6 +3,7 @@ from zope.interface import alsoProvides | ||||||
| from twisted.trial import unittest | from twisted.trial import unittest | ||||||
| from twisted.internet.defer import Deferred | from twisted.internet.defer import Deferred | ||||||
| from twisted.internet.task import Clock, Cooperator | from twisted.internet.task import Clock, Cooperator | ||||||
|  | from twisted.internet.interfaces import IAddress | ||||||
| import mock | import mock | ||||||
| from ...eventual import EventualQueue | from ...eventual import EventualQueue | ||||||
| from ..._interfaces import ISend, IDilationManager, ITerminator | from ..._interfaces import ISend, IDilationManager, ITerminator | ||||||
|  | @ -14,7 +15,6 @@ from ..._dilation.manager import (Dilator, Manager, make_side, | ||||||
|                                   UnknownDilationMessageType, |                                   UnknownDilationMessageType, | ||||||
|                                   UnexpectedKCM, |                                   UnexpectedKCM, | ||||||
|                                   UnknownMessageType) |                                   UnknownMessageType) | ||||||
| from ..._dilation.subchannel import _WormholeAddress |  | ||||||
| from ..._dilation.connection import Open, Data, Close, Ack, KCM, Ping, Pong | from ..._dilation.connection import Open, Data, Close, Ack, KCM, Ping, Pong | ||||||
| from .common import clear_mock_calls | from .common import clear_mock_calls | ||||||
| 
 | 
 | ||||||
|  | @ -56,6 +56,16 @@ class TestDilator(unittest.TestCase): | ||||||
|         self.assertNoResult(d1) |         self.assertNoResult(d1) | ||||||
|         self.assertNoResult(d2) |         self.assertNoResult(d2) | ||||||
| 
 | 
 | ||||||
|  |         host_addr = dil._host_addr | ||||||
|  | 
 | ||||||
|  |         peer_addr = object() | ||||||
|  |         m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress", | ||||||
|  |                            return_value=peer_addr) | ||||||
|  |         sc = mock.Mock() | ||||||
|  |         m_sc = mock.patch("wormhole._dilation.manager.SubChannel", | ||||||
|  |                           return_value=sc) | ||||||
|  |         scid0 = b"\x00\x00\x00\x00" | ||||||
|  | 
 | ||||||
|         m = mock.Mock() |         m = mock.Mock() | ||||||
|         alsoProvides(m, IDilationManager) |         alsoProvides(m, IDilationManager) | ||||||
|         m.when_first_connected.return_value = wfc_d = Deferred() |         m.when_first_connected.return_value = wfc_d = Deferred() | ||||||
|  | @ -63,47 +73,38 @@ class TestDilator(unittest.TestCase): | ||||||
|                         return_value=m) as ml: |                         return_value=m) as ml: | ||||||
|             with mock.patch("wormhole._dilation.manager.make_side", |             with mock.patch("wormhole._dilation.manager.make_side", | ||||||
|                             return_value="us"): |                             return_value="us"): | ||||||
|                 dil.got_wormhole_versions({"can-dilate": ["1"]}) |                 with m_sca, m_sc as m_sc_m: | ||||||
|  |                     dil.got_wormhole_versions({"can-dilate": ["1"]}) | ||||||
|         # that should create the Manager |         # that should create the Manager | ||||||
|         self.assertEqual(ml.mock_calls, [mock.call(send, "us", transit_key, |         self.assertEqual(ml.mock_calls, [mock.call(send, "us", transit_key, | ||||||
|                                                    None, reactor, eq, coop, no_listen=False)]) |                                                    None, reactor, eq, coop, host_addr, False)]) | ||||||
|  |         # and create subchannel0 | ||||||
|  |         self.assertEqual(m_sc_m.mock_calls, | ||||||
|  |                          [mock.call(scid0, m, host_addr, peer_addr)]) | ||||||
|         # and tell it to start, and get wait-for-it-to-connect Deferred |         # and tell it to start, and get wait-for-it-to-connect Deferred | ||||||
|         self.assertEqual(m.mock_calls, [mock.call.start(), |         self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc), | ||||||
|  |                                         mock.call.start(), | ||||||
|                                         mock.call.when_first_connected(), |                                         mock.call.when_first_connected(), | ||||||
|                                         ]) |                                         ]) | ||||||
|         clear_mock_calls(m) |         clear_mock_calls(m) | ||||||
|         self.assertNoResult(d1) |         self.assertNoResult(d1) | ||||||
|         self.assertNoResult(d2) |         self.assertNoResult(d2) | ||||||
| 
 | 
 | ||||||
|         host_addr = _WormholeAddress() |  | ||||||
|         m_wa = mock.patch("wormhole._dilation.manager._WormholeAddress", |  | ||||||
|                           return_value=host_addr) |  | ||||||
|         peer_addr = object() |  | ||||||
|         m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress", |  | ||||||
|                            return_value=peer_addr) |  | ||||||
|         ce = mock.Mock() |         ce = mock.Mock() | ||||||
|         m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", |         m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", | ||||||
|                           return_value=ce) |                           return_value=ce) | ||||||
|         sc = mock.Mock() |  | ||||||
|         m_sc = mock.patch("wormhole._dilation.manager.SubChannel", |  | ||||||
|                           return_value=sc) |  | ||||||
| 
 |  | ||||||
|         lep = object() |         lep = object() | ||||||
|         m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", |         m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", | ||||||
|                            return_value=lep) |                            return_value=lep) | ||||||
| 
 | 
 | ||||||
|         with m_wa, m_sca, m_ce as m_ce_m, m_sc as m_sc_m, m_sle as m_sle_m: |         with m_ce as m_ce_m, m_sle as m_sle_m: | ||||||
|             wfc_d.callback(None) |             wfc_d.callback(None) | ||||||
|             eq.flush_sync() |             eq.flush_sync() | ||||||
|         scid0 = b"\x00\x00\x00\x00" |  | ||||||
|         self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)]) |         self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)]) | ||||||
|         self.assertEqual(m_sc_m.mock_calls, |  | ||||||
|                          [mock.call(scid0, m, host_addr, peer_addr)]) |  | ||||||
|         self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)]) |         self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)]) | ||||||
|         self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)]) |         self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)]) | ||||||
|         self.assertEqual(m.mock_calls, |         self.assertEqual(m.mock_calls, | ||||||
|                          [mock.call.set_subchannel_zero(scid0, sc), |                          [mock.call.set_listener_endpoint(lep), | ||||||
|                           mock.call.set_listener_endpoint(lep), |  | ||||||
|                           ]) |                           ]) | ||||||
|         clear_mock_calls(m) |         clear_mock_calls(m) | ||||||
| 
 | 
 | ||||||
|  | @ -166,6 +167,7 @@ class TestDilator(unittest.TestCase): | ||||||
|         dil, send, reactor, eq, clock, coop = make_dilator() |         dil, send, reactor, eq, clock, coop = make_dilator() | ||||||
|         dil._transit_key = b"key" |         dil._transit_key = b"key" | ||||||
|         d1 = dil.dilate() |         d1 = dil.dilate() | ||||||
|  |         host_addr = dil._host_addr | ||||||
|         self.assertNoResult(d1) |         self.assertNoResult(d1) | ||||||
|         pleasemsg = dict(type="please", side="them") |         pleasemsg = dict(type="please", side="them") | ||||||
|         dil.received_dilate(dict_to_bytes(pleasemsg)) |         dil.received_dilate(dict_to_bytes(pleasemsg)) | ||||||
|  | @ -176,14 +178,21 @@ class TestDilator(unittest.TestCase): | ||||||
|         alsoProvides(m, IDilationManager) |         alsoProvides(m, IDilationManager) | ||||||
|         m.when_first_connected.return_value = Deferred() |         m.when_first_connected.return_value = Deferred() | ||||||
| 
 | 
 | ||||||
|  |         scid0 = b"\x00\x00\x00\x00" | ||||||
|  |         sc = mock.Mock() | ||||||
|  |         m_sc = mock.patch("wormhole._dilation.manager.SubChannel", | ||||||
|  |                           return_value=sc) | ||||||
|  | 
 | ||||||
|         with mock.patch("wormhole._dilation.manager.Manager", |         with mock.patch("wormhole._dilation.manager.Manager", | ||||||
|                         return_value=m) as ml: |                         return_value=m) as ml: | ||||||
|             with mock.patch("wormhole._dilation.manager.make_side", |             with mock.patch("wormhole._dilation.manager.make_side", | ||||||
|                             return_value="us"): |                             return_value="us"): | ||||||
|                 dil.got_wormhole_versions({"can-dilate": ["1"]}) |                 with m_sc: | ||||||
|  |                     dil.got_wormhole_versions({"can-dilate": ["1"]}) | ||||||
|         self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", |         self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", | ||||||
|                                                    None, reactor, eq, coop, no_listen=False)]) |                                                    None, reactor, eq, coop, host_addr, False)]) | ||||||
|         self.assertEqual(m.mock_calls, [mock.call.start(), |         self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc), | ||||||
|  |                                         mock.call.start(), | ||||||
|                                         mock.call.rx_PLEASE(pleasemsg), |                                         mock.call.rx_PLEASE(pleasemsg), | ||||||
|                                         mock.call.rx_HINTS(hintmsg), |                                         mock.call.rx_HINTS(hintmsg), | ||||||
|                                         mock.call.when_first_connected()]) |                                         mock.call.when_first_connected()]) | ||||||
|  | @ -191,16 +200,24 @@ class TestDilator(unittest.TestCase): | ||||||
|     def test_transit_relay(self): |     def test_transit_relay(self): | ||||||
|         dil, send, reactor, eq, clock, coop = make_dilator() |         dil, send, reactor, eq, clock, coop = make_dilator() | ||||||
|         dil._transit_key = b"key" |         dil._transit_key = b"key" | ||||||
|  |         host_addr = dil._host_addr | ||||||
|         relay = object() |         relay = object() | ||||||
|         d1 = dil.dilate(transit_relay_location=relay) |         d1 = dil.dilate(transit_relay_location=relay) | ||||||
|         self.assertNoResult(d1) |         self.assertNoResult(d1) | ||||||
| 
 | 
 | ||||||
|  |         scid0 = b"\x00\x00\x00\x00" | ||||||
|  |         sc = mock.Mock() | ||||||
|  |         m_sc = mock.patch("wormhole._dilation.manager.SubChannel", | ||||||
|  |                           return_value=sc) | ||||||
|  | 
 | ||||||
|         with mock.patch("wormhole._dilation.manager.Manager") as ml: |         with mock.patch("wormhole._dilation.manager.Manager") as ml: | ||||||
|             with mock.patch("wormhole._dilation.manager.make_side", |             with mock.patch("wormhole._dilation.manager.make_side", | ||||||
|                             return_value="us"): |                             return_value="us"): | ||||||
|                 dil.got_wormhole_versions({"can-dilate": ["1"]}) |                 with m_sc: | ||||||
|  |                     dil.got_wormhole_versions({"can-dilate": ["1"]}) | ||||||
|         self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", |         self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", | ||||||
|                                                    relay, reactor, eq, coop, no_listen=False), |                                                    relay, reactor, eq, coop, host_addr, False), | ||||||
|  |                                          mock.call().set_subchannel_zero(scid0, sc), | ||||||
|                                          mock.call().start(), |                                          mock.call().start(), | ||||||
|                                          mock.call().when_first_connected()]) |                                          mock.call().when_first_connected()]) | ||||||
| 
 | 
 | ||||||
|  | @ -234,12 +251,11 @@ def make_manager(leader=True): | ||||||
|     h.Inbound = mock.Mock(return_value=h.inbound) |     h.Inbound = mock.Mock(return_value=h.inbound) | ||||||
|     h.outbound = mock.Mock() |     h.outbound = mock.Mock() | ||||||
|     h.Outbound = mock.Mock(return_value=h.outbound) |     h.Outbound = mock.Mock(return_value=h.outbound) | ||||||
|     h.hostaddr = object() |     h.hostaddr = mock.Mock() | ||||||
|  |     alsoProvides(h.hostaddr, IAddress) | ||||||
|     with mock.patch("wormhole._dilation.manager.Inbound", h.Inbound): |     with mock.patch("wormhole._dilation.manager.Inbound", h.Inbound): | ||||||
|         with mock.patch("wormhole._dilation.manager.Outbound", h.Outbound): |         with mock.patch("wormhole._dilation.manager.Outbound", h.Outbound): | ||||||
|             with mock.patch("wormhole._dilation.manager._WormholeAddress", |             m = Manager(h.send, side, h.key, h.relay, h.reactor, h.eq, h.coop, h.hostaddr) | ||||||
|                             return_value=h.hostaddr): |  | ||||||
|                 m = Manager(h.send, side, h.key, h.relay, h.reactor, h.eq, h.coop) |  | ||||||
|     return m, h |     return m, h | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -193,10 +193,10 @@ class _DeferredWormhole(object): | ||||||
|             raise NoKeyError() |             raise NoKeyError() | ||||||
|         return derive_key(self._key, to_bytes(purpose), length) |         return derive_key(self._key, to_bytes(purpose), length) | ||||||
| 
 | 
 | ||||||
|     def dilate(self): |     def dilate(self, no_listen=False): | ||||||
|         if not self._enable_dilate: |         if not self._enable_dilate: | ||||||
|             raise NotImplementedError |             raise NotImplementedError | ||||||
|         return self._boss.dilate()  # fires with (endpoints) |         return self._boss.dilate(no_listen)  # fires with (endpoints) | ||||||
| 
 | 
 | ||||||
|     def close(self): |     def close(self): | ||||||
|         # fails with WormholeError unless we established a connection |         # fails with WormholeError unless we established a connection | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user