dilate/test_full: test reconnection

This commit is contained in:
Brian Warner 2019-07-05 15:47:49 -07:00
parent 98cce7fab7
commit ee8c1acffa

View File

@ -5,7 +5,7 @@ from twisted.internet.defer import Deferred, inlineCallbacks, gatherResults
from twisted.internet.protocol import Protocol, Factory from twisted.internet.protocol import Protocol, Factory
from twisted.trial import unittest from twisted.trial import unittest
from ..common import ServerBase from ..common import ServerBase, poll_until
from ...eventual import EventualQueue from ...eventual import EventualQueue
from ..._dilation._noise import NoiseConnection from ..._dilation._noise import NoiseConnection
@ -75,3 +75,106 @@ class Full(ServerBase, unittest.TestCase):
yield w2.close() yield w2.close()
test_full.timeout = 30 test_full.timeout = 30
class ReconP(Protocol):
def eventually(self, which, data):
d = self.factory.deferreds[which]
self.factory.eq.fire_eventually(data).addCallback(d.callback)
def connectionMade(self):
self.eventually("connectionMade", self)
#self.transport.write(b"hello\n")
def dataReceived(self, data):
self.eventually("dataReceived", data)
def connectionLost(self, why):
self.eventually("connectionLost", (self, why))
class ReconF(Factory):
protocol = ReconP
def __init__(self, eq):
Factory.__init__(self)
self.eq = eq
self.deferreds = {}
for name in ["connectionMade", "dataReceived", "connectionLost"]:
self.deferreds[name] = Deferred()
def resetDeferred(self, name):
d = Deferred()
self.deferreds[name] = d
return d
class Reconnect(ServerBase, unittest.TestCase):
@inlineCallbacks
def setUp(self):
if not NoiseConnection:
raise unittest.SkipTest("noiseprotocol unavailable")
# test_welcome wants to see [current_cli_version]
yield self._setup_relay(None)
@inlineCallbacks
def test_reconnect(self):
eq = EventualQueue(reactor)
w1 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True)
w2 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True)
w1.allocate_code()
code = yield w1.get_code()
w2.set_code(code)
yield doBoth(w1.get_verifier(), w2.get_verifier())
eps1_d = w1.dilate()
eps2_d = w2.dilate()
(eps1, eps2) = yield doBoth(eps1_d, eps2_d)
(control_ep1, connect_ep1, listen_ep1) = eps1
(control_ep2, connect_ep2, listen_ep2) = eps2
f1 = ReconF(eq); f2 = ReconF(eq)
d1 = control_ep1.connect(f1); d2 = control_ep2.connect(f2)
yield d1
yield d2
protocols = {}
def p_connected(p, index):
protocols[index] = p
msg = "hello from %s\n" % index
p.transport.write(msg.encode("ascii"))
f1.deferreds["connectionMade"].addCallback(p_connected, 1)
f2.deferreds["connectionMade"].addCallback(p_connected, 2)
data1 = yield f1.deferreds["dataReceived"]
data2 = yield f2.deferreds["dataReceived"]
self.assertEqual(data1, b"hello from 2\n")
self.assertEqual(data2, b"hello from 1\n")
# the ACKs are now in flight and may not arrive before we kill the
# connection
f1.resetDeferred("connectionMade")
f2.resetDeferred("connectionMade")
d1 = f1.resetDeferred("dataReceived")
d2 = f2.resetDeferred("dataReceived")
# now we reach inside and drop the connection
sc = protocols[1].transport
orig_connection = sc._manager._connection
orig_connection.disconnect()
# stall until the connection has been replaced
yield poll_until(lambda: sc._manager._connection
and (orig_connection != sc._manager._connection))
# now write some more data, which should travel over the new
# connection
protocols[1].transport.write(b"more\n")
data2 = yield d2
self.assertEqual(data2, b"more\n")
replacement_connection = sc._manager._connection
self.assertNotEqual(orig_connection, replacement_connection)
# the application-visible Protocol should not observe the
# interruption
self.assertNoResult(f1.deferreds["connectionMade"])
self.assertNoResult(f2.deferreds["connectionMade"])
self.assertNoResult(f1.deferreds["connectionLost"])
self.assertNoResult(f2.deferreds["connectionLost"])
yield w1.close()
yield w2.close()