Add Dilator.stop(), fix delivery of connection made/lost messages

Dilator.stop() now shuts everything down, and returns a Deferred when it all
stops moving. This needed some Manager state machine changes (to notify
Dilator when it enters the STOPPED state). This also revealed problems in the
delivery of connector_connection_made() (which was misnamed) and
connector_connection_lost() (which wasn't being called at all).
This commit is contained in:
Brian Warner 2019-02-04 11:38:07 -08:00
parent 39fed50071
commit 059338a257
4 changed files with 61 additions and 14 deletions

View File

@ -519,6 +519,8 @@ class DilatedConnectionProtocol(Protocol, object):
@m.output()
def set_manager(self, manager):
self._manager = manager
self.when_disconnected().addCallback(lambda c:
manager.connector_connection_lost())
@m.output()
def can_send_records(self, manager):

View File

@ -42,6 +42,33 @@ def build_noise():
@attrs(hash=True)
@implementer(IDilationConnector)
class Connector(object):
"""I manage a single generation of connection.
The Manager creates one of me at a time, whenever it wants a connection
(which is always, once w.dilate() has been called and we know the remote
end can dilate, and is expressed by the Manager calling my .start()
method). I am discarded when my established connection is lost (and if we
still want to be connected, a new generation is started and a new
Connector is created). I am also discarded if we stop wanting to be
connected (which the Manager expresses by calling my .stop() method).
I manage the race between multiple connections for a specific generation
of the dilated connection.
I send connection hints when my InboundConnectionFactory yields addresses
(self.listener_ready), and I initiate outbond connections (with
OutboundConnectionFactory) as I receive connection hints from my peer
(self.got_hints). Both factories use my build_protocol() method to create
connection.DilatedConnectionProtocol instances. I track these protocol
instances until one finishes negotiation and wins the race. I then shut
down the others, remember the winner as self._winning_connection, and
deliver the winner to manager.connector_connection_made(c).
When an active connection is lost, we call manager.connector_connection_lost,
allowing the manager to decide whether it wants to start a new generation
or not.
"""
_dilation_key = attrib(validator=instance_of(type(b"")))
_transit_relay_location = attrib(validator=optional(instance_of(type(u""))))
_manager = attrib(validator=provides(IDilationManager))
@ -181,10 +208,13 @@ class Connector(object):
self.stop_pending_connections()
c.select(self._manager) # subsequent frames go directly to the manager
# c.select also wires up when_disconnected() to fire
# manager.connector_connection_lost(). TODO: rename this, since the
# Connector is no longer the one calling it
if self._role is LEADER:
# TODO: this should live in Connection
c.send_record(KCM()) # leader sends KCM now
self._manager.use_connection(c) # manager sends frames to Connection
self._manager.connector_connection_made(c) # manager sends frames to Connection
@m.output()
def stop_everything(self):
@ -199,7 +229,8 @@ class Connector(object):
return d # synchronization for tests
def stop_pending_connectors(self):
return DeferredList([d.cancel() for d in self._pending_connectors])
for d in self._pending_connectors:
d.cancel()
def stop_pending_connections(self):
d = self._pending_connections.when_next_empty()

View File

@ -5,7 +5,7 @@ from attr import attrs, attrib
from attr.validators import provides, instance_of, optional
from automat import MethodicalMachine
from zope.interface import implementer
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue, succeed
from twisted.python import log
from .._interfaces import IDilator, IDilationManager, ISend
from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr
@ -113,6 +113,7 @@ class Manager(object):
self._connection = None
self._made_first_connection = False
self._first_connected = OneShotObserver(self._eventual_queue)
self._stopped = OneShotObserver(self._eventual_queue)
self._host_addr = _WormholeAddress()
self._next_dilation_phase = 0
@ -133,6 +134,9 @@ class Manager(object):
def when_first_connected(self):
return self._first_connected.when_fired()
def when_stopped(self):
return self._stopped.when_fired()
def send_dilation_phase(self, **fields):
dilation_phase = self._next_dilation_phase
self._next_dilation_phase += 1
@ -401,6 +405,10 @@ class Manager(object):
# been told to shut down.
self._connection.disconnect() # let connection_lost do cleanup
@m.output()
def notify_stopped(self):
self._stopped.fire(None)
# we start CONNECTING when we get rx_PLEASE
WANTING.upon(rx_PLEASE, enter=CONNECTING,
outputs=[choose_role, start_connecting_ignore_message])
@ -440,14 +448,14 @@ class Manager(object):
ABANDONING.upon(rx_HINTS, enter=ABANDONING, outputs=[]) # shouldn't happen
STOPPING.upon(rx_HINTS, enter=STOPPING, outputs=[])
WANTING.upon(stop, enter=STOPPED, outputs=[])
CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting])
WANTING.upon(stop, enter=STOPPED, outputs=[notify_stopped])
CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting, notify_stopped])
CONNECTED.upon(stop, enter=STOPPING, outputs=[abandon_connection])
ABANDONING.upon(stop, enter=STOPPING, outputs=[])
FLUSHING.upon(stop, enter=STOPPED, outputs=[])
LONELY.upon(stop, enter=STOPPED, outputs=[])
STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[])
STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[])
FLUSHING.upon(stop, enter=STOPPED, outputs=[notify_stopped])
LONELY.upon(stop, enter=STOPPED, outputs=[notify_stopped])
STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[notify_stopped])
STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[notify_stopped])
@attrs
@ -536,6 +544,13 @@ class Dilator(object):
endpoints = (control_ep, connect_ep, listen_ep)
returnValue(endpoints)
def stop(self):
if not self._started:
return succeed(None)
if self._started:
self._manager.stop()
return self._manager.when_stopped()
# from Boss
def got_key(self, key):

View File

@ -388,7 +388,7 @@ class Race(unittest.TestCase):
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)])
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager),
mock.call.send_record(KCM())])
@ -409,7 +409,7 @@ class Race(unittest.TestCase):
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)])
# just like LEADER, but follower doesn't send KCM now (it sent one
# earlier, to tell the leader that this connection looks viable)
self.assertEqual(p1.mock_calls,
@ -432,7 +432,7 @@ class Race(unittest.TestCase):
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)])
clear_mock_calls(h.manager)
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager),
@ -454,10 +454,9 @@ class Race(unittest.TestCase):
c.add_candidate(p1)
self.assertEqual(h.manager.mock_calls, [])
h.eq.flush_sync()
self.assertEqual(h.manager.mock_calls, [mock.call.use_connection(p1)])
self.assertEqual(p1.mock_calls,
[mock.call.select(h.manager),
mock.call.send_record(KCM())])
self.assertEqual(h.manager.mock_calls, [mock.call.connector_connection_made(p1)])
c.stop()