change pruning algorithm
The new approach runs every 10 minutes and keeps a nameplate/mailbox/messages "channel" alive if the mailbox has been updated within 11 minutes, or if there has been an attached listener within that time. Also remove the "nameplates.updated" column. Now we only track "updated" timestamps on the "mailboxes" table, and a new mailbox will preserve any attached nameplate.
This commit is contained in:
parent
6176500cf4
commit
ffb1a9b9c9
|
@ -12,11 +12,9 @@ CREATE TABLE `nameplates`
|
||||||
`app_id` VARCHAR,
|
`app_id` VARCHAR,
|
||||||
`name` VARCHAR,
|
`name` VARCHAR,
|
||||||
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
|
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
|
||||||
`request_id` VARCHAR, -- from 'allocate' message, for future deduplication
|
`request_id` VARCHAR -- from 'allocate' message, for future deduplication
|
||||||
`updated` INTEGER -- time of last activity, used for pruning
|
|
||||||
);
|
);
|
||||||
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
|
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
|
||||||
CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`);
|
|
||||||
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
|
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
|
||||||
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
|
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
|
||||||
|
|
||||||
|
|
|
@ -17,11 +17,9 @@ CREATE TABLE `nameplates`
|
||||||
`app_id` VARCHAR,
|
`app_id` VARCHAR,
|
||||||
`name` VARCHAR,
|
`name` VARCHAR,
|
||||||
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
|
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
|
||||||
`request_id` VARCHAR, -- from 'allocate' message, for future deduplication
|
`request_id` VARCHAR -- from 'allocate' message, for future deduplication
|
||||||
`updated` INTEGER -- time of last activity, used for pruning
|
|
||||||
);
|
);
|
||||||
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
|
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
|
||||||
CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`);
|
|
||||||
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
|
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
|
||||||
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
|
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
|
||||||
|
|
||||||
|
|
|
@ -6,12 +6,9 @@ from twisted.application import service, internet
|
||||||
|
|
||||||
SECONDS = 1.0
|
SECONDS = 1.0
|
||||||
MINUTE = 60*SECONDS
|
MINUTE = 60*SECONDS
|
||||||
HOUR = 60*MINUTE
|
|
||||||
DAY = 24*HOUR
|
|
||||||
MB = 1000*1000
|
|
||||||
|
|
||||||
CHANNEL_EXPIRATION_TIME = 2*HOUR
|
CHANNEL_EXPIRATION_TIME = 11*MINUTE
|
||||||
EXPIRATION_CHECK_PERIOD = 1*HOUR
|
EXPIRATION_CHECK_PERIOD = 10*MINUTE
|
||||||
|
|
||||||
def generate_mailbox_id():
|
def generate_mailbox_id():
|
||||||
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
|
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
|
||||||
|
@ -203,9 +200,9 @@ class AppNamespace:
|
||||||
mailbox_id = generate_mailbox_id()
|
mailbox_id = generate_mailbox_id()
|
||||||
self._add_mailbox(mailbox_id, side, when) # ensure row exists
|
self._add_mailbox(mailbox_id, side, when) # ensure row exists
|
||||||
sql = ("INSERT INTO `nameplates`"
|
sql = ("INSERT INTO `nameplates`"
|
||||||
" (`app_id`, `name`, `mailbox_id`, `updated`)"
|
" (`app_id`, `name`, `mailbox_id`)"
|
||||||
" VALUES(?,?,?,?)")
|
" VALUES(?,?,?)")
|
||||||
npid = db.execute(sql, (self._app_id, name, mailbox_id, when)
|
npid = db.execute(sql, (self._app_id, name, mailbox_id)
|
||||||
).lastrowid
|
).lastrowid
|
||||||
else:
|
else:
|
||||||
npid = row["id"]
|
npid = row["id"]
|
||||||
|
@ -219,8 +216,6 @@ class AppNamespace:
|
||||||
" (`nameplates_id`, `claimed`, `side`, `added`)"
|
" (`nameplates_id`, `claimed`, `side`, `added`)"
|
||||||
" VALUES(?,?,?,?)",
|
" VALUES(?,?,?,?)",
|
||||||
(npid, True, side, when))
|
(npid, True, side, when))
|
||||||
db.execute("UPDATE `nameplates` SET `updated`=? WHERE `id`=?",
|
|
||||||
(when, npid))
|
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
|
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
|
||||||
|
@ -398,10 +393,11 @@ class AppNamespace:
|
||||||
# allowed to disconnect for up to 9 minutes without losing the
|
# allowed to disconnect for up to 9 minutes without losing the
|
||||||
# channel (nameplate, mailbox, and messages).
|
# channel (nameplate, mailbox, and messages).
|
||||||
|
|
||||||
# Each time a client does something, the "updated" field is updated.
|
# Each time a client does something, the mailbox.updated field is
|
||||||
# If a client is subscribed to the mailbox when pruning check runs,
|
# updated with the current timestamp. If a client is subscribed to
|
||||||
# the "updated" field is updated. After that check, if the "updated"
|
# the mailbox when pruning check runs, the "updated" field is also
|
||||||
# field is "old", the channel is deleted.
|
# updated. After that check, if the "updated" field is "old", the
|
||||||
|
# channel is deleted.
|
||||||
|
|
||||||
# For now, pruning is logged even if log_requests is False, to debug
|
# For now, pruning is logged even if log_requests is False, to debug
|
||||||
# the pruning process, and since pruning is triggered by a timer
|
# the pruning process, and since pruning is triggered by a timer
|
||||||
|
@ -411,58 +407,33 @@ class AppNamespace:
|
||||||
log.msg(" prune begins (%s)" % self._app_id)
|
log.msg(" prune begins (%s)" % self._app_id)
|
||||||
db = self._db
|
db = self._db
|
||||||
modified = False
|
modified = False
|
||||||
# for all `mailboxes`: classify as new or old
|
|
||||||
OLD = 0; NEW = 1
|
for mailbox in self._mailboxes.values():
|
||||||
all_mailboxes = {}
|
if mailbox.has_listeners():
|
||||||
|
mailbox._touch(now)
|
||||||
|
db.commit() # make sure the updates are visible below
|
||||||
|
|
||||||
|
new_mailboxes = set()
|
||||||
|
old_mailboxes = set()
|
||||||
for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?",
|
for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?",
|
||||||
(self._app_id,)).fetchall():
|
(self._app_id,)).fetchall():
|
||||||
mailbox_id = row["id"]
|
mailbox_id = row["id"]
|
||||||
if row["updated"] > old:
|
if row["updated"] > old:
|
||||||
which = NEW
|
new_mailboxes.add(mailbox_id)
|
||||||
else:
|
else:
|
||||||
which = OLD
|
old_mailboxes.add(mailbox_id)
|
||||||
all_mailboxes[mailbox_id] = which
|
#log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes)
|
||||||
#log.msg(" 2: all_mailboxes", all_mailboxes)
|
|
||||||
|
|
||||||
# for all mailbox objects with active listeners:
|
old_nameplates = set()
|
||||||
# classify the mailbox as new
|
|
||||||
for mailbox_id in self._mailboxes:
|
|
||||||
#log.msg(" -5: checking", mailbox_id, self._mailboxes[mailbox_id])
|
|
||||||
if self._mailboxes[mailbox_id].has_listeners():
|
|
||||||
all_mailboxes[mailbox_id] = NEW
|
|
||||||
#log.msg(" 5: all_mailboxes", all_mailboxes)
|
|
||||||
|
|
||||||
# for all `nameplates`:
|
|
||||||
# classify as new or old
|
|
||||||
# if the linked mailbox exists:
|
|
||||||
# if it is new:
|
|
||||||
# classify nameplate as new
|
|
||||||
# if it is old:
|
|
||||||
# if the nameplate is new:
|
|
||||||
# classify mailbox as new
|
|
||||||
old_nameplate_ids = []
|
|
||||||
for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?",
|
for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?",
|
||||||
(self._app_id,)).fetchall():
|
(self._app_id,)).fetchall():
|
||||||
npid = row["id"]
|
npid = row["id"]
|
||||||
if row["updated"] > old:
|
|
||||||
which = NEW
|
|
||||||
else:
|
|
||||||
which = OLD
|
|
||||||
mailbox_id = row["mailbox_id"]
|
mailbox_id = row["mailbox_id"]
|
||||||
if mailbox_id in all_mailboxes:
|
if mailbox_id in old_mailboxes:
|
||||||
if all_mailboxes[mailbox_id] == NEW:
|
old_nameplates.add(npid)
|
||||||
which = NEW
|
#log.msg(" 3: old_nameplates", old_nameplates)
|
||||||
else:
|
|
||||||
if which == NEW:
|
|
||||||
all_mailboxes[mailbox_id] = NEW
|
|
||||||
if which == OLD:
|
|
||||||
old_nameplate_ids.append(npid)
|
|
||||||
#log.msg(" 6: old_nameplate_ids", old_nameplate_ids)
|
|
||||||
|
|
||||||
# delete all old nameplates
|
for npid in old_nameplates:
|
||||||
# invariant check: if there is a linked mailbox, it is old
|
|
||||||
|
|
||||||
for npid in old_nameplate_ids:
|
|
||||||
log.msg(" deleting nameplate", npid)
|
log.msg(" deleting nameplate", npid)
|
||||||
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
|
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
|
||||||
" WHERE `nameplates_id`=?",
|
" WHERE `nameplates_id`=?",
|
||||||
|
@ -476,20 +447,19 @@ class AppNamespace:
|
||||||
# delete all messages for old mailboxes
|
# delete all messages for old mailboxes
|
||||||
# delete all old mailboxes
|
# delete all old mailboxes
|
||||||
|
|
||||||
for mailbox_id, which in all_mailboxes.items():
|
for mailbox_id in old_mailboxes:
|
||||||
if which == OLD:
|
log.msg(" deleting mailbox", mailbox_id)
|
||||||
log.msg(" deleting mailbox", mailbox_id)
|
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
|
||||||
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
|
" WHERE `mailbox_id`=?",
|
||||||
" WHERE `mailbox_id`=?",
|
(mailbox_id,)).fetchall()
|
||||||
(mailbox_id,)).fetchall()
|
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
|
||||||
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
|
(mailbox_id,))
|
||||||
(mailbox_id,))
|
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
|
||||||
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
|
(mailbox_id,))
|
||||||
(mailbox_id,))
|
db.execute("DELETE FROM `mailboxes` WHERE `id`=?",
|
||||||
db.execute("DELETE FROM `mailboxes` WHERE `id`=?",
|
(mailbox_id,))
|
||||||
(mailbox_id,))
|
self._summarize_mailbox_and_store(side_rows, now, pruned=True)
|
||||||
self._summarize_mailbox_and_store(side_rows, now, pruned=True)
|
modified = True
|
||||||
modified = True
|
|
||||||
|
|
||||||
if modified:
|
if modified:
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
|
@ -391,7 +391,7 @@ class Prune(unittest.TestCase):
|
||||||
|
|
||||||
def test_lots(self):
|
def test_lots(self):
|
||||||
OLD = "old"; NEW = "new"
|
OLD = "old"; NEW = "new"
|
||||||
for nameplate in [None, OLD, NEW]:
|
for nameplate in [False, True]:
|
||||||
for mailbox in [OLD, NEW]:
|
for mailbox in [OLD, NEW]:
|
||||||
for has_listeners in [False, True]:
|
for has_listeners in [False, True]:
|
||||||
self.one(nameplate, mailbox, has_listeners)
|
self.one(nameplate, mailbox, has_listeners)
|
||||||
|
@ -417,8 +417,8 @@ class Prune(unittest.TestCase):
|
||||||
mailbox_survives = False
|
mailbox_survives = False
|
||||||
|
|
||||||
mbid = "mbid"
|
mbid = "mbid"
|
||||||
if nameplate is not None:
|
if nameplate:
|
||||||
mbid = app.claim_nameplate("npid", "side1", when[nameplate])
|
mbid = app.claim_nameplate("npid", "side1", when[mailbox])
|
||||||
mb = app.open_mailbox(mbid, "side1", when[mailbox])
|
mb = app.open_mailbox(mbid, "side1", when[mailbox])
|
||||||
|
|
||||||
# the pruning algorithm doesn't care about the age of messages,
|
# the pruning algorithm doesn't care about the age of messages,
|
||||||
|
@ -430,8 +430,8 @@ class Prune(unittest.TestCase):
|
||||||
if has_listeners:
|
if has_listeners:
|
||||||
mb.add_listener("handle", None, None)
|
mb.add_listener("handle", None, None)
|
||||||
|
|
||||||
if (nameplate == NEW or mailbox == NEW or has_listeners):
|
if (mailbox == NEW or has_listeners):
|
||||||
if nameplate is not None:
|
if nameplate:
|
||||||
nameplate_survives = True
|
nameplate_survives = True
|
||||||
mailbox_survives = True
|
mailbox_survives = True
|
||||||
messages_survive = mailbox_survives
|
messages_survive = mailbox_survives
|
||||||
|
|
Loading…
Reference in New Issue
Block a user