diff --git a/src/wormhole/_dilation/connection.py b/src/wormhole/_dilation/connection.py index b11b7bc..84205aa 100644 --- a/src/wormhole/_dilation/connection.py +++ b/src/wormhole/_dilation/connection.py @@ -519,6 +519,8 @@ class DilatedConnectionProtocol(Protocol, object): @m.output() def set_manager(self, manager): self._manager = manager + self.when_disconnected().addCallback(lambda c: + manager.connector_connection_lost()) @m.output() def can_send_records(self, manager): diff --git a/src/wormhole/_dilation/connector.py b/src/wormhole/_dilation/connector.py index c9c6ee5..42b9bb5 100644 --- a/src/wormhole/_dilation/connector.py +++ b/src/wormhole/_dilation/connector.py @@ -42,6 +42,33 @@ def build_noise(): @attrs(hash=True) @implementer(IDilationConnector) class Connector(object): + """I manage a single generation of connection. + + The Manager creates one of me at a time, whenever it wants a connection + (which is always, once w.dilate() has been called and we know the remote + end can dilate, and is expressed by the Manager calling my .start() + method). I am discarded when my established connection is lost (and if we + still want to be connected, a new generation is started and a new + Connector is created). I am also discarded if we stop wanting to be + connected (which the Manager expresses by calling my .stop() method). + + I manage the race between multiple connections for a specific generation + of the dilated connection. + + I send connection hints when my InboundConnectionFactory yields addresses + (self.listener_ready), and I initiate outbond connections (with + OutboundConnectionFactory) as I receive connection hints from my peer + (self.got_hints). Both factories use my build_protocol() method to create + connection.DilatedConnectionProtocol instances. I track these protocol + instances until one finishes negotiation and wins the race. I then shut + down the others, remember the winner as self._winning_connection, and + deliver the winner to manager.connector_connection_made(c). + + When an active connection is lost, we call manager.connector_connection_lost, + allowing the manager to decide whether it wants to start a new generation + or not. + """ + _dilation_key = attrib(validator=instance_of(type(b""))) _transit_relay_location = attrib(validator=optional(instance_of(type(u"")))) _manager = attrib(validator=provides(IDilationManager)) @@ -181,10 +208,13 @@ class Connector(object): self.stop_pending_connections() c.select(self._manager) # subsequent frames go directly to the manager + # c.select also wires up when_disconnected() to fire + # manager.connector_connection_lost(). TODO: rename this, since the + # Connector is no longer the one calling it if self._role is LEADER: # TODO: this should live in Connection c.send_record(KCM()) # leader sends KCM now - self._manager.use_connection(c) # manager sends frames to Connection + self._manager.connector_connection_made(c) # manager sends frames to Connection @m.output() def stop_everything(self): @@ -199,7 +229,8 @@ class Connector(object): return d # synchronization for tests def stop_pending_connectors(self): - return DeferredList([d.cancel() for d in self._pending_connectors]) + for d in self._pending_connectors: + d.cancel() def stop_pending_connections(self): d = self._pending_connections.when_next_empty() diff --git a/src/wormhole/_dilation/manager.py b/src/wormhole/_dilation/manager.py index feee725..d85d31e 100644 --- a/src/wormhole/_dilation/manager.py +++ b/src/wormhole/_dilation/manager.py @@ -5,7 +5,7 @@ from attr import attrs, attrib from attr.validators import provides, instance_of, optional from automat import MethodicalMachine from zope.interface import implementer -from twisted.internet.defer import Deferred, inlineCallbacks, returnValue +from twisted.internet.defer import Deferred, inlineCallbacks, returnValue, succeed from twisted.python import log from .._interfaces import IDilator, IDilationManager, ISend from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr @@ -113,6 +113,7 @@ class Manager(object): self._connection = None self._made_first_connection = False self._first_connected = OneShotObserver(self._eventual_queue) + self._stopped = OneShotObserver(self._eventual_queue) self._host_addr = _WormholeAddress() self._next_dilation_phase = 0 @@ -133,6 +134,9 @@ class Manager(object): def when_first_connected(self): return self._first_connected.when_fired() + def when_stopped(self): + return self._stopped.when_fired() + def send_dilation_phase(self, **fields): dilation_phase = self._next_dilation_phase self._next_dilation_phase += 1 @@ -401,6 +405,10 @@ class Manager(object): # been told to shut down. self._connection.disconnect() # let connection_lost do cleanup + @m.output() + def notify_stopped(self): + self._stopped.fire(None) + # we start CONNECTING when we get rx_PLEASE WANTING.upon(rx_PLEASE, enter=CONNECTING, outputs=[choose_role, start_connecting_ignore_message]) @@ -440,14 +448,14 @@ class Manager(object): ABANDONING.upon(rx_HINTS, enter=ABANDONING, outputs=[]) # shouldn't happen STOPPING.upon(rx_HINTS, enter=STOPPING, outputs=[]) - WANTING.upon(stop, enter=STOPPED, outputs=[]) - CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting]) + WANTING.upon(stop, enter=STOPPED, outputs=[notify_stopped]) + CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting, notify_stopped]) CONNECTED.upon(stop, enter=STOPPING, outputs=[abandon_connection]) ABANDONING.upon(stop, enter=STOPPING, outputs=[]) - FLUSHING.upon(stop, enter=STOPPED, outputs=[]) - LONELY.upon(stop, enter=STOPPED, outputs=[]) - STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[]) - STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[]) + FLUSHING.upon(stop, enter=STOPPED, outputs=[notify_stopped]) + LONELY.upon(stop, enter=STOPPED, outputs=[notify_stopped]) + STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[notify_stopped]) + STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[notify_stopped]) @attrs @@ -536,6 +544,13 @@ class Dilator(object): endpoints = (control_ep, connect_ep, listen_ep) returnValue(endpoints) + def stop(self): + if not self._started: + return succeed(None) + if self._started: + self._manager.stop() + return self._manager.when_stopped() + # from Boss def got_key(self, key): diff --git a/src/wormhole/test/dilate/test_connector.py b/src/wormhole/test/dilate/test_connector.py index 2bb8809..c17e590 100644 --- a/src/wormhole/test/dilate/test_connector.py +++ b/src/wormhole/test/dilate/test_connector.py @@ -388,7 +388,7 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) self.assertEqual(p1.mock_calls, [mock.call.select(h.manager), mock.call.send_record(KCM())]) @@ -409,7 +409,7 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) # just like LEADER, but follower doesn't send KCM now (it sent one # earlier, to tell the leader that this connection looks viable) self.assertEqual(p1.mock_calls, @@ -432,7 +432,7 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) clear_mock_calls(h.manager) self.assertEqual(p1.mock_calls, [mock.call.select(h.manager), @@ -454,10 +454,9 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) self.assertEqual(p1.mock_calls, [mock.call.select(h.manager), mock.call.send_record(KCM())]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) c.stop() -