From 5682ddff8ed15acb94299700ce2e6d2a0705df10 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 2 Mar 2015 00:09:17 -0800 Subject: [PATCH] fix transit relay stop using web setup for now --- src/wormhole/blocking/transcribe.py | 6 +- src/wormhole/blocking/transit.py | 31 +++-- src/wormhole/const.py | 3 +- src/wormhole/servers/relay.py | 170 ++++++++++++++-------------- 4 files changed, 108 insertions(+), 102 deletions(-) diff --git a/src/wormhole/blocking/transcribe.py b/src/wormhole/blocking/transcribe.py index f0cab4c..e568229 100644 --- a/src/wormhole/blocking/transcribe.py +++ b/src/wormhole/blocking/transcribe.py @@ -6,7 +6,7 @@ from nacl.exceptions import CryptoError from nacl import utils from .. import codes from ..util.hkdf import HKDF -from ..const import RELAY +from ..const import RENDEZVOUS_RELAY SECOND = 1 MINUTE = 60*SECOND @@ -111,7 +111,7 @@ class Common: return HKDF(self.key, length, CTXinfo=purpose) class Initiator(Common): - def __init__(self, appid, data, relay=RELAY): + def __init__(self, appid, data, relay=RENDEZVOUS_RELAY): self.appid = appid self.data = data assert relay.endswith("/") @@ -151,7 +151,7 @@ class Initiator(Common): class Receiver(Common): - def __init__(self, appid, data, relay=RELAY): + def __init__(self, appid, data, relay=RENDEZVOUS_RELAY): self.appid = appid self.data = data self.relay = relay diff --git a/src/wormhole/blocking/transit.py b/src/wormhole/blocking/transit.py index 92fa03c..1a96c73 100644 --- a/src/wormhole/blocking/transit.py +++ b/src/wormhole/blocking/transit.py @@ -3,6 +3,7 @@ import threading, socket, SocketServer from binascii import hexlify from ..util import ipaddrs from ..util.hkdf import HKDF +from ..const import TRANSIT_RELAY class TransitError(Exception): pass @@ -45,8 +46,9 @@ def build_sender_handshake(key): hexid = HKDF(key, 32, CTXinfo=b"transit_sender") return "transit sender %s ready\n\n" % hexlify(hexid) -def build_relay_token(key): - return "PLEASE RELAY\n" +def build_relay_handshake(key): + token = HKDF(key, 32, CTXinfo=b"transit_relay_token") + return "please relay %s\n" % hexlify(token) TIMEOUT=15 @@ -65,6 +67,11 @@ def force_ascii(s): return s.encode("ascii") return s +def send_to(skt, data): + sent = 0 + while sent < len(data): + sent += skt.send(data[sent:]) + def wait_for(skt, expected, hint): got = b"" while len(got) < len(expected): @@ -74,7 +81,7 @@ def wait_for(skt, expected, hint): (got, expected, hint)) def connector(owner, hint, send_handshake, expected_handshake, - relay_token=None): + relay_handshake=None): addr,port = hint.split(",") skt = None try: @@ -82,10 +89,11 @@ def connector(owner, hint, send_handshake, expected_handshake, TIMEOUT) # timeout or ECONNREFUSED skt.settimeout(TIMEOUT) #print("socket(%s) connected" % (hint,)) - if relay_token: - skt.send(relay_token) + if relay_handshake: + send_to(skt, relay_handshake) wait_for(skt, "ok\n", hint) - skt.send(send_handshake) + #print("relay ready %r" % (hint,)) + send_to(skt, send_handshake) wait_for(skt, expected_handshake, hint) #print("connector ready %r" % (hint,)) except Exception as e: @@ -100,6 +108,7 @@ def connector(owner, hint, send_handshake, expected_handshake, if not isinstance(e, (socket.error, socket.timeout, BadHandshake)): raise owner._connector_failed(hint) + return # owner is now responsible for the socket owner._negotiation_finished(skt) # note thread @@ -107,7 +116,7 @@ def handle(skt, client_address, owner, send_handshake, expected_handshake): try: #print("handle %r" % (skt,)) skt.settimeout(TIMEOUT) - skt.send(send_handshake) + send_to(skt, send_handshake) got = b"" # for the receiver, this includes the "go\n" while len(got) < len(expected_handshake): @@ -180,7 +189,7 @@ class Common: def get_direct_hints(self): return self.my_direct_hints def get_relay_hints(self): - return [] + return [TRANSIT_RELAY] def add_their_direct_hints(self, hints): self._their_direct_hints = [force_ascii(h) for h in hints] @@ -221,7 +230,7 @@ class Common: def _start_connector(self, hint, is_relay=False): args = (self, hint, self._send_this(), self._expect_this()) if is_relay: - args = args + (build_relay_token(self._transit_key),) + args = args + (build_relay_handshake(self._transit_key),) t = threading.Thread(target=connector, args=args) t.daemon = True t.start() @@ -265,11 +274,11 @@ class Common: if is_winner: if self.is_sender: - skt.send("go\n") + send_to(skt, "go\n") self.winning.set() else: if self.is_sender: - skt.send("nevermind\n") + send_to(skt, "nevermind\n") skt.close() class TransitSender(Common): diff --git a/src/wormhole/const.py b/src/wormhole/const.py index 09faa70..85ae953 100644 --- a/src/wormhole/const.py +++ b/src/wormhole/const.py @@ -1,4 +1,5 @@ # This is a relay I run on a personal server. If it gets too expensive to # run, I'll shut it down. -RELAY = "http://relay.petmail.org:8009/relay/" +RENDEZVOUS_RELAY = "http://relay.petmail.org:8009/relay/" +TRANSIT_RELAY = "relay.petmail.org,8010" diff --git a/src/wormhole/servers/relay.py b/src/wormhole/servers/relay.py index 9e5c274..5952bab 100644 --- a/src/wormhole/servers/relay.py +++ b/src/wormhole/servers/relay.py @@ -1,4 +1,5 @@ -import os, re, json, binascii +from __future__ import print_function +import re, json from collections import defaultdict from twisted.python import log from twisted.internet import protocol @@ -111,84 +112,8 @@ class Relay(resource.Resource): log.msg("freed %d, now have %d channels" % (channel_id, len(self.channels))) -class Transit(resource.Resource, protocol.ServerFactory, service.MultiService): - # Transit manages two simultaneous connections to a secondary TCP port, - # both forwarded to the other. Transit will allocate you a token when the - # ports are free, and will inform you of the MAXLENGTH and MAXTIME - # limits. Connect to the port, send "TOKEN\n", receive "ok\n", and all - # subsequent data you send will be delivered to the other side. All data - # you get after the "ok" will be from the other side. You will not - # receive "ok" until the other side has also connected and submitted a - # valid token. The token is different for each side. The connections will - # be dropped after MAXLENGTH bytes have been sent by either side, or - # MAXTIME seconds after the token is issued, whichever is reached first. - - # These relay connections are not half-closeable (unlike full TCP - # connections, applications will not receive any data after half-closing - # their outgoing side). Applications must negotiate shutdown with their - # peer and not close the connection until all data has finished - # transferring in both directions. Applications which only need to send - # data in one direction can use close as usual. - - MAXLENGTH = 10*MB - MAXTIME = 60*SECONDS - - def __init__(self): - resource.Resource.__init__(self) - service.MultiService.__init__(self) - self.pending_requests = [] - self.active_token = None - self.active_connection = None - self.active_timer = None - - def make_token(self): - return binascii.hexlify(os.urandom(8)) - - def render_POST(self, request): - if self.active_connection: - self.pending_requests.append(request) - return server.NOT_DONE_YET - self.active_token = self.make_token() - request.setHeader("content-type", "application/json; charset=utf-8") - t = service.TimerService(self.MAXTIME, self.timer_expired) - self.active_timer = t - t.setServiceParent(self) - r = { "token": self.active_token, - "maxlength": self.MAXLENGTH, - "maxtime": self.MAXTIME, - } - return json.dumps(r)+"\n" - - def timer_expired(self): - self.remove_timer() - for c in self.active_connections: - c.STOPSTOP() - self.active_connections[:] = [] - self.active_token = None - - def remove_timer(self): - self.active_timer.disownServiceParent() - self.active_timer = None - - # ServerFactory methods, which manage the two TransitConnection protocols - - def buildProtocol(self, addr): - p = TransitConnection(self.active_token) - p.factory = self - return p - - def connection_got_token(self, p): - pass - def transitFinished(self, p): - pass - def transitFailed(self): - pass - -# after getting a token, both transit clients connect to one of these - class TransitConnection(protocol.Protocol): - def __init__(self, expected_token): - self.expected_token = expected_token + def __init__(self): self.got_token = False self.token_buffer = b"" self.sent_ok = False @@ -200,37 +125,109 @@ class TransitConnection(protocol.Protocol): self.buddy.transport.write(data) return if self.got_token: # but not yet sent_ok + self.transport.write("impatient\n") + print("transit impatience failure") return self.disconnect() # impatience yields failure # else this should be (part of) the token self.token_buffer += data - if b"\n" not in self.token_buffer: + buf = self.token_buffer + wanted = len("please relay \n")+32*2 + if len(buf) < wanted-1 and "\n" in buf: + self.transport.write("bad handshake\n") + print("transit handshake early failure") + return self.disconnect() + if len(buf) < wanted: return - lines = self.token_buffer.split(b"\n") - if len(lines) > 1: + if len(buf) > wanted: + self.transport.write("impatient\n") + print("transit impatience failure") return self.disconnect() # impatience yields failure - token = lines[0] - if token != self.expected_token: + mo = re.search(r"^please relay (\w{64})\n", buf, re.M) + if not mo: + self.transport.write("bad handshake\n") + print("transit handshake failure") return self.disconnect() # incorrectness yields failure + token = mo.group(1) + self.got_token = True - self.factory.connection_got_token(self) + self.factory.connection_got_token(token, self) def buddy_connected(self, them): self.buddy = them self.transport.write(b"ok\n") self.sent_ok = True + # TODO: connect as producer/consumer def buddy_disconnected(self): + print("buddy_disconnected %r" % self) self.buddy = None self.transport.loseConnection() - self.factory.transitFinished(self) def connectionLost(self, reason): + print("connectionLost %r %s" % (self, reason)) if self.buddy: self.buddy.buddy_disconnected() + self.factory.transitFinished(self) def disconnect(self): self.transport.loseConnection() - self.factory.transitFailed() + self.factory.transitFailed(self) + +class Transit(protocol.ServerFactory, service.MultiService): + # I manage pairs of simultaneous connections to a secondary TCP port, + # both forwarded to the other. Clients must begin each connection with + # "please relay TOKEN\n". I will send "ok\n" when the matching connection + # is established, or disconnect if no matching connection is made within + # MAX_WAIT_TIME seconds. I will disconnect if you send data before the + # "ok\n". All data you get after the "ok\n" will be from the other side. + # You will not receive "ok\n" until the other side has also connected and + # submitted a matching token. The token is the same for each side. + + # In addition, the connections will be dropped after MAXLENGTH bytes have + # been sent by either side, or MAXTIME seconds have elapsed after the + # matching connections were established. A future API will reveal these + # limits to clients instead of causing mysterious spontaneous failures. + + # These relay connections are not half-closeable (unlike full TCP + # connections, applications will not receive any data after half-closing + # their outgoing side). Applications must negotiate shutdown with their + # peer and not close the connection until all data has finished + # transferring in both directions. Applications which only need to send + # data in one direction can use close() as usual. + + MAX_WAIT_TIME = 30*SECONDS + MAXLENGTH = 10*MB + MAXTIME = 60*SECONDS + protocol = TransitConnection + + def __init__(self): + service.MultiService.__init__(self) + self.pending_requests = {} # token -> TransitConnection + self.active_connections = set() # TransitConnection + + def connection_got_token(self, token, p): + if token in self.pending_requests: + print("transit relay 2: %r" % token) + buddy = self.pending_requests.pop(token) + self.active_connections.add(p) + self.active_connections.add(buddy) + p.buddy_connected(buddy) + buddy.buddy_connected(p) + else: + self.pending_requests[token] = p + print("transit relay 1: %r" % token) + # TODO: timer + def transitFinished(self, p): + print("transitFinished %r" % p) + for token,tc in self.pending_requests.items(): + if tc is p: + del self.pending_requests[token] + break + self.active_connections.discard(p) + + def transitFailed(self, p): + print("transitFailed %r" % p) + pass class Root(resource.Resource): @@ -249,7 +246,6 @@ class RelayServer(service.MultiService): self.relay = Relay() # for tests self.root.putChild("relay", self.relay) self.transit = Transit() - self.root.putChild("transit", self.transit) self.transit.setServiceParent(self) # for the timer self.transport_service = strports.service(transitport, self.transit) self.transport_service.setServiceParent(self)