From 059338a257fc5ab5223b491970ab52f511fcfbe7 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 4 Feb 2019 11:38:07 -0800 Subject: [PATCH] Add Dilator.stop(), fix delivery of connection made/lost messages Dilator.stop() now shuts everything down, and returns a Deferred when it all stops moving. This needed some Manager state machine changes (to notify Dilator when it enters the STOPPED state). This also revealed problems in the delivery of connector_connection_made() (which was misnamed) and connector_connection_lost() (which wasn't being called at all). --- src/wormhole/_dilation/connection.py | 2 ++ src/wormhole/_dilation/connector.py | 35 ++++++++++++++++++++-- src/wormhole/_dilation/manager.py | 29 +++++++++++++----- src/wormhole/test/dilate/test_connector.py | 9 +++--- 4 files changed, 61 insertions(+), 14 deletions(-) diff --git a/src/wormhole/_dilation/connection.py b/src/wormhole/_dilation/connection.py index b11b7bc..84205aa 100644 --- a/src/wormhole/_dilation/connection.py +++ b/src/wormhole/_dilation/connection.py @@ -519,6 +519,8 @@ class DilatedConnectionProtocol(Protocol, object): @m.output() def set_manager(self, manager): self._manager = manager + self.when_disconnected().addCallback(lambda c: + manager.connector_connection_lost()) @m.output() def can_send_records(self, manager): diff --git a/src/wormhole/_dilation/connector.py b/src/wormhole/_dilation/connector.py index c9c6ee5..42b9bb5 100644 --- a/src/wormhole/_dilation/connector.py +++ b/src/wormhole/_dilation/connector.py @@ -42,6 +42,33 @@ def build_noise(): @attrs(hash=True) @implementer(IDilationConnector) class Connector(object): + """I manage a single generation of connection. + + The Manager creates one of me at a time, whenever it wants a connection + (which is always, once w.dilate() has been called and we know the remote + end can dilate, and is expressed by the Manager calling my .start() + method). I am discarded when my established connection is lost (and if we + still want to be connected, a new generation is started and a new + Connector is created). I am also discarded if we stop wanting to be + connected (which the Manager expresses by calling my .stop() method). + + I manage the race between multiple connections for a specific generation + of the dilated connection. + + I send connection hints when my InboundConnectionFactory yields addresses + (self.listener_ready), and I initiate outbond connections (with + OutboundConnectionFactory) as I receive connection hints from my peer + (self.got_hints). Both factories use my build_protocol() method to create + connection.DilatedConnectionProtocol instances. I track these protocol + instances until one finishes negotiation and wins the race. I then shut + down the others, remember the winner as self._winning_connection, and + deliver the winner to manager.connector_connection_made(c). + + When an active connection is lost, we call manager.connector_connection_lost, + allowing the manager to decide whether it wants to start a new generation + or not. + """ + _dilation_key = attrib(validator=instance_of(type(b""))) _transit_relay_location = attrib(validator=optional(instance_of(type(u"")))) _manager = attrib(validator=provides(IDilationManager)) @@ -181,10 +208,13 @@ class Connector(object): self.stop_pending_connections() c.select(self._manager) # subsequent frames go directly to the manager + # c.select also wires up when_disconnected() to fire + # manager.connector_connection_lost(). TODO: rename this, since the + # Connector is no longer the one calling it if self._role is LEADER: # TODO: this should live in Connection c.send_record(KCM()) # leader sends KCM now - self._manager.use_connection(c) # manager sends frames to Connection + self._manager.connector_connection_made(c) # manager sends frames to Connection @m.output() def stop_everything(self): @@ -199,7 +229,8 @@ class Connector(object): return d # synchronization for tests def stop_pending_connectors(self): - return DeferredList([d.cancel() for d in self._pending_connectors]) + for d in self._pending_connectors: + d.cancel() def stop_pending_connections(self): d = self._pending_connections.when_next_empty() diff --git a/src/wormhole/_dilation/manager.py b/src/wormhole/_dilation/manager.py index feee725..d85d31e 100644 --- a/src/wormhole/_dilation/manager.py +++ b/src/wormhole/_dilation/manager.py @@ -5,7 +5,7 @@ from attr import attrs, attrib from attr.validators import provides, instance_of, optional from automat import MethodicalMachine from zope.interface import implementer -from twisted.internet.defer import Deferred, inlineCallbacks, returnValue +from twisted.internet.defer import Deferred, inlineCallbacks, returnValue, succeed from twisted.python import log from .._interfaces import IDilator, IDilationManager, ISend from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr @@ -113,6 +113,7 @@ class Manager(object): self._connection = None self._made_first_connection = False self._first_connected = OneShotObserver(self._eventual_queue) + self._stopped = OneShotObserver(self._eventual_queue) self._host_addr = _WormholeAddress() self._next_dilation_phase = 0 @@ -133,6 +134,9 @@ class Manager(object): def when_first_connected(self): return self._first_connected.when_fired() + def when_stopped(self): + return self._stopped.when_fired() + def send_dilation_phase(self, **fields): dilation_phase = self._next_dilation_phase self._next_dilation_phase += 1 @@ -401,6 +405,10 @@ class Manager(object): # been told to shut down. self._connection.disconnect() # let connection_lost do cleanup + @m.output() + def notify_stopped(self): + self._stopped.fire(None) + # we start CONNECTING when we get rx_PLEASE WANTING.upon(rx_PLEASE, enter=CONNECTING, outputs=[choose_role, start_connecting_ignore_message]) @@ -440,14 +448,14 @@ class Manager(object): ABANDONING.upon(rx_HINTS, enter=ABANDONING, outputs=[]) # shouldn't happen STOPPING.upon(rx_HINTS, enter=STOPPING, outputs=[]) - WANTING.upon(stop, enter=STOPPED, outputs=[]) - CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting]) + WANTING.upon(stop, enter=STOPPED, outputs=[notify_stopped]) + CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting, notify_stopped]) CONNECTED.upon(stop, enter=STOPPING, outputs=[abandon_connection]) ABANDONING.upon(stop, enter=STOPPING, outputs=[]) - FLUSHING.upon(stop, enter=STOPPED, outputs=[]) - LONELY.upon(stop, enter=STOPPED, outputs=[]) - STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[]) - STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[]) + FLUSHING.upon(stop, enter=STOPPED, outputs=[notify_stopped]) + LONELY.upon(stop, enter=STOPPED, outputs=[notify_stopped]) + STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[notify_stopped]) + STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[notify_stopped]) @attrs @@ -536,6 +544,13 @@ class Dilator(object): endpoints = (control_ep, connect_ep, listen_ep) returnValue(endpoints) + def stop(self): + if not self._started: + return succeed(None) + if self._started: + self._manager.stop() + return self._manager.when_stopped() + # from Boss def got_key(self, key): diff --git a/src/wormhole/test/dilate/test_connector.py b/src/wormhole/test/dilate/test_connector.py index 2bb8809..c17e590 100644 --- a/src/wormhole/test/dilate/test_connector.py +++ b/src/wormhole/test/dilate/test_connector.py @@ -388,7 +388,7 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) self.assertEqual(p1.mock_calls, [mock.call.select(h.manager), mock.call.send_record(KCM())]) @@ -409,7 +409,7 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) # just like LEADER, but follower doesn't send KCM now (it sent one # earlier, to tell the leader that this connection looks viable) self.assertEqual(p1.mock_calls, @@ -432,7 +432,7 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) clear_mock_calls(h.manager) self.assertEqual(p1.mock_calls, [mock.call.select(h.manager), @@ -454,10 +454,9 @@ class Race(unittest.TestCase): c.add_candidate(p1) self.assertEqual(h.manager.mock_calls, []) h.eq.flush_sync() - self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)]) self.assertEqual(p1.mock_calls, [mock.call.select(h.manager), mock.call.send_record(KCM())]) + self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)]) c.stop() -