magic-wormhole/src/wormhole/_rendezvous.py

258 lines
8.9 KiB
Python
Raw Normal View History

2017-02-24 02:11:07 +00:00
from __future__ import print_function, absolute_import, unicode_literals
2017-02-22 20:51:53 +00:00
import os
from six.moves.urllib_parse import urlparse
from attr import attrs, attrib
from attr.validators import provides, instance_of
from zope.interface import implementer
2017-02-22 20:51:53 +00:00
from twisted.python import log
from twisted.internet import defer, endpoints
from twisted.application import internet
from autobahn.twisted import websocket
from . import _interfaces
2017-02-22 20:51:53 +00:00
from .util import (bytes_to_hexstr, hexstr_to_bytes,
bytes_to_dict, dict_to_bytes)
2017-02-22 20:51:53 +00:00
class WSClient(websocket.WebSocketClientProtocol):
def onConnect(self, response):
# this fires during WebSocket negotiation, and isn't very useful
# unless you want to modify the protocol settings
#print("onConnect", response)
pass
def onOpen(self, *args):
# this fires when the WebSocket is ready to go. No arguments
#print("onOpen", args)
#self.wormhole_open = True
self._RC.ws_open(self)
def onMessage(self, payload, isBinary):
assert not isBinary
2017-02-24 02:11:07 +00:00
try:
self._RC.ws_message(payload)
except:
2017-02-25 02:30:00 +00:00
from twisted.python.failure import Failure
print("LOGGING", Failure())
2017-02-24 02:11:07 +00:00
log.err()
raise
2017-02-22 20:51:53 +00:00
def onClose(self, wasClean, code, reason):
#print("onClose")
self._RC.ws_close(wasClean, code, reason)
#if self.wormhole_open:
# self.wormhole._ws_closed(wasClean, code, reason)
#else:
# # we closed before establishing a connection (onConnect) or
# # finishing WebSocket negotiation (onOpen): errback
# self.factory.d.errback(error.ConnectError(reason))
class WSFactory(websocket.WebSocketClientFactory):
protocol = WSClient
def __init__(self, RC, *args, **kwargs):
websocket.WebSocketClientFactory.__init__(self, *args, **kwargs)
self._RC = RC
def buildProtocol(self, addr):
proto = websocket.WebSocketClientFactory.buildProtocol(self, addr)
proto._RC = self._RC
#proto.wormhole_open = False
return proto
2017-02-25 02:30:00 +00:00
def dmsg(side, text):
offset = int(side, 16) % 20
print(" "*offset, text)
2017-02-22 20:51:53 +00:00
@attrs
@implementer(_interfaces.IRendezvousConnector)
2017-02-22 20:51:53 +00:00
class RendezvousConnector(object):
2017-02-23 00:56:39 +00:00
_url = attrib(validator=instance_of(type(u"")))
_appid = attrib(validator=instance_of(type(u"")))
_side = attrib(validator=instance_of(type(u"")))
2017-02-22 20:51:53 +00:00
_reactor = attrib()
2017-02-23 00:56:39 +00:00
_journal = attrib(validator=provides(_interfaces.IJournal))
2017-03-04 12:07:31 +00:00
_tor_manager = attrib() # TODO: ITorManager or None
2017-02-23 00:56:39 +00:00
_timing = attrib(validator=provides(_interfaces.ITiming))
2017-02-22 20:51:53 +00:00
def __attrs_post_init__(self):
self._trace = None
2017-02-22 20:51:53 +00:00
self._ws = None
f = WSFactory(self, self._url)
f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600)
p = urlparse(self._url)
ep = self._make_endpoint(p.hostname, p.port or 80)
# TODO: change/wrap ClientService to fail if the first attempt fails
2017-02-22 20:51:53 +00:00
self._connector = internet.ClientService(ep, f)
def set_trace(self, f):
self._trace = f
def _debug(self, what):
if self._trace:
self._trace(old_state="", input=what, new_state="")
2017-02-22 20:51:53 +00:00
def _make_endpoint(self, hostname, port):
2017-03-04 12:07:31 +00:00
if self._tor_manager:
return self._tor_manager.get_endpoint_for(hostname, port)
2017-02-22 20:51:53 +00:00
return endpoints.HostnameEndpoint(self._reactor, hostname, port)
def wire(self, boss, nameplate, mailbox, code, lister, terminator):
self._B = _interfaces.IBoss(boss)
self._N = _interfaces.INameplate(nameplate)
self._M = _interfaces.IMailbox(mailbox)
self._C = _interfaces.ICode(code)
self._L = _interfaces.ILister(lister)
self._T = _interfaces.ITerminator(terminator)
# from Boss
2017-02-22 20:51:53 +00:00
def start(self):
self._connector.startService()
# from Mailbox
2017-02-22 20:51:53 +00:00
def tx_claim(self, nameplate):
self._tx("claim", nameplate=nameplate)
def tx_open(self, mailbox):
self._tx("open", mailbox=mailbox)
def tx_add(self, phase, body):
assert isinstance(phase, type("")), type(phase)
assert isinstance(body, type(b"")), type(body)
self._tx("add", phase=phase, body=bytes_to_hexstr(body))
def tx_release(self, nameplate):
self._tx("release", nameplate=nameplate)
2017-02-22 20:51:53 +00:00
def tx_close(self, mailbox, mood):
self._tx("close", mailbox=mailbox, mood=mood)
2017-02-22 20:51:53 +00:00
def stop(self):
2017-02-22 20:51:53 +00:00
d = defer.maybeDeferred(self._connector.stopService)
d.addErrback(log.err) # TODO: deliver error upstairs?
d.addBoth(self._stopped)
# from Lister
def tx_list(self):
2017-02-22 20:51:53 +00:00
self._tx("list")
# from Code
def tx_allocate(self):
2017-02-22 20:51:53 +00:00
self._tx("allocate")
# from our WSClient (the WebSocket protocol)
def ws_open(self, proto):
self._debug("R.connected")
2017-02-22 20:51:53 +00:00
self._ws = proto
try:
self._tx("bind", appid=self._appid, side=self._side)
self._C.connected()
self._N.connected()
self._M.connected()
self._L.connected()
except Exception as e:
self._B.error(e)
raise
self._debug("R.connected finished notifications")
2017-02-22 20:51:53 +00:00
def ws_message(self, payload):
msg = bytes_to_dict(payload)
#if self.DEBUG and msg["type"]!="ack":
# dmsg(self._side, "R.rx(%s %s%s)" %
# (msg["type"], msg.get("phase",""),
# "[mine]" if msg.get("side","") == self._side else "",
# ))
if msg["type"] != "ack":
self._debug("R.rx(%s %s%s)" %
(msg["type"], msg.get("phase",""),
"[mine]" if msg.get("side","") == self._side else "",
))
2017-02-22 20:51:53 +00:00
self._timing.add("ws_receive", _side=self._side, message=msg)
mtype = msg["type"]
meth = getattr(self, "_response_handle_"+mtype, None)
if not meth:
# make tests fail, but real application will ignore it
log.err(ValueError("Unknown inbound message type %r" % (msg,)))
return
try:
return meth(msg)
except Exception as e:
2017-03-08 07:45:33 +00:00
log.err(e)
self._B.error(e)
raise
2017-02-22 20:51:53 +00:00
def ws_close(self, wasClean, code, reason):
self._debug("R.lost")
2017-02-22 20:51:53 +00:00
self._ws = None
self._C.lost()
self._N.lost()
2017-02-22 20:51:53 +00:00
self._M.lost()
self._L.lost()
2017-02-22 20:51:53 +00:00
# internal
def _stopped(self, res):
self._T.stopped()
2017-02-22 20:51:53 +00:00
def _tx(self, mtype, **kwargs):
assert self._ws
# msgid is used by misc/dump-timing.py to correlate our sends with
# their receives, and vice versa. They are also correlated with the
# ACKs we get back from the server (which we otherwise ignore). There
# are so few messages, 16 bits is enough to be mostly-unique.
kwargs["id"] = bytes_to_hexstr(os.urandom(2))
kwargs["type"] = mtype
self._debug("R.tx(%s %s)" % (mtype.upper(), kwargs.get("phase", "")))
2017-02-22 20:51:53 +00:00
payload = dict_to_bytes(kwargs)
self._timing.add("ws_send", _side=self._side, **kwargs)
self._ws.sendMessage(payload, False)
def _response_handle_allocated(self, msg):
nameplate = msg["nameplate"]
assert isinstance(nameplate, type("")), type(nameplate)
self._C.rx_allocated(nameplate)
def _response_handle_nameplates(self, msg):
nameplates = msg["nameplates"]
assert isinstance(nameplates, list), type(nameplates)
nids = []
for n in nameplates:
assert isinstance(n, dict), type(n)
nameplate_id = n["id"]
assert isinstance(nameplate_id, type("")), type(nameplate_id)
nids.append(nameplate_id)
self._L.rx_nameplates(nids)
2017-02-22 20:51:53 +00:00
def _response_handle_ack(self, msg):
pass
2017-02-22 20:51:53 +00:00
2017-02-25 02:30:00 +00:00
def _response_handle_error(self, msg):
# the server sent us a type=error. Most cases are due to our mistakes
# (malformed protocol messages, sending things in the wrong order),
# but it can also result from CrowdedError (more than two clients
# using the same channel).
2017-02-25 02:30:00 +00:00
err = msg["error"]
orig = msg["orig"]
self._B.rx_error(err, orig)
2017-02-22 20:51:53 +00:00
def _response_handle_welcome(self, msg):
self._B.rx_welcome(msg["welcome"])
2017-02-22 20:51:53 +00:00
def _response_handle_claimed(self, msg):
mailbox = msg["mailbox"]
assert isinstance(mailbox, type("")), type(mailbox)
self._N.rx_claimed(mailbox)
2017-02-22 20:51:53 +00:00
def _response_handle_message(self, msg):
side = msg["side"]
phase = msg["phase"]
assert isinstance(phase, type("")), type(phase)
body = hexstr_to_bytes(msg["body"]) # bytes
self._M.rx_message(side, phase, body)
def _response_handle_released(self, msg):
self._N.rx_released()
2017-02-22 20:51:53 +00:00
def _response_handle_closed(self, msg):
self._M.rx_closed()
# record, message, payload, packet, bundle, ciphertext, plaintext