diff --git a/src/wormhole/_dilation/manager.py b/src/wormhole/_dilation/manager.py index b16d198..bae38d0 100644 --- a/src/wormhole/_dilation/manager.py +++ b/src/wormhole/_dilation/manager.py @@ -1,4 +1,5 @@ from __future__ import print_function, unicode_literals +import os from collections import deque from attr import attrs, attrib from attr.validators import provides, instance_of, optional @@ -7,7 +8,7 @@ from zope.interface import implementer from twisted.internet.defer import Deferred, inlineCallbacks, returnValue from twisted.python import log from .._interfaces import IDilator, IDilationManager, ISend -from ..util import dict_to_bytes, bytes_to_dict +from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr from ..observer import OneShotObserver from .._key import derive_key from .encode import to_be4 @@ -21,6 +22,10 @@ from .inbound import Inbound from .outbound import Outbound +# exported to Wormhole() for inclusion in versions message +DILATION_VERSIONS = ["1"] + + class OldPeerCannotDilateError(Exception): pass @@ -33,9 +38,45 @@ class ReceivedHintsTooEarly(Exception): pass +# new scheme: +# * both sides send PLEASE as soon as they have an unverified key and +# w.dilate has been called, +# * PLEASE includes a dilation-specific "side" (independent of the "side" +# used by mailbox messages) +# * higher "side" is Leader, lower is Follower +# * PLEASE includes can-dilate list of version integers, requires overlap +# "1" is current + +# * we start dilation after both w.dilate() and receiving VERSION, putting us +# in WANTING, then we process all previously-queued inbound DILATE-n +# messages. When PLEASE arrives, we move to CONNECTING +# * HINTS sent after dilation starts +# * only Leader sends RECONNECT, only Follower sends RECONNECTING. This +# is the only difference between the two sides, and is not enforced +# by the protocol (i.e. if the Follower sends RECONNECT to the Leader, +# the Leader will obey, although TODO how confusing will this get?) +# * upon receiving RECONNECT: drop Connector, start new Connector, send +# RECONNECTING, start sending HINTS +# * upon sending RECONNECT: go into FLUSHING state and ignore all HINTS until +# RECONNECTING received. The new Connector can be spun up earlier, and it +# can send HINTS, but it must not be given any HINTS that arrive before +# RECONNECTING (since they're probably stale) + +# * after VERSIONS(KCM) received, we might learn that they other side cannot +# dilate. w.dilate errbacks at this point + +# * maybe signal warning if we stay in a "want" state for too long +# * nobody sends HINTS until they're ready to receive +# * nobody sends HINTS unless they've called w.dilate() and received PLEASE +# * nobody connects to inbound hints unless they've called w.dilate() +# * if leader calls w.dilate() but not follower, leader waits forever in +# "want" (doesn't send anything) +# * if follower calls w.dilate() but not leader, follower waits forever +# in "want", leader waits forever in "wanted" + @attrs @implementer(IDilationManager) -class _ManagerBase(object): +class Manager(object): _S = attrib(validator=provides(ISend)) _my_side = attrib(validator=instance_of(type(u""))) _transit_key = attrib(validator=instance_of(bytes)) @@ -46,6 +87,10 @@ class _ManagerBase(object): _no_listen = False # TODO _tor = None # TODO _timing = None # TODO + _next_subchannel_id = None # initialized in choose_role + + m = MethodicalMachine() + set_trace = getattr(m, "_setTrace", lambda self, f: None) def __attrs_post_init__(self): self._got_versions_d = Deferred() @@ -59,8 +104,6 @@ class _ManagerBase(object): self._next_dilation_phase = 0 - self._next_subchannel_id = 0 # increments by 2 - # I kept getting confused about which methods were for inbound data # (and thus flow-control methods go "out") and which were for # outbound data (with flow-control going "in"), so I split them up @@ -127,18 +170,6 @@ class _ManagerBase(object): self._inbound.subchannel_closed(scid, sc) self._outbound.subchannel_closed(scid, sc) - def _start_connecting(self, role): - assert self._my_role is not None - self._connector = Connector(self._transit_key, - self._transit_relay_location, - self, - self._reactor, self._eventual_queue, - self._no_listen, self._tor, - self._timing, - self._side, # needed for relay handshake - self._my_role) - self._connector.start() - # our Connector calls these def connector_connection_made(self, c): @@ -209,59 +240,22 @@ class _ManagerBase(object): # subchannel maintenance def allocate_subchannel_id(self): - raise NotImplementedError # subclass knows if we're leader or follower + scid_num = self._next_subchannel_id + self._next_subchannel_id += 2 + return to_be4(scid_num) -# new scheme: -# * both sides send PLEASE as soon as they have an unverified key and -# w.dilate has been called, -# * PLEASE includes a dilation-specific "side" (independent of the "side" -# used by mailbox messages) -# * higher "side" is Leader, lower is Follower -# * PLEASE includes can-dilate list of version integers, requires overlap -# "1" is current -# * dilation starts as soon as we've sent PLEASE and received PLEASE -# (four-state two-variable IDLE/WANTING/WANTED/STARTED diamond FSM) -# * HINTS sent after dilation starts -# * only Leader sends RECONNECT, only Follower sends RECONNECTING. This -# is the only difference between the two sides, and is not enforced -# by the protocol (i.e. if the Follower sends RECONNECT to the Leader, -# the Leader will obey, although TODO how confusing will this get?) -# * upon receiving RECONNECT: drop Connector, start new Connector, send -# RECONNECTING, start sending HINTS -# * upon sending CONNECT: go into FLUSHING state and ignore all HINTS until -# RECONNECTING received. The new Connector can be spun up earlier, and it -# can send HINTS, but it must not be given any HINTS that arrive before -# RECONNECTING (since they're probably stale) + # state machine -# * after VERSIONS(KCM) received, we might learn that they other side cannot -# dilate. w.dilate errbacks at this point + # We are born WANTING after the local app calls w.dilate(). We start + # CONNECTING when we receive PLEASE from the remote side -# * maybe signal warning if we stay in a "want" state for too long -# * nobody sends HINTS until they're ready to receive -# * nobody sends HINTS unless they've called w.dilate() and received PLEASE -# * nobody connects to inbound hints unless they've called w.dilate() -# * if leader calls w.dilate() but not follower, leader waits forever in -# "want" (doesn't send anything) -# * if follower calls w.dilate() but not leader, follower waits forever -# in "want", leader waits forever in "wanted" - - -class ManagerShared(_ManagerBase): - m = MethodicalMachine() - set_trace = getattr(m, "_setTrace", lambda self, f: None) + def start(self): + self.send_please() @m.state(initial=True) - def IDLE(self): - pass # pragma: no cover - - @m.state() def WANTING(self): pass # pragma: no cover - @m.state() - def WANTED(self): - pass # pragma: no cover - @m.state() def CONNECTING(self): pass # pragma: no cover @@ -290,10 +284,6 @@ class ManagerShared(_ManagerBase): def STOPPED(self): pass # pragma: no cover - @m.input() - def start(self): - pass # pragma: no cover - @m.input() def rx_PLEASE(self, message): pass # pragma: no cover @@ -332,9 +322,19 @@ class ManagerShared(_ManagerBase): pass # pragma: no cover @m.output() - def stash_side(self, message): + def choose_role(self, message): their_side = message["side"] - self.my_role = LEADER if self._my_side > their_side else FOLLOWER + if self._my_side > their_side: + self._my_role = LEADER + # scid 0 is reserved for the control channel. the leader uses odd + # numbers starting with 1 + self._next_subchannel_id = 1 + elif their_side > self._my_side: + self._my_role = FOLLOWER + # the follower uses even numbers starting with 2 + self._next_subchannel_id = 2 + else: + raise ValueError("their side shouldn't be equal: reflection?") # these Outputs behave differently for the Leader vs the Follower @m.output() @@ -342,12 +342,22 @@ class ManagerShared(_ManagerBase): self.send_dilation_phase(type="please", side=self._my_side) @m.output() - def start_connecting(self): - self._start_connecting() # TODO: merge + def start_connecting_ignore_message(self, message): + del message # ignored + return self.start_connecting() @m.output() - def ignore_message_start_connecting(self, message): - self.start_connecting() + def start_connecting(self): + assert self._my_role is not None + self._connector = Connector(self._transit_key, + self._transit_relay_location, + self, + self._reactor, self._eventual_queue, + self._no_listen, self._tor, + self._timing, + self._my_side, # needed for relay handshake + self._my_role) + self._connector.start() @m.output() def send_reconnect(self): @@ -374,14 +384,9 @@ class ManagerShared(_ManagerBase): # been told to shut down. self._connection.disconnect() # let connection_lost do cleanup - # we don't start CONNECTING until a local start() plus rx_PLEASE - IDLE.upon(rx_PLEASE, enter=WANTED, outputs=[stash_side]) - IDLE.upon(start, enter=WANTING, outputs=[send_please]) - WANTED.upon(start, enter=CONNECTING, outputs=[ - send_please, start_connecting]) + # we start CONNECTING when we get rx_PLEASE WANTING.upon(rx_PLEASE, enter=CONNECTING, - outputs=[stash_side, - ignore_message_start_connecting]) + outputs=[choose_role, start_connecting_ignore_message]) CONNECTING.upon(connection_made, enter=CONNECTED, outputs=[]) @@ -394,11 +399,11 @@ class ManagerShared(_ManagerBase): # Follower # if we notice a lost connection, just wait for the Leader to notice too CONNECTED.upon(connection_lost_follower, enter=LONELY, outputs=[]) - LONELY.upon(rx_RECONNECT, enter=CONNECTING, outputs=[start_connecting]) + LONELY.upon(rx_RECONNECT, enter=CONNECTING, + outputs=[send_reconnecting, start_connecting]) # but if they notice it first, abandon our (seemingly functional) # connection, then tell them that we're ready to try again - CONNECTED.upon(rx_RECONNECT, enter=ABANDONING, # they noticed loss - outputs=[abandon_connection]) + CONNECTED.upon(rx_RECONNECT, enter=ABANDONING, outputs=[abandon_connection]) ABANDONING.upon(connection_lost_follower, enter=CONNECTING, outputs=[send_reconnecting, start_connecting]) # and if they notice a problem while we're still connecting, abandon our @@ -410,8 +415,6 @@ class ManagerShared(_ManagerBase): start_connecting]) # rx_HINTS never changes state, they're just accepted or ignored - IDLE.upon(rx_HINTS, enter=IDLE, outputs=[]) # too early - WANTED.upon(rx_HINTS, enter=WANTED, outputs=[]) # too early WANTING.upon(rx_HINTS, enter=WANTING, outputs=[]) # too early CONNECTING.upon(rx_HINTS, enter=CONNECTING, outputs=[use_hints]) CONNECTED.upon(rx_HINTS, enter=CONNECTED, outputs=[]) # too late, ignore @@ -420,24 +423,15 @@ class ManagerShared(_ManagerBase): ABANDONING.upon(rx_HINTS, enter=ABANDONING, outputs=[]) # shouldn't happen STOPPING.upon(rx_HINTS, enter=STOPPING, outputs=[]) - IDLE.upon(stop, enter=STOPPED, outputs=[]) - WANTED.upon(stop, enter=STOPPED, outputs=[]) WANTING.upon(stop, enter=STOPPED, outputs=[]) CONNECTING.upon(stop, enter=STOPPED, outputs=[stop_connecting]) CONNECTED.upon(stop, enter=STOPPING, outputs=[abandon_connection]) ABANDONING.upon(stop, enter=STOPPING, outputs=[]) - FLUSHING.upon(stop, enter=STOPPED, outputs=[stop_connecting]) + FLUSHING.upon(stop, enter=STOPPED, outputs=[]) LONELY.upon(stop, enter=STOPPED, outputs=[]) STOPPING.upon(connection_lost_leader, enter=STOPPED, outputs=[]) STOPPING.upon(connection_lost_follower, enter=STOPPED, outputs=[]) - def allocate_subchannel_id(self): - # scid 0 is reserved for the control channel. the leader uses odd - # numbers starting with 1 - scid_num = self._next_outbound_seqnum + 1 - self._next_outbound_seqnum += 2 - return to_be4(scid_num) - @attrs @implementer(IDilator) @@ -477,15 +471,23 @@ class Dilator(object): @inlineCallbacks def _start(self): # first, we wait until we hear the VERSION message, which tells us 1: - # the PAKE key works, so we can talk securely, 2: their side, so we - # know who will lead, and 3: that they can do dilation at all + # the PAKE key works, so we can talk securely, 2: that they can do + # dilation at all (if they can't then w.dilate() errbacks) dilation_version = yield self._got_versions_d - if not dilation_version: # 1 or None + # TODO: we could probably return the endpoints earlier, if we flunk + # any connection/listen attempts upon OldPeerCannotDilateError, or + # if/when we give up on the initial connection + + if not dilation_version: # "1" or None + # TODO: be more specific about the error. dilation_version==None + # means we had no version in common with them, which could either + # be because they're so old they don't dilate at all, or because + # they're so new that they no longer accomodate our old version raise OldPeerCannotDilateError() - my_dilation_side = TODO # random + my_dilation_side = bytes_to_hexstr(os.urandom(6)) self._manager = Manager(self._S, my_dilation_side, self._transit_key, self._transit_relay_location, @@ -497,8 +499,8 @@ class Dilator(object): plaintext = self._pending_inbound_dilate_messages.popleft() self.received_dilate(plaintext) - # we could probably return the endpoints earlier yield self._manager.when_first_connected() + # we can open subchannels as soon as we get our first connection scid0 = b"\x00\x00\x00\x00" self._host_addr = _WormholeAddress() # TODO: share with Manager @@ -508,8 +510,7 @@ class Dilator(object): control_ep._subchannel_zero_opened(sc0) self._manager.set_subchannel_zero(scid0, sc0) - connect_ep = SubchannelConnectorEndpoint( - self._manager, self._host_addr) + connect_ep = SubchannelConnectorEndpoint(self._manager, self._host_addr) listen_ep = SubchannelListenerEndpoint(self._manager, self._host_addr) self._manager.set_listener_endpoint(listen_ep) @@ -526,16 +527,14 @@ class Dilator(object): LENGTH = 32 # TODO: whatever Noise wants, I guess self._transit_key = derive_key(key, purpose, LENGTH) - def got_wormhole_versions(self, our_side, their_side, - their_wormhole_versions): - # TODO: remove our_side, their_side - assert isinstance(our_side, str), str - assert isinstance(their_side, str), str + def got_wormhole_versions(self, their_wormhole_versions): # this always happens before received_dilate dilation_version = None - their_dilation_versions = their_wormhole_versions.get("can-dilate", []) - if 1 in their_dilation_versions: - dilation_version = 1 + their_dilation_versions = set(their_wormhole_versions.get("can-dilate", [])) + my_versions = set(DILATION_VERSIONS) + shared_versions = my_versions.intersection(their_dilation_versions) + if "1" in shared_versions: + dilation_version = "1" self._got_versions_d.callback(dilation_version) def received_dilate(self, plaintext): diff --git a/src/wormhole/wormhole.py b/src/wormhole/wormhole.py index 3734fd2..95f5146 100644 --- a/src/wormhole/wormhole.py +++ b/src/wormhole/wormhole.py @@ -9,6 +9,7 @@ from twisted.internet.task import Cooperator from zope.interface import implementer from ._boss import Boss +from ._dilation.manager import DILATION_VERSIONS from ._dilation.connector import Connector from ._interfaces import IDeferredWormhole, IWormhole from ._key import derive_key @@ -271,7 +272,7 @@ def create( w = _DeferredWormhole(reactor, eq) # this indicates Wormhole capabilities wormhole_versions = { - "can-dilate": [1], + "can-dilate": DILATION_VERSIONS, "dilation-abilities": Connector.get_connection_abilities(), } wormhole_versions["app_versions"] = versions # app-specific capabilities