get most of Manager working and tested

still need to test the subchannel interfaces, and ping/pong/kcm
This commit is contained in:
Brian Warner 2018-12-21 16:51:28 -05:00
parent e19c7d1281
commit e55787c693
3 changed files with 436 additions and 67 deletions

View File

@ -131,25 +131,24 @@ Each `DILATE-n` message is a JSON-encoded dictionary with a `type` field that
has a string value. The dictionary will have other keys that depend upon the
type.
`w.dilate()` triggers transmission of a `please-dilate` record with a set of
versions that can be accepted. Versions use strings, rather than integers, to
support experimental protocols, however there is still a total ordering of
preferability.
`w.dilate()` triggers transmission of a `please` (i.e. "please dilate")
record with a set of versions that can be accepted. Versions use strings,
rather than integers, to support experimental protocols, however there is
still a total ordering of preferability.
```
{ "type": "please-dilate",
{ "type": "please",
"side": "abcdef",
"accepted-versions": ["1"]
}
```
If one side receives a `please-dilate` before `w.dilate()` has been called
locally, the contents are stored in case `w.dilate()` is called in the
future. Once both `w.dilate()` has been called and the peer's `please-dilate`
has been received, the side determines whether it is the Leader or the
Follower. Both sides also compare `accepted-versions` fields to choose the
best mutually-compatible version to use: they should always pick the same
one.
If one side receives a `please` before `w.dilate()` has been called locally,
the contents are stored in case `w.dilate()` is called in the future. Once
both `w.dilate()` has been called and the peer's `please` has been received,
the side determines whether it is the Leader or the Follower. Both sides also
compare `accepted-versions` fields to choose the best mutually-compatible
version to use: they should always pick the same one.
Then both sides begin the connection process for generation 1 by opening
listening sockets and sending `connection-hint` records for each one. After a
@ -465,11 +464,11 @@ side seeking to send a file.
Each subchannel uses a distinct subchannel-id, which is a four-byte
identifier. Both directions share a number space (unlike L4 seqnums), so the
rule is that the Leader side sets the last bit of the last byte to a 0, while
the Follower sets it to a 1. These are not generally treated as integers,
rule is that the Leader side sets the last bit of the last byte to a 1, while
the Follower sets it to a 0. These are not generally treated as integers,
however for the sake of debugging, the implementation generates them with a
simple big-endian-encoded counter (`counter*2+2` for the Leader,
`counter*2+1` for the Follower, with id `0` reserved for the control
simple big-endian-encoded counter (`counter*2+1` for the Leader,
`counter*2+2` for the Follower, with id `0` reserved for the control
channel).
When the `client_ep.connect()` API is called, the Initiator allocates a

View File

@ -188,7 +188,7 @@ class Manager(object):
def connector_connection_lost(self):
self._stop_using_connection()
if self.role is LEADER:
if self._my_role is LEADER:
self.connection_lost_leader() # state machine
else:
self.connection_lost_follower()
@ -256,6 +256,9 @@ class Manager(object):
def start(self):
self.send_please()
def send_please(self):
self.send_dilation_phase(type="please", side=self._my_side)
@m.state(initial=True)
def WANTING(self):
pass # pragma: no cover
@ -278,7 +281,7 @@ class Manager(object):
@m.state()
def LONELY(self):
pass # pragme: no cover
pass # pragma: no cover
@m.state()
def STOPPING(self):
@ -306,7 +309,7 @@ class Manager(object):
# Connector gives us connection_made()
@m.input()
def connection_made(self, c):
def connection_made(self):
pass # pragma: no cover
# our connection_lost() fires connection_lost_leader or
@ -341,17 +344,17 @@ class Manager(object):
raise ValueError("their side shouldn't be equal: reflection?")
# these Outputs behave differently for the Leader vs the Follower
@m.output()
def send_please(self):
self.send_dilation_phase(type="please", side=self._my_side)
@m.output()
def start_connecting_ignore_message(self, message):
del message # ignored
return self.start_connecting()
return self._start_connecting()
@m.output()
def start_connecting(self):
self._start_connecting()
def _start_connecting(self):
assert self._my_role is not None
self._connector = Connector(self._transit_key,
self._transit_relay_location,
@ -447,7 +450,7 @@ class Dilator(object):
before we know whether we'll be the Leader or the Follower. Once we
hear the other side's VERSION message (which tells us that we have a
connection, they are capable of dilating, and which side we're on),
then we build a DilationManager and hand control to it.
then we build a Manager and hand control to it.
"""
_reactor = attrib()
@ -532,6 +535,7 @@ class Dilator(object):
self._transit_key = derive_key(key, purpose, LENGTH)
def got_wormhole_versions(self, their_wormhole_versions):
assert self._transit_key is not None
# this always happens before received_dilate
dilation_version = None
their_dilation_versions = set(their_wormhole_versions.get("can-dilate", []))
@ -554,11 +558,13 @@ class Dilator(object):
message = bytes_to_dict(plaintext)
type = message["type"]
if type == "please":
self._manager.rx_PLEASE() # message)
elif type == "dilate":
self._manager.rx_DILATE() # message)
self._manager.rx_PLEASE(message)
elif type == "connection-hints":
self._manager.rx_HINTS(message)
elif type == "reconnect":
self._manager.rx_RECONNECT()
elif type == "reconnecting":
self._manager.rx_RECONNECTING()
else:
log.err(UnknownDilationMessageType(message))
return

View File

@ -7,10 +7,13 @@ import mock
from ...eventual import EventualQueue
from ..._interfaces import ISend, IDilationManager
from ...util import dict_to_bytes
from ..._dilation.manager import (Dilator,
from ..._dilation import roles
from ..._dilation.encode import to_be4
from ..._dilation.manager import (Dilator, Manager, make_side,
OldPeerCannotDilateError,
UnknownDilationMessageType)
from ..._dilation.subchannel import _WormholeAddress
from ..._dilation.connection import Open, Data, Close, Ack
from .common import clear_mock_calls
@ -30,9 +33,8 @@ def make_dilator():
dil.wire(send)
return dil, send, reactor, eq, clock, coop
class TestDilator(unittest.TestCase):
def test_leader(self):
def test_manager_and_endpoints(self):
dil, send, reactor, eq, clock, coop = make_dilator()
d1 = dil.dilate()
d2 = dil.dilate()
@ -52,16 +54,15 @@ class TestDilator(unittest.TestCase):
m = mock.Mock()
alsoProvides(m, IDilationManager)
m.when_first_connected.return_value = wfc_d = Deferred()
# TODO: test missing can-dilate, and no-overlap
with mock.patch("wormhole._dilation.manager.Manager",
return_value=m) as ml:
with mock.patch("wormhole._dilation.manager.make_side",
return_value="us"):
dil.got_wormhole_versions({"can-dilate": ["1"]})
# that should create the Manager. Because "us" > "them", we're
# the leader
# that should create the Manager
self.assertEqual(ml.mock_calls, [mock.call(send, "us", transit_key,
None, reactor, eq, coop)])
# and tell it to start, and get wait-for-it-to-connect Deferred
self.assertEqual(m.mock_calls, [mock.call.start(),
mock.call.when_first_connected(),
])
@ -107,9 +108,11 @@ class TestDilator(unittest.TestCase):
eq.flush_sync()
self.assertEqual(eps, self.successResultOf(d3))
# all subsequent DILATE-n messages should get passed to the manager
self.assertEqual(m.mock_calls, [])
dil.received_dilate(dict_to_bytes(dict(type="please")))
self.assertEqual(m.mock_calls, [mock.call.rx_PLEASE()])
pleasemsg = dict(type="please", side="them")
dil.received_dilate(dict_to_bytes(pleasemsg))
self.assertEqual(m.mock_calls, [mock.call.rx_PLEASE(pleasemsg)])
clear_mock_calls(m)
hintmsg = dict(type="connection-hints")
@ -117,47 +120,27 @@ class TestDilator(unittest.TestCase):
self.assertEqual(m.mock_calls, [mock.call.rx_HINTS(hintmsg)])
clear_mock_calls(m)
dil.received_dilate(dict_to_bytes(dict(type="dilate")))
self.assertEqual(m.mock_calls, [mock.call.rx_DILATE()])
# we're nominally the LEADER, and the leader would not normally be
# receiving a RECONNECT, but since we've mocked out the Manager it
# won't notice
dil.received_dilate(dict_to_bytes(dict(type="reconnect")))
self.assertEqual(m.mock_calls, [mock.call.rx_RECONNECT()])
clear_mock_calls(m)
dil.received_dilate(dict_to_bytes(dict(type="reconnecting")))
self.assertEqual(m.mock_calls, [mock.call.rx_RECONNECTING()])
clear_mock_calls(m)
dil.received_dilate(dict_to_bytes(dict(type="unknown")))
self.assertEqual(m.mock_calls, [])
self.flushLoggedErrors(UnknownDilationMessageType)
def test_follower(self):
# todo: this no longer proceeds far enough to pick a side
dil, send, reactor, eq, clock, coop = make_dilator()
d1 = dil.dilate()
self.assertNoResult(d1)
self.assertEqual(send.mock_calls, [])
key = b"key"
transit_key = object()
with mock.patch("wormhole._dilation.manager.derive_key",
return_value=transit_key):
dil.got_key(key)
m = mock.Mock()
alsoProvides(m, IDilationManager)
m.when_first_connected.return_value = Deferred()
with mock.patch("wormhole._dilation.manager.Manager", return_value=m) as mf:
with mock.patch("wormhole._dilation.manager.make_side",
return_value="me"):
dil.got_wormhole_versions({"can-dilate": ["1"]})
# we want to dilate (dil.dilate() above), and now we know they *can*
# dilate (got_wormhole_versions), so we create and start the manager
self.assertEqual(mf.mock_calls, [mock.call(send, "me", transit_key,
None, reactor, eq, coop)])
self.assertEqual(m.mock_calls, [mock.call.start(),
mock.call.when_first_connected(),
])
def test_peer_cannot_dilate(self):
dil, send, reactor, eq, clock, coop = make_dilator()
d1 = dil.dilate()
self.assertNoResult(d1)
dil._transit_key = b"\x01"*32
dil.got_wormhole_versions({}) # missing "can-dilate"
eq.flush_sync()
f = self.failureResultOf(d1)
@ -168,6 +151,7 @@ class TestDilator(unittest.TestCase):
d1 = dil.dilate()
self.assertNoResult(d1)
dil._transit_key = b"key"
dil.got_wormhole_versions({"can-dilate": [-1]})
eq.flush_sync()
f = self.failureResultOf(d1)
@ -178,7 +162,8 @@ class TestDilator(unittest.TestCase):
dil._transit_key = b"key"
d1 = dil.dilate()
self.assertNoResult(d1)
dil.received_dilate(dict_to_bytes(dict(type="please")))
pleasemsg = dict(type="please", side="them")
dil.received_dilate(dict_to_bytes(pleasemsg))
hintmsg = dict(type="connection-hints")
dil.received_dilate(dict_to_bytes(hintmsg))
@ -194,7 +179,7 @@ class TestDilator(unittest.TestCase):
self.assertEqual(ml.mock_calls, [mock.call(send, "us", b"key",
None, reactor, eq, coop)])
self.assertEqual(m.mock_calls, [mock.call.start(),
mock.call.rx_PLEASE(),
mock.call.rx_PLEASE(pleasemsg),
mock.call.rx_HINTS(hintmsg),
mock.call.when_first_connected()])
@ -213,3 +198,382 @@ class TestDilator(unittest.TestCase):
relay, reactor, eq, coop),
mock.call().start(),
mock.call().when_first_connected()])
LEADER = "ff3456abcdef"
FOLLOWER = "123456abcdef"
def make_manager(leader=True):
class Holder:
pass
h = Holder()
h.send = mock.Mock()
alsoProvides(h.send, ISend)
if leader:
side = LEADER
else:
side = FOLLOWER
h.key = b"\x00"*32
h.relay = None
h.reactor = object()
h.clock = Clock()
h.eq = EventualQueue(h.clock)
term = mock.Mock(side_effect=lambda: True) # one write per Eventual tick
def term_factory():
return term
h.coop = Cooperator(terminationPredicateFactory=term_factory,
scheduler=h.eq.eventually)
h.inbound = mock.Mock()
h.Inbound = mock.Mock(return_value=h.inbound)
h.outbound = mock.Mock()
h.Outbound = mock.Mock(return_value=h.outbound)
h.hostaddr = object()
with mock.patch("wormhole._dilation.manager.Inbound", h.Inbound):
with mock.patch("wormhole._dilation.manager.Outbound", h.Outbound):
with mock.patch("wormhole._dilation.manager._WormholeAddress",
return_value=h.hostaddr):
m = Manager(h.send, side, h.key, h.relay, h.reactor, h.eq, h.coop)
return m, h
class TestManager(unittest.TestCase):
def test_make_side(self):
side = make_side()
self.assertEqual(type(side), type(u""))
self.assertEqual(len(side), 2*6)
def test_create(self):
m, h = make_manager()
def test_leader(self):
m, h = make_manager(leader=True)
self.assertEqual(h.send.mock_calls, [])
self.assertEqual(h.Inbound.mock_calls, [mock.call(m, h.hostaddr)])
self.assertEqual(h.Outbound.mock_calls, [mock.call(m, h.coop)])
m.start()
self.assertEqual(h.send.mock_calls, [
mock.call.send("dilate-0",
dict_to_bytes({"type": "please", "side": LEADER}))
])
clear_mock_calls(h.send)
wfc_d = m.when_first_connected()
self.assertNoResult(wfc_d)
# ignore early hints
m.rx_HINTS({})
self.assertEqual(h.send.mock_calls, [])
c = mock.Mock()
connector = mock.Mock(return_value=c)
with mock.patch("wormhole._dilation.manager.Connector", connector):
# receiving this PLEASE triggers creation of the Connector
m.rx_PLEASE({"side": FOLLOWER})
self.assertEqual(h.send.mock_calls, [])
self.assertEqual(connector.mock_calls, [
mock.call(b"\x00"*32, None, m, h.reactor, h.eq,
False, # no_listen
None, # tor
None, # timing
LEADER, roles.LEADER),
])
self.assertEqual(c.mock_calls, [mock.call.start()])
clear_mock_calls(connector, c)
self.assertNoResult(wfc_d)
# now any inbound hints should get passed to our Connector
with mock.patch("wormhole._dilation.manager.parse_hint",
side_effect=["p1", None, "p3"]) as ph:
m.rx_HINTS({"hints": [1, 2, 3]})
self.assertEqual(ph.mock_calls, [mock.call(1), mock.call(2), mock.call(3)])
self.assertEqual(c.mock_calls, [mock.call.got_hints(["p1", "p3"])])
clear_mock_calls(ph, c)
# and we send out any (listening) hints from our Connector
m.send_hints([1,2])
self.assertEqual(h.send.mock_calls, [
mock.call.send("dilate-1",
dict_to_bytes({"type": "connection-hints",
"hints": [1, 2]}))
])
clear_mock_calls(h.send)
# the first successful connection fires when_first_connected(), so
# the Dilator can create and return the endpoints
c1 = mock.Mock()
m.connector_connection_made(c1)
self.assertEqual(h.inbound.mock_calls, [mock.call.use_connection(c1)])
self.assertEqual(h.outbound.mock_calls, [mock.call.use_connection(c1)])
clear_mock_calls(h.inbound, h.outbound)
h.eq.flush_sync()
self.successResultOf(wfc_d) # fires with None
wfc_d2 = m.when_first_connected()
h.eq.flush_sync()
self.successResultOf(wfc_d2)
scid0 = b"\x00\x00\x00\x00"
sc0 = mock.Mock()
m.set_subchannel_zero(scid0, sc0)
listen_ep = mock.Mock()
m.set_listener_endpoint(listen_ep)
self.assertEqual(h.inbound.mock_calls, [
mock.call.set_subchannel_zero(scid0, sc0),
mock.call.set_listener_endpoint(listen_ep),
])
clear_mock_calls(h.inbound)
# the Leader making a new outbound channel should get scid=1
scid1 = to_be4(1)
self.assertEqual(m.allocate_subchannel_id(), scid1)
r1 = Open(10, scid1) # seqnum=10
h.outbound.build_record = mock.Mock(return_value=r1)
m.send_open(scid1)
self.assertEqual(h.outbound.mock_calls, [
mock.call.build_record(Open, scid1),
mock.call.queue_and_send_record(r1),
])
clear_mock_calls(h.outbound)
r2 = Data(11, scid1, b"data")
h.outbound.build_record = mock.Mock(return_value=r2)
m.send_data(scid1, b"data")
self.assertEqual(h.outbound.mock_calls, [
mock.call.build_record(Data, scid1, b"data"),
mock.call.queue_and_send_record(r2),
])
clear_mock_calls(h.outbound)
r3 = Close(12, scid1)
h.outbound.build_record = mock.Mock(return_value=r3)
m.send_close(scid1)
self.assertEqual(h.outbound.mock_calls, [
mock.call.build_record(Close, scid1),
mock.call.queue_and_send_record(r3),
])
clear_mock_calls(h.outbound)
# ack the OPEN
m.got_record(Ack(10))
self.assertEqual(h.outbound.mock_calls, [
mock.call.handle_ack(10)
])
clear_mock_calls(h.outbound)
# test that inbound records get acked and routed to Inbound
h.inbound.is_record_old = mock.Mock(return_value=False)
scid2 = to_be4(2)
o200 = Open(200, scid2)
m.got_record(o200)
self.assertEqual(h.outbound.mock_calls, [
mock.call.send_if_connected(Ack(200))
])
self.assertEqual(h.inbound.mock_calls, [
mock.call.is_record_old(o200),
mock.call.update_ack_watermark(200),
mock.call.handle_open(scid2),
])
clear_mock_calls(h.outbound, h.inbound)
# old (duplicate) records should provoke new Acks, but not get
# forwarded
h.inbound.is_record_old = mock.Mock(return_value=True)
m.got_record(o200)
self.assertEqual(h.outbound.mock_calls, [
mock.call.send_if_connected(Ack(200))
])
self.assertEqual(h.inbound.mock_calls, [
mock.call.is_record_old(o200),
])
clear_mock_calls(h.outbound, h.inbound)
# check Data and Close too
h.inbound.is_record_old = mock.Mock(return_value=False)
d201 = Data(201, scid2, b"data")
m.got_record(d201)
self.assertEqual(h.outbound.mock_calls, [
mock.call.send_if_connected(Ack(201))
])
self.assertEqual(h.inbound.mock_calls, [
mock.call.is_record_old(d201),
mock.call.update_ack_watermark(201),
mock.call.handle_data(scid2, b"data"),
])
clear_mock_calls(h.outbound, h.inbound)
c202 = Close(202, scid2)
m.got_record(c202)
self.assertEqual(h.outbound.mock_calls, [
mock.call.send_if_connected(Ack(202))
])
self.assertEqual(h.inbound.mock_calls, [
mock.call.is_record_old(c202),
mock.call.update_ack_watermark(202),
mock.call.handle_close(scid2),
])
clear_mock_calls(h.outbound, h.inbound)
# Now we lose the connection. The Leader should tell the other side
# that we're reconnecting.
m.connector_connection_lost()
self.assertEqual(h.send.mock_calls, [
mock.call.send("dilate-2",
dict_to_bytes({"type": "reconnect"}))
])
self.assertEqual(h.inbound.mock_calls, [
mock.call.stop_using_connection()
])
self.assertEqual(h.outbound.mock_calls, [
mock.call.stop_using_connection()
])
clear_mock_calls(h.send, h.inbound, h.outbound)
# leader does nothing (stays in FLUSHING) until the follower acks by
# sending RECONNECTING
# inbound hints should be ignored during FLUSHING
with mock.patch("wormhole._dilation.manager.parse_hint",
return_value=None) as ph:
m.rx_HINTS({"hints": [1, 2, 3]})
self.assertEqual(ph.mock_calls, []) # ignored
c2 = mock.Mock()
connector2 = mock.Mock(return_value=c2)
with mock.patch("wormhole._dilation.manager.Connector", connector2):
# this triggers creation of a new Connector
m.rx_RECONNECTING()
self.assertEqual(h.send.mock_calls, [])
self.assertEqual(connector2.mock_calls, [
mock.call(b"\x00"*32, None, m, h.reactor, h.eq,
False, # no_listen
None, # tor
None, # timing
LEADER, roles.LEADER),
])
self.assertEqual(c2.mock_calls, [mock.call.start()])
clear_mock_calls(connector2, c2)
self.assertEqual(h.inbound.mock_calls, [])
self.assertEqual(h.outbound.mock_calls, [])
# and a new connection should re-register with Inbound/Outbound,
# which are responsible for re-sending unacked queued messages
c3 = mock.Mock()
m.connector_connection_made(c3)
self.assertEqual(h.inbound.mock_calls, [mock.call.use_connection(c3)])
self.assertEqual(h.outbound.mock_calls, [mock.call.use_connection(c3)])
clear_mock_calls(h.inbound, h.outbound)
def test_follower(self):
m, h = make_manager(leader=False)
m.start()
self.assertEqual(h.send.mock_calls, [
mock.call.send("dilate-0",
dict_to_bytes({"type": "please", "side": FOLLOWER}))
])
clear_mock_calls(h.send)
c = mock.Mock()
connector = mock.Mock(return_value=c)
with mock.patch("wormhole._dilation.manager.Connector", connector):
# receiving this PLEASE triggers creation of the Connector
m.rx_PLEASE({"side": LEADER})
self.assertEqual(h.send.mock_calls, [])
self.assertEqual(connector.mock_calls, [
mock.call(b"\x00"*32, None, m, h.reactor, h.eq,
False, # no_listen
None, # tor
None, # timing
FOLLOWER, roles.FOLLOWER),
])
self.assertEqual(c.mock_calls, [mock.call.start()])
clear_mock_calls(connector, c)
# get connected, then lose the connection
c1 = mock.Mock()
m.connector_connection_made(c1)
self.assertEqual(h.inbound.mock_calls, [mock.call.use_connection(c1)])
self.assertEqual(h.outbound.mock_calls, [mock.call.use_connection(c1)])
clear_mock_calls(h.inbound, h.outbound)
# now lose the connection. As the follower, we don't notify the
# leader, we just wait for them to notice
m.connector_connection_lost()
self.assertEqual(h.send.mock_calls, [])
self.assertEqual(h.inbound.mock_calls, [
mock.call.stop_using_connection()
])
self.assertEqual(h.outbound.mock_calls, [
mock.call.stop_using_connection()
])
clear_mock_calls(h.send, h.inbound, h.outbound)
# now we get a RECONNECT: we should send RECONNECTING
c2 = mock.Mock()
connector2 = mock.Mock(return_value=c2)
with mock.patch("wormhole._dilation.manager.Connector", connector2):
m.rx_RECONNECT()
self.assertEqual(h.send.mock_calls, [
mock.call.send("dilate-1",
dict_to_bytes({"type": "reconnecting"}))
])
self.assertEqual(connector2.mock_calls, [
mock.call(b"\x00"*32, None, m, h.reactor, h.eq,
False, # no_listen
None, # tor
None, # timing
FOLLOWER, roles.FOLLOWER),
])
self.assertEqual(c2.mock_calls, [mock.call.start()])
clear_mock_calls(connector2, c2)
# while we're trying to connect, we get told to stop again, so we
# should abandon the connection attempt and start another
c3 = mock.Mock()
connector3 = mock.Mock(return_value=c3)
with mock.patch("wormhole._dilation.manager.Connector", connector3):
m.rx_RECONNECT()
self.assertEqual(c2.mock_calls, [mock.call.stop()])
self.assertEqual(connector3.mock_calls, [
mock.call(b"\x00"*32, None, m, h.reactor, h.eq,
False, # no_listen
None, # tor
None, # timing
FOLLOWER, roles.FOLLOWER),
])
self.assertEqual(c3.mock_calls, [mock.call.start()])
clear_mock_calls(c2, connector3, c3)
m.connector_connection_made(c3)
# finally if we're already connected, rx_RECONNECT means we should
# abandon this connection (even though it still looks ok to us), then
# when the attempt is finished stopping, we should start another
m.rx_RECONNECT()
c4 = mock.Mock()
connector4 = mock.Mock(return_value=c4)
with mock.patch("wormhole._dilation.manager.Connector", connector4):
m.connector_connection_lost()
self.assertEqual(c3.mock_calls, [mock.call.disconnect()])
self.assertEqual(connector4.mock_calls, [
mock.call(b"\x00"*32, None, m, h.reactor, h.eq,
False, # no_listen
None, # tor
None, # timing
FOLLOWER, roles.FOLLOWER),
])
self.assertEqual(c4.mock_calls, [mock.call.start()])
clear_mock_calls(c3, connector4, c4)
def test_stop(self):
pass
def test_mirror(self):
# receive a PLEASE with the same side as us: shouldn't happen
pass