diff --git a/docs/state-machines/Makefile b/docs/state-machines/Makefile index a9072c3..ea42875 100644 --- a/docs/state-machines/Makefile +++ b/docs/state-machines/Makefile @@ -1,7 +1,7 @@ default: images -images: allocator.png boss.png code.png input.png key.png lister.png machines.png mailbox.png nameplate.png order.png receive.png send.png terminator.png +images: allocator.png boss.png code.png input.png key.png lister.png machines.png mailbox.png nameplate.png order.png receive.png send.png terminator.png dilation.png .PHONY: default images diff --git a/docs/state-machines/dilation.dot b/docs/state-machines/dilation.dot new file mode 100644 index 0000000..1465802 --- /dev/null +++ b/docs/state-machines/dilation.dot @@ -0,0 +1,32 @@ +digraph { + Manager [label="Manager" shape="box" color="blue" fontcolor="blue"] + Connector [label="Connector" shape="oval"] + Framer [label="Framer"] + DCP [label="Dilated\nConnection\nProtocol"] + + DCP -> Connector [style="dashed" label="add_candidate\n"] + + Record [label="Record"] + Record -> Framer [style="dashed" label="connectionMade\nsend_frame"] + Record -> Framer [style="dashed" label="add_and_parse (-> tokens)"] + + ITransport -> DCP [style="dashed" label="connectionMade\ndataReceived\nconnectionLost"] + Framer -> ITransport [style="dashed" label="write"] + Manager -> DCP [style="dashed" color="green" label="disconnect"] + DCP -> Manager [style="dashed" color="green" label="got_record CClost"] + DCP -> Record [style="dashed" label="set_role\nconnectionMade\nsend_record"] + DCP -> Record [style="dashed" label="add_and_unframe (-> tokens)"] + Manager -> Connector [style="dashed" label="start\ngot_hints\nstop"] + Connector -> Manager [style="dashed" color="green" label="CCmade"] + Connector -> DCP [color="green" fontcolor="blue" label="select\nsend_record(KCM)"] + Connector -> DCP [color="red" fontcolor="red" label="disconnect"] + Connector -> Connector [color="green" fontcolor="green" label="accept"] + + Inbound [label="Inbound" shape="box" color="blue" fontcolor="blue"] + Manager -> Inbound [style="dashed" label="use_connection"] + Inbound -> DCP [style="dashed" label="pauseProducing\nresumeProducing"] + + Outbound [label="Outbound" shape="box" color="blue" fontcolor="blue"] + Manager -> Outbound [style="dashed" label="use_connection"] + Outbound -> DCP [style="dashed" label="send_record\ntransport.(un)registerProducer"] +} diff --git a/src/wormhole/_dilation/connector.py b/src/wormhole/_dilation/connector.py index 83914e3..2ff6ae0 100644 --- a/src/wormhole/_dilation/connector.py +++ b/src/wormhole/_dilation/connector.py @@ -400,12 +400,15 @@ class Connector(object): # our Connection protocols call: add_candidate -@attrs +@attrs(repr=False) class OutboundConnectionFactory(ClientFactory, object): _connector = attrib(validator=provides(IDilationConnector)) _relay_handshake = attrib(validator=optional(instance_of(bytes))) _description = attrib() + def __repr__(self): + return "OutboundConnectionFactory(%s %s)" % (self._connector._role, self._description) + def buildProtocol(self, addr): p = self._connector.build_protocol(addr, self._description) p.factory = self @@ -420,10 +423,13 @@ def describe_inbound(addr): return "<-tcp:%s:%d" % (addr.host, addr.port) return "<-%r" % addr -@attrs +@attrs(repr=False) class InboundConnectionFactory(ServerFactory, object): _connector = attrib(validator=provides(IDilationConnector)) + def __repr__(self): + return "InboundConnectionFactory(%s)" % (self._connector._role) + def buildProtocol(self, addr): description = describe_inbound(addr) p = self._connector.build_protocol(addr, description) diff --git a/src/wormhole/test/dilate/test_full.py b/src/wormhole/test/dilate/test_full.py index 16d990a..50fd3a2 100644 --- a/src/wormhole/test/dilate/test_full.py +++ b/src/wormhole/test/dilate/test_full.py @@ -5,7 +5,7 @@ from twisted.internet.defer import Deferred, inlineCallbacks, gatherResults from twisted.internet.protocol import Protocol, Factory from twisted.trial import unittest -from ..common import ServerBase +from ..common import ServerBase, poll_until from ...eventual import EventualQueue from ..._dilation._noise import NoiseConnection @@ -75,3 +75,106 @@ class Full(ServerBase, unittest.TestCase): yield w2.close() test_full.timeout = 30 + + +class ReconP(Protocol): + def eventually(self, which, data): + d = self.factory.deferreds[which] + self.factory.eq.fire_eventually(data).addCallback(d.callback) + def connectionMade(self): + self.eventually("connectionMade", self) + #self.transport.write(b"hello\n") + def dataReceived(self, data): + self.eventually("dataReceived", data) + def connectionLost(self, why): + self.eventually("connectionLost", (self, why)) + +class ReconF(Factory): + protocol = ReconP + def __init__(self, eq): + Factory.__init__(self) + self.eq = eq + self.deferreds = {} + for name in ["connectionMade", "dataReceived", "connectionLost"]: + self.deferreds[name] = Deferred() + def resetDeferred(self, name): + d = Deferred() + self.deferreds[name] = d + return d + +class Reconnect(ServerBase, unittest.TestCase): + @inlineCallbacks + def setUp(self): + if not NoiseConnection: + raise unittest.SkipTest("noiseprotocol unavailable") + # test_welcome wants to see [current_cli_version] + yield self._setup_relay(None) + + @inlineCallbacks + def test_reconnect(self): + eq = EventualQueue(reactor) + w1 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True) + w2 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True) + w1.allocate_code() + code = yield w1.get_code() + w2.set_code(code) + yield doBoth(w1.get_verifier(), w2.get_verifier()) + + eps1_d = w1.dilate() + eps2_d = w2.dilate() + (eps1, eps2) = yield doBoth(eps1_d, eps2_d) + (control_ep1, connect_ep1, listen_ep1) = eps1 + (control_ep2, connect_ep2, listen_ep2) = eps2 + + f1 = ReconF(eq); f2 = ReconF(eq) + d1 = control_ep1.connect(f1); d2 = control_ep2.connect(f2) + yield d1 + yield d2 + + protocols = {} + def p_connected(p, index): + protocols[index] = p + msg = "hello from %s\n" % index + p.transport.write(msg.encode("ascii")) + f1.deferreds["connectionMade"].addCallback(p_connected, 1) + f2.deferreds["connectionMade"].addCallback(p_connected, 2) + + data1 = yield f1.deferreds["dataReceived"] + data2 = yield f2.deferreds["dataReceived"] + self.assertEqual(data1, b"hello from 2\n") + self.assertEqual(data2, b"hello from 1\n") + # the ACKs are now in flight and may not arrive before we kill the + # connection + + f1.resetDeferred("connectionMade") + f2.resetDeferred("connectionMade") + d1 = f1.resetDeferred("dataReceived") + d2 = f2.resetDeferred("dataReceived") + + # now we reach inside and drop the connection + sc = protocols[1].transport + orig_connection = sc._manager._connection + orig_connection.disconnect() + + # stall until the connection has been replaced + yield poll_until(lambda: sc._manager._connection + and (orig_connection != sc._manager._connection)) + + # now write some more data, which should travel over the new + # connection + protocols[1].transport.write(b"more\n") + data2 = yield d2 + self.assertEqual(data2, b"more\n") + + replacement_connection = sc._manager._connection + self.assertNotEqual(orig_connection, replacement_connection) + + # the application-visible Protocol should not observe the + # interruption + self.assertNoResult(f1.deferreds["connectionMade"]) + self.assertNoResult(f2.deferreds["connectionMade"]) + self.assertNoResult(f1.deferreds["connectionLost"]) + self.assertNoResult(f2.deferreds["connectionLost"]) + + yield w1.close() + yield w2.close()