finish test coverage/fixes for connector.py

This commit is contained in:
Brian Warner 2018-12-23 15:01:16 -05:00
parent 6ad6f8f40f
commit a458fe9ab9
2 changed files with 288 additions and 151 deletions

View File

@ -116,10 +116,9 @@ class Connector(object):
pass # pragma: no cover
# TODO: unify the tense of these method-name verbs
@m.input()
def listener_ready(self, hint_objs):
pass
# add_relay() and got_hints() are called by the Manager as it receives
# messages from our peer. stop() is called when the Manager shuts down
@m.input()
def add_relay(self, hint_objs):
pass
@ -129,16 +128,25 @@ class Connector(object):
pass
@m.input()
def add_candidate(self, c): # called by DilatedConnectionProtocol
def stop(self):
pass
# called by ourselves, when _start_listener() is ready
@m.input()
def listener_ready(self, hint_objs):
pass
# called when DilatedConnectionProtocol submits itself, after KCM
# received
@m.input()
def add_candidate(self, c):
pass
# called by ourselves, via consider()
@m.input()
def accept(self, c):
pass
@m.input()
def stop(self):
pass
@m.output()
def use_hints(self, hint_objs):
@ -199,17 +207,12 @@ class Connector(object):
[c.loseConnection() for c in self._pending_connections]
return d
def stop_winner(self):
d = self._winner.when_disconnected()
self._winner.disconnect()
return d
def break_cycles(self):
# help GC by forgetting references to things that reference us
self._listeners.clear()
self._pending_connectors.clear()
self._pending_connections.clear()
self._winner = None
self._winning_connection = None
connecting.upon(listener_ready, enter=connecting, outputs=[publish_hints])
connecting.upon(add_relay, enter=connecting, outputs=[use_hints,
@ -224,6 +227,8 @@ class Connector(object):
connected.upon(listener_ready, enter=connected, outputs=[])
connected.upon(add_relay, enter=connected, outputs=[])
connected.upon(got_hints, enter=connected, outputs=[])
# TODO: tell them to disconnect? will they hang out forever? I *think*
# they'll drop this once they get a KCM on the winning connection.
connected.upon(add_candidate, enter=connected, outputs=[])
connected.upon(accept, enter=connected, outputs=[])
connected.upon(stop, enter=stopped, outputs=[stop_everything])
@ -232,20 +237,23 @@ class Connector(object):
# maybe add_candidate, accept
def start(self):
self._start_listener()
if not self._no_listen and not self._tor:
addresses = self._get_listener_addresses()
self._start_listener(addresses)
if self._transit_relays:
self._publish_hints(self._transit_relays)
self._use_hints(self._transit_relays)
def _start_listener(self):
if self._no_listen or self._tor:
return
def _get_listener_addresses(self):
addresses = ipaddrs.find_addresses()
non_loopback_addresses = [a for a in addresses if a != "127.0.0.1"]
if non_loopback_addresses:
# some test hosts, including the appveyor VMs, *only* have
# 127.0.0.1, and the tests will hang badly if we remove it.
addresses = non_loopback_addresses
return addresses
def _start_listener(self, addresses):
# TODO: listen on a fixed port, if possible, for NAT/p2p benefits, also
# to make firewall configs easier
# TODO: retain listening port between connection generations?
@ -263,6 +271,14 @@ class Connector(object):
d.addCallback(_listening)
d.addErrback(log.err)
def _schedule_connection(self, delay, h, is_relay):
ep = endpoint_from_hint_obj(h, self._tor, self._reactor)
desc = describe_hint_obj(h, is_relay, self._tor)
d = deferLater(self._reactor, delay,
self._connect, ep, desc, is_relay)
d.addErrback(log.err)
self._pending_connectors.add(d)
def _use_hints(self, hints):
# first, pull out all the relays, we'll connect to them later
relays = []
@ -279,12 +295,7 @@ class Connector(object):
for h in direct[p]:
if isinstance(h, TorTCPV1Hint) and not self._tor:
continue
ep = endpoint_from_hint_obj(h, self._tor, self._reactor)
desc = describe_hint_obj(h, False, self._tor)
d = deferLater(self._reactor, delay,
self._connect, ep, desc, is_relay=False)
d.addErrback(log.err)
self._pending_connectors.add(d)
self._schedule_connection(delay, h, is_relay=False)
made_direct = True
# Make all direct connections immediately. Later, we'll change
# the add_candidate() function to look at the priority when
@ -314,12 +325,7 @@ class Connector(object):
# quickly or hang for a long time.
for r in relays:
for h in r.hints:
ep = endpoint_from_hint_obj(h, self._tor, self._reactor)
desc = describe_hint_obj(h, True, self._tor)
d = deferLater(self._reactor, delay,
self._connect, ep, desc, is_relay=True)
d.addErrback(log.err)
self._pending_connectors.add(d)
self._schedule_connection(delay, h, is_relay=True)
# TODO:
# if not contenders:
# raise TransitError("No contenders for connection")

View File

@ -5,20 +5,19 @@ from zope.interface import alsoProvides
from twisted.trial import unittest
from twisted.internet.task import Clock
from twisted.internet.defer import Deferred
#from twisted.internet import endpoints
from ...eventual import EventualQueue
from ..._interfaces import IDilationManager, IDilationConnector
from ..._dilation import roles
from ..._hints import DirectTCPV1Hint, RelayV1Hint, TorTCPV1Hint
from ..._dilation.connector import (#describe_hint_obj, parse_hint_argv,
#parse_tcp_v1_hint, parse_hint, encode_hint,
Connector,
build_sided_relay_handshake,
build_noise,
OutboundConnectionFactory,
InboundConnectionFactory,
PROLOGUE_LEADER, PROLOGUE_FOLLOWER,
)
from ..._dilation.connection import KCM
from ..._dilation.connector import (Connector,
build_sided_relay_handshake,
build_noise,
OutboundConnectionFactory,
InboundConnectionFactory,
PROLOGUE_LEADER, PROLOGUE_FOLLOWER,
)
from .common import clear_mock_calls
class Handshake(unittest.TestCase):
def test_build(self):
@ -149,67 +148,125 @@ class TestConnector(unittest.TestCase):
c.stop()
# we stop while we're connecting, so no connections must be stopped
def test_basic(self):
def test_empty(self):
c, h = make_connector(listen=False, relay=None, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
# no relays, so it publishes no hints
self.assertEqual(h.manager.mock_calls, [])
# and no listener, so nothing happens until we provide a hint
self.assertEqual(c._schedule_connection.mock_calls, [])
c.stop()
ep0 = mock.Mock()
ep0_connect_d = Deferred()
ep0.connect = mock.Mock(return_value=ep0_connect_d)
efho = mock.Mock(side_effect=[ep0])
hint0 = DirectTCPV1Hint("foo", 55, 0.0)
dho = mock.Mock(side_effect=["desc0"])
def test_basic(self):
c, h = make_connector(listen=False, relay=None, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
# no relays, so it publishes no hints
self.assertEqual(h.manager.mock_calls, [])
# and no listener, so nothing happens until we provide a hint
self.assertEqual(c._schedule_connection.mock_calls, [])
hint = DirectTCPV1Hint("foo", 55, 0.0)
c.got_hints([hint])
# received hints don't get published
self.assertEqual(h.manager.mock_calls, [])
# they just schedule a connection
self.assertEqual(c._schedule_connection.mock_calls,
[mock.call(0.0, DirectTCPV1Hint("foo", 55, 0.0),
is_relay=False)])
def test_listen_addresses(self):
c, h = make_connector(listen=True, role=roles.LEADER)
with mock.patch("wormhole.ipaddrs.find_addresses",
return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]):
self.assertEqual(c._get_listener_addresses(),
["1.2.3.4", "5.6.7.8"])
with mock.patch("wormhole.ipaddrs.find_addresses",
return_value=["127.0.0.1"]):
# some test hosts, including the appveyor VMs, *only* have
# 127.0.0.1, and the tests will hang badly if we remove it.
self.assertEqual(c._get_listener_addresses(), ["127.0.0.1"])
def test_listen(self):
c, h = make_connector(listen=True, role=roles.LEADER)
c._start_listener = mock.Mock()
with mock.patch("wormhole.ipaddrs.find_addresses",
return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]):
c.start()
self.assertEqual(c._start_listener.mock_calls,
[mock.call(["1.2.3.4", "5.6.7.8"])])
def test_start_listen(self):
c, h = make_connector(listen=True, role=roles.LEADER)
ep = mock.Mock()
d = Deferred()
ep.listen = mock.Mock(return_value=d)
with mock.patch("wormhole._dilation.connector.serverFromString",
return_value=ep) as sfs:
c._start_listener(["1.2.3.4", "5.6.7.8"])
self.assertEqual(sfs.mock_calls, [mock.call(h.reactor, "tcp:0")])
lp = mock.Mock()
host = mock.Mock()
host.port = 66
lp.getHost = mock.Mock(return_value=host)
d.callback(lp)
self.assertEqual(h.manager.mock_calls,
[mock.call.send_hints([{"type": "direct-tcp-v1",
"hostname": "1.2.3.4",
"port": 66,
"priority": 0.0
},
{"type": "direct-tcp-v1",
"hostname": "5.6.7.8",
"port": 66,
"priority": 0.0
},
])])
def test_schedule_connection_no_relay(self):
c, h = make_connector(listen=True, role=roles.LEADER)
hint = DirectTCPV1Hint("foo", 55, 0.0)
ep = mock.Mock()
with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj",
efho):
with mock.patch("wormhole._dilation.connector.describe_hint_obj", dho):
c.got_hints([hint0])
self.assertEqual(efho.mock_calls, [mock.call(hint0, h.tor, h.reactor)])
self.assertEqual(dho.mock_calls, [mock.call(hint0, False, h.tor)])
f0 = mock.Mock()
side_effect=[ep]) as efho:
c._schedule_connection(0.0, hint, False)
self.assertEqual(efho.mock_calls, [mock.call(hint, h.tor, h.reactor)])
self.assertEqual(ep.mock_calls, [])
d = Deferred()
ep.connect = mock.Mock(side_effect=[d])
# direct hints are scheduled for T+0.0
f = mock.Mock()
with mock.patch("wormhole._dilation.connector.OutboundConnectionFactory",
return_value=f0) as ocf:
h.clock.advance(c.RELAY_DELAY / 2 + 0.01)
return_value=f) as ocf:
h.clock.advance(1.0)
self.assertEqual(ocf.mock_calls, [mock.call(c, None)])
self.assertEqual(ep0.connect.mock_calls, [mock.call(f0)])
self.assertEqual(ep.connect.mock_calls, [mock.call(f)])
p = mock.Mock()
ep0_connect_d.callback(p)
d.callback(p)
self.assertEqual(p.mock_calls,
[mock.call.when_disconnected(),
mock.call.when_disconnected().addCallback(c._pending_connections.discard)])
def test_listen(self):
def test_schedule_connection_relay(self):
c, h = make_connector(listen=True, role=roles.LEADER)
d = Deferred()
hint = DirectTCPV1Hint("foo", 55, 0.0)
ep = mock.Mock()
ep.listen = mock.Mock(return_value=d)
with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj",
side_effect=[ep]) as efho:
c._schedule_connection(0.0, hint, True)
self.assertEqual(efho.mock_calls, [mock.call(hint, h.tor, h.reactor)])
self.assertEqual(ep.mock_calls, [])
d = Deferred()
ep.connect = mock.Mock(side_effect=[d])
# direct hints are scheduled for T+0.0
f = mock.Mock()
with mock.patch("wormhole.ipaddrs.find_addresses",
return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]):
with mock.patch("wormhole._dilation.connector.serverFromString",
side_effect=[ep]):
with mock.patch("wormhole._dilation.connector.InboundConnectionFactory",
return_value=f):
c.start()
# no relays and the listener isn't ready yet, so no hints yet
self.assertEqual(h.manager.mock_calls, [])
# but a listener was started
self.assertEqual(ep.mock_calls, [mock.call.listen(f)])
lp = mock.Mock()
host = mock.Mock()
host.port = 2345
lp.getHost = mock.Mock(return_value=host)
d.callback(lp)
self.assertEqual(h.manager.mock_calls,
[mock.call.send_hints(
[{"type": "direct-tcp-v1", "hostname": "1.2.3.4",
"port": 2345, "priority": 0.0},
{"type": "direct-tcp-v1", "hostname": "5.6.7.8",
"port": 2345, "priority": 0.0},
])])
with mock.patch("wormhole._dilation.connector.OutboundConnectionFactory",
return_value=f) as ocf:
h.clock.advance(1.0)
handshake = build_sided_relay_handshake(h.dilation_key, h.side)
self.assertEqual(ocf.mock_calls, [mock.call(c, handshake)])
def test_listen_but_tor(self):
c, h = make_connector(listen=True, tor=True, role=roles.LEADER)
@ -221,67 +278,36 @@ class TestConnector(unittest.TestCase):
# no relays and the listener isn't ready yet, so no hints yet
self.assertEqual(h.manager.mock_calls, [])
def test_listen_only_loopback(self):
# some test hosts, including the appveyor VMs, *only* have
# 127.0.0.1, and the tests will hang badly if we remove it.
c, h = make_connector(listen=True, role=roles.LEADER)
d = Deferred()
ep = mock.Mock()
ep.listen = mock.Mock(return_value=d)
f = mock.Mock()
with mock.patch("wormhole.ipaddrs.find_addresses", return_value=["127.0.0.1"]):
with mock.patch("wormhole._dilation.connector.serverFromString",
side_effect=[ep]):
with mock.patch("wormhole._dilation.connector.InboundConnectionFactory",
return_value=f):
c.start()
# no relays and the listener isn't ready yet, so no hints yet
def test_no_listen(self):
c, h = make_connector(listen=False, tor=False, role=roles.LEADER)
with mock.patch("wormhole.ipaddrs.find_addresses",
return_value=["127.0.0.1", "1.2.3.4", "5.6.7.8"]) as fa:
c.start()
# don't even look up addresses
self.assertEqual(fa.mock_calls, [])
self.assertEqual(h.manager.mock_calls, [])
# but a listener was started
self.assertEqual(ep.mock_calls, [mock.call.listen(f)])
lp = mock.Mock()
host = mock.Mock()
host.port = 2345
lp.getHost = mock.Mock(return_value=host)
d.callback(lp)
self.assertEqual(h.manager.mock_calls,
[mock.call.send_hints(
[{"type": "direct-tcp-v1", "hostname": "127.0.0.1",
"port": 2345, "priority": 0.0},
])])
def OFFtest_relay_delay(self):
def test_relay_delay(self):
# given a direct connection and a relay, we should see the direct
# connection initiated at T+0 seconds, and the relay at T+RELAY_DELAY
c, h = make_connector(listen=False, relay="tcp:foo:55", role=roles.LEADER)
c, h = make_connector(listen=True, relay=None, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c._start_listener = mock.Mock()
c.start()
hint1 = DirectTCPV1Hint("foo", 55, 0.0)
hint2 = DirectTCPV1Hint("bar", 55, 0.0)
hint3 = RelayV1Hint([DirectTCPV1Hint("relay", 55, 0.0)])
ep1, ep2, ep3 = mock.Mock(), mock.Mock(), mock.Mock()
with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj",
side_effect=[ep1, ep2, ep3]):
c.got_hints([hint1, hint2, hint3])
self.assertEqual(ep1.mock_calls, [])
self.assertEqual(ep2.mock_calls, [])
self.assertEqual(ep3.mock_calls, [])
h.clock.advance(c.RELAY_DELAY / 2 + 0.01)
self.assertEqual(len(ep1.mock_calls), 2)
self.assertEqual(len(ep2.mock_calls), 2)
self.assertEqual(ep3.mock_calls, [])
h.clock.advance(c.RELAY_DELAY)
self.assertEqual(len(ep1.mock_calls), 2)
self.assertEqual(len(ep2.mock_calls), 2)
self.assertEqual(len(ep3.mock_calls), 2)
c.got_hints([hint1, hint2, hint3])
self.assertEqual(c._schedule_connection.mock_calls,
[mock.call(0.0, hint1, is_relay=False),
mock.call(0.0, hint2, is_relay=False),
mock.call(c.RELAY_DELAY, hint3.hints[0], is_relay=True),
])
def test_initial_relay(self):
c, h = make_connector(listen=False, relay="tcp:foo:55", role=roles.LEADER)
ep = mock.Mock()
with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj",
side_effect=[ep]) as efho:
c.start()
c._schedule_connection = mock.Mock()
c.start()
self.assertEqual(h.manager.mock_calls,
[mock.call.send_hints([{"type": "relay-v1",
"hints": [
@ -292,38 +318,143 @@ class TestConnector(unittest.TestCase):
},
],
}])])
self.assertEqual(len(efho.mock_calls), 1)
self.assertEqual(c._schedule_connection.mock_calls,
[mock.call(0.0, DirectTCPV1Hint("foo", 55, 0.0),
is_relay=True)])
def test_add_relay(self):
c, h = make_connector(listen=False, relay=None, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
self.assertEqual(h.manager.mock_calls, [])
self.assertEqual(c._schedule_connection.mock_calls, [])
hint = RelayV1Hint([DirectTCPV1Hint("foo", 55, 0.0)])
c.add_relay([hint])
self.assertEqual(h.manager.mock_calls,
[mock.call.send_hints([{"type": "relay-v1",
"hints": [
{"type": "direct-tcp-v1",
"hostname": "foo",
"port": 55,
"priority": 0.0
},
],
}])])
self.assertEqual(c._schedule_connection.mock_calls,
[mock.call(0.0, DirectTCPV1Hint("foo", 55, 0.0),
is_relay=True)])
def test_tor_no_manager(self):
# tor hints should be ignored if we don't have a Tor manager to use them
c, h = make_connector(listen=False, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
hint = TorTCPV1Hint("foo", 55, 0.0)
ep = mock.Mock()
with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj",
side_effect=[ep]):
c.got_hints([hint])
self.assertEqual(ep.mock_calls, [])
h.clock.advance(c.RELAY_DELAY * 2)
self.assertEqual(ep.mock_calls, [])
c.got_hints([hint])
self.assertEqual(h.manager.mock_calls, [])
self.assertEqual(c._schedule_connection.mock_calls, [])
def test_tor_with_manager(self):
# tor hints should be processed if we do have a Tor manager
c, h = make_connector(listen=False, tor=True, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
hint = TorTCPV1Hint("foo", 55, 0.0)
ep = mock.Mock()
with mock.patch("wormhole._dilation.connector.endpoint_from_hint_obj",
side_effect=[ep]):
c.got_hints([hint])
self.assertEqual(ep.mock_calls, [])
h.clock.advance(c.RELAY_DELAY * 2)
self.assertEqual(len(ep.mock_calls), 2)
c.got_hints([hint])
self.assertEqual(c._schedule_connection.mock_calls,
[mock.call(0.0, hint, is_relay=False)])
def test_priorities(self):
# given two hints with different priorities, we should somehow prefer
# one. This is a placeholder to fill in once we implement priorities.
pass
class Race(unittest.TestCase):
def test_one_leader(self):
c, h = make_connector(listen=True, role=roles.LEADER)
lp = mock.Mock()
def start_listener(addresses):
c._listeners.add(lp)
c._start_listener = start_listener
c._schedule_connection = mock.Mock()
c.start()
self.assertEqual(c._listeners, set([lp]))
p1 = mock.Mock() # DilatedConnectionProtocol instance
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager),
mock.call.send_record(KCM())])
self.assertEqual(lp.mock_calls[0], mock.call.stopListening())
# stop_listeners() uses a DeferredList, so we ignore the second call
def test_one_follower(self):
c, h = make_connector(listen=True, role=roles.FOLLOWER)
lp = mock.Mock()
def start_listener(addresses):
c._listeners.add(lp)
c._start_listener = start_listener
c._schedule_connection = mock.Mock()
c.start()
self.assertEqual(c._listeners, set([lp]))
p1 = mock.Mock() # DilatedConnectionProtocol instance
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
# just like LEADER, but follower doesn't send KCM now (it sent one
# earlier, to tell the leader that this connection looks viable)
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager)])
self.assertEqual(lp.mock_calls[0], mock.call.stopListening())
# stop_listeners() uses a DeferredList, so we ignore the second call
# TODO: make sure a pending connection is abandoned when the listener
# answers successfully
# TODO: make sure a second pending connection is abandoned when the first
# connection succeeds
def test_late(self):
c, h = make_connector(listen=False, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
p1 = mock.Mock() # DilatedConnectionProtocol instance
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
clear_mock_calls(h.manager)
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager),
mock.call.send_record(KCM())])
# late connection is ignored
p2 = mock.Mock()
c.add_candidate(p2)
self.assertEqual(h.manager.mock_calls, [])
# make sure an established connection is dropped when stop() is called
def test_stop(self):
c, h = make_connector(listen=False, role=roles.LEADER)
c._schedule_connection = mock.Mock()
c.start()
p1 = mock.Mock() # DilatedConnectionProtocol instance
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager),
mock.call.send_record(KCM())])
c.stop()