fix transit relay

stop using web setup for now
This commit is contained in:
Brian Warner 2015-03-02 00:09:17 -08:00
parent dc8d6e979f
commit 5682ddff8e
4 changed files with 108 additions and 102 deletions

View File

@ -6,7 +6,7 @@ from nacl.exceptions import CryptoError
from nacl import utils from nacl import utils
from .. import codes from .. import codes
from ..util.hkdf import HKDF from ..util.hkdf import HKDF
from ..const import RELAY from ..const import RENDEZVOUS_RELAY
SECOND = 1 SECOND = 1
MINUTE = 60*SECOND MINUTE = 60*SECOND
@ -111,7 +111,7 @@ class Common:
return HKDF(self.key, length, CTXinfo=purpose) return HKDF(self.key, length, CTXinfo=purpose)
class Initiator(Common): class Initiator(Common):
def __init__(self, appid, data, relay=RELAY): def __init__(self, appid, data, relay=RENDEZVOUS_RELAY):
self.appid = appid self.appid = appid
self.data = data self.data = data
assert relay.endswith("/") assert relay.endswith("/")
@ -151,7 +151,7 @@ class Initiator(Common):
class Receiver(Common): class Receiver(Common):
def __init__(self, appid, data, relay=RELAY): def __init__(self, appid, data, relay=RENDEZVOUS_RELAY):
self.appid = appid self.appid = appid
self.data = data self.data = data
self.relay = relay self.relay = relay

View File

@ -3,6 +3,7 @@ import threading, socket, SocketServer
from binascii import hexlify from binascii import hexlify
from ..util import ipaddrs from ..util import ipaddrs
from ..util.hkdf import HKDF from ..util.hkdf import HKDF
from ..const import TRANSIT_RELAY
class TransitError(Exception): class TransitError(Exception):
pass pass
@ -45,8 +46,9 @@ def build_sender_handshake(key):
hexid = HKDF(key, 32, CTXinfo=b"transit_sender") hexid = HKDF(key, 32, CTXinfo=b"transit_sender")
return "transit sender %s ready\n\n" % hexlify(hexid) return "transit sender %s ready\n\n" % hexlify(hexid)
def build_relay_token(key): def build_relay_handshake(key):
return "PLEASE RELAY\n" token = HKDF(key, 32, CTXinfo=b"transit_relay_token")
return "please relay %s\n" % hexlify(token)
TIMEOUT=15 TIMEOUT=15
@ -65,6 +67,11 @@ def force_ascii(s):
return s.encode("ascii") return s.encode("ascii")
return s return s
def send_to(skt, data):
sent = 0
while sent < len(data):
sent += skt.send(data[sent:])
def wait_for(skt, expected, hint): def wait_for(skt, expected, hint):
got = b"" got = b""
while len(got) < len(expected): while len(got) < len(expected):
@ -74,7 +81,7 @@ def wait_for(skt, expected, hint):
(got, expected, hint)) (got, expected, hint))
def connector(owner, hint, send_handshake, expected_handshake, def connector(owner, hint, send_handshake, expected_handshake,
relay_token=None): relay_handshake=None):
addr,port = hint.split(",") addr,port = hint.split(",")
skt = None skt = None
try: try:
@ -82,10 +89,11 @@ def connector(owner, hint, send_handshake, expected_handshake,
TIMEOUT) # timeout or ECONNREFUSED TIMEOUT) # timeout or ECONNREFUSED
skt.settimeout(TIMEOUT) skt.settimeout(TIMEOUT)
#print("socket(%s) connected" % (hint,)) #print("socket(%s) connected" % (hint,))
if relay_token: if relay_handshake:
skt.send(relay_token) send_to(skt, relay_handshake)
wait_for(skt, "ok\n", hint) wait_for(skt, "ok\n", hint)
skt.send(send_handshake) #print("relay ready %r" % (hint,))
send_to(skt, send_handshake)
wait_for(skt, expected_handshake, hint) wait_for(skt, expected_handshake, hint)
#print("connector ready %r" % (hint,)) #print("connector ready %r" % (hint,))
except Exception as e: except Exception as e:
@ -100,6 +108,7 @@ def connector(owner, hint, send_handshake, expected_handshake,
if not isinstance(e, (socket.error, socket.timeout, BadHandshake)): if not isinstance(e, (socket.error, socket.timeout, BadHandshake)):
raise raise
owner._connector_failed(hint) owner._connector_failed(hint)
return
# owner is now responsible for the socket # owner is now responsible for the socket
owner._negotiation_finished(skt) # note thread owner._negotiation_finished(skt) # note thread
@ -107,7 +116,7 @@ def handle(skt, client_address, owner, send_handshake, expected_handshake):
try: try:
#print("handle %r" % (skt,)) #print("handle %r" % (skt,))
skt.settimeout(TIMEOUT) skt.settimeout(TIMEOUT)
skt.send(send_handshake) send_to(skt, send_handshake)
got = b"" got = b""
# for the receiver, this includes the "go\n" # for the receiver, this includes the "go\n"
while len(got) < len(expected_handshake): while len(got) < len(expected_handshake):
@ -180,7 +189,7 @@ class Common:
def get_direct_hints(self): def get_direct_hints(self):
return self.my_direct_hints return self.my_direct_hints
def get_relay_hints(self): def get_relay_hints(self):
return [] return [TRANSIT_RELAY]
def add_their_direct_hints(self, hints): def add_their_direct_hints(self, hints):
self._their_direct_hints = [force_ascii(h) for h in hints] self._their_direct_hints = [force_ascii(h) for h in hints]
@ -221,7 +230,7 @@ class Common:
def _start_connector(self, hint, is_relay=False): def _start_connector(self, hint, is_relay=False):
args = (self, hint, self._send_this(), self._expect_this()) args = (self, hint, self._send_this(), self._expect_this())
if is_relay: if is_relay:
args = args + (build_relay_token(self._transit_key),) args = args + (build_relay_handshake(self._transit_key),)
t = threading.Thread(target=connector, args=args) t = threading.Thread(target=connector, args=args)
t.daemon = True t.daemon = True
t.start() t.start()
@ -265,11 +274,11 @@ class Common:
if is_winner: if is_winner:
if self.is_sender: if self.is_sender:
skt.send("go\n") send_to(skt, "go\n")
self.winning.set() self.winning.set()
else: else:
if self.is_sender: if self.is_sender:
skt.send("nevermind\n") send_to(skt, "nevermind\n")
skt.close() skt.close()
class TransitSender(Common): class TransitSender(Common):

View File

@ -1,4 +1,5 @@
# This is a relay I run on a personal server. If it gets too expensive to # This is a relay I run on a personal server. If it gets too expensive to
# run, I'll shut it down. # run, I'll shut it down.
RELAY = "http://relay.petmail.org:8009/relay/" RENDEZVOUS_RELAY = "http://relay.petmail.org:8009/relay/"
TRANSIT_RELAY = "relay.petmail.org,8010"

View File

@ -1,4 +1,5 @@
import os, re, json, binascii from __future__ import print_function
import re, json
from collections import defaultdict from collections import defaultdict
from twisted.python import log from twisted.python import log
from twisted.internet import protocol from twisted.internet import protocol
@ -111,84 +112,8 @@ class Relay(resource.Resource):
log.msg("freed %d, now have %d channels" % log.msg("freed %d, now have %d channels" %
(channel_id, len(self.channels))) (channel_id, len(self.channels)))
class Transit(resource.Resource, protocol.ServerFactory, service.MultiService):
# Transit manages two simultaneous connections to a secondary TCP port,
# both forwarded to the other. Transit will allocate you a token when the
# ports are free, and will inform you of the MAXLENGTH and MAXTIME
# limits. Connect to the port, send "TOKEN\n", receive "ok\n", and all
# subsequent data you send will be delivered to the other side. All data
# you get after the "ok" will be from the other side. You will not
# receive "ok" until the other side has also connected and submitted a
# valid token. The token is different for each side. The connections will
# be dropped after MAXLENGTH bytes have been sent by either side, or
# MAXTIME seconds after the token is issued, whichever is reached first.
# These relay connections are not half-closeable (unlike full TCP
# connections, applications will not receive any data after half-closing
# their outgoing side). Applications must negotiate shutdown with their
# peer and not close the connection until all data has finished
# transferring in both directions. Applications which only need to send
# data in one direction can use close as usual.
MAXLENGTH = 10*MB
MAXTIME = 60*SECONDS
def __init__(self):
resource.Resource.__init__(self)
service.MultiService.__init__(self)
self.pending_requests = []
self.active_token = None
self.active_connection = None
self.active_timer = None
def make_token(self):
return binascii.hexlify(os.urandom(8))
def render_POST(self, request):
if self.active_connection:
self.pending_requests.append(request)
return server.NOT_DONE_YET
self.active_token = self.make_token()
request.setHeader("content-type", "application/json; charset=utf-8")
t = service.TimerService(self.MAXTIME, self.timer_expired)
self.active_timer = t
t.setServiceParent(self)
r = { "token": self.active_token,
"maxlength": self.MAXLENGTH,
"maxtime": self.MAXTIME,
}
return json.dumps(r)+"\n"
def timer_expired(self):
self.remove_timer()
for c in self.active_connections:
c.STOPSTOP()
self.active_connections[:] = []
self.active_token = None
def remove_timer(self):
self.active_timer.disownServiceParent()
self.active_timer = None
# ServerFactory methods, which manage the two TransitConnection protocols
def buildProtocol(self, addr):
p = TransitConnection(self.active_token)
p.factory = self
return p
def connection_got_token(self, p):
pass
def transitFinished(self, p):
pass
def transitFailed(self):
pass
# after getting a token, both transit clients connect to one of these
class TransitConnection(protocol.Protocol): class TransitConnection(protocol.Protocol):
def __init__(self, expected_token): def __init__(self):
self.expected_token = expected_token
self.got_token = False self.got_token = False
self.token_buffer = b"" self.token_buffer = b""
self.sent_ok = False self.sent_ok = False
@ -200,37 +125,109 @@ class TransitConnection(protocol.Protocol):
self.buddy.transport.write(data) self.buddy.transport.write(data)
return return
if self.got_token: # but not yet sent_ok if self.got_token: # but not yet sent_ok
self.transport.write("impatient\n")
print("transit impatience failure")
return self.disconnect() # impatience yields failure return self.disconnect() # impatience yields failure
# else this should be (part of) the token # else this should be (part of) the token
self.token_buffer += data self.token_buffer += data
if b"\n" not in self.token_buffer: buf = self.token_buffer
wanted = len("please relay \n")+32*2
if len(buf) < wanted-1 and "\n" in buf:
self.transport.write("bad handshake\n")
print("transit handshake early failure")
return self.disconnect()
if len(buf) < wanted:
return return
lines = self.token_buffer.split(b"\n") if len(buf) > wanted:
if len(lines) > 1: self.transport.write("impatient\n")
print("transit impatience failure")
return self.disconnect() # impatience yields failure return self.disconnect() # impatience yields failure
token = lines[0] mo = re.search(r"^please relay (\w{64})\n", buf, re.M)
if token != self.expected_token: if not mo:
self.transport.write("bad handshake\n")
print("transit handshake failure")
return self.disconnect() # incorrectness yields failure return self.disconnect() # incorrectness yields failure
token = mo.group(1)
self.got_token = True self.got_token = True
self.factory.connection_got_token(self) self.factory.connection_got_token(token, self)
def buddy_connected(self, them): def buddy_connected(self, them):
self.buddy = them self.buddy = them
self.transport.write(b"ok\n") self.transport.write(b"ok\n")
self.sent_ok = True self.sent_ok = True
# TODO: connect as producer/consumer
def buddy_disconnected(self): def buddy_disconnected(self):
print("buddy_disconnected %r" % self)
self.buddy = None self.buddy = None
self.transport.loseConnection() self.transport.loseConnection()
self.factory.transitFinished(self)
def connectionLost(self, reason): def connectionLost(self, reason):
print("connectionLost %r %s" % (self, reason))
if self.buddy: if self.buddy:
self.buddy.buddy_disconnected() self.buddy.buddy_disconnected()
self.factory.transitFinished(self)
def disconnect(self): def disconnect(self):
self.transport.loseConnection() self.transport.loseConnection()
self.factory.transitFailed() self.factory.transitFailed(self)
class Transit(protocol.ServerFactory, service.MultiService):
# I manage pairs of simultaneous connections to a secondary TCP port,
# both forwarded to the other. Clients must begin each connection with
# "please relay TOKEN\n". I will send "ok\n" when the matching connection
# is established, or disconnect if no matching connection is made within
# MAX_WAIT_TIME seconds. I will disconnect if you send data before the
# "ok\n". All data you get after the "ok\n" will be from the other side.
# You will not receive "ok\n" until the other side has also connected and
# submitted a matching token. The token is the same for each side.
# In addition, the connections will be dropped after MAXLENGTH bytes have
# been sent by either side, or MAXTIME seconds have elapsed after the
# matching connections were established. A future API will reveal these
# limits to clients instead of causing mysterious spontaneous failures.
# These relay connections are not half-closeable (unlike full TCP
# connections, applications will not receive any data after half-closing
# their outgoing side). Applications must negotiate shutdown with their
# peer and not close the connection until all data has finished
# transferring in both directions. Applications which only need to send
# data in one direction can use close() as usual.
MAX_WAIT_TIME = 30*SECONDS
MAXLENGTH = 10*MB
MAXTIME = 60*SECONDS
protocol = TransitConnection
def __init__(self):
service.MultiService.__init__(self)
self.pending_requests = {} # token -> TransitConnection
self.active_connections = set() # TransitConnection
def connection_got_token(self, token, p):
if token in self.pending_requests:
print("transit relay 2: %r" % token)
buddy = self.pending_requests.pop(token)
self.active_connections.add(p)
self.active_connections.add(buddy)
p.buddy_connected(buddy)
buddy.buddy_connected(p)
else:
self.pending_requests[token] = p
print("transit relay 1: %r" % token)
# TODO: timer
def transitFinished(self, p):
print("transitFinished %r" % p)
for token,tc in self.pending_requests.items():
if tc is p:
del self.pending_requests[token]
break
self.active_connections.discard(p)
def transitFailed(self, p):
print("transitFailed %r" % p)
pass
class Root(resource.Resource): class Root(resource.Resource):
@ -249,7 +246,6 @@ class RelayServer(service.MultiService):
self.relay = Relay() # for tests self.relay = Relay() # for tests
self.root.putChild("relay", self.relay) self.root.putChild("relay", self.relay)
self.transit = Transit() self.transit = Transit()
self.root.putChild("transit", self.transit)
self.transit.setServiceParent(self) # for the timer self.transit.setServiceParent(self) # for the timer
self.transport_service = strports.service(transitport, self.transit) self.transport_service = strports.service(transitport, self.transit)
self.transport_service.setServiceParent(self) self.transport_service.setServiceParent(self)