rewrite pruning, add full tests

Apparently it was broken: the first time the LoopingCall fired, it would
throw an exception, and never try again. Now it should be fixed.
This commit is contained in:
Brian Warner 2016-05-26 22:43:29 -07:00
parent 0b3863fb52
commit eebc9ebd54
2 changed files with 391 additions and 119 deletions

View File

@ -52,11 +52,9 @@ SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body",
"server_rx", "msg_id"])
class Mailbox:
def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id):
def __init__(self, app, db, app_id, mailbox_id):
self._app = app
self._db = db
self._blur_usage = blur_usage
self._log_requests = log_requests
self._app_id = app_id
self._mailbox_id = mailbox_id
self._listeners = {} # handle -> (send_f, stop_f)
@ -99,12 +97,16 @@ class Mailbox:
return messages
def add_listener(self, handle, send_f, stop_f):
# TODO: update 'updated'
self._listeners[handle] = (send_f, stop_f)
return self.get_messages()
def remove_listener(self, handle):
self._listeners.pop(handle)
def has_listeners(self):
return bool(self._listeners)
def broadcast_message(self, sm):
for (send_f, stop_f) in self._listeners.values():
send_f(sm)
@ -133,11 +135,8 @@ class Mailbox:
return
sr = remove_side(row, side)
if sr.empty:
rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, self._mailbox_id)).fetchall()
num_sides = len(rows)
self._summarize_and_store(row, num_sides, mood, when, pruned=False)
self._app._summarize_mailbox_and_store(self._mailbox_id, row,
mood, when, pruned=False)
self._delete()
db.commit()
elif sr.changed:
@ -164,61 +163,8 @@ class Mailbox:
self._app.free_mailbox(self._mailbox_id)
def _summarize_and_store(self, row, num_sides, second_mood, delete_time,
pruned):
u = self._summarize(row, num_sides, second_mood, delete_time, pruned)
self._db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`, "
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
def _summarize(self, row, num_sides, second_mood, delete_time, pruned):
started = row["started"]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
if num_sides == 0:
result = u"quiet"
elif num_sides == 1:
result = u"lonely"
else:
result = u"happy"
moods = set([row["first_mood"], second_mood])
if u"lonely" in moods:
result = u"lonely"
if u"errory" in moods:
result = u"errory"
if u"scary" in moods:
result = u"scary"
if pruned:
result = u"pruney"
if row["crowded"]:
result = u"crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def is_idle(self):
if self._listeners:
return False
c = self._db.execute("SELECT `server_rx` FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` DESC LIMIT 1",
(self._app_id, self._mailbox_id))
rows = c.fetchall()
if not rows:
return True
old = time.time() - CHANNEL_EXPIRATION_TIME
if rows[0]["server_rx"] < old:
return True
return False
def is_active(self):
return bool(self._listeners)
def _shutdown(self):
# used at test shutdown to accelerate client disconnects
@ -226,14 +172,22 @@ class Mailbox:
stop_f()
class AppNamespace:
def __init__(self, db, welcome, blur_usage, log_requests, app_id):
def __init__(self, db, blur_usage, log_requests, app_id):
self._db = db
self._welcome = welcome
self._blur_usage = blur_usage
self._log_requests = log_requests
self._app_id = app_id
self._mailboxes = {}
def is_active(self):
# An idle AppNamespace does not need to be kept in memory: it can be
# reconstructed from the DB if needed. And active one must be kept
# alive.
for mb in self._mailboxes.values():
if mb.is_active():
return True
return False
def get_nameplate_ids(self):
db = self._db
# TODO: filter this to numeric ids?
@ -265,7 +219,7 @@ class AppNamespace:
del mailbox_id # ignored, they'll learn it from claim()
return nameplate_id
def claim_nameplate(self, nameplate_id, side, when):
def claim_nameplate(self, nameplate_id, side, when, _test_mailbox_id=None):
# when we're done:
# * there will be one row for the nameplate
# * side1 or side2 will be populated
@ -298,7 +252,10 @@ class AppNamespace:
if self._log_requests:
log.msg("creating nameplate#%s for app_id %s" %
(nameplate_id, self._app_id))
mailbox_id = generate_mailbox_id()
if _test_mailbox_id is not None: # for unit tests
mailbox_id = _test_mailbox_id
else:
mailbox_id = generate_mailbox_id()
db.execute("INSERT INTO `nameplates`"
" (`app_id`, `id`, `mailbox_id`, `side1`, `crowded`,"
" `updated`, `started`)"
@ -365,25 +322,6 @@ class AppNamespace:
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def _prune_nameplate(self, row, delete_time):
# requires caller to db.commit()
db = self._db
db.execute("DELETE FROM `nameplates` WHERE `app_id`=? AND `id`=?",
(self._app_id, row["id"]))
self._summarize_nameplate_and_store(row, delete_time, pruned=True)
# TODO: make a Nameplate object, keep track of when there's a
# websocket that's watching it, don't prune a nameplate that someone
# is watching, even if they started watching a long time ago
def prune_nameplates(self, old):
db = self._db
for row in db.execute("SELECT * FROM `nameplates`"
" WHERE `updated` < ?",
(old,)).fetchall():
self._prune_nameplate(row)
count = db.execute("SELECT COUNT(*) FROM `nameplates`").fetchone()[0]
return count
def open_mailbox(self, mailbox_id, side, when):
assert isinstance(mailbox_id, type(u"")), type(mailbox_id)
db = self._db
@ -398,8 +336,6 @@ class AppNamespace:
db.commit() # XXX
# mailbox.open() does a SELECT to find the old sides
self._mailboxes[mailbox_id] = Mailbox(self, self._db,
self._blur_usage,
self._log_requests,
self._app_id, mailbox_id)
mailbox = self._mailboxes[mailbox_id]
mailbox.open(side, when)
@ -416,25 +352,178 @@ class AppNamespace:
# log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" %
# (mailbox_id, len(self.get_claimed()), len(self._mailboxes)))
def prune_mailboxes(self, old):
def _summarize_mailbox_and_store(self, mailbox_id, row,
second_mood, delete_time, pruned):
db = self._db
rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, mailbox_id)).fetchall()
num_sides = len(rows)
u = self._summarize_mailbox(row, num_sides, second_mood, delete_time,
pruned)
db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`,"
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
def _summarize_mailbox(self, row, num_sides, second_mood, delete_time,
pruned):
started = row["started"]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
if num_sides == 0:
result = u"quiet"
elif num_sides == 1:
result = u"lonely"
else:
result = u"happy"
moods = set([row["first_mood"], second_mood])
if u"lonely" in moods:
result = u"lonely"
if u"errory" in moods:
result = u"errory"
if u"scary" in moods:
result = u"scary"
if pruned:
result = u"pruney"
if row["crowded"]:
result = u"crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def prune(self, now, old):
# For now, pruning is logged even if log_requests is False, to debug
# the pruning process, and since pruning is triggered by a timer
# instead of by user action. It does reveal which mailboxes were
# present when the pruning process began, though, so in the log run
# it should do less logging.
log.msg(" channel prune begins")
# a channel is deleted when there are no listeners and there have
# been no messages added in CHANNEL_EXPIRATION_TIME seconds
mailboxes = set(self.get_claimed()) # these have messages
mailboxes.update(self._mailboxes) # these might have listeners
for mailbox_id in mailboxes:
log.msg(" channel prune checking %d" % mailbox_id)
channel = self.get_channel(mailbox_id)
if channel.is_idle():
log.msg(" channel prune expiring %d" % mailbox_id)
channel.delete_and_summarize() # calls self.free_channel
log.msg(" channel prune done, %r left" % (self._mailboxes.keys(),))
return bool(self._mailboxes)
log.msg(" prune begins (%s)" % self._app_id)
db = self._db
modified = False
# for all `mailboxes`: classify as new or old
OLD = 0; NEW = 1
all_mailboxes = {}
all_mailbox_rows = {}
for row in db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=?",
(self._app_id,)).fetchall():
mailbox_id = row["id"]
all_mailbox_rows[mailbox_id] = row
if row["started"] > old:
which = NEW
elif row["second"] and row["second"] > old:
which = NEW
else:
which = OLD
all_mailboxes[mailbox_id] = which
#log.msg(" 2: all_mailboxes", all_mailboxes, all_mailbox_rows)
# for all mailbox ids used by `messages`:
# if there is no matching mailbox: delete the messages
# if there is at least one new message (select when>old limit 1):
# classify the mailbox as new
for row in db.execute("SELECT DISTINCT(`mailbox_id`)"
" FROM `messages`"
" WHERE `app_id`=?",
(self._app_id,)).fetchall():
mailbox_id = row["mailbox_id"]
if mailbox_id not in all_mailboxes:
log.msg(" deleting orphan messages", mailbox_id)
db.execute("DELETE FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, mailbox_id))
modified = True
else:
new_msgs = db.execute("SELECT * FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" AND `server_rx` > ?"
" LIMIT 1",
(self._app_id, mailbox_id, old)
).fetchall()
if new_msgs:
#log.msg(" 3-: saved by new messages", new_msgs)
all_mailboxes[mailbox_id] = NEW
#log.msg(" 4: all_mailboxes", all_mailboxes)
# for all mailbox objects with active listeners:
# classify the mailbox as new
for mailbox_id in self._mailboxes:
#log.msg(" -5: checking", mailbox_id, self._mailboxes[mailbox_id])
if self._mailboxes[mailbox_id].has_listeners():
all_mailboxes[mailbox_id] = NEW
#log.msg(" 5: all_mailboxes", all_mailboxes)
# for all `nameplates`:
# classify as new or old
# if the linked mailbox exists:
# if it is new:
# classify nameplate as new
# if it is old:
# if the nameplate is new:
# classify mailbox as new
all_nameplates = {}
all_nameplate_rows = {}
for row in db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=?",
(self._app_id,)).fetchall():
nameplate_id = row["id"]
all_nameplate_rows[nameplate_id] = row
if row["updated"] > old:
which = NEW
else:
which = OLD
mailbox_id = row["mailbox_id"]
if mailbox_id in all_mailboxes:
if all_mailboxes[mailbox_id] == NEW:
which = NEW
else:
if which == NEW:
all_mailboxes[mailbox_id] = NEW
all_nameplates[nameplate_id] = which
#log.msg(" 6: all_nameplates", all_nameplates, all_nameplate_rows)
# delete all old nameplates
# invariant check: if there is a linked mailbox, it is old
for nameplate_id, which in all_nameplates.items():
if which == OLD:
log.msg(" deleting nameplate", nameplate_id)
row = all_nameplate_rows[nameplate_id]
self._summarize_nameplate_and_store(row, now, pruned=True)
db.execute("DELETE FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, nameplate_id))
modified = True
# delete all messages for old mailboxes
# delete all old mailboxes
for mailbox_id, which in all_mailboxes.items():
if which == OLD:
log.msg(" deleting mailbox", mailbox_id)
self._summarize_mailbox_and_store(mailbox_id,
all_mailbox_rows[mailbox_id],
u"pruney", now, pruned=True)
db.execute("DELETE FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, mailbox_id))
db.execute("DELETE FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, mailbox_id))
modified = True
if modified:
db.commit()
log.msg(" prune complete, modified:", modified)
def _shutdown(self):
for channel in self._mailboxes.values():
@ -462,25 +551,35 @@ class Rendezvous(service.MultiService):
if not app_id in self._apps:
if self._log_requests:
log.msg("spawning app_id %s" % (app_id,))
self._apps[app_id] = AppNamespace(self._db, self._welcome,
self._blur_usage,
self._log_requests, app_id)
self._apps[app_id] = AppNamespace(self._db,
self._blur_usage,
self._log_requests, app_id)
return self._apps[app_id]
def prune(self, old=None):
def get_all_apps(self):
apps = set()
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `nameplates`").fetchall():
apps.add(row["app_id"])
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `mailboxes`").fetchall():
apps.add(row["app_id"])
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `messages`").fetchall():
apps.add(row["app_id"])
return apps
def prune(self, now=None, old=None):
# As with AppNamespace.prune_old_mailboxes, we log for now.
log.msg("beginning app prune")
if old is None:
old = time.time() - CHANNEL_EXPIRATION_TIME
c = self._db.execute("SELECT DISTINCT `app_id` FROM `messages`")
apps = set([row["app_id"] for row in c.fetchall()]) # these have messages
apps.update(self._apps) # these might have listeners
for app_id in apps:
now = now or time.time()
old = old or (now - CHANNEL_EXPIRATION_TIME)
for app_id in sorted(self.get_all_apps()):
log.msg(" app prune checking %r" % (app_id,))
app = self.get_app(app_id)
still_active = app.prune_nameplates(old) + app.prune_mailboxes(old)
if not still_active:
log.msg("prune pops app %r" % (app_id,))
app.prune(now, old)
if not app.is_active(): # meaning no websockets
log.msg(" pruning idle app", app_id)
self._apps.pop(app_id)
log.msg("app prune ends, %d remaining apps" % len(self._apps))

View File

@ -1,7 +1,9 @@
from __future__ import print_function
import json, itertools
from binascii import hexlify
import mock
from twisted.trial import unittest
from twisted.python import log
from twisted.internet import protocol, reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.endpoints import clientFromString, connectProtocol
@ -9,7 +11,8 @@ from autobahn.twisted import websocket
from .. import __version__
from .common import ServerBase
from ..server import rendezvous, transit_server
from ..server.rendezvous import Usage, SidedMessage
from ..server.rendezvous import Usage, SidedMessage, Mailbox
from ..server.database import get_db
class Server(ServerBase, unittest.TestCase):
def test_apps(self):
@ -281,6 +284,175 @@ class Server(ServerBase, unittest.TestCase):
self.assertEqual(len(msgs), 5)
self.assertEqual(msgs[-1]["body"], u"body")
class Prune(unittest.TestCase):
def test_apps(self):
rv = rendezvous.Rendezvous(get_db(":memory:"), None, None)
app = rv.get_app(u"appid")
app.allocate_nameplate(u"side", 121)
app.prune = mock.Mock()
rv.prune(now=123, old=122)
self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)])
def test_active(self):
rv = rendezvous.Rendezvous(get_db(":memory:"), None, None)
app = rv.get_app(u"appid1")
self.assertFalse(app.is_active())
mb = app.open_mailbox(u"mbid", u"side1", 0)
self.assertFalse(mb.is_active())
self.assertFalse(app.is_active())
mb.add_listener(u"handle", None, None)
self.assertTrue(mb.is_active())
self.assertTrue(app.is_active())
mb.remove_listener(u"handle")
self.assertFalse(mb.is_active())
self.assertFalse(app.is_active())
def test_basic(self):
db = get_db(":memory:")
rv = rendezvous.Rendezvous(db, None, 3600)
# timestamps <=50 are "old", >=51 are "new"
#OLD = "old"; NEW = "new"
#when = {OLD: 1, NEW: 60}
new_nameplates = set()
new_mailboxes = set()
new_messages = set()
APPID = u"appid"
app = rv.get_app(APPID)
# Exercise the first-vs-second newness tests. These nameplates have
# no mailbox.
app.claim_nameplate(u"np-1", u"side1", 1)
app.claim_nameplate(u"np-2", u"side1", 1)
app.claim_nameplate(u"np-2", u"side2", 2)
app.claim_nameplate(u"np-3", u"side1", 60)
new_nameplates.add(u"np-3")
app.claim_nameplate(u"np-4", u"side1", 1)
app.claim_nameplate(u"np-4", u"side2", 60)
new_nameplates.add(u"np-4")
app.claim_nameplate(u"np-5", u"side1", 60)
app.claim_nameplate(u"np-5", u"side2", 61)
new_nameplates.add(u"np-5")
# same for mailboxes
app.open_mailbox(u"mb-11", u"side1", 1)
app.open_mailbox(u"mb-12", u"side1", 1)
app.open_mailbox(u"mb-12", u"side2", 2)
app.open_mailbox(u"mb-13", u"side1", 60)
new_mailboxes.add(u"mb-13")
app.open_mailbox(u"mb-14", u"side1", 1)
app.open_mailbox(u"mb-14", u"side2", 60)
new_mailboxes.add(u"mb-14")
app.open_mailbox(u"mb-15", u"side1", 60)
app.open_mailbox(u"mb-15", u"side2", 61)
new_mailboxes.add(u"mb-15")
rv.prune(now=123, old=50)
nameplates = set([row["id"] for row in
db.execute("SELECT * FROM `nameplates`").fetchall()])
self.assertEqual(new_nameplates, nameplates)
mailboxes = set([row["id"] for row in
db.execute("SELECT * FROM `mailboxes`").fetchall()])
self.assertEqual(new_mailboxes, mailboxes)
messages = set([row["msg_id"] for row in
db.execute("SELECT * FROM `messages`").fetchall()])
self.assertEqual(new_messages, messages)
def test_lots(self):
OLD = "old"; NEW = "new"
for nameplate in [None, OLD, NEW]:
for mailbox in [None, OLD, NEW]:
listeners = [False]
if mailbox is not None:
listeners = [False, True]
for has_listeners in listeners:
for messages in [None, OLD, NEW]:
self.one(nameplate, mailbox, has_listeners, messages)
#def test_one(self):
# # to debug specific problems found by test_lots
# self.one(None, "old", True, None)
def one(self, nameplate, mailbox, has_listeners, messages):
desc = ("nameplate=%s, mailbox=%s, has_listeners=%s,"
" messages=%s" %
(nameplate, mailbox, has_listeners, messages))
log.msg(desc)
db = get_db(":memory:")
rv = rendezvous.Rendezvous(db, None, 3600)
APPID = u"appid"
app = rv.get_app(APPID)
# timestamps <=50 are "old", >=51 are "new"
OLD = "old"; NEW = "new"
when = {OLD: 1, NEW: 60}
nameplate_survives = False
mailbox_survives = False
messages_survive = False
mbid = u"mbid"
if nameplate is not None:
app.claim_nameplate(u"npid", u"side1", when[nameplate],
_test_mailbox_id=mbid)
if mailbox is not None:
mb = app.open_mailbox(mbid, u"side1", when[mailbox])
else:
# We might want a Mailbox, because that's the easiest way to add
# a "messages" row, but we can't use app.open_mailbox() because
# that modifies both the "mailboxes" table and app._mailboxes,
# and sometimes we're testing what happens when there are
# messages but not a mailbox
mb = Mailbox(app, db, APPID, mbid)
# we need app._mailboxes to know about this, because that's
# where it looks to find listeners
app._mailboxes[mbid] = mb
if messages is not None:
sm = SidedMessage(u"side1", u"phase", u"body", when[messages],
u"msgid")
mb.add_message(sm)
if has_listeners:
mb.add_listener("handle", None, None)
if mailbox is None and messages is not None:
# orphaned messages, even new ones, can't keep a nameplate alive
messages = None
messages_survive = False
if (nameplate is NEW or mailbox is NEW
or has_listeners or messages is NEW):
if nameplate is not None:
nameplate_survives = True
if mailbox is not None:
mailbox_survives = True
if messages is not None:
messages_survive = True
rv.prune(now=123, old=50)
nameplates = set([row["id"] for row in
db.execute("SELECT * FROM `nameplates`").fetchall()])
self.assertEqual(nameplate_survives, bool(nameplates),
("nameplate", nameplate_survives, nameplates, desc))
mailboxes = set([row["id"] for row in
db.execute("SELECT * FROM `mailboxes`").fetchall()])
self.assertEqual(mailbox_survives, bool(mailboxes),
("mailbox", mailbox_survives, mailboxes, desc))
messages = set([row["msg_id"] for row in
db.execute("SELECT * FROM `messages`").fetchall()])
self.assertEqual(messages_survive, bool(messages),
("messages", messages_survive, messages, desc))
def strip_message(msg):
m2 = msg.copy()
@ -726,14 +898,15 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
class Summary(unittest.TestCase):
def test_mailbox(self):
c = rendezvous.Mailbox(None, None, None, False, None, None)
app = rendezvous.AppNamespace(None, None, False, None)
# starts at time 1, maybe gets second open at time 3, closes at 5
base_row = {u"started": 1, u"second": None,
u"first_mood": None, u"crowded": False}
def summ(num_sides, second_mood=None, pruned=False, **kwargs):
row = base_row.copy()
row.update(kwargs)
return c._summarize(row, num_sides, second_mood, 5, pruned)
return app._summarize_mailbox(row, num_sides, second_mood, 5,
pruned)
self.assertEqual(summ(1), Usage(1, None, 4, u"lonely"))
self.assertEqual(summ(1, u"lonely"), Usage(1, None, 4, u"lonely"))
@ -764,7 +937,7 @@ class Summary(unittest.TestCase):
Usage(1, 2, 4, u"pruney"))
def test_nameplate(self):
a = rendezvous.AppNamespace(None, None, None, False, None)
a = rendezvous.AppNamespace(None, None, False, None)
# starts at time 1, maybe gets second open at time 3, closes at 5
base_row = {u"started": 1, u"second": None, u"crowded": False}
def summ(num_sides, pruned=False, **kwargs):