Merge branch '180-websocket-error'
This commit is contained in:
commit
3b0a8de114
|
@ -151,7 +151,7 @@ class RendezvousConnector(object):
|
||||||
# from our ClientService
|
# from our ClientService
|
||||||
def _initial_connection_failed(self, f):
|
def _initial_connection_failed(self, f):
|
||||||
if not self._stopping:
|
if not self._stopping:
|
||||||
sce = errors.ServerConnectionError(f.value)
|
sce = errors.ServerConnectionError(self._url, f.value)
|
||||||
d = defer.maybeDeferred(self._connector.stopService)
|
d = defer.maybeDeferred(self._connector.stopService)
|
||||||
# this should happen right away: the ClientService ought to be in
|
# this should happen right away: the ClientService ought to be in
|
||||||
# the "_waiting" state, and everything in the _waiting.stop
|
# the "_waiting" state, and everything in the _waiting.stop
|
||||||
|
@ -201,12 +201,36 @@ class RendezvousConnector(object):
|
||||||
|
|
||||||
def ws_close(self, wasClean, code, reason):
|
def ws_close(self, wasClean, code, reason):
|
||||||
self._debug("R.lost")
|
self._debug("R.lost")
|
||||||
|
was_open = bool(self._ws)
|
||||||
self._ws = None
|
self._ws = None
|
||||||
|
# when Autobahn connects to a non-websocket server, it gets a
|
||||||
|
# CLOSE_STATUS_CODE_ABNORMAL_CLOSE, and delivers onClose() without
|
||||||
|
# ever calling onOpen first. This confuses our state machines, so
|
||||||
|
# avoid telling them we've lost the connection unless we'd previously
|
||||||
|
# told them we'd connected.
|
||||||
|
if was_open:
|
||||||
self._N.lost()
|
self._N.lost()
|
||||||
self._M.lost()
|
self._M.lost()
|
||||||
self._L.lost()
|
self._L.lost()
|
||||||
self._A.lost()
|
self._A.lost()
|
||||||
|
|
||||||
|
# and if this happens on the very first connection, then we treat it
|
||||||
|
# as a failed initial connection, even though ClientService didn't
|
||||||
|
# notice it. There's a Twisted ticket (#8375) about giving
|
||||||
|
# ClientService an extra setup function to use, so it can tell
|
||||||
|
# whether post-connection negotiation was successful or not, and
|
||||||
|
# restart the process if it fails. That would be useful here, so that
|
||||||
|
# failAfterFailures=1 would do the right thing if the initial TCP
|
||||||
|
# connection succeeds but the first WebSocket negotiation fails.
|
||||||
|
if not self._have_made_a_successful_connection:
|
||||||
|
# shut down the ClientService, which currently thinks it has a
|
||||||
|
# valid connection
|
||||||
|
sce = errors.ServerConnectionError(self._url, reason)
|
||||||
|
d = defer.maybeDeferred(self._connector.stopService)
|
||||||
|
d.addErrback(log.err) # just in case something goes wrong
|
||||||
|
# tell the Boss to quit and inform the user
|
||||||
|
d.addCallback(lambda _: self._B.error(sce))
|
||||||
|
|
||||||
# internal
|
# internal
|
||||||
def _stopped(self, res):
|
def _stopped(self, res):
|
||||||
self._T.stopped()
|
self._T.stopped()
|
||||||
|
|
|
@ -120,8 +120,9 @@ def _dispatch_command(reactor, cfg, command):
|
||||||
print(u"TransferError: %s" % six.text_type(e), file=cfg.stderr)
|
print(u"TransferError: %s" % six.text_type(e), file=cfg.stderr)
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
except ServerConnectionError as e:
|
except ServerConnectionError as e:
|
||||||
msg = fill("ERROR: " + dedent(e.__doc__))
|
msg = fill("ERROR: " + dedent(e.__doc__)) + "\n"
|
||||||
msg += "\n" + six.text_type(e)
|
msg += "(relay URL was %s)\n" % e.url
|
||||||
|
msg += six.text_type(e)
|
||||||
print(msg, file=cfg.stderr)
|
print(msg, file=cfg.stderr)
|
||||||
raise SystemExit(1)
|
raise SystemExit(1)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
@ -18,7 +18,8 @@ class ServerError(WormholeError):
|
||||||
|
|
||||||
class ServerConnectionError(WormholeError):
|
class ServerConnectionError(WormholeError):
|
||||||
"""We had a problem connecting to the relay server:"""
|
"""We had a problem connecting to the relay server:"""
|
||||||
def __init__(self, reason):
|
def __init__(self, url, reason):
|
||||||
|
self.url = url
|
||||||
self.reason = reason
|
self.reason = reason
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return str(self.reason)
|
return str(self.reason)
|
||||||
|
|
|
@ -1144,10 +1144,11 @@ class Dispatch(unittest.TestCase):
|
||||||
cfg = config("send")
|
cfg = config("send")
|
||||||
cfg.stderr = io.StringIO()
|
cfg.stderr = io.StringIO()
|
||||||
def fake():
|
def fake():
|
||||||
raise ServerConnectionError(ValueError("abcd"))
|
raise ServerConnectionError("URL", ValueError("abcd"))
|
||||||
yield self.assertFailure(cli._dispatch_command(reactor, cfg, fake),
|
yield self.assertFailure(cli._dispatch_command(reactor, cfg, fake),
|
||||||
SystemExit)
|
SystemExit)
|
||||||
expected = fill("ERROR: " + dedent(ServerConnectionError.__doc__))+"\n"
|
expected = fill("ERROR: " + dedent(ServerConnectionError.__doc__))+"\n"
|
||||||
|
expected += "(relay URL was URL)\n"
|
||||||
expected += "abcd\n"
|
expected += "abcd\n"
|
||||||
self.assertEqual(cfg.stderr.getvalue(), expected)
|
self.assertEqual(cfg.stderr.getvalue(), expected)
|
||||||
|
|
||||||
|
|
|
@ -4,13 +4,15 @@ import mock
|
||||||
from zope.interface import directlyProvides, implementer
|
from zope.interface import directlyProvides, implementer
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from .. import (errors, timing, _order, _receive, _key, _code, _lister, _boss,
|
from .. import (errors, timing, _order, _receive, _key, _code, _lister, _boss,
|
||||||
_input, _allocator, _send, _terminator, _nameplate, _mailbox)
|
_input, _allocator, _send, _terminator, _nameplate, _mailbox,
|
||||||
|
_rendezvous)
|
||||||
from .._interfaces import (IKey, IReceive, IBoss, ISend, IMailbox, IOrder,
|
from .._interfaces import (IKey, IReceive, IBoss, ISend, IMailbox, IOrder,
|
||||||
IRendezvousConnector, ILister, IInput, IAllocator,
|
IRendezvousConnector, ILister, IInput, IAllocator,
|
||||||
INameplate, ICode, IWordlist, ITerminator)
|
INameplate, ICode, IWordlist, ITerminator)
|
||||||
from .._key import derive_key, derive_phase_key, encrypt_data
|
from .._key import derive_key, derive_phase_key, encrypt_data
|
||||||
from ..journal import ImmediateJournal
|
from ..journal import ImmediateJournal
|
||||||
from ..util import dict_to_bytes, hexstr_to_bytes, bytes_to_hexstr, to_bytes
|
from ..util import (dict_to_bytes, bytes_to_dict,
|
||||||
|
hexstr_to_bytes, bytes_to_hexstr, to_bytes)
|
||||||
from spake2 import SPAKE2_Symmetric
|
from spake2 import SPAKE2_Symmetric
|
||||||
from nacl.secret import SecretBox
|
from nacl.secret import SecretBox
|
||||||
|
|
||||||
|
@ -1380,6 +1382,75 @@ class Boss(unittest.TestCase):
|
||||||
b.allocate_code(3)
|
b.allocate_code(3)
|
||||||
|
|
||||||
|
|
||||||
|
class Rendezvous(unittest.TestCase):
|
||||||
|
def build(self):
|
||||||
|
events = []
|
||||||
|
reactor = object()
|
||||||
|
journal = ImmediateJournal()
|
||||||
|
tor_manager = None
|
||||||
|
rc = _rendezvous.RendezvousConnector("ws://host:4000/v1", "appid",
|
||||||
|
"side", reactor,
|
||||||
|
journal, tor_manager,
|
||||||
|
timing.DebugTiming())
|
||||||
|
b = Dummy("b", events, IBoss, "error")
|
||||||
|
n = Dummy("n", events, INameplate, "connected", "lost")
|
||||||
|
m = Dummy("m", events, IMailbox, "connected", "lost")
|
||||||
|
a = Dummy("a", events, IAllocator, "connected", "lost")
|
||||||
|
l = Dummy("l", events, ILister, "connected", "lost")
|
||||||
|
t = Dummy("t", events, ITerminator)
|
||||||
|
rc.wire(b, n, m, a, l, t)
|
||||||
|
return rc, events
|
||||||
|
|
||||||
|
def test_basic(self):
|
||||||
|
rc, events = self.build()
|
||||||
|
del rc, events
|
||||||
|
|
||||||
|
def test_websocket_failure(self):
|
||||||
|
# if the TCP connection succeeds, but the subsequent WebSocket
|
||||||
|
# negotiation fails, then we'll see an onClose without first seeing
|
||||||
|
# onOpen
|
||||||
|
rc, events = self.build()
|
||||||
|
rc.ws_close(False, 1006, "connection was closed uncleanly")
|
||||||
|
# this should cause the ClientService to be shut down, and an error
|
||||||
|
# delivered to the Boss
|
||||||
|
self.assertEqual(len(events), 1, events)
|
||||||
|
self.assertEqual(events[0][0], "b.error")
|
||||||
|
self.assertIsInstance(events[0][1], errors.ServerConnectionError)
|
||||||
|
self.assertEqual(str(events[0][1]), "connection was closed uncleanly")
|
||||||
|
|
||||||
|
def test_websocket_lost(self):
|
||||||
|
# if the TCP connection succeeds, and negotiation completes, then the
|
||||||
|
# connection is lost, several machines should be notified
|
||||||
|
rc, events = self.build()
|
||||||
|
|
||||||
|
ws = mock.Mock()
|
||||||
|
def notrandom(length):
|
||||||
|
return b"\x00" * length
|
||||||
|
with mock.patch("os.urandom", notrandom):
|
||||||
|
rc.ws_open(ws)
|
||||||
|
self.assertEqual(events, [("n.connected", ),
|
||||||
|
("m.connected", ),
|
||||||
|
("l.connected", ),
|
||||||
|
("a.connected", ),
|
||||||
|
])
|
||||||
|
events[:] = []
|
||||||
|
def sent_messages(ws):
|
||||||
|
for c in ws.mock_calls:
|
||||||
|
self.assertEqual(c[0], "sendMessage", ws.mock_calls)
|
||||||
|
self.assertEqual(c[1][1], False, ws.mock_calls)
|
||||||
|
yield bytes_to_dict(c[1][0])
|
||||||
|
self.assertEqual(list(sent_messages(ws)),
|
||||||
|
[dict(appid="appid", side="side", id="0000",
|
||||||
|
type="bind"),
|
||||||
|
])
|
||||||
|
|
||||||
|
rc.ws_close(True, None, None)
|
||||||
|
self.assertEqual(events, [("n.lost", ),
|
||||||
|
("m.lost", ),
|
||||||
|
("l.lost", ),
|
||||||
|
("a.lost", ),
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# TODO
|
# TODO
|
||||||
|
|
Loading…
Reference in New Issue
Block a user