move stats-file up to RelayServer
This moves responsibility for the periodic prune-everything Timer up to RelayServer too. That way we can be sure the stats are dumped immediately after prune, and we can incorporate stats from Transit as well.
This commit is contained in:
parent
7f389dc76e
commit
62f9a4d702
|
@ -1,14 +1,8 @@
|
||||||
from __future__ import print_function, unicode_literals
|
from __future__ import print_function, unicode_literals
|
||||||
import os, time, random, base64, json
|
import os, random, base64
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from twisted.python import log
|
from twisted.python import log
|
||||||
from twisted.application import service, internet
|
from twisted.application import service
|
||||||
|
|
||||||
SECONDS = 1.0
|
|
||||||
MINUTE = 60*SECONDS
|
|
||||||
|
|
||||||
CHANNEL_EXPIRATION_TIME = 11*MINUTE
|
|
||||||
EXPIRATION_CHECK_PERIOD = 10*MINUTE
|
|
||||||
|
|
||||||
def generate_mailbox_id():
|
def generate_mailbox_id():
|
||||||
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
|
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
|
||||||
|
@ -478,17 +472,14 @@ class AppNamespace:
|
||||||
channel._shutdown()
|
channel._shutdown()
|
||||||
|
|
||||||
class Rendezvous(service.MultiService):
|
class Rendezvous(service.MultiService):
|
||||||
def __init__(self, db, welcome, blur_usage, stats_file=None):
|
def __init__(self, db, welcome, blur_usage):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
self._db = db
|
self._db = db
|
||||||
self._welcome = welcome
|
self._welcome = welcome
|
||||||
self._blur_usage = blur_usage
|
self._blur_usage = blur_usage
|
||||||
self._stats_file = stats_file
|
|
||||||
log_requests = blur_usage is None
|
log_requests = blur_usage is None
|
||||||
self._log_requests = log_requests
|
self._log_requests = log_requests
|
||||||
self._apps = {}
|
self._apps = {}
|
||||||
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer)
|
|
||||||
t.setServiceParent(self)
|
|
||||||
|
|
||||||
def get_welcome(self):
|
def get_welcome(self):
|
||||||
return self._welcome
|
return self._welcome
|
||||||
|
@ -518,34 +509,19 @@ class Rendezvous(service.MultiService):
|
||||||
apps.add(row["app_id"])
|
apps.add(row["app_id"])
|
||||||
return apps
|
return apps
|
||||||
|
|
||||||
def timer(self):
|
def prune_all_apps(self, now, old):
|
||||||
self.prune_all_apps()
|
|
||||||
self.dump_stats(now=time.time(), validity=EXPIRATION_CHECK_PERIOD+60)
|
|
||||||
|
|
||||||
def prune_all_apps(self, now=None, old=None):
|
|
||||||
# As with AppNamespace.prune_old_mailboxes, we log for now.
|
# As with AppNamespace.prune_old_mailboxes, we log for now.
|
||||||
log.msg("beginning app prune")
|
log.msg("beginning app prune")
|
||||||
now = now or time.time()
|
|
||||||
old = old or (now - CHANNEL_EXPIRATION_TIME)
|
|
||||||
for app_id in sorted(self.get_all_apps()):
|
for app_id in sorted(self.get_all_apps()):
|
||||||
log.msg(" app prune checking %r" % (app_id,))
|
log.msg(" app prune checking %r" % (app_id,))
|
||||||
app = self.get_app(app_id)
|
app = self.get_app(app_id)
|
||||||
app.prune(now, old)
|
app.prune(now, old)
|
||||||
log.msg("app prune ends, %d apps" % len(self._apps))
|
log.msg("app prune ends, %d apps" % len(self._apps))
|
||||||
|
|
||||||
def dump_stats(self, now, validity):
|
def get_stats(self):
|
||||||
if not self._stats_file:
|
stats = {}
|
||||||
return
|
|
||||||
tmpfn = self._stats_file + ".tmp"
|
|
||||||
|
|
||||||
data = {}
|
return stats
|
||||||
data["created"] = now
|
|
||||||
data["valid_until"] = now + validity
|
|
||||||
|
|
||||||
with open(tmpfn, "wb") as f:
|
|
||||||
json.dump(data, f, indent=1)
|
|
||||||
f.write("\n")
|
|
||||||
os.rename(tmpfn, self._stats_file)
|
|
||||||
|
|
||||||
def stopService(self):
|
def stopService(self):
|
||||||
# This forcibly boots any clients that are still connected, which
|
# This forcibly boots any clients that are still connected, which
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
# NO unicode_literals or static.Data() will break, because it demands
|
# NO unicode_literals or static.Data() will break, because it demands
|
||||||
# a str on Python 2
|
# a str on Python 2
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
|
import os, time, json
|
||||||
from twisted.python import log
|
from twisted.python import log
|
||||||
from twisted.internet import reactor, endpoints
|
from twisted.internet import reactor, endpoints
|
||||||
from twisted.application import service
|
from twisted.application import service, internet
|
||||||
from twisted.web import server, static, resource
|
from twisted.web import server, static, resource
|
||||||
from autobahn.twisted.resource import WebSocketResource
|
from autobahn.twisted.resource import WebSocketResource
|
||||||
from .endpoint_service import ServerEndpointService
|
from .endpoint_service import ServerEndpointService
|
||||||
|
@ -13,6 +14,12 @@ from .rendezvous import Rendezvous
|
||||||
from .rendezvous_websocket import WebSocketRendezvousFactory
|
from .rendezvous_websocket import WebSocketRendezvousFactory
|
||||||
from .transit_server import Transit
|
from .transit_server import Transit
|
||||||
|
|
||||||
|
SECONDS = 1.0
|
||||||
|
MINUTE = 60*SECONDS
|
||||||
|
|
||||||
|
CHANNEL_EXPIRATION_TIME = 11*MINUTE
|
||||||
|
EXPIRATION_CHECK_PERIOD = 10*MINUTE
|
||||||
|
|
||||||
class Root(resource.Resource):
|
class Root(resource.Resource):
|
||||||
# child_FOO is a nevow thing, not a twisted.web.resource thing
|
# child_FOO is a nevow thing, not a twisted.web.resource thing
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -52,11 +59,11 @@ class RelayServer(service.MultiService):
|
||||||
if signal_error:
|
if signal_error:
|
||||||
welcome["error"] = signal_error
|
welcome["error"] = signal_error
|
||||||
|
|
||||||
rendezvous = Rendezvous(db, welcome, blur_usage, stats_file)
|
self._rendezvous = Rendezvous(db, welcome, blur_usage)
|
||||||
rendezvous.setServiceParent(self) # for the pruning timer
|
self._rendezvous.setServiceParent(self) # for the pruning timer
|
||||||
|
|
||||||
root = Root()
|
root = Root()
|
||||||
wsrf = WebSocketRendezvousFactory(None, rendezvous)
|
wsrf = WebSocketRendezvousFactory(None, self._rendezvous)
|
||||||
root.putChild(b"v1", WebSocketResource(wsrf))
|
root.putChild(b"v1", WebSocketResource(wsrf))
|
||||||
|
|
||||||
site = PrivacyEnhancedSite(root)
|
site = PrivacyEnhancedSite(root)
|
||||||
|
@ -74,9 +81,12 @@ class RelayServer(service.MultiService):
|
||||||
transit_service = ServerEndpointService(t, transit)
|
transit_service = ServerEndpointService(t, transit)
|
||||||
transit_service.setServiceParent(self)
|
transit_service.setServiceParent(self)
|
||||||
|
|
||||||
|
self._stats_file = stats_file
|
||||||
|
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer)
|
||||||
|
t.setServiceParent(self)
|
||||||
|
|
||||||
# make some things accessible for tests
|
# make some things accessible for tests
|
||||||
self._db = db
|
self._db = db
|
||||||
self._rendezvous = rendezvous
|
|
||||||
self._root = root
|
self._root = root
|
||||||
self._rendezvous_web_service = rendezvous_web_service
|
self._rendezvous_web_service = rendezvous_web_service
|
||||||
self._rendezvous_websocket = wsrf
|
self._rendezvous_websocket = wsrf
|
||||||
|
@ -93,3 +103,25 @@ class RelayServer(service.MultiService):
|
||||||
log.msg("not logging HTTP requests")
|
log.msg("not logging HTTP requests")
|
||||||
else:
|
else:
|
||||||
log.msg("not blurring access times")
|
log.msg("not blurring access times")
|
||||||
|
|
||||||
|
def timer(self):
|
||||||
|
now = time.time()
|
||||||
|
old = now - CHANNEL_EXPIRATION_TIME
|
||||||
|
self._rendezvous.prune_all_apps(now, old)
|
||||||
|
self.dump_stats(now, validity=EXPIRATION_CHECK_PERIOD+60)
|
||||||
|
|
||||||
|
def dump_stats(self, now, validity):
|
||||||
|
if not self._stats_file:
|
||||||
|
return
|
||||||
|
tmpfn = self._stats_file + ".tmp"
|
||||||
|
|
||||||
|
data = {}
|
||||||
|
data["created"] = now
|
||||||
|
data["valid_until"] = now + validity
|
||||||
|
|
||||||
|
data["rendezvous"] = self._rendezvous.get_stats()
|
||||||
|
|
||||||
|
with open(tmpfn, "wb") as f:
|
||||||
|
json.dump(data, f, indent=1)
|
||||||
|
f.write("\n")
|
||||||
|
os.rename(tmpfn, self._stats_file)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user