Merge branch 'blur-usage'
This commit is contained in:
commit
4bc0fc6cc0
|
@ -41,6 +41,9 @@ sp_start.add_argument("--transit", default="tcp:3001", metavar="tcp:PORT",
|
||||||
help="endpoint specification for the transit-relay port")
|
help="endpoint specification for the transit-relay port")
|
||||||
sp_start.add_argument("--advertise-version", metavar="VERSION",
|
sp_start.add_argument("--advertise-version", metavar="VERSION",
|
||||||
help="version to recommend to clients")
|
help="version to recommend to clients")
|
||||||
|
sp_start.add_argument("--blur-usage", default=None, type=int,
|
||||||
|
metavar="SECONDS",
|
||||||
|
help="round logged access times to improve privacy")
|
||||||
sp_start.add_argument("-n", "--no-daemon", action="store_true")
|
sp_start.add_argument("-n", "--no-daemon", action="store_true")
|
||||||
#sp_start.add_argument("twistd_args", nargs="*", default=None,
|
#sp_start.add_argument("twistd_args", nargs="*", default=None,
|
||||||
# metavar="[TWISTD-ARGS..]",
|
# metavar="[TWISTD-ARGS..]",
|
||||||
|
@ -61,6 +64,9 @@ sp_restart.add_argument("--transit", default="tcp:3001", metavar="tcp:PORT",
|
||||||
help="endpoint specification for the transit-relay port")
|
help="endpoint specification for the transit-relay port")
|
||||||
sp_restart.add_argument("--advertise-version", metavar="VERSION",
|
sp_restart.add_argument("--advertise-version", metavar="VERSION",
|
||||||
help="version to recommend to clients")
|
help="version to recommend to clients")
|
||||||
|
sp_restart.add_argument("--blur-usage", default=None, type=int,
|
||||||
|
metavar="SECONDS",
|
||||||
|
help="round logged access times to improve privacy")
|
||||||
sp_restart.add_argument("-n", "--no-daemon", action="store_true")
|
sp_restart.add_argument("-n", "--no-daemon", action="store_true")
|
||||||
sp_restart.set_defaults(func=cmd_server.restart_server)
|
sp_restart.set_defaults(func=cmd_server.restart_server)
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ class MyPlugin:
|
||||||
from .server import RelayServer
|
from .server import RelayServer
|
||||||
return RelayServer(self.args.rendezvous, self.args.transit,
|
return RelayServer(self.args.rendezvous, self.args.transit,
|
||||||
self.args.advertise_version,
|
self.args.advertise_version,
|
||||||
"relay.sqlite")
|
"relay.sqlite", self.args.blur_usage)
|
||||||
|
|
||||||
def start_server(args):
|
def start_server(args):
|
||||||
from twisted.python import usage
|
from twisted.python import usage
|
||||||
|
|
|
@ -63,10 +63,11 @@ class EventsProtocol:
|
||||||
# all JSON responses include a "welcome:{..}" key
|
# all JSON responses include a "welcome:{..}" key
|
||||||
|
|
||||||
class RelayResource(resource.Resource):
|
class RelayResource(resource.Resource):
|
||||||
def __init__(self, relay, welcome):
|
def __init__(self, relay, welcome, log_requests):
|
||||||
resource.Resource.__init__(self)
|
resource.Resource.__init__(self)
|
||||||
self._relay = relay
|
self._relay = relay
|
||||||
self._welcome = welcome
|
self._welcome = welcome
|
||||||
|
self._log_requests = log_requests
|
||||||
|
|
||||||
class ChannelLister(RelayResource):
|
class ChannelLister(RelayResource):
|
||||||
def render_GET(self, request):
|
def render_GET(self, request):
|
||||||
|
@ -93,8 +94,9 @@ class Allocator(RelayResource):
|
||||||
app = self._relay.get_app(appid)
|
app = self._relay.get_app(appid)
|
||||||
channelid = app.find_available_channelid()
|
channelid = app.find_available_channelid()
|
||||||
app.allocate_channel(channelid, side)
|
app.allocate_channel(channelid, side)
|
||||||
log.msg("allocated #%d, now have %d DB channels" %
|
if self._log_requests:
|
||||||
(channelid, len(app.get_allocated())))
|
log.msg("allocated #%d, now have %d DB channels" %
|
||||||
|
(channelid, len(app.get_allocated())))
|
||||||
request.setHeader(b"content-type", b"application/json; charset=utf-8")
|
request.setHeader(b"content-type", b"application/json; charset=utf-8")
|
||||||
data = {"welcome": self._welcome, "channelid": channelid}
|
data = {"welcome": self._welcome, "channelid": channelid}
|
||||||
return (json.dumps(data)+"\n").encode("utf-8")
|
return (json.dumps(data)+"\n").encode("utf-8")
|
||||||
|
@ -207,10 +209,13 @@ class Deallocator(RelayResource):
|
||||||
|
|
||||||
|
|
||||||
class Channel:
|
class Channel:
|
||||||
def __init__(self, app, db, welcome, appid, channelid):
|
def __init__(self, app, db, welcome, blur_usage, log_requests,
|
||||||
|
appid, channelid):
|
||||||
self._app = app
|
self._app = app
|
||||||
self._db = db
|
self._db = db
|
||||||
self._welcome = welcome
|
self._welcome = welcome
|
||||||
|
self._blur_usage = blur_usage
|
||||||
|
self._log_requests = log_requests
|
||||||
self._appid = appid
|
self._appid = appid
|
||||||
self._channelid = channelid
|
self._channelid = channelid
|
||||||
self._listeners = set() # callbacks that take JSONable object
|
self._listeners = set() # callbacks that take JSONable object
|
||||||
|
@ -297,6 +302,8 @@ class Channel:
|
||||||
|
|
||||||
def _store_summary(self, summary):
|
def _store_summary(self, summary):
|
||||||
(started, result, total_time, waiting_time) = summary
|
(started, result, total_time, waiting_time) = summary
|
||||||
|
if self._blur_usage:
|
||||||
|
started = self._blur_usage * (started // self._blur_usage)
|
||||||
self._db.execute("INSERT INTO `usage`"
|
self._db.execute("INSERT INTO `usage`"
|
||||||
" (`type`, `started`, `result`,"
|
" (`type`, `started`, `result`,"
|
||||||
" `total_time`, `waiting_time`)"
|
" `total_time`, `waiting_time`)"
|
||||||
|
@ -382,9 +389,11 @@ class Channel:
|
||||||
|
|
||||||
|
|
||||||
class AppNamespace:
|
class AppNamespace:
|
||||||
def __init__(self, db, welcome, appid):
|
def __init__(self, db, welcome, blur_usage, log_requests, appid):
|
||||||
self._db = db
|
self._db = db
|
||||||
self._welcome = welcome
|
self._welcome = welcome
|
||||||
|
self._blur_usage = blur_usage
|
||||||
|
self._log_requests = log_requests
|
||||||
self._appid = appid
|
self._appid = appid
|
||||||
self._channels = {}
|
self._channels = {}
|
||||||
|
|
||||||
|
@ -418,8 +427,11 @@ class AppNamespace:
|
||||||
def get_channel(self, channelid):
|
def get_channel(self, channelid):
|
||||||
assert isinstance(channelid, int)
|
assert isinstance(channelid, int)
|
||||||
if not channelid in self._channels:
|
if not channelid in self._channels:
|
||||||
log.msg("spawning #%d for appid %s" % (channelid, self._appid))
|
if self._log_requests:
|
||||||
|
log.msg("spawning #%d for appid %s" % (channelid, self._appid))
|
||||||
self._channels[channelid] = Channel(self, self._db, self._welcome,
|
self._channels[channelid] = Channel(self, self._db, self._welcome,
|
||||||
|
self._blur_usage,
|
||||||
|
self._log_requests,
|
||||||
self._appid, channelid)
|
self._appid, channelid)
|
||||||
return self._channels[channelid]
|
return self._channels[channelid]
|
||||||
|
|
||||||
|
@ -429,10 +441,16 @@ class AppNamespace:
|
||||||
|
|
||||||
if channelid in self._channels:
|
if channelid in self._channels:
|
||||||
self._channels.pop(channelid)
|
self._channels.pop(channelid)
|
||||||
log.msg("freed+killed #%d, now have %d DB channels, %d live" %
|
if self._log_requests:
|
||||||
(channelid, len(self.get_allocated()), len(self._channels)))
|
log.msg("freed+killed #%d, now have %d DB channels, %d live" %
|
||||||
|
(channelid, len(self.get_allocated()), len(self._channels)))
|
||||||
|
|
||||||
def prune_old_channels(self):
|
def prune_old_channels(self):
|
||||||
|
# For now, pruning is logged even if log_requests is False, to debug
|
||||||
|
# the pruning process, and since pruning is triggered by a timer
|
||||||
|
# instead of by user action. It does reveal which channels were
|
||||||
|
# present when the pruning process began, though, so in the log run
|
||||||
|
# it should do less logging.
|
||||||
log.msg(" channel prune begins")
|
log.msg(" channel prune begins")
|
||||||
# a channel is deleted when there are no listeners and there have
|
# a channel is deleted when there are no listeners and there have
|
||||||
# been no messages added in CHANNEL_EXPIRATION_TIME seconds
|
# been no messages added in CHANNEL_EXPIRATION_TIME seconds
|
||||||
|
@ -448,20 +466,23 @@ class AppNamespace:
|
||||||
return bool(self._channels)
|
return bool(self._channels)
|
||||||
|
|
||||||
class Relay(resource.Resource, service.MultiService):
|
class Relay(resource.Resource, service.MultiService):
|
||||||
def __init__(self, db, welcome):
|
def __init__(self, db, welcome, blur_usage):
|
||||||
resource.Resource.__init__(self)
|
resource.Resource.__init__(self)
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
self._db = db
|
self._db = db
|
||||||
self._welcome = welcome
|
self._welcome = welcome
|
||||||
|
self._blur_usage = blur_usage
|
||||||
|
log_requests = blur_usage is None
|
||||||
|
self._log_requests = log_requests
|
||||||
self._apps = {}
|
self._apps = {}
|
||||||
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.prune)
|
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.prune)
|
||||||
t.setServiceParent(self)
|
t.setServiceParent(self)
|
||||||
self.putChild(b"list", ChannelLister(self, welcome))
|
self.putChild(b"list", ChannelLister(self, welcome, log_requests))
|
||||||
self.putChild(b"allocate", Allocator(self, welcome))
|
self.putChild(b"allocate", Allocator(self, welcome, log_requests))
|
||||||
self.putChild(b"add", Adder(self, welcome))
|
self.putChild(b"add", Adder(self, welcome, log_requests))
|
||||||
self.putChild(b"get", GetterOrWatcher(self, welcome))
|
self.putChild(b"get", GetterOrWatcher(self, welcome, log_requests))
|
||||||
self.putChild(b"watch", Watcher(self, welcome))
|
self.putChild(b"watch", Watcher(self, welcome, log_requests))
|
||||||
self.putChild(b"deallocate", Deallocator(self, welcome))
|
self.putChild(b"deallocate", Deallocator(self, welcome, log_requests))
|
||||||
|
|
||||||
def getChild(self, path, req):
|
def getChild(self, path, req):
|
||||||
# 0.4.0 used "POST /CID/SIDE/post/MSGNUM"
|
# 0.4.0 used "POST /CID/SIDE/post/MSGNUM"
|
||||||
|
@ -475,11 +496,15 @@ class Relay(resource.Resource, service.MultiService):
|
||||||
def get_app(self, appid):
|
def get_app(self, appid):
|
||||||
assert isinstance(appid, type(u""))
|
assert isinstance(appid, type(u""))
|
||||||
if not appid in self._apps:
|
if not appid in self._apps:
|
||||||
log.msg("spawning appid %s" % (appid,))
|
if self._log_requests:
|
||||||
self._apps[appid] = AppNamespace(self._db, self._welcome, appid)
|
log.msg("spawning appid %s" % (appid,))
|
||||||
|
self._apps[appid] = AppNamespace(self._db, self._welcome,
|
||||||
|
self._blur_usage,
|
||||||
|
self._log_requests, appid)
|
||||||
return self._apps[appid]
|
return self._apps[appid]
|
||||||
|
|
||||||
def prune(self):
|
def prune(self):
|
||||||
|
# As with AppNamespace.prune_old_channels, we log for now.
|
||||||
log.msg("beginning app prune")
|
log.msg("beginning app prune")
|
||||||
c = self._db.execute("SELECT DISTINCT `appid` FROM `messages`")
|
c = self._db.execute("SELECT DISTINCT `appid` FROM `messages`")
|
||||||
apps = set([row["appid"] for row in c.fetchall()]) # these have messages
|
apps = set([row["appid"] for row in c.fetchall()]) # these have messages
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
from twisted.python import log
|
||||||
from twisted.internet import reactor, endpoints
|
from twisted.internet import reactor, endpoints
|
||||||
from twisted.application import service
|
from twisted.application import service
|
||||||
from twisted.web import server, static, resource
|
from twisted.web import server, static, resource
|
||||||
|
@ -14,10 +15,17 @@ class Root(resource.Resource):
|
||||||
resource.Resource.__init__(self)
|
resource.Resource.__init__(self)
|
||||||
self.putChild(b"", static.Data(b"Wormhole Relay\n", "text/plain"))
|
self.putChild(b"", static.Data(b"Wormhole Relay\n", "text/plain"))
|
||||||
|
|
||||||
|
class PrivacyEnhancedSite(server.Site):
|
||||||
|
logRequests = True
|
||||||
|
def log(self, request):
|
||||||
|
if self.logRequests:
|
||||||
|
return server.Site.log(self, request)
|
||||||
|
|
||||||
class RelayServer(service.MultiService):
|
class RelayServer(service.MultiService):
|
||||||
def __init__(self, relayport, transitport, advertise_version,
|
def __init__(self, relayport, transitport, advertise_version,
|
||||||
db_url=":memory:"):
|
db_url=":memory:", blur_usage=None):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
|
self._blur_usage = blur_usage
|
||||||
self.db = get_db(db_url)
|
self.db = get_db(db_url)
|
||||||
welcome = {
|
welcome = {
|
||||||
"current_version": __version__,
|
"current_version": __version__,
|
||||||
|
@ -31,15 +39,26 @@ class RelayServer(service.MultiService):
|
||||||
if advertise_version:
|
if advertise_version:
|
||||||
welcome["current_version"] = advertise_version
|
welcome["current_version"] = advertise_version
|
||||||
self.root = Root()
|
self.root = Root()
|
||||||
site = server.Site(self.root)
|
site = PrivacyEnhancedSite(self.root)
|
||||||
|
if blur_usage:
|
||||||
|
site.logRequests = False
|
||||||
r = endpoints.serverFromString(reactor, relayport)
|
r = endpoints.serverFromString(reactor, relayport)
|
||||||
self.relayport_service = ServerEndpointService(r, site)
|
self.relayport_service = ServerEndpointService(r, site)
|
||||||
self.relayport_service.setServiceParent(self)
|
self.relayport_service.setServiceParent(self)
|
||||||
self.relay = Relay(self.db, welcome) # accessible from tests
|
self.relay = Relay(self.db, welcome, blur_usage) # accessible from tests
|
||||||
self.root.putChild(b"wormhole-relay", self.relay)
|
self.root.putChild(b"wormhole-relay", self.relay)
|
||||||
if transitport:
|
if transitport:
|
||||||
self.transit = Transit(self.db)
|
self.transit = Transit(self.db, blur_usage)
|
||||||
self.transit.setServiceParent(self) # for the timer
|
self.transit.setServiceParent(self) # for the timer
|
||||||
t = endpoints.serverFromString(reactor, transitport)
|
t = endpoints.serverFromString(reactor, transitport)
|
||||||
self.transport_service = ServerEndpointService(t, self.transit)
|
self.transport_service = ServerEndpointService(t, self.transit)
|
||||||
self.transport_service.setServiceParent(self)
|
self.transport_service.setServiceParent(self)
|
||||||
|
|
||||||
|
def startService(self):
|
||||||
|
service.MultiService.startService(self)
|
||||||
|
log.msg("Wormhole relay server (Rendezvous and Transit) running")
|
||||||
|
if self._blur_usage:
|
||||||
|
log.msg("blurring access times to %d seconds" % self._blur_usage)
|
||||||
|
log.msg("not logging HTTP requests")
|
||||||
|
else:
|
||||||
|
log.msg("not blurring access times")
|
||||||
|
|
|
@ -148,9 +148,10 @@ class Transit(protocol.ServerFactory, service.MultiService):
|
||||||
MAXTIME = 60*SECONDS
|
MAXTIME = 60*SECONDS
|
||||||
protocol = TransitConnection
|
protocol = TransitConnection
|
||||||
|
|
||||||
def __init__(self, db):
|
def __init__(self, db, blur_usage):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
self._db = db
|
self._db = db
|
||||||
|
self._blur_usage = blur_usage
|
||||||
self._pending_requests = {} # token -> TransitConnection
|
self._pending_requests = {} # token -> TransitConnection
|
||||||
self._active_connections = set() # TransitConnection
|
self._active_connections = set() # TransitConnection
|
||||||
|
|
||||||
|
@ -170,6 +171,8 @@ class Transit(protocol.ServerFactory, service.MultiService):
|
||||||
def recordUsage(self, started, result, total_bytes,
|
def recordUsage(self, started, result, total_bytes,
|
||||||
total_time, waiting_time):
|
total_time, waiting_time):
|
||||||
log.msg("Transit.recordUsage (%dB)" % total_bytes)
|
log.msg("Transit.recordUsage (%dB)" % total_bytes)
|
||||||
|
if self._blur_usage:
|
||||||
|
started = self._blur_usage * (started // self._blur_usage)
|
||||||
self._db.execute("INSERT INTO `usage`"
|
self._db.execute("INSERT INTO `usage`"
|
||||||
" (`type`, `started`, `result`, `total_bytes`,"
|
" (`type`, `started`, `result`, `total_bytes`,"
|
||||||
" `total_time`, `waiting_time`)"
|
" `total_time`, `waiting_time`)"
|
||||||
|
|
|
@ -363,7 +363,7 @@ class OneEventAtATime:
|
||||||
|
|
||||||
class Summary(unittest.TestCase):
|
class Summary(unittest.TestCase):
|
||||||
def test_summarize(self):
|
def test_summarize(self):
|
||||||
c = relay_server.Channel(None, None, None, None, None)
|
c = relay_server.Channel(None, None, None, None, False, None, None)
|
||||||
A = relay_server.ALLOCATE
|
A = relay_server.ALLOCATE
|
||||||
D = relay_server.DEALLOCATE
|
D = relay_server.DEALLOCATE
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user