Merge branch 'dilation-testing'
This commit is contained in:
commit
3a4b9a3b67
|
@ -1,7 +1,7 @@
|
||||||
|
|
||||||
default: images
|
default: images
|
||||||
|
|
||||||
images: allocator.png boss.png code.png input.png key.png lister.png machines.png mailbox.png nameplate.png order.png receive.png send.png terminator.png
|
images: allocator.png boss.png code.png input.png key.png lister.png machines.png mailbox.png nameplate.png order.png receive.png send.png terminator.png dilation.png
|
||||||
|
|
||||||
.PHONY: default images
|
.PHONY: default images
|
||||||
|
|
||||||
|
|
32
docs/state-machines/dilation.dot
Normal file
32
docs/state-machines/dilation.dot
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
digraph {
|
||||||
|
Manager [label="Manager" shape="box" color="blue" fontcolor="blue"]
|
||||||
|
Connector [label="Connector" shape="oval"]
|
||||||
|
Framer [label="Framer"]
|
||||||
|
DCP [label="Dilated\nConnection\nProtocol"]
|
||||||
|
|
||||||
|
DCP -> Connector [style="dashed" label="add_candidate\n"]
|
||||||
|
|
||||||
|
Record [label="Record"]
|
||||||
|
Record -> Framer [style="dashed" label="connectionMade\nsend_frame"]
|
||||||
|
Record -> Framer [style="dashed" label="add_and_parse (-> tokens)"]
|
||||||
|
|
||||||
|
ITransport -> DCP [style="dashed" label="connectionMade\ndataReceived\nconnectionLost"]
|
||||||
|
Framer -> ITransport [style="dashed" label="write"]
|
||||||
|
Manager -> DCP [style="dashed" color="green" label="disconnect"]
|
||||||
|
DCP -> Manager [style="dashed" color="green" label="got_record CClost"]
|
||||||
|
DCP -> Record [style="dashed" label="set_role\nconnectionMade\nsend_record"]
|
||||||
|
DCP -> Record [style="dashed" label="add_and_unframe (-> tokens)"]
|
||||||
|
Manager -> Connector [style="dashed" label="start\ngot_hints\nstop"]
|
||||||
|
Connector -> Manager [style="dashed" color="green" label="CCmade"]
|
||||||
|
Connector -> DCP [color="green" fontcolor="blue" label="select\nsend_record(KCM)"]
|
||||||
|
Connector -> DCP [color="red" fontcolor="red" label="disconnect"]
|
||||||
|
Connector -> Connector [color="green" fontcolor="green" label="accept"]
|
||||||
|
|
||||||
|
Inbound [label="Inbound" shape="box" color="blue" fontcolor="blue"]
|
||||||
|
Manager -> Inbound [style="dashed" label="use_connection"]
|
||||||
|
Inbound -> DCP [style="dashed" label="pauseProducing\nresumeProducing"]
|
||||||
|
|
||||||
|
Outbound [label="Outbound" shape="box" color="blue" fontcolor="blue"]
|
||||||
|
Manager -> Outbound [style="dashed" label="use_connection"]
|
||||||
|
Outbound -> DCP [style="dashed" label="send_record\ntransport.(un)registerProducer"]
|
||||||
|
}
|
|
@ -400,12 +400,15 @@ class Connector(object):
|
||||||
# our Connection protocols call: add_candidate
|
# our Connection protocols call: add_candidate
|
||||||
|
|
||||||
|
|
||||||
@attrs
|
@attrs(repr=False)
|
||||||
class OutboundConnectionFactory(ClientFactory, object):
|
class OutboundConnectionFactory(ClientFactory, object):
|
||||||
_connector = attrib(validator=provides(IDilationConnector))
|
_connector = attrib(validator=provides(IDilationConnector))
|
||||||
_relay_handshake = attrib(validator=optional(instance_of(bytes)))
|
_relay_handshake = attrib(validator=optional(instance_of(bytes)))
|
||||||
_description = attrib()
|
_description = attrib()
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "OutboundConnectionFactory(%s %s)" % (self._connector._role, self._description)
|
||||||
|
|
||||||
def buildProtocol(self, addr):
|
def buildProtocol(self, addr):
|
||||||
p = self._connector.build_protocol(addr, self._description)
|
p = self._connector.build_protocol(addr, self._description)
|
||||||
p.factory = self
|
p.factory = self
|
||||||
|
@ -420,10 +423,13 @@ def describe_inbound(addr):
|
||||||
return "<-tcp:%s:%d" % (addr.host, addr.port)
|
return "<-tcp:%s:%d" % (addr.host, addr.port)
|
||||||
return "<-%r" % addr
|
return "<-%r" % addr
|
||||||
|
|
||||||
@attrs
|
@attrs(repr=False)
|
||||||
class InboundConnectionFactory(ServerFactory, object):
|
class InboundConnectionFactory(ServerFactory, object):
|
||||||
_connector = attrib(validator=provides(IDilationConnector))
|
_connector = attrib(validator=provides(IDilationConnector))
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "InboundConnectionFactory(%s)" % (self._connector._role)
|
||||||
|
|
||||||
def buildProtocol(self, addr):
|
def buildProtocol(self, addr):
|
||||||
description = describe_inbound(addr)
|
description = describe_inbound(addr)
|
||||||
p = self._connector.build_protocol(addr, description)
|
p = self._connector.build_protocol(addr, description)
|
||||||
|
|
|
@ -5,7 +5,7 @@ from twisted.internet.defer import Deferred, inlineCallbacks, gatherResults
|
||||||
from twisted.internet.protocol import Protocol, Factory
|
from twisted.internet.protocol import Protocol, Factory
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
|
|
||||||
from ..common import ServerBase
|
from ..common import ServerBase, poll_until
|
||||||
from ...eventual import EventualQueue
|
from ...eventual import EventualQueue
|
||||||
from ..._dilation._noise import NoiseConnection
|
from ..._dilation._noise import NoiseConnection
|
||||||
|
|
||||||
|
@ -75,3 +75,106 @@ class Full(ServerBase, unittest.TestCase):
|
||||||
yield w2.close()
|
yield w2.close()
|
||||||
|
|
||||||
test_full.timeout = 30
|
test_full.timeout = 30
|
||||||
|
|
||||||
|
|
||||||
|
class ReconP(Protocol):
|
||||||
|
def eventually(self, which, data):
|
||||||
|
d = self.factory.deferreds[which]
|
||||||
|
self.factory.eq.fire_eventually(data).addCallback(d.callback)
|
||||||
|
def connectionMade(self):
|
||||||
|
self.eventually("connectionMade", self)
|
||||||
|
#self.transport.write(b"hello\n")
|
||||||
|
def dataReceived(self, data):
|
||||||
|
self.eventually("dataReceived", data)
|
||||||
|
def connectionLost(self, why):
|
||||||
|
self.eventually("connectionLost", (self, why))
|
||||||
|
|
||||||
|
class ReconF(Factory):
|
||||||
|
protocol = ReconP
|
||||||
|
def __init__(self, eq):
|
||||||
|
Factory.__init__(self)
|
||||||
|
self.eq = eq
|
||||||
|
self.deferreds = {}
|
||||||
|
for name in ["connectionMade", "dataReceived", "connectionLost"]:
|
||||||
|
self.deferreds[name] = Deferred()
|
||||||
|
def resetDeferred(self, name):
|
||||||
|
d = Deferred()
|
||||||
|
self.deferreds[name] = d
|
||||||
|
return d
|
||||||
|
|
||||||
|
class Reconnect(ServerBase, unittest.TestCase):
|
||||||
|
@inlineCallbacks
|
||||||
|
def setUp(self):
|
||||||
|
if not NoiseConnection:
|
||||||
|
raise unittest.SkipTest("noiseprotocol unavailable")
|
||||||
|
# test_welcome wants to see [current_cli_version]
|
||||||
|
yield self._setup_relay(None)
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def test_reconnect(self):
|
||||||
|
eq = EventualQueue(reactor)
|
||||||
|
w1 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True)
|
||||||
|
w2 = wormhole.create(APPID, self.relayurl, reactor, _enable_dilate=True)
|
||||||
|
w1.allocate_code()
|
||||||
|
code = yield w1.get_code()
|
||||||
|
w2.set_code(code)
|
||||||
|
yield doBoth(w1.get_verifier(), w2.get_verifier())
|
||||||
|
|
||||||
|
eps1_d = w1.dilate()
|
||||||
|
eps2_d = w2.dilate()
|
||||||
|
(eps1, eps2) = yield doBoth(eps1_d, eps2_d)
|
||||||
|
(control_ep1, connect_ep1, listen_ep1) = eps1
|
||||||
|
(control_ep2, connect_ep2, listen_ep2) = eps2
|
||||||
|
|
||||||
|
f1 = ReconF(eq); f2 = ReconF(eq)
|
||||||
|
d1 = control_ep1.connect(f1); d2 = control_ep2.connect(f2)
|
||||||
|
yield d1
|
||||||
|
yield d2
|
||||||
|
|
||||||
|
protocols = {}
|
||||||
|
def p_connected(p, index):
|
||||||
|
protocols[index] = p
|
||||||
|
msg = "hello from %s\n" % index
|
||||||
|
p.transport.write(msg.encode("ascii"))
|
||||||
|
f1.deferreds["connectionMade"].addCallback(p_connected, 1)
|
||||||
|
f2.deferreds["connectionMade"].addCallback(p_connected, 2)
|
||||||
|
|
||||||
|
data1 = yield f1.deferreds["dataReceived"]
|
||||||
|
data2 = yield f2.deferreds["dataReceived"]
|
||||||
|
self.assertEqual(data1, b"hello from 2\n")
|
||||||
|
self.assertEqual(data2, b"hello from 1\n")
|
||||||
|
# the ACKs are now in flight and may not arrive before we kill the
|
||||||
|
# connection
|
||||||
|
|
||||||
|
f1.resetDeferred("connectionMade")
|
||||||
|
f2.resetDeferred("connectionMade")
|
||||||
|
d1 = f1.resetDeferred("dataReceived")
|
||||||
|
d2 = f2.resetDeferred("dataReceived")
|
||||||
|
|
||||||
|
# now we reach inside and drop the connection
|
||||||
|
sc = protocols[1].transport
|
||||||
|
orig_connection = sc._manager._connection
|
||||||
|
orig_connection.disconnect()
|
||||||
|
|
||||||
|
# stall until the connection has been replaced
|
||||||
|
yield poll_until(lambda: sc._manager._connection
|
||||||
|
and (orig_connection != sc._manager._connection))
|
||||||
|
|
||||||
|
# now write some more data, which should travel over the new
|
||||||
|
# connection
|
||||||
|
protocols[1].transport.write(b"more\n")
|
||||||
|
data2 = yield d2
|
||||||
|
self.assertEqual(data2, b"more\n")
|
||||||
|
|
||||||
|
replacement_connection = sc._manager._connection
|
||||||
|
self.assertNotEqual(orig_connection, replacement_connection)
|
||||||
|
|
||||||
|
# the application-visible Protocol should not observe the
|
||||||
|
# interruption
|
||||||
|
self.assertNoResult(f1.deferreds["connectionMade"])
|
||||||
|
self.assertNoResult(f2.deferreds["connectionMade"])
|
||||||
|
self.assertNoResult(f1.deferreds["connectionLost"])
|
||||||
|
self.assertNoResult(f2.deferreds["connectionLost"])
|
||||||
|
|
||||||
|
yield w1.close()
|
||||||
|
yield w2.close()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user