manager: clean up versions, merge state machines
This commit is contained in:
parent
d4a551c6b8
commit
7e168b819e
|
@ -1,4 +1,5 @@
|
|||
from __future__ import print_function, unicode_literals
|
||||
import os
|
||||
from collections import deque
|
||||
from attr import attrs, attrib
|
||||
from attr.validators import provides, instance_of, optional
|
||||
|
@ -7,7 +8,7 @@ from zope.interface import implementer
|
|||
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
|
||||
from twisted.python import log
|
||||
from .._interfaces import IDilator, IDilationManager, ISend
|
||||
from ..util import dict_to_bytes, bytes_to_dict
|
||||
from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr
|
||||
from ..observer import OneShotObserver
|
||||
from .._key import derive_key
|
||||
from .encode import to_be4
|
||||
|
@ -21,6 +22,10 @@ from .inbound import Inbound
|
|||
from .outbound import Outbound
|
||||
|
||||
|
||||
# exported to Wormhole() for inclusion in versions message
|
||||
DILATION_VERSIONS = ["1"]
|
||||
|
||||
|
||||
class OldPeerCannotDilateError(Exception):
|
||||
pass
|
||||
|
||||
|
@ -33,9 +38,45 @@ class ReceivedHintsTooEarly(Exception):
|
|||
pass
|
||||
|
||||
|
||||
# new scheme:
|
||||
# * both sides send PLEASE as soon as they have an unverified key and
|
||||
# w.dilate has been called,
|
||||
# * PLEASE includes a dilation-specific "side" (independent of the "side"
|
||||
# used by mailbox messages)
|
||||
# * higher "side" is Leader, lower is Follower
|
||||
# * PLEASE includes can-dilate list of version integers, requires overlap
|
||||
# "1" is current
|
||||
|
||||
# * we start dilation after both w.dilate() and receiving VERSION, putting us
|
||||
# in WANTING, then we process all previously-queued inbound DILATE-n
|
||||
# messages. When PLEASE arrives, we move to CONNECTING
|
||||
# * HINTS sent after dilation starts
|
||||
# * only Leader sends RECONNECT, only Follower sends RECONNECTING. This
|
||||
# is the only difference between the two sides, and is not enforced
|
||||
# by the protocol (i.e. if the Follower sends RECONNECT to the Leader,
|
||||
# the Leader will obey, although TODO how confusing will this get?)
|
||||
# * upon receiving RECONNECT: drop Connector, start new Connector, send
|
||||
# RECONNECTING, start sending HINTS
|
||||
# * upon sending RECONNECT: go into FLUSHING state and ignore all HINTS until
|
||||
# RECONNECTING received. The new Connector can be spun up earlier, and it
|
||||
# can send HINTS, but it must not be given any HINTS that arrive before
|
||||
# RECONNECTING (since they're probably stale)
|
||||
|
||||
# * after VERSIONS(KCM) received, we might learn that they other side cannot
|
||||
# dilate. w.dilate errbacks at this point
|
||||
|
||||
# * maybe signal warning if we stay in a "want" state for too long
|
||||
# * nobody sends HINTS until they're ready to receive
|
||||
# * nobody sends HINTS unless they've called w.dilate() and received PLEASE
|
||||
# * nobody connects to inbound hints unless they've called w.dilate()
|
||||
# * if leader calls w.dilate() but not follower, leader waits forever in
|
||||
# "want" (doesn't send anything)
|
||||
# * if follower calls w.dilate() but not leader, follower waits forever
|
||||
# in "want", leader waits forever in "wanted"
|
||||
|
||||
@attrs
|
||||
@implementer(IDilationManager)
|
||||
class _ManagerBase(object):
|
||||
class Manager(object):
|
||||
_S = attrib(validator=provides(ISend))
|
||||
_my_side = attrib(validator=instance_of(type(u"")))
|
||||
_transit_key = attrib(validator=instance_of(bytes))
|
||||
|
@ -46,6 +87,10 @@ class _ManagerBase(object):
|
|||
_no_listen = False # TODO
|
||||
_tor = None # TODO
|
||||
_timing = None # TODO
|
||||
_next_subchannel_id = None # initialized in choose_role
|
||||
|
||||
m = MethodicalMachine()
|
||||
set_trace = getattr(m, "_setTrace", lambda self, f: None)
|
||||
|
||||
def __attrs_post_init__(self):
|
||||
self._got_versions_d = Deferred()
|
||||
|
@ -59,8 +104,6 @@ class _ManagerBase(object):
|
|||
|
||||
self._next_dilation_phase = 0
|
||||
|
||||
self._next_subchannel_id = 0 # increments by 2
|
||||
|
||||
# I kept getting confused about which methods were for inbound data
|
||||
# (and thus flow-control methods go "out") and which were for
|
||||
# outbound data (with flow-control going "in"), so I split them up
|
||||
|
@ -127,18 +170,6 @@ class _ManagerBase(object):
|
|||
self._inbound.subchannel_closed(scid, sc)
|
||||
self._outbound.subchannel_closed(scid, sc)
|
||||
|
||||
def _start_connecting(self, role):
|
||||
assert self._my_role is not None
|
||||
self._connector = Connector(self._transit_key,
|
||||
self._transit_relay_location,
|
||||
self,
|
||||
self._reactor, self._eventual_queue,
|
||||
self._no_listen, self._tor,
|
||||
self._timing,
|
||||
self._side, # needed for relay handshake
|
||||
self._my_role)
|
||||
self._connector.start()
|
||||
|
||||
# our Connector calls these
|
||||
|
||||
def connector_connection_made(self, c):
|
||||
|
@ -209,59 +240,22 @@ class _ManagerBase(object):
|
|||
|
||||
# subchannel maintenance
|
||||
def allocate_subchannel_id(self):
|
||||
raise NotImplementedError # subclass knows if we're leader or follower
|
||||
scid_num = self._next_subchannel_id
|
||||
self._next_subchannel_id += 2
|
||||
return to_be4(scid_num)
|
||||
|
||||
# new scheme:
|
||||
# * both sides send PLEASE as soon as they have an unverified key and
|
||||
# w.dilate has been called,
|
||||
# * PLEASE includes a dilation-specific "side" (independent of the "side"
|
||||
# used by mailbox messages)
|
||||
# * higher "side" is Leader, lower is Follower
|
||||
# * PLEASE includes can-dilate list of version integers, requires overlap
|
||||
# "1" is current
|
||||
# * dilation starts as soon as we've sent PLEASE and received PLEASE
|
||||
# (four-state two-variable IDLE/WANTING/WANTED/STARTED diamond FSM)
|
||||
# * HINTS sent after dilation starts
|
||||
# * only Leader sends RECONNECT, only Follower sends RECONNECTING. This
|
||||
# is the only difference between the two sides, and is not enforced
|
||||
# by the protocol (i.e. if the Follower sends RECONNECT to the Leader,
|
||||
# the Leader will obey, although TODO how confusing will this get?)
|
||||
# * upon receiving RECONNECT: drop Connector, start new Connector, send
|
||||
# RECONNECTING, start sending HINTS
|
||||
# * upon sending CONNECT: go into FLUSHING state and ignore all HINTS until
|
||||
# RECONNECTING received. The new Connector can be spun up earlier, and it
|
||||
# can send HINTS, but it must not be given any HINTS that arrive before
|
||||
# RECONNECTING (since they're probably stale)
|
||||
# state machine
|
||||
|
||||
# * after VERSIONS(KCM) received, we might learn that they other side cannot
|
||||
# dilate. w.dilate errbacks at this point
|
||||
# We are born WANTING after the local app calls w.dilate(). We start
|
||||
# CONNECTING when we receive PLEASE from the remote side
|
||||
|
||||
# * maybe signal warning if we stay in a "want" state for too long
|
||||
# * nobody sends HINTS until they're ready to receive
|
||||
# * nobody sends HINTS unless they've called w.dilate() and received PLEASE
|
||||
# * nobody connects to inbound hints unless they've called w.dilate()
|
||||
# * if leader calls w.dilate() but not follower, leader waits forever in
|
||||
# "want" (doesn't send anything)
|
||||
# * if follower calls w.dilate() but not leader, follower waits forever
|
||||
# in "want", leader waits forever in "wanted"
|
||||
|
||||
|
||||
class ManagerShared(_ManagerBase):
|
||||
m = MethodicalMachine()
|
||||
set_trace = getattr(m, "_setTrace", lambda self, f: None)
|
||||
def start(self):
|
||||
self.send_please()
|
||||
|
||||
@m.state(initial=True)
|
||||
def IDLE(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
@m.state()
|
||||
def WANTING(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
@m.state()
|
||||
def WANTED(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
@m.state()
|
||||
def CONNECTING(self):
|
||||
pass # pragma: no cover
|
||||
|
@ -290,10 +284,6 @@ class ManagerShared(_ManagerBase):
|
|||
def STOPPED(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
@m.input()
|
||||
def start(self):
|
||||
pass # pragma: no cover
|
||||
|
||||
@m.input()
|
||||
def rx_PLEASE(self, message):
|
||||
pass # pragma: no cover
|
||||
|
@ -332,9 +322,19 @@ class ManagerShared(_ManagerBase):
|
|||
pass # pragma: no cover
|
||||
|
||||
@m.output()
|
||||
def stash_side(self, message):
|
||||
def choose_role(self, message):
|
||||
their_side = message["side"]
|
||||
self.my_role = LEADER if self._my_side > their_side else FOLLOWER
|
||||
if self._my_side > their_side:
|
||||
self._my_role = LEADER
|
||||
# scid 0 is reserved for the control channel. the leader uses odd
|
||||
# numbers starting with 1
|
||||
self._next_subchannel_id = 1
|
||||
elif their_side > self._my_side:
|
||||
self._my_role = FOLLOWER
|
||||
# the follower uses even numbers starting with 2
|
||||
self._next_subchannel_id = 2
|
||||
else:
|
||||
raise ValueError("their side shouldn't be equal: reflection?")
|
||||
|
||||
# these Outputs behave differently for the Leader vs the Follower
|
||||
@m.output()
|
||||
|
@ -342,12 +342,22 @@ class ManagerShared(_ManagerBase):
|
|||
self.send_dilation_phase(type="please", side=self._my_side)
|
||||
|
||||
@m.output()
|
||||
def start_connecting(self):
|
||||
self._start_connecting() # TODO: merge
|
||||
def start_connecting_ignore_message(self, message):
|
||||
del message # ignored
|
||||
return self.start_connecting()
|
||||
|
||||
@m.output()
|
||||
def ignore_message_start_connecting(self, message):
|
||||
self.start_connecting()
|
||||
def start_connecting(self):
|
||||
assert self._my_role is not None
|
||||
self._connector = Connector(self._transit_key,
|
||||
self._transit_relay_location,
|
||||
self,
|
||||
self._reactor, self._eventual_queue,
|
||||
self._no_listen, self._tor,
|
||||
self._timing,
|
||||
self._my_side, # needed for relay handshake
|
||||
self._my_role)
|
||||
self._connector.start()
|
||||
|
||||
@m.output()
|
||||
def send_reconnect(self):
|
||||
|
@ -374,14 +384,9 @@ class ManagerShared(_ManagerBase):
|
|||
# been told to shut down.
|
||||
self._connection.disconnect() # let connection_lost do cleanup
|
||||
|
||||
# we don't start CONNECTING until a local start() plus rx_PLEASE
|
||||
IDLE.upon(rx_PLEASE, enter=WANTED, outputs=[stash_side])
|
||||
IDLE.upon(start, enter=WANTING, outputs=[send_please])
|
||||
WANTED.upon(start, enter=CONNECTING, outputs=[
|
||||
send_please, start_connecting])
|
||||
# we start CONNECTING when we get rx_PLEASE
|
||||
WANTING.upon(rx_PLEASE, enter=CONNECTING,
|
||||
outputs=[stash_side,
|
||||
ignore_message_start_connecting])
|
||||
outputs=[choose_role, start_connecting_ignore_message])
|
||||
|
||||
CONNECTING.upon(connection_made, enter=CONNECTED, outputs=[])
|
||||
|
||||
|
@ -394,11 +399,11 @@ class ManagerShared(_ManagerBase):
|
|||
# Follower
|
||||
# if we notice a lost connection, just wait for the Leader to notice too
|
||||
CONNECTED.upon(connection_lost_follower, enter=LONELY, outputs=[])
|
||||
LONELY.upon(rx_RECONNECT, enter=CONNECTING, outputs=[start_connecting])
|
||||
LONELY.upon(rx_RECONNECT, enter=CONNECTING,
|
||||
outputs=[send_reconnecting, start_connecting])
|
||||
# but if they notice it first, abandon our (seemingly functional)
|
||||
# connection, then tell them that we're ready to try again
|
||||
CONNECTED.upon(rx_RECONNECT, enter=ABANDONING, # they noticed loss
|
||||
outputs=[abandon_connection])
|
||||
CONNECTED.upon(rx_RECONNECT, enter=ABANDONING, outputs=[abandon_connection])
|
||||
ABANDONING.upon(connection_lost_follower, enter=CONNECTING,
|
||||
outputs=[send_reconnecting, start_connecting])
|
||||
# and if they notice a problem while we're still connecting, abandon our
|
||||
|
@ -410,8 +415,6 @@ class ManagerShared(_ManagerBase):
|
|||
start_connecting])
|
||||
|
||||
# rx_HINTS never changes state, they're just accepted or ignored
|
||||
IDLE.upon(rx_HINTS, enter=IDLE, outputs=[]) # too early
|
||||
WANTED.upon(rx_HINTS, enter=WANTED, outputs=[]) # too early
|
||||
WANTING.upon(rx_HINTS, enter=WANTING, outputs=[]) # too early
|
||||
CONNECTING.upon(rx_HINTS, enter=CONNECTING, outputs=[use_hints])
|
||||
CONNECTED.upon(rx_HINTS, enter=CONNECTED, outputs=[]) # too late, ignore
|
||||
|
@ -420,24 +423,15 @@ class ManagerShared(_ManagerBase):
|
|||
ABANDONING.upon(rx_HINTS, enter=ABANDONING, outputs=[]) # shouldn't happen
|
||||
STOPPING.upon(rx_HINTS, enter=STOPPING, outputs=[])
|
||||
|
||||
IDLE.upon(stop, enter=STOPPED, outputs=[])
|
||||
WANTED.upon(stop, enter=STOPPED, outputs=[])
|
||||
WANTING.upon(stop, enter=STOPPED, outputs=[])
|
||||
CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting])
|
||||
CONNECTED.upon(stop, enter=STOPPING, outputs=[abandon_connection])
|
||||
ABANDONING.upon(stop, enter=STOPPING, outputs=[])
|
||||
FLUSHING.upon(stop, enter=STOPPED, outputs=[stop_connecting])
|
||||
FLUSHING.upon(stop, enter=STOPPED, outputs=[])
|
||||
LONELY.upon(stop, enter=STOPPED, outputs=[])
|
||||
STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[])
|
||||
STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[])
|
||||
|
||||
def allocate_subchannel_id(self):
|
||||
# scid 0 is reserved for the control channel. the leader uses odd
|
||||
# numbers starting with 1
|
||||
scid_num = self._next_outbound_seqnum + 1
|
||||
self._next_outbound_seqnum += 2
|
||||
return to_be4(scid_num)
|
||||
|
||||
|
||||
@attrs
|
||||
@implementer(IDilator)
|
||||
|
@ -477,15 +471,23 @@ class Dilator(object):
|
|||
@inlineCallbacks
|
||||
def _start(self):
|
||||
# first, we wait until we hear the VERSION message, which tells us 1:
|
||||
# the PAKE key works, so we can talk securely, 2: their side, so we
|
||||
# know who will lead, and 3: that they can do dilation at all
|
||||
# the PAKE key works, so we can talk securely, 2: that they can do
|
||||
# dilation at all (if they can't then w.dilate() errbacks)
|
||||
|
||||
dilation_version = yield self._got_versions_d
|
||||
|
||||
if not dilation_version: # 1 or None
|
||||
# TODO: we could probably return the endpoints earlier, if we flunk
|
||||
# any connection/listen attempts upon OldPeerCannotDilateError, or
|
||||
# if/when we give up on the initial connection
|
||||
|
||||
if not dilation_version: # "1" or None
|
||||
# TODO: be more specific about the error. dilation_version==None
|
||||
# means we had no version in common with them, which could either
|
||||
# be because they're so old they don't dilate at all, or because
|
||||
# they're so new that they no longer accomodate our old version
|
||||
raise OldPeerCannotDilateError()
|
||||
|
||||
my_dilation_side = TODO # random
|
||||
my_dilation_side = bytes_to_hexstr(os.urandom(6))
|
||||
self._manager = Manager(self._S, my_dilation_side,
|
||||
self._transit_key,
|
||||
self._transit_relay_location,
|
||||
|
@ -497,8 +499,8 @@ class Dilator(object):
|
|||
plaintext = self._pending_inbound_dilate_messages.popleft()
|
||||
self.received_dilate(plaintext)
|
||||
|
||||
# we could probably return the endpoints earlier
|
||||
yield self._manager.when_first_connected()
|
||||
|
||||
# we can open subchannels as soon as we get our first connection
|
||||
scid0 = b"\x00\x00\x00\x00"
|
||||
self._host_addr = _WormholeAddress() # TODO: share with Manager
|
||||
|
@ -508,8 +510,7 @@ class Dilator(object):
|
|||
control_ep._subchannel_zero_opened(sc0)
|
||||
self._manager.set_subchannel_zero(scid0, sc0)
|
||||
|
||||
connect_ep = SubchannelConnectorEndpoint(
|
||||
self._manager, self._host_addr)
|
||||
connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr)
|
||||
|
||||
listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr)
|
||||
self._manager.set_listener_endpoint(listen_ep)
|
||||
|
@ -526,16 +527,14 @@ class Dilator(object):
|
|||
LENGTH = 32 # TODO: whatever Noise wants, I guess
|
||||
self._transit_key = derive_key(key, purpose, LENGTH)
|
||||
|
||||
def got_wormhole_versions(self, our_side, their_side,
|
||||
their_wormhole_versions):
|
||||
# TODO: remove our_side, their_side
|
||||
assert isinstance(our_side, str), str
|
||||
assert isinstance(their_side, str), str
|
||||
def got_wormhole_versions(self, their_wormhole_versions):
|
||||
# this always happens before received_dilate
|
||||
dilation_version = None
|
||||
their_dilation_versions = their_wormhole_versions.get("can-dilate", [])
|
||||
if 1 in their_dilation_versions:
|
||||
dilation_version = 1
|
||||
their_dilation_versions = set(their_wormhole_versions.get("can-dilate", []))
|
||||
my_versions = set(DILATION_VERSIONS)
|
||||
shared_versions = my_versions.intersection(their_dilation_versions)
|
||||
if "1" in shared_versions:
|
||||
dilation_version = "1"
|
||||
self._got_versions_d.callback(dilation_version)
|
||||
|
||||
def received_dilate(self, plaintext):
|
||||
|
|
|
@ -9,6 +9,7 @@ from twisted.internet.task import Cooperator
|
|||
from zope.interface import implementer
|
||||
|
||||
from ._boss import Boss
|
||||
from ._dilation.manager import DILATION_VERSIONS
|
||||
from ._dilation.connector import Connector
|
||||
from ._interfaces import IDeferredWormhole, IWormhole
|
||||
from ._key import derive_key
|
||||
|
@ -271,7 +272,7 @@ def create(
|
|||
w = _DeferredWormhole(reactor, eq)
|
||||
# this indicates Wormhole capabilities
|
||||
wormhole_versions = {
|
||||
"can-dilate": [1],
|
||||
"can-dilate": DILATION_VERSIONS,
|
||||
"dilation-abilities": Connector.get_connection_abilities(),
|
||||
}
|
||||
wormhole_versions["app_versions"] = versions # app-specific capabilities
|
||||
|
|
Loading…
Reference in New Issue
Block a user