manager: call inbound.set_listener_endpoint() before start()

This should fix the immediate issue of the remote side opening a
subchannel (and sending data on it) before the local side even sees the
Endpoints, so before it can register a listening factory to receive the OPEN.

We were already buffering early OPENs in the SubchannelListenerEndpoint, but
this makes sure that endpoint is available (for the manager's Inbound half to
deliver) them as soon as the dilation connection is established.

The downside to buffering OPENs (and all data written to inbound subchannels)
is that the application has no way to reject or pause them, until it
registers the listening factory. If the application never calls
`listen_ep.listen()`, we'll buffer this data forever (or until the wormhole
is closed). The upside is that we don't lose a roundtrip waiting for an ack
on the OPEN. See ticket #335 for more details.

refs #335
This commit is contained in:
Brian Warner 2019-07-08 01:02:15 -07:00
parent 575cf5d789
commit 443d248972
2 changed files with 61 additions and 35 deletions

View File

@ -542,6 +542,15 @@ class Dilator(object):
sc0 = SubChannel(scid0, self._manager, self._host_addr, peer_addr0)
self._manager.set_subchannel_zero(scid0, sc0)
# we can open non-zero subchannels as soon as we get our first
# connection, and we can make the Endpoints even earlier
control_ep = ControlEndpoint(peer_addr0)
control_ep._subchannel_zero_opened(sc0)
connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr)
listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr)
self._manager.set_listener_endpoint(listen_ep)
self._manager.start()
while self._pending_inbound_dilate_messages:
@ -550,15 +559,6 @@ class Dilator(object):
yield self._manager.when_first_connected()
# we can open non-zero subchannels as soon as we get our first
# connection
control_ep = ControlEndpoint(peer_addr0)
control_ep._subchannel_zero_opened(sc0)
connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr)
listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr)
self._manager.set_listener_endpoint(listen_ep)
endpoints = (control_ep, connect_ep, listen_ep)
returnValue(endpoints)

View File

@ -63,6 +63,12 @@ class TestDilator(unittest.TestCase):
sc = mock.Mock()
m_sc = mock.patch("wormhole._dilation.manager.SubChannel",
return_value=sc)
ce = mock.Mock()
m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint",
return_value=ce)
lep = object()
m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint",
return_value=lep)
scid0 = 0
m = mock.Mock()
@ -72,16 +78,21 @@ class TestDilator(unittest.TestCase):
return_value=m) as ml:
with mock.patch("wormhole._dilation.manager.make_side",
return_value="us"):
with m_sca, m_sc as m_sc_m:
dil.got_wormhole_versions({"can-dilate": ["1"]})
with m_sca, m_sc as m_sc_m, m_ce as m_ce_m, m_sle as m_sle_m:
dil.got_wormhole_versions({"can-dilate": ["1"]})
# that should create the Manager
self.assertEqual(ml.mock_calls, [mock.call(send, "us", transit_key,
None, reactor, eq, coop, host_addr, False)])
# and the three endpoints
self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)])
self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)])
self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)])
# and create subchannel0
self.assertEqual(m_sc_m.mock_calls,
[mock.call(scid0, m, host_addr, peer_addr)])
# and tell it to start, and get wait-for-it-to-connect Deferred
self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc),
mock.call.set_listener_endpoint(lep),
mock.call.start(),
mock.call.when_first_connected(),
])
@ -89,22 +100,9 @@ class TestDilator(unittest.TestCase):
self.assertNoResult(d1)
self.assertNoResult(d2)
ce = mock.Mock()
m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint",
return_value=ce)
lep = object()
m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint",
return_value=lep)
with m_ce as m_ce_m, m_sle as m_sle_m:
wfc_d.callback(None)
eq.flush_sync()
self.assertEqual(m_ce_m.mock_calls, [mock.call(peer_addr)])
self.assertEqual(ce.mock_calls, [mock.call._subchannel_zero_opened(sc)])
self.assertEqual(m_sle_m.mock_calls, [mock.call(m, host_addr)])
self.assertEqual(m.mock_calls,
[mock.call.set_listener_endpoint(lep),
])
wfc_d.callback(None)
eq.flush_sync()
self.assertEqual(m.mock_calls, [])
clear_mock_calls(m)
eps = self.successResultOf(d1)
@ -181,16 +179,26 @@ class TestDilator(unittest.TestCase):
sc = mock.Mock()
m_sc = mock.patch("wormhole._dilation.manager.SubChannel",
return_value=sc)
peer_addr = object()
m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress",
return_value=peer_addr)
ce = mock.Mock()
m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint",
return_value=ce)
lep = object()
m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint",
return_value=lep)
with mock.patch("wormhole._dilation.manager.Manager",
return_value=m) as ml:
with mock.patch("wormhole._dilation.manager.make_side",
return_value="us"):
with m_sc:
with m_sca, m_sc, m_ce, m_sle:
dil.got_wormhole_versions({"can-dilate": ["1"]})
self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key",
None, reactor, eq, coop, host_addr, False)])
self.assertEqual(m.mock_calls, [mock.call.set_subchannel_zero(scid0, sc),
mock.call.set_listener_endpoint(lep),
mock.call.start(),
mock.call.rx_PLEASE(pleasemsg),
mock.call.rx_HINTS(hintmsg),
@ -204,21 +212,39 @@ class TestDilator(unittest.TestCase):
d1 = dil.dilate(transit_relay_location=relay)
self.assertNoResult(d1)
m = mock.Mock()
alsoProvides(m, IDilationManager)
m.when_first_connected.return_value = Deferred()
scid0 = 0
sc = mock.Mock()
m_sc = mock.patch("wormhole._dilation.manager.SubChannel",
return_value=sc)
peer_addr = object()
m_sca = mock.patch("wormhole._dilation.manager._SubchannelAddress",
return_value=peer_addr)
ce = mock.Mock()
m_ce = mock.patch("wormhole._dilation.manager.ControlEndpoint",
return_value=ce)
lep = object()
m_sle = mock.patch("wormhole._dilation.manager.SubchannelListenerEndpoint",
return_value=lep)
with mock.patch("wormhole._dilation.manager.Manager") as ml:
with mock.patch("wormhole._dilation.manager.Manager",
return_value=m) as ml:
with mock.patch("wormhole._dilation.manager.make_side",
return_value="us"):
with m_sc:
with m_sca, m_sc, m_ce, m_sle:
dil.got_wormhole_versions({"can-dilate": ["1"]})
self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key",
relay, reactor, eq, coop, host_addr, False),
mock.call().set_subchannel_zero(scid0, sc),
mock.call().start(),
mock.call().when_first_connected()])
self.assertEqual(ml.mock_calls, [
mock.call(send, "us", b"key",
relay, reactor, eq, coop, host_addr, False)])
self.assertEqual(m.mock_calls, [
mock.call.set_subchannel_zero(scid0, sc),
mock.call.set_listener_endpoint(lep),
mock.call.start(),
mock.call.when_first_connected()])
LEADER = "ff3456abcdef"