start on transit service

This commit is contained in:
Brian Warner 2015-02-11 18:13:54 -08:00
parent ab8d9f7678
commit d5d4a3f97a

View File

@ -1,9 +1,13 @@
import re, json
import os, re, json, binascii
from collections import defaultdict
from twisted.python import log
from twisted.internet import protocol
from twisted.application import strports, service
from twisted.web import server, static, resource, http
SECONDS = 1.0
MB = 1000*1000
class Channel(resource.Resource):
isLeaf = True
@ -97,6 +101,128 @@ class Relay(resource.Resource):
log.msg("freed %d, now have %d channels" %
(channel_id, len(self.channels)))
class Transit(resource.Resource, protocol.ServerFactory, service.MultiService):
# Transit manages two simultaneous connections to a secondary TCP port,
# both forwarded to the other. Transit will allocate you a token when the
# ports are free, and will inform you of the MAXLENGTH and MAXTIME
# limits. Connect to the port, send "TOKEN\n", receive "ok\n", and all
# subsequent data you send will be delivered to the other side. All data
# you get after the "ok" will be from the other side. You will not
# receive "ok" until the other side has also connected and submitted a
# valid token. The token is different for each side. The connections will
# be dropped after MAXLENGTH bytes have been sent by either side, or
# MAXTIME seconds after the token is issued, whichever is reached first.
# These relay connections are not half-closeable (unlike full TCP
# connections, applications will not receive any data after half-closing
# their outgoing side). Applications must negotiate shutdown with their
# peer and not close the connection until all data has finished
# transferring in both directions. Applications which only need to send
# data in one direction can use close as usual.
MAXLENGTH = 10*MB
MAXTIME = 60*SECONDS
def __init__(self):
resource.Resource.__init__(self)
service.MultiService.__init__(self)
self.pending_requests = []
self.active_token = None
self.active_connection = None
self.active_timer = None
def make_token(self):
return binascii.hexlify(os.urandom(8))
def render_POST(self, request):
if self.active_connection:
self.pending_requests.append(request)
return server.NOT_DONE_YET
self.active_token = self.make_token()
request.setHeader("content-type", "application/json; charset=utf-8")
t = service.TimerService(self.MAXTIME, self.timer_expired)
self.active_timer = t
t.setServiceParent(self)
r = { "token": self.active_token,
"maxlength": self.MAXLENGTH,
"maxtime": self.MAXTIME,
}
return json.dumps(r)+"\n"
def timer_expired(self):
self.remove_timer()
for c in self.active_connections:
c.STOPSTOP()
self.active_connections[:] = []
self.active_token = None
def remove_timer(self):
self.active_timer.disownServiceParent()
self.active_timer = None
# ServerFactory methods, which manage the two TransitConnection protocols
def buildProtocol(self, addr):
p = TransitConnection(self.active_token)
p.factory = self
return p
def connection_got_token(self, p):
pass
def transitFinished(self, p):
pass
def transitFailed(self):
pass
# after getting a token, both transit clients connect to one of these
class TransitConnection(protocol.Protocol):
def __init__(self, expected_token):
self.expected_token = expected_token
self.got_token = False
self.token_buffer = b""
self.sent_ok = False
self.buddy = None
def dataReceived(self, data):
if self.sent_ok:
# TODO: connect as producer/consumer
self.buddy.transport.write(data)
return
if self.got_token: # but not yet sent_ok
return self.disconnect() # impatience yields failure
# else this should be (part of) the token
self.token_buffer += data
if b"\n" not in self.token_buffer:
return
lines = self.token_buffer.split(b"\n")
if len(lines) > 1:
return self.disconnect() # impatience yields failure
token = lines[0]
if token != self.expected_token:
return self.disconnect() # incorrectness yields failure
self.got_token = True
self.factory.connection_got_token(self)
def buddy_connected(self, them):
self.buddy = them
self.transport.write(b"ok\n")
self.sent_ok = True
def buddy_disconnected(self):
self.buddy = None
self.transport.loseConnection()
self.factory.transitFinished(self)
def connectionLost(self, reason):
if self.buddy:
self.buddy.buddy_disconnected()
def disconnect(self):
self.transport.loseConnection()
self.factory.transitFailed()
class Root(resource.Resource):
# child_FOO is a nevow thing, not a twisted.web.resource thing
def __init__(self):
@ -104,17 +230,19 @@ class Root(resource.Resource):
self.putChild("", static.Data("Wormhole Relay\n", "text/plain"))
class RelayServer(service.MultiService):
def __init__(self, listenport):
def __init__(self, relayport, transitport):
service.MultiService.__init__(self)
self.root = Root()
site = server.Site(self.root)
self.port_service = strports.service(listenport, site)
self.port_service.setServiceParent(self)
self.relayport_service = strports.service(relayport, site)
self.relayport_service.setServiceParent(self)
self.relay = Relay() # for tests
self.root.putChild("relay", self.relay)
def get_root(self):
return self.root
self.transit = Transit()
self.root.putChild("transit", self.transit)
self.transit.setServiceParent(self) # for the timer
self.transport_service = strports.service(transitport, self.transit)
self.transport_service.setServiceParent(self)
application = service.Application("foo")
RelayServer("tcp:8009").setServiceParent(application)
RelayServer("tcp:8009", "tcp:8010").setServiceParent(application)