diff --git a/src/wormhole/_dilation/connector.py b/src/wormhole/_dilation/connector.py index 92888f9..545d9de 100644 --- a/src/wormhole/_dilation/connector.py +++ b/src/wormhole/_dilation/connector.py @@ -116,10 +116,9 @@ class Connector(object): pass # pragma: no cover # TODO: unify the tense of these method-name verbs - @m.input() - def listener_ready(self, hint_objs): - pass + # add_relay() and got_hints() are called by the Manager as it receives + # messages from our peer. stop() is called when the Manager shuts down @m.input() def add_relay(self, hint_objs): pass @@ -129,16 +128,25 @@ class Connector(object): pass @m.input() - def add_candidate(self, c): # called by DilatedConnectionProtocol + def stop(self): pass + # called by ourselves, when _start_listener() is ready + @m.input() + def listener_ready(self, hint_objs): + pass + + # called when DilatedConnectionProtocol submits itself, after KCM + # received + @m.input() + def add_candidate(self, c): + pass + + # called by ourselves, via consider() @m.input() def accept(self, c): pass - @m.input() - def stop(self): - pass @m.output() def use_hints(self, hint_objs): @@ -199,17 +207,12 @@ class Connector(object): [c.loseConnection() for c in self._pending_connections] return d - def stop_winner(self): - d = self._winner.when_disconnected() - self._winner.disconnect() - return d - def break_cycles(self): # help GC by forgetting references to things that reference us self._listeners.clear() self._pending_connectors.clear() self._pending_connections.clear() - self._winner = None + self._winning_connection = None connecting.upon(listener_ready, enter=connecting, outputs=[publish_hints]) connecting.upon(add_relay, enter=connecting, outputs=[use_hints, @@ -224,6 +227,8 @@ class Connector(object): connected.upon(listener_ready, enter=connected, outputs=[]) connected.upon(add_relay, enter=connected, outputs=[]) connected.upon(got_hints, enter=connected, outputs=[]) + # TODO: tell them to disconnect? will they hang out forever? I *think* + # they'll drop this once they get a KCM on the winning connection. connected.upon(add_candidate, enter=connected, outputs=[]) connected.upon(accept, enter=connected, outputs=[]) connected.upon(stop, enter=stopped, outputs=[stop_everything]) @@ -232,20 +237,23 @@ class Connector(object): # maybe add_candidate, accept def start(self): - self._start_listener() + if not self._no_listen and not self._tor: + addresses = self._get_listener_addresses() + self._start_listener(addresses) if self._transit_relays: self._publish_hints(self._transit_relays) self._use_hints(self._transit_relays) - def _start_listener(self): - if self._no_listen or self._tor: - return + def _get_listener_addresses(self): addresses = ipaddrs.find_addresses() non_loopback_addresses = [a for a in addresses if a != "127.0.0.1"] if non_loopback_addresses: # some test hosts, including the appveyor VMs, *only* have # 127.0.0.1, and the tests will hang badly if we remove it. addresses = non_loopback_addresses + return addresses + + def _start_listener(self, addresses): # TODO: listen on a fixed port, if possible, for NAT/p2p benefits, also # to make firewall configs easier # TODO: retain listening port between connection generations? @@ -263,6 +271,14 @@ class Connector(object): d.addCallback(_listening) d.addErrback(log.err) + def _schedule_connection(self, delay, h, is_relay): + ep = endpoint_from_hint_obj(h, self._tor, self._reactor) + desc = describe_hint_obj(h, is_relay, self._tor) + d = deferLater(self._reactor, delay, + self._connect, ep, desc, is_relay) + d.addErrback(log.err) + self._pending_connectors.add(d) + def _use_hints(self, hints): # first, pull out all the relays, we'll connect to them later relays = [] @@ -279,12 +295,7 @@ class Connector(object): for h in direct[p]: if isinstance(h, TorTCPV1Hint) and not self._tor: continue - ep = endpoint_from_hint_obj(h, self._tor, self._reactor) - desc = describe_hint_obj(h, False, self._tor) - d = deferLater(self._reactor, delay, - self._connect, ep, desc, is_relay=False) - d.addErrback(log.err) - self._pending_connectors.add(d) + self._schedule_connection(delay, h, is_relay=False) made_direct = True # Make all direct connections immediately. Later, we'll change # the add_candidate() function to look at the priority when @@ -314,12 +325,7 @@ class Connector(object): # quickly or hang for a long time. for r in relays: for h in r.hints: - ep = endpoint_from_hint_obj(h, self._tor, self._reactor) - desc = describe_hint_obj(h, True, self._tor) - d = deferLater(self._reactor, delay, - self._connect, ep, desc, is_relay=True) - d.addErrback(log.err) - self._pending_connectors.add(d) + self._schedule_connection(delay, h, is_relay=True) # TODO: # if not contenders: # raise TransitError("No contenders for connection") diff --git a/src/wormhole/test/dilate/test_connector.py b/src/wormhole/test/dilate/test_connector.py index 8d05f65..64291e5 100644 --- a/src/wormhole/test/dilate/test_connector.py +++ b/src/wormhole/test/dilate/test_connector.py @@ -5,20 +5,19 @@ from zope.interface import alsoProvides from twisted.trial import unittest from twisted.internet.task import Clock from twisted.internet.defer import Deferred -#from twisted.internet import endpoints from ...eventual import EventualQueue from ..._interfaces import IDilationManager, IDilationConnector from ..._dilation import roles from ..._hints import DirectTCPV1Hint, RelayV1Hint, TorTCPV1Hint -from ..._dilation.connector import (#describe_hint_obj, parse_hint_argv, - #parse_tcp_v1_hint, parse_hint, encode_hint, - Connector, - build_sided_relay_handshake, - build_noise, - OutboundConnectionFactory, - InboundConnectionFactory, - PROLOGUE_LEADER, PROLOGUE_FOLLOWER, - ) +from ..._dilation.connection import KCM +from ..._dilation.connector import (Connector, + build_sided_relay_handshake, + build_noise, + OutboundConnectionFactory, + InboundConnectionFactory, + PROLOGUE_LEADER, PROLOGUE_FOLLOWER, + ) +from .common import clear_mock_calls class Handshake(unittest.TestCase): def test_build(self): @@ -149,67 +148,125 @@ class TestConnector(unittest.TestCase): c.stop() # we stop while we're connecting, so no connections must be stopped - def test_basic(self): + def test_empty(self): c, h = make_connector(listen=False, relay=None, role=roles.LEADER) + c._schedule_connection = mock.Mock() c.start() # no relays, so it publishes no hints self.assertEqual(h.manager.mock_calls, []) # and no listener, so nothing happens until we provide a hint + self.assertEqual(c._schedule_connection.mock_calls, []) + c.stop() - ep0 = mock.Mock() - ep0_connect_d = Deferred() - ep0.connect = mock.Mock(return_value=ep0_connect_d) - efho = mock.Mock(side_effect=[ep0]) - hint0 = DirectTCPV1Hint("foo", 55, 0.0) - dho = mock.Mock(side_effect=["desc0"]) + def test_basic(self): + c, h = make_connector(listen=False, relay=None, role=roles.LEADER) + c._schedule_connection = mock.Mock() + c.start() + # no relays, so it publishes no hints + self.assertEqual(h.manager.mock_calls, []) + # and no listener, so nothing happens until we provide a hint + self.assertEqual(c._schedule_connection.mock_calls, []) + + hint = DirectTCPV1Hint("foo", 55, 0.0) + c.got_hints([hint]) + + # received hints don't get published + self.assertEqual(h.manager.mock_calls, []) + # they just schedule a connection + self.assertEqual(c._schedule_connection.mock_calls, + [mock.call(0.0, DirectTCPV1Hint("foo", 55, 0.0), + is_relay=False)]) + + def test_listen_addresses(self): + c, h = make_connector(listen=True, role=roles.LEADER) + with mock.patch("wormhole.ipaddrs.find_addresses", + return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]): + self.assertEqual(c._get_listener_addresses(), + ["1.2.3.4", "5.6.7.8"]) + with mock.patch("wormhole.ipaddrs.find_addresses", + return_value=["127.0.0.1"]): + # some test hosts, including the appveyor VMs, *only* have + # 127.0.0.1, and the tests will hang badly if we remove it. + self.assertEqual(c._get_listener_addresses(), ["127.0.0.1"]) + + def test_listen(self): + c, h = make_connector(listen=True, role=roles.LEADER) + c._start_listener = mock.Mock() + with mock.patch("wormhole.ipaddrs.find_addresses", + return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]): + c.start() + self.assertEqual(c._start_listener.mock_calls, + [mock.call(["1.2.3.4", "5.6.7.8"])]) + + def test_start_listen(self): + c, h = make_connector(listen=True, role=roles.LEADER) + ep = mock.Mock() + d = Deferred() + ep.listen = mock.Mock(return_value=d) + with mock.patch("wormhole._dilation.connector.serverFromString", + return_value=ep) as sfs: + c._start_listener(["1.2.3.4", "5.6.7.8"]) + self.assertEqual(sfs.mock_calls, [mock.call(h.reactor, "tcp:0")]) + lp = mock.Mock() + host = mock.Mock() + host.port = 66 + lp.getHost = mock.Mock(return_value=host) + d.callback(lp) + self.assertEqual(h.manager.mock_calls, + [mock.call.send_hints([{"type": "direct-tcp-v1", + "hostname": "1.2.3.4", + "port": 66, + "priority": 0.0 + }, + {"type": "direct-tcp-v1", + "hostname": "5.6.7.8", + "port": 66, + "priority": 0.0 + }, + ])]) + + def test_schedule_connection_no_relay(self): + c, h = make_connector(listen=True, role=roles.LEADER) + hint = DirectTCPV1Hint("foo", 55, 0.0) + ep = mock.Mock() with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj", - efho): - with mock.patch("wormhole._dilation.connector.describe_hint_obj", dho): - c.got_hints([hint0]) - self.assertEqual(efho.mock_calls, [mock.call(hint0, h.tor, h.reactor)]) - self.assertEqual(dho.mock_calls, [mock.call(hint0, False, h.tor)]) - f0 = mock.Mock() + side_effect=[ep]) as efho: + c._schedule_connection(0.0, hint, False) + self.assertEqual(efho.mock_calls, [mock.call(hint, h.tor, h.reactor)]) + self.assertEqual(ep.mock_calls, []) + d = Deferred() + ep.connect = mock.Mock(side_effect=[d]) + # direct hints are scheduled for T+0.0 + f = mock.Mock() with mock.patch("wormhole._dilation.connector.OutboundConnectionFactory", - return_value=f0) as ocf: - h.clock.advance(c.RELAY_DELAY / 2 + 0.01) + return_value=f) as ocf: + h.clock.advance(1.0) self.assertEqual(ocf.mock_calls, [mock.call(c, None)]) - self.assertEqual(ep0.connect.mock_calls, [mock.call(f0)]) - + self.assertEqual(ep.connect.mock_calls, [mock.call(f)]) p = mock.Mock() - ep0_connect_d.callback(p) + d.callback(p) self.assertEqual(p.mock_calls, [mock.call.when_disconnected(), mock.call.when_disconnected().addCallback(c._pending_connections.discard)]) - def test_listen(self): + def test_schedule_connection_relay(self): c, h = make_connector(listen=True, role=roles.LEADER) - d = Deferred() + hint = DirectTCPV1Hint("foo", 55, 0.0) ep = mock.Mock() - ep.listen = mock.Mock(return_value=d) + with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj", + side_effect=[ep]) as efho: + c._schedule_connection(0.0, hint, True) + self.assertEqual(efho.mock_calls, [mock.call(hint, h.tor, h.reactor)]) + self.assertEqual(ep.mock_calls, []) + d = Deferred() + ep.connect = mock.Mock(side_effect=[d]) + # direct hints are scheduled for T+0.0 f = mock.Mock() - with mock.patch("wormhole.ipaddrs.find_addresses", - return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]): - with mock.patch("wormhole._dilation.connector.serverFromString", - side_effect=[ep]): - with mock.patch("wormhole._dilation.connector.InboundConnectionFactory", - return_value=f): - c.start() - # no relays and the listener isn't ready yet, so no hints yet - self.assertEqual(h.manager.mock_calls, []) - # but a listener was started - self.assertEqual(ep.mock_calls, [mock.call.listen(f)]) - lp = mock.Mock() - host = mock.Mock() - host.port = 2345 - lp.getHost = mock.Mock(return_value=host) - d.callback(lp) - self.assertEqual(h.manager.mock_calls, - [mock.call.send_hints( - [{"type": "direct-tcp-v1", "hostname": "1.2.3.4", - "port": 2345, "priority": 0.0}, - {"type": "direct-tcp-v1", "hostname": "5.6.7.8", - "port": 2345, "priority": 0.0}, - ])]) + with mock.patch("wormhole._dilation.connector.OutboundConnectionFactory", + return_value=f) as ocf: + h.clock.advance(1.0) + handshake = build_sided_relay_handshake(h.dilation_key, h.side) + self.assertEqual(ocf.mock_calls, [mock.call(c, handshake)]) def test_listen_but_tor(self): c, h = make_connector(listen=True, tor=True, role=roles.LEADER) @@ -221,67 +278,36 @@ class TestConnector(unittest.TestCase): # no relays and the listener isn't ready yet, so no hints yet self.assertEqual(h.manager.mock_calls, []) - def test_listen_only_loopback(self): - # some test hosts, including the appveyor VMs, *only* have - # 127.0.0.1, and the tests will hang badly if we remove it. - c, h = make_connector(listen=True, role=roles.LEADER) - d = Deferred() - ep = mock.Mock() - ep.listen = mock.Mock(return_value=d) - f = mock.Mock() - with mock.patch("wormhole.ipaddrs.find_addresses", return_value=["127.0.0.1"]): - with mock.patch("wormhole._dilation.connector.serverFromString", - side_effect=[ep]): - with mock.patch("wormhole._dilation.connector.InboundConnectionFactory", - return_value=f): - c.start() - # no relays and the listener isn't ready yet, so no hints yet + def test_no_listen(self): + c, h = make_connector(listen=False, tor=False, role=roles.LEADER) + with mock.patch("wormhole.ipaddrs.find_addresses", + return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]) as fa: + c.start() + # don't even look up addresses + self.assertEqual(fa.mock_calls, []) self.assertEqual(h.manager.mock_calls, []) - # but a listener was started - self.assertEqual(ep.mock_calls, [mock.call.listen(f)]) - lp = mock.Mock() - host = mock.Mock() - host.port = 2345 - lp.getHost = mock.Mock(return_value=host) - d.callback(lp) - self.assertEqual(h.manager.mock_calls, - [mock.call.send_hints( - [{"type": "direct-tcp-v1", "hostname": "127.0.0.1", - "port": 2345, "priority": 0.0}, - ])]) - def OFFtest_relay_delay(self): + def test_relay_delay(self): # given a direct connection and a relay, we should see the direct # connection initiated at T+0 seconds, and the relay at T+RELAY_DELAY - c, h = make_connector(listen=False, relay="tcp:foo:55", role=roles.LEADER) + c, h = make_connector(listen=True, relay=None, role=roles.LEADER) + c._schedule_connection = mock.Mock() + c._start_listener = mock.Mock() c.start() hint1 = DirectTCPV1Hint("foo", 55, 0.0) hint2 = DirectTCPV1Hint("bar", 55, 0.0) hint3 = RelayV1Hint([DirectTCPV1Hint("relay", 55, 0.0)]) - ep1, ep2, ep3 = mock.Mock(), mock.Mock(), mock.Mock() - with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj", - side_effect=[ep1, ep2, ep3]): - c.got_hints([hint1, hint2, hint3]) - self.assertEqual(ep1.mock_calls, []) - self.assertEqual(ep2.mock_calls, []) - self.assertEqual(ep3.mock_calls, []) - - h.clock.advance(c.RELAY_DELAY / 2 + 0.01) - self.assertEqual(len(ep1.mock_calls), 2) - self.assertEqual(len(ep2.mock_calls), 2) - self.assertEqual(ep3.mock_calls, []) - - h.clock.advance(c.RELAY_DELAY) - self.assertEqual(len(ep1.mock_calls), 2) - self.assertEqual(len(ep2.mock_calls), 2) - self.assertEqual(len(ep3.mock_calls), 2) + c.got_hints([hint1, hint2, hint3]) + self.assertEqual(c._schedule_connection.mock_calls, + [mock.call(0.0, hint1, is_relay=False), + mock.call(0.0, hint2, is_relay=False), + mock.call(c.RELAY_DELAY, hint3.hints[0], is_relay=True), + ]) def test_initial_relay(self): c, h = make_connector(listen=False, relay="tcp:foo:55", role=roles.LEADER) - ep = mock.Mock() - with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj", - side_effect=[ep]) as efho: - c.start() + c._schedule_connection = mock.Mock() + c.start() self.assertEqual(h.manager.mock_calls, [mock.call.send_hints([{"type": "relay-v1", "hints": [ @@ -292,38 +318,143 @@ class TestConnector(unittest.TestCase): }, ], }])]) - self.assertEqual(len(efho.mock_calls), 1) + self.assertEqual(c._schedule_connection.mock_calls, + [mock.call(0.0, DirectTCPV1Hint("foo", 55, 0.0), + is_relay=True)]) + + def test_add_relay(self): + c, h = make_connector(listen=False, relay=None, role=roles.LEADER) + c._schedule_connection = mock.Mock() + c.start() + self.assertEqual(h.manager.mock_calls, []) + self.assertEqual(c._schedule_connection.mock_calls, []) + hint = RelayV1Hint([DirectTCPV1Hint("foo", 55, 0.0)]) + c.add_relay([hint]) + self.assertEqual(h.manager.mock_calls, + [mock.call.send_hints([{"type": "relay-v1", + "hints": [ + {"type": "direct-tcp-v1", + "hostname": "foo", + "port": 55, + "priority": 0.0 + }, + ], + }])]) + self.assertEqual(c._schedule_connection.mock_calls, + [mock.call(0.0, DirectTCPV1Hint("foo", 55, 0.0), + is_relay=True)]) def test_tor_no_manager(self): # tor hints should be ignored if we don't have a Tor manager to use them c, h = make_connector(listen=False, role=roles.LEADER) + c._schedule_connection = mock.Mock() c.start() hint = TorTCPV1Hint("foo", 55, 0.0) - ep = mock.Mock() - with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj", - side_effect=[ep]): - c.got_hints([hint]) - self.assertEqual(ep.mock_calls, []) - - h.clock.advance(c.RELAY_DELAY * 2) - self.assertEqual(ep.mock_calls, []) + c.got_hints([hint]) + self.assertEqual(h.manager.mock_calls, []) + self.assertEqual(c._schedule_connection.mock_calls, []) def test_tor_with_manager(self): # tor hints should be processed if we do have a Tor manager c, h = make_connector(listen=False, tor=True, role=roles.LEADER) + c._schedule_connection = mock.Mock() c.start() hint = TorTCPV1Hint("foo", 55, 0.0) - ep = mock.Mock() - with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj", - side_effect=[ep]): - c.got_hints([hint]) - self.assertEqual(ep.mock_calls, []) - - h.clock.advance(c.RELAY_DELAY * 2) - self.assertEqual(len(ep.mock_calls), 2) - + c.got_hints([hint]) + self.assertEqual(c._schedule_connection.mock_calls, + [mock.call(0.0, hint, is_relay=False)]) def test_priorities(self): # given two hints with different priorities, we should somehow prefer # one. This is a placeholder to fill in once we implement priorities. pass + + +class Race(unittest.TestCase): + def test_one_leader(self): + c, h = make_connector(listen=True, role=roles.LEADER) + lp = mock.Mock() + def start_listener(addresses): + c._listeners.add(lp) + c._start_listener = start_listener + c._schedule_connection = mock.Mock() + c.start() + self.assertEqual(c._listeners, set([lp])) + + p1 = mock.Mock() # DilatedConnectionProtocol instance + c.add_candidate(p1) + self.assertEqual(h.manager.mock_calls, []) + h.eq.flush_sync() + self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(p1.mock_calls, + [mock.call.select(h.manager), + mock.call.send_record(KCM())]) + self.assertEqual(lp.mock_calls[0], mock.call.stopListening()) + # stop_listeners() uses a DeferredList, so we ignore the second call + + def test_one_follower(self): + c, h = make_connector(listen=True, role=roles.FOLLOWER) + lp = mock.Mock() + def start_listener(addresses): + c._listeners.add(lp) + c._start_listener = start_listener + c._schedule_connection = mock.Mock() + c.start() + self.assertEqual(c._listeners, set([lp])) + + p1 = mock.Mock() # DilatedConnectionProtocol instance + c.add_candidate(p1) + self.assertEqual(h.manager.mock_calls, []) + h.eq.flush_sync() + self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + # just like LEADER, but follower doesn't send KCM now (it sent one + # earlier, to tell the leader that this connection looks viable) + self.assertEqual(p1.mock_calls, + [mock.call.select(h.manager)]) + self.assertEqual(lp.mock_calls[0], mock.call.stopListening()) + # stop_listeners() uses a DeferredList, so we ignore the second call + + # TODO: make sure a pending connection is abandoned when the listener + # answers successfully + + # TODO: make sure a second pending connection is abandoned when the first + # connection succeeds + + def test_late(self): + c, h = make_connector(listen=False, role=roles.LEADER) + c._schedule_connection = mock.Mock() + c.start() + + p1 = mock.Mock() # DilatedConnectionProtocol instance + c.add_candidate(p1) + self.assertEqual(h.manager.mock_calls, []) + h.eq.flush_sync() + self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + clear_mock_calls(h.manager) + self.assertEqual(p1.mock_calls, + [mock.call.select(h.manager), + mock.call.send_record(KCM())]) + + # late connection is ignored + p2 = mock.Mock() + c.add_candidate(p2) + self.assertEqual(h.manager.mock_calls, []) + + + # make sure an established connection is dropped when stop() is called + def test_stop(self): + c, h = make_connector(listen=False, role=roles.LEADER) + c._schedule_connection = mock.Mock() + c.start() + + p1 = mock.Mock() # DilatedConnectionProtocol instance + c.add_candidate(p1) + self.assertEqual(h.manager.mock_calls, []) + h.eq.flush_sync() + self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(p1.mock_calls, + [mock.call.select(h.manager), + mock.call.send_record(KCM())]) + + c.stop() +