diff --git a/src/wormhole/twisted/transcribe.py b/src/wormhole/twisted/transcribe.py deleted file mode 100644 index 555499b..0000000 --- a/src/wormhole/twisted/transcribe.py +++ /dev/null @@ -1,624 +0,0 @@ -from __future__ import print_function -import os, sys, json, re, unicodedata -from six.moves.urllib_parse import urlparse -from binascii import hexlify, unhexlify -from twisted.internet import defer, endpoints, error -from twisted.internet.threads import deferToThread, blockingCallFromThread -from twisted.internet.defer import inlineCallbacks, returnValue -from twisted.python import log -from autobahn.twisted import websocket -from nacl.secret import SecretBox -from nacl.exceptions import CryptoError -from nacl import utils -from spake2 import SPAKE2_Symmetric -from .. import __version__ -from .. import codes -from ..errors import ServerError, Timeout, WrongPasswordError, UsageError -from ..timing import DebugTiming -from hkdf import Hkdf - -def HKDF(skm, outlen, salt=None, CTXinfo=b""): - return Hkdf(salt, skm).expand(CTXinfo, outlen) - -CONFMSG_NONCE_LENGTH = 128//8 -CONFMSG_MAC_LENGTH = 256//8 -def make_confmsg(confkey, nonce): - return nonce+HKDF(confkey, CONFMSG_MAC_LENGTH, nonce) - -def to_bytes(u): - return unicodedata.normalize("NFC", u).encode("utf-8") - -# We send the following messages through the relay server to the far side (by -# sending "add" commands to the server, and getting "message" responses): -# -# phase=setup: -# * unauthenticated version strings (but why?) -# * early warmup for connection hints ("I can do tor, spin up HS") -# * wordlist l10n identifier -# phase=pake: just the SPAKE2 'start' message (binary) -# phase=confirm: key verification (HKDF(key, nonce)+nonce) -# phase=1,2,3,..: application messages - -class WSClient(websocket.WebSocketClientProtocol): - def onOpen(self): - self.wormhole_open = True - self.factory.d.callback(self) - - def onMessage(self, payload, isBinary): - assert not isBinary - self.wormhole._ws_dispatch_response(payload) - - def onClose(self, wasClean, code, reason): - if self.wormhole_open: - self.wormhole._ws_closed(wasClean, code, reason) - else: - # we closed before establishing a connection (onConnect) or - # finishing WebSocket negotiation (onOpen): errback - self.factory.d.errback(error.ConnectError(reason)) - -class WSFactory(websocket.WebSocketClientFactory): - protocol = WSClient - def buildProtocol(self, addr): - proto = websocket.WebSocketClientFactory.buildProtocol(self, addr) - proto.wormhole = self.wormhole - proto.wormhole_open = False - return proto - -class _Wormhole: - motd_displayed = False - version_warning_displayed = False - _send_confirm = True - - def __init__(self, appid, relay_url, reactor, - tor_manager=None, timing=None): - if not isinstance(appid, type(u"")): raise TypeError(type(appid)) - if not isinstance(relay_url, type(u"")): - raise TypeError(type(relay_url)) - if not relay_url.endswith(u"/"): raise UsageError - self._appid = appid - self._relay_url = relay_url - self._ws_url = relay_url.replace("http:", "ws:") + "ws" - self._tor_manager = tor_manager - self._timing = timing or DebugTiming() - self._reactor = reactor - - self._ws_connected = defer.Deferred() # XXX - - self._side = hexlify(os.urandom(5)).decode("ascii") - self._code = None - - self._nameplate_id = None - self._nameplate_claimed = False - self._nameplate_released = False - - self._mailbox_id = None - self._mailbox_opened = False - self._mailbox_closed = False - - self._key = None - self._started_get_code = False - self._next_outbound_phase = 0 - self._sent_messages = {} # phase -> body_bytes - self._delivered_messages = set() # phase - self._next_inbound_phase = 0 - self._received_messages = {} # phase -> body_bytes - self._got_phases = set() # phases, to prohibit double-read - self._sleepers = [] - self._confirmation_failed = False - self._closed = False - self._released_status = None - self._timing_started = self._timing.add("wormhole") - self._ws = None - self._ws_t = None # timing Event - self._error = None - - def _make_endpoint(self, hostname, port): - if self._tor_manager: - return self._tor_manager.get_endpoint_for(hostname, port) - # note: HostnameEndpoints have a default 30s timeout - return endpoints.HostnameEndpoint(self._reactor, hostname, port) - - @inlineCallbacks - def _get_websocket(self): - if not self._ws: - # TODO: if we lose the connection, make a new one, re-establish - # the state - assert self._side - p = urlparse(self._ws_url) - f = WSFactory(self._ws_url) - f.wormhole = self - f.d = defer.Deferred() - # TODO: if hostname="localhost", I get three factories starting - # and stopping (maybe 127.0.0.1, ::1, and something else?), and - # an error in the factory is masked. - ep = self._make_endpoint(p.hostname, p.port or 80) - # .connect errbacks if the TCP connection fails - self._ws = yield ep.connect(f) - self._ws_t = self._timing.add("websocket") - # f.d is errbacked if WebSocket negotiation fails - yield f.d # WebSocket drops data sent before onOpen() fires - self._ws_send_command(u"bind", appid=self._appid, side=self._side) - # the socket is connected, and bound, but no nameplate has been claimed - returnValue(self._ws) - - @inlineCallbacks - def _ws_send_command(self, mtype, **kwargs): - ws = yield self._get_websocket() - # msgid is used by misc/dump-timing.py to correlate our sends with - # their receives, and vice versa. They are also correlated with the - # ACKs we get back from the server (which we otherwise ignore). There - # are so few messages, 16 bits is enough to be mostly-unique. - kwargs["id"] = hexlify(os.urandom(2)).decode("ascii") - kwargs["type"] = mtype - payload = json.dumps(kwargs).encode("utf-8") - self._timing.add("ws_send", _side=self._side, **kwargs) - ws.sendMessage(payload, False) - - def _ws_dispatch_response(self, payload): - msg = json.loads(payload.decode("utf-8")) - self._timing.add("ws_receive", _side=self._side, message=msg) - mtype = msg["type"] - meth = getattr(self, "_ws_handle_"+mtype, None) - if not meth: - # make tests fail, but real application will ignore it - log.err(ValueError("Unknown inbound message type %r" % (msg,))) - return - return meth(msg) - - def _ws_handle_ack(self, msg): - pass - - def _ws_handle_welcome(self, msg): - welcome = msg["welcome"] - if ("motd" in welcome and - not self.motd_displayed): - motd_lines = welcome["motd"].splitlines() - motd_formatted = "\n ".join(motd_lines) - print("Server (at %s) says:\n %s" % - (self._ws_url, motd_formatted), file=sys.stderr) - self.motd_displayed = True - - # Only warn if we're running a release version (e.g. 0.0.6, not - # 0.0.6-DISTANCE-gHASH). Only warn once. - if ("-" not in __version__ and - not self.version_warning_displayed and - welcome["current_version"] != __version__): - print("Warning: errors may occur unless both sides are running the same version", file=sys.stderr) - print("Server claims %s is current, but ours is %s" - % (welcome["current_version"], __version__), file=sys.stderr) - self.version_warning_displayed = True - - if "error" in welcome: - return self._signal_error(welcome["error"]) - - @inlineCallbacks - def _sleep(self, wake_on_error=True): - if wake_on_error and self._error: - # don't sleep if the bed's already on fire, unless we're waiting - # for the fire department to respond, in which case sure, keep on - # sleeping - raise self._error - d = defer.Deferred() - self._sleepers.append(d) - yield d - if wake_on_error and self._error: - raise self._error - - def _wakeup(self): - sleepers = self._sleepers - self._sleepers = [] - for d in sleepers: - d.callback(None) - # NOTE: callers should avoid reentrancy themselves. An - # eventual-send would be safer here, but it makes synchronizing - # unit tests annoying. - - def _signal_error(self, error): - assert isinstance(error, Exception) - self._error = error - self._wakeup() - - def _ws_handle_error(self, msg): - err = ServerError("%s: %s" % (msg["error"], msg["orig"]), - self._ws_url) - return self._signal_error(err) - - @inlineCallbacks - def _claim_nameplate(self): - if not self._nameplate_id: raise UsageError - if self._nameplate_claimed: raise UsageError - yield self._ws_send_command(u"claim", nameplate=self._nameplate_id) - # provokes "claimed" response - - def _ws_handle_claimed(self, msg): - mailbox_id = msg["mailbox"] - assert isinstance(mailbox_id, type(u"")), type(mailbox_id) - self._mailbox_id = mailbox_id - self._open_mailbox() - - @inlineCallbacks - def _release_nameplate(self): - if not self._nameplate_claimed: raise UsageError - if self._nameplate_released: raise UsageError - yield self._ws_send_command(u"release") - self._nameplate_released = True - - - @inlineCallbacks - def _open_mailbox(self): - if not self._mailbox_id: raise UsageError - if self._mailbox_opened: raise UsageError - yield self._ws_send_command(u"open", mailbox=self._mailbox_id) - self._mailbox_opened = True - # causes old messages to be sent now, and subscribes to new messages - - @inlineCallbacks - def _close_mailbox(self): - if not self._mailbox_id: raise UsageError - if not self._mailbox_opened: raise UsageError - if self._mailbox_closed: raise UsageError - yield self._ws_send_command(u"close") - self._mailbox_closed = True - - - @inlineCallbacks - def _msg_send(self, phase, body, wait=False): - if phase in self._sent_messages: raise UsageError - if not self._mailbox_opened: raise UsageError - if self._mailbox_closed: raise UsageError - self._sent_messages[phase] = body - # TODO: retry on failure, with exponential backoff. We're guarding - # against the rendezvous server being temporarily offline. - t = self._timing.add("add", phase=phase, wait=wait) - yield self._ws_send_command(u"add", phase=phase, - body=hexlify(body).decode("ascii")) - if wait: - while phase not in self._delivered_messages: - yield self._sleep() - t.finish() - - def _ws_handle_message(self, msg): - # any message in the mailbox means we no longer need the nameplate - if not self._nameplate_released: - self._release_nameplate() # XXX returns Deferred - - m = msg["message"] - phase = m["phase"] - body = unhexlify(m["body"].encode("ascii")) - if phase in self._sent_messages and self._sent_messages[phase] == body: - self._delivered_messages.add(phase) # ack by server - self._wakeup() - return # ignore echoes of our outbound messages - if phase in self._received_messages: - # a nameplate collision would cause this - err = ServerError("got duplicate phase %s" % phase, self._ws_url) - return self._signal_error(err) - self._received_messages[phase] = body - if phase == u"confirm": - # TODO: we might not have a master key yet, if the caller wasn't - # waiting in _get_master_key() when a back-to-back pake+_confirm - # message pair arrived. - confkey = self.derive_key(u"wormhole:confirmation") - nonce = body[:CONFMSG_NONCE_LENGTH] - if body != make_confmsg(confkey, nonce): - # this makes all API calls fail - return self._signal_error(WrongPasswordError()) - # now notify anyone waiting on it - self._wakeup() - - @inlineCallbacks - def _msg_get(self, phase): - with self._timing.add("get", phase=phase): - while phase not in self._received_messages: - yield self._sleep() # we can wait a long time here - # that will throw an error if something goes wrong - msg = self._received_messages[phase] - returnValue(msg) - - # entry point 1: generate a new code - @inlineCallbacks - def get_code(self, code_length=2): # XX rename to allocate_code()? create_? - if self._code is not None: raise UsageError - if self._started_get_code: raise UsageError - self._started_get_code = True - with self._timing.add("API get_code"): - with self._timing.add("allocate"): - yield self._ws_send_command(u"allocate") - while self._nameplate_id is None: - yield self._sleep() - code = codes.make_code(self._nameplate_id, code_length) - assert isinstance(code, type(u"")), type(code) - self._set_code(code) - self._start() - returnValue(code) - - def _ws_handle_allocated(self, msg): - if self._nameplate_id is not None: - return self._signal_error("got duplicate 'allocated' response") - nid = msg["nameplate"] - assert isinstance(nid, type(u"")), type(nid) - self._nameplate_id = nid - self._wakeup() - - def _start(self): - # allocate the rest now too, so it can be serialized - with self._timing.add("pake1", waiting="crypto"): - self._sp = SPAKE2_Symmetric(to_bytes(self._code), - idSymmetric=to_bytes(self._appid)) - self._msg1 = self._sp.start() - - # entry point 2a: interactively type in a code, with completion - @inlineCallbacks - def input_code(self, prompt="Enter wormhole code: ", code_length=2): - def _lister(): - return blockingCallFromThread(self._reactor, self._list_nameplates) - # fetch the list of nameplates ahead of time, to give us a chance to - # discover the welcome message (and warn the user about an obsolete - # client) - # - # TODO: send the request early, show the prompt right away, hide the - # latency in the user's indecision and slow typing. If we're lucky - # the answer will come back before they hit TAB. - with self._timing.add("API input_code"): - initial_nameplate_ids = yield self._list_nameplates() - with self._timing.add("input code", waiting="user"): - t = self._reactor.addSystemEventTrigger("before", "shutdown", - self._warn_readline) - code = yield deferToThread(codes.input_code_with_completion, - prompt, - initial_nameplate_ids, _lister, - code_length) - self._reactor.removeSystemEventTrigger(t) - returnValue(code) # application will give this to set_code() - - def _warn_readline(self): - # When our process receives a SIGINT, Twisted's SIGINT handler will - # stop the reactor and wait for all threads to terminate before the - # process exits. However, if we were waiting for - # input_code_with_completion() when SIGINT happened, the readline - # thread will be blocked waiting for something on stdin. Trick the - # user into satisfying the blocking read so we can exit. - print("\nCommand interrupted: please press Return to quit", - file=sys.stderr) - - # Other potential approaches to this problem: - # * hard-terminate our process with os._exit(1), but make sure the - # tty gets reset to a normal mode ("cooked"?) first, so that the - # next shell command the user types is echoed correctly - # * track down the thread (t.p.threadable.getThreadID from inside the - # thread), get a cffi binding to pthread_kill, deliver SIGINT to it - # * allocate a pty pair (pty.openpty), replace sys.stdin with the - # slave, build a pty bridge that copies bytes (and other PTY - # things) from the real stdin to the master, then close the slave - # at shutdown, so readline sees EOF - # * write tab-completion and basic editing (TTY raw mode, - # backspace-is-erase) without readline, probably with curses or - # twisted.conch.insults - # * write a separate program to get codes (maybe just "wormhole - # --internal-get-code"), run it as a subprocess, let it inherit - # stdin/stdout, send it SIGINT when we receive SIGINT ourselves. It - # needs an RPC mechanism (over some extra file descriptors) to ask - # us to fetch the current nameplate_id list. - # - # Note that hard-terminating our process with os.kill(os.getpid(), - # signal.SIGKILL), or SIGTERM, doesn't seem to work: the thread - # doesn't see the signal, and we must still wait for stdin to make - # readline finish. - - @inlineCallbacks - def _list_nameplates(self): - with self._timing.add("list"): - self._latest_nameplate_ids = None - yield self._ws_send_command(u"list") - while self._latest_nameplate_ids is None: - yield self._sleep() - returnValue(self._latest_nameplate_ids) - - def _ws_handle_nameplates(self, msg): - self._latest_nameplate_ids = msg["nameplates"] - self._wakeup() - - # entry point 2b: paste in a fully-formed code - def set_code(self, code): - if not isinstance(code, type(u"")): raise TypeError(type(code)) - if self._code is not None: raise UsageError - mo = re.search(r'^(\d+)-', code) - if not mo: - raise ValueError("code (%s) must start with NN-" % code) - with self._timing.add("API set_code"): - self._nameplate_id = mo.group(1) - assert isinstance(self._nameplate_id, type(u"")), type(self._nameplate_id) - self._set_code(code) - self._start() - - def _set_code(self, code): - if self._code is not None: raise UsageError - self._timing.add("code established") - self._code = code - - def serialize(self): - # I can only be serialized after get_code/set_code and before - # get_verifier/get - if self._code is None: raise UsageError - if self._key is not None: raise UsageError - if self._sent_messages: raise UsageError - if self._got_phases: raise UsageError - data = { - "appid": self._appid, - "relay_url": self._relay_url, - "code": self._code, - "nameplate_id": self._nameplate_id, - "side": self._side, - "spake2": json.loads(self._sp.serialize().decode("ascii")), - "msg1": hexlify(self._msg1).decode("ascii"), - } - return json.dumps(data) - - # entry point 3: resume a previously-serialized session - @classmethod - def from_serialized(klass, data, reactor): - d = json.loads(data) - self = klass(d["appid"], d["relay_url"], reactor) - self._side = d["side"] - self._nameplate_id = d["nameplate_id"] - self._set_code(d["code"]) - sp_data = json.dumps(d["spake2"]).encode("ascii") - self._sp = SPAKE2_Symmetric.from_serialized(sp_data) - self._msg1 = unhexlify(d["msg1"].encode("ascii")) - return self - - @inlineCallbacks - def get_verifier(self): - if self._closed: raise UsageError - if self._code is None: raise UsageError - with self._timing.add("API get_verifier"): - yield self._get_master_key() - # If the caller cares about the verifier, then they'll probably - # also willing to wait a moment to see the _confirm message. Each - # side sends this as soon as it sees the other's PAKE message. So - # the sender should see this hot on the heels of the inbound PAKE - # message (a moment after _get_master_key() returns). The - # receiver will see this a round-trip after they send their PAKE - # (because the sender is using wait=True inside _get_master_key, - # below: otherwise the sender might go do some blocking call). - yield self._msg_get(u"confirm") - returnValue(self._verifier) - - @inlineCallbacks - def _get_master_key(self): - # TODO: prevent multiple invocation - if not self._key: - yield self._claim_nameplate_and_watch() - yield self._msg_send(u"pake", self._msg1) - pake_msg = yield self._msg_get(u"pake") - - with self._timing.add("pake2", waiting="crypto"): - self._key = self._sp.finish(pake_msg) - self._verifier = self.derive_key(u"wormhole:verifier") - self._timing.add("key established") - - if self._send_confirm: - # both sides send different (random) confirmation messages - confkey = self.derive_key(u"wormhole:confirmation") - nonce = os.urandom(CONFMSG_NONCE_LENGTH) - confmsg = make_confmsg(confkey, nonce) - yield self._msg_send(u"confirm", confmsg, wait=True) - - def derive_key(self, purpose, length=SecretBox.KEY_SIZE): - if not isinstance(purpose, type(u"")): raise TypeError(type(purpose)) - if self._key is None: - # call after get_verifier() or get() - raise UsageError - return HKDF(self._key, length, CTXinfo=to_bytes(purpose)) - - def _encrypt_data(self, key, data): - assert isinstance(key, type(b"")), type(key) - assert isinstance(data, type(b"")), type(data) - assert len(key) == SecretBox.KEY_SIZE, len(key) - box = SecretBox(key) - nonce = utils.random(SecretBox.NONCE_SIZE) - return box.encrypt(data, nonce) - - def _decrypt_data(self, key, encrypted): - assert isinstance(key, type(b"")), type(key) - assert isinstance(encrypted, type(b"")), type(encrypted) - assert len(key) == SecretBox.KEY_SIZE, len(key) - box = SecretBox(key) - data = box.decrypt(encrypted) - return data - - @inlineCallbacks - def send(self, outbound_data, wait=False): - if not isinstance(outbound_data, type(b"")): - raise TypeError(type(outbound_data)) - if self._closed: raise UsageError - if self._code is None: - raise UsageError("You must set_code() before send()") - phase = self._next_outbound_phase - self._next_outbound_phase += 1 - with self._timing.add("API send", phase=phase, wait=wait): - # Without predefined roles, we can't derive predictably unique - # keys for each side, so we use the same key for both. We use - # random nonces to keep the messages distinct, and we - # automatically ignore reflections. - yield self._get_master_key() - data_key = self.derive_key(u"wormhole:phase:%d" % phase) - outbound_encrypted = self._encrypt_data(data_key, outbound_data) - yield self._msg_send(phase, outbound_encrypted, wait) - - @inlineCallbacks - def get(self): - if self._closed: raise UsageError - if self._code is None: raise UsageError - phase = self._next_inbound_phase - self._next_inbound_phase += 1 - with self._timing.add("API get", phase=phase): - yield self._get_master_key() - body = yield self._msg_get(phase) # we can wait a long time here - try: - data_key = self.derive_key(u"wormhole:phase:%d" % phase) - inbound_data = self._decrypt_data(data_key, body) - returnValue(inbound_data) - except CryptoError: - raise WrongPasswordError - - def _ws_closed(self, wasClean, code, reason): - self._ws = None - self._ws_t.finish() - # TODO: schedule reconnect, unless we're done - - @inlineCallbacks - def close(self, f=None, mood=None): - """Do d.addBoth(w.close) at the end of your chain.""" - if self._closed: - returnValue(None) - self._closed = True - if not self._ws: - returnValue(None) - - if mood is None: - mood = u"happy" - if f: - if f.check(Timeout): - mood = u"lonely" - elif f.check(WrongPasswordError): - mood = u"scary" - elif f.check(TypeError, UsageError): - # preconditions don't warrant reporting mood - pass - else: - mood = u"errory" # other errors do - if not isinstance(mood, (type(None), type(u""))): - raise TypeError(type(mood)) - - with self._timing.add("API close"): - yield self._release(mood) - # TODO: mark WebSocket as don't-reconnect - self._ws.transport.loseConnection() # probably flushes - del self._ws - self._ws_t.finish() - self._timing_started.finish(mood=mood) - returnValue(f) - - @inlineCallbacks - def _release(self, mood): - with self._timing.add("release"): - yield self._ws_send_command(u"release", mood=mood) - while self._released_status is None: - yield self._sleep(wake_on_error=False) - # TODO: set a timeout, don't wait forever for an ack - # TODO: if the connection is lost, let it go - returnValue(self._released_status) - - def _ws_handle_released(self, msg): - self._released_status = msg["status"] - self._wakeup() - -def wormhole(appid, relay_url, reactor, tor_manager=None, timing=None): - w = _Wormhole(appid, relay_url, reactor, tor_manager, timing) - w._start() - return w - -def wormhole_from_serialized(data, reactor): - w = _Wormhole.from_serialized(data, reactor) - return w