diff --git a/src/wormhole/_dilation/manager.py b/src/wormhole/_dilation/manager.py index 574c7b2..581a721 100644 --- a/src/wormhole/_dilation/manager.py +++ b/src/wormhole/_dilation/manager.py @@ -116,6 +116,7 @@ class Manager(object): self._made_first_connection = False self._first_connected = OneShotObserver(self._eventual_queue) self._stopped = OneShotObserver(self._eventual_queue) + self._debug_stall_connector = False self._next_dilation_phase = 0 @@ -382,6 +383,11 @@ class Manager(object): self._timing, self._my_side, # needed for relay handshake self._my_role) + if self._debug_stall_connector: + # unit tests use this hook to send messages while we know we + # don't have a connection + self._eventual_queue.eventually(self._debug_stall_connector, self._connector) + return self._connector.start() @m.output() diff --git a/src/wormhole/test/dilate/test_full.py b/src/wormhole/test/dilate/test_full.py index 50fd3a2..ee089c7 100644 --- a/src/wormhole/test/dilate/test_full.py +++ b/src/wormhole/test/dilate/test_full.py @@ -6,6 +6,7 @@ from twisted.internet.protocol import Protocol, Factory from twisted.trial import unittest from ..common import ServerBase, poll_until +from ..._interfaces import IDilationConnector from ...eventual import EventualQueue from ..._dilation._noise import NoiseConnection @@ -178,3 +179,87 @@ class Reconnect(ServerBase, unittest.TestCase): yield w1.close() yield w2.close() + + @inlineCallbacks + def test_data_while_offline(self): + eq = EventualQueue(reactor) + w1 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True) + w2 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True) + w1.allocate_code() + code = yield w1.get_code() + w2.set_code(code) + yield doBoth(w1.get_verifier(), w2.get_verifier()) + + eps1_d = w1.dilate() + eps2_d = w2.dilate() + (eps1, eps2) = yield doBoth(eps1_d, eps2_d) + (control_ep1, connect_ep1, listen_ep1) = eps1 + (control_ep2, connect_ep2, listen_ep2) = eps2 + + f1 = ReconF(eq); f2 = ReconF(eq) + d1 = control_ep1.connect(f1); d2 = control_ep2.connect(f2) + yield d1 + yield d2 + + protocols = {} + def p_connected(p, index): + protocols[index] = p + msg = "hello from %s\n" % index + p.transport.write(msg.encode("ascii")) + f1.deferreds["connectionMade"].addCallback(p_connected, 1) + f2.deferreds["connectionMade"].addCallback(p_connected, 2) + + data1 = yield f1.deferreds["dataReceived"] + data2 = yield f2.deferreds["dataReceived"] + self.assertEqual(data1, b"hello from 2\n") + self.assertEqual(data2, b"hello from 1\n") + # the ACKs are now in flight and may not arrive before we kill the + # connection + + f1.resetDeferred("connectionMade") + f2.resetDeferred("connectionMade") + d1 = f1.resetDeferred("dataReceived") + d2 = f2.resetDeferred("dataReceived") + + # switch off connections + assert w1._boss._D._manager._debug_stall_connector == False + cd1 = Deferred(); cd2 = Deferred() + w1._boss._D._manager._debug_stall_connector = cd1.callback + w2._boss._D._manager._debug_stall_connector = cd2.callback + + # now we reach inside and drop the connection + sc = protocols[1].transport + orig_connection = sc._manager._connection + orig_connection.disconnect() + + c1 = yield cd1 + c2 = yield cd2 + assert IDilationConnector.providedBy(c1) + assert IDilationConnector.providedBy(c2) + assert c1 is not orig_connection + w1._boss._D._manager._debug_stall_connector = False + w2._boss._D._manager._debug_stall_connector = False + + # now write some data while the connection is definitely offline + protocols[1].transport.write(b"more 1->2\n") + protocols[2].transport.write(b"more 2->1\n") + + # allow the connections to proceed + c1.start() + c2.start() + + # and wait for the data to arrive + data2 = yield d2 + self.assertEqual(data2, b"more 1->2\n") + data1 = yield d1 + self.assertEqual(data1, b"more 2->1\n") + + # the application-visible Protocol should not observe the + # interruption + self.assertNoResult(f1.deferreds["connectionMade"]) + self.assertNoResult(f2.deferreds["connectionMade"]) + self.assertNoResult(f1.deferreds["connectionLost"]) + self.assertNoResult(f2.deferreds["connectionLost"]) + + yield w1.close() + yield w2.close()