From 443d2489722de30188e2b7f6173d28cbf6c2ef28 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 8 Jul 2019 01:02:15 -0700 Subject: [PATCH] manager: call inbound.set_listener_endpoint() before start() This should fix the immediate issue of the remote side opening a subchannel (and sending data on it) before the local side even sees the Endpoints, so before it can register a listening factory to receive the OPEN. We were already buffering early OPENs in the SubchannelListenerEndpoint, but this makes sure that endpoint is available (for the manager's Inbound half to deliver) them as soon as the dilation connection is established. The downside to buffering OPENs (and all data written to inbound subchannels) is that the application has no way to reject or pause them, until it registers the listening factory. If the application never calls `listen_ep.listen()`, we'll buffer this data forever (or until the wormhole is closed). The upside is that we don't lose a roundtrip waiting for an ack on the OPEN. See ticket #335 for more details. refs #335 --- src/wormhole/_dilation/manager.py | 18 +++--- src/wormhole/test/dilate/test_manager.py | 78 ++++++++++++++++-------- 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/src/wormhole/_dilation/manager.py b/src/wormhole/_dilation/manager.py index 5ea2ae6..14b0c89 100644 --- a/src/wormhole/_dilation/manager.py +++ b/src/wormhole/_dilation/manager.py @@ -542,6 +542,15 @@ class Dilator(object): sc0 = SubChannel(scid0, self._manager, self._host_addr, peer_addr0) self._manager.set_subchannel_zero(scid0, sc0) + # we can open non-zero subchannels as soon as we get our first + # connection, and we can make the Endpoints even earlier + control_ep = ControlEndpoint(peer_addr0) + control_ep._subchannel_zero_opened(sc0) + connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr) + + listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr) + self._manager.set_listener_endpoint(listen_ep) + self._manager.start() while self._pending_inbound_dilate_messages: @@ -550,15 +559,6 @@ class Dilator(object): yield self._manager.when_first_connected() - # we can open non-zero subchannels as soon as we get our first - # connection - control_ep = ControlEndpoint(peer_addr0) - control_ep._subchannel_zero_opened(sc0) - connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr) - - listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr) - self._manager.set_listener_endpoint(listen_ep) - endpoints = (control_ep, connect_ep, listen_ep) returnValue(endpoints) diff --git a/src/wormhole/test/dilate/test_manager.py b/src/wormhole/test/dilate/test_manager.py index e36e403..cea92de 100644 --- a/src/wormhole/test/dilate/test_manager.py +++ b/src/wormhole/test/dilate/test_manager.py @@ -63,6 +63,12 @@ class TestDilator(unittest.TestCase): sc = mock.Mock() m_sc = mock.patch("wormhole._dilation.manager.SubChannel", return_value=sc) + ce = mock.Mock() + m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", + return_value=ce) + lep = object() + m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", + return_value=lep) scid0 = 0 m = mock.Mock() @@ -72,16 +78,21 @@ class TestDilator(unittest.TestCase): return_value=m) as ml: with mock.patch("wormhole._dilation.manager.make_side", return_value="us"): - with m_sca, m_sc as m_sc_m: - dil.got_wormhole_versions({"can-dilate": ["1"]}) + with m_sca, m_sc as m_sc_m, m_ce as m_ce_m, m_sle as m_sle_m: + dil.got_wormhole_versions({"can-dilate": ["1"]}) # that should create the Manager self.assertEqual(ml.mock_calls, [mock.call(send, "us", transit_key, None, reactor, eq, coop, host_addr, False)]) + # and the three endpoints + self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)]) + self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)]) + self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)]) # and create subchannel0 self.assertEqual(m_sc_m.mock_calls, [mock.call(scid0, m, host_addr, peer_addr)]) # and tell it to start, and get wait-for-it-to-connect Deferred self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc), + mock.call.set_listener_endpoint(lep), mock.call.start(), mock.call.when_first_connected(), ]) @@ -89,22 +100,9 @@ class TestDilator(unittest.TestCase): self.assertNoResult(d1) self.assertNoResult(d2) - ce = mock.Mock() - m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", - return_value=ce) - lep = object() - m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", - return_value=lep) - - with m_ce as m_ce_m, m_sle as m_sle_m: - wfc_d.callback(None) - eq.flush_sync() - self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)]) - self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)]) - self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)]) - self.assertEqual(m.mock_calls, - [mock.call.set_listener_endpoint(lep), - ]) + wfc_d.callback(None) + eq.flush_sync() + self.assertEqual(m.mock_calls, []) clear_mock_calls(m) eps = self.successResultOf(d1) @@ -181,16 +179,26 @@ class TestDilator(unittest.TestCase): sc = mock.Mock() m_sc = mock.patch("wormhole._dilation.manager.SubChannel", return_value=sc) + peer_addr = object() + m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress", + return_value=peer_addr) + ce = mock.Mock() + m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", + return_value=ce) + lep = object() + m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", + return_value=lep) with mock.patch("wormhole._dilation.manager.Manager", return_value=m) as ml: with mock.patch("wormhole._dilation.manager.make_side", return_value="us"): - with m_sc: + with m_sca, m_sc, m_ce, m_sle: dil.got_wormhole_versions({"can-dilate": ["1"]}) self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", None, reactor, eq, coop, host_addr, False)]) self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc), + mock.call.set_listener_endpoint(lep), mock.call.start(), mock.call.rx_PLEASE(pleasemsg), mock.call.rx_HINTS(hintmsg), @@ -204,21 +212,39 @@ class TestDilator(unittest.TestCase): d1 = dil.dilate(transit_relay_location=relay) self.assertNoResult(d1) + m = mock.Mock() + alsoProvides(m, IDilationManager) + m.when_first_connected.return_value = Deferred() + scid0 = 0 sc = mock.Mock() m_sc = mock.patch("wormhole._dilation.manager.SubChannel", return_value=sc) + peer_addr = object() + m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress", + return_value=peer_addr) + ce = mock.Mock() + m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint", + return_value=ce) + lep = object() + m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint", + return_value=lep) - with mock.patch("wormhole._dilation.manager.Manager") as ml: + with mock.patch("wormhole._dilation.manager.Manager", + return_value=m) as ml: with mock.patch("wormhole._dilation.manager.make_side", return_value="us"): - with m_sc: + with m_sca, m_sc, m_ce, m_sle: dil.got_wormhole_versions({"can-dilate": ["1"]}) - self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key", - relay, reactor, eq, coop, host_addr, False), - mock.call().set_subchannel_zero(scid0, sc), - mock.call().start(), - mock.call().when_first_connected()]) + + self.assertEqual(ml.mock_calls, [ + mock.call(send, "us", b"key", + relay, reactor, eq, coop, host_addr, False)]) + self.assertEqual(m.mock_calls, [ + mock.call.set_subchannel_zero(scid0, sc), + mock.call.set_listener_endpoint(lep), + mock.call.start(), + mock.call.when_first_connected()]) LEADER = "ff3456abcdef"