checkpointing: server roughed out

This commit is contained in:
Brian Warner 2016-05-19 18:09:17 -07:00
parent 0e72422ffa
commit e39a8291e3
3 changed files with 298 additions and 266 deletions

View File

@ -18,12 +18,14 @@ CREATE TABLE `nameplates`
`mailbox_id` VARCHAR, -- really a foreign key `mailbox_id` VARCHAR, -- really a foreign key
`side1` VARCHAR, -- side name, or NULL `side1` VARCHAR, -- side name, or NULL
`side2` VARCHAR, -- side name, or NULL `side2` VARCHAR, -- side name, or NULL
`crowded` BOOLEAN, -- at some point, three or more sides were involved
`updated` INTEGER, -- time of last activity, used for pruning
-- timing data -- timing data
`started` INTEGER, -- time when nameplace was opened `started` INTEGER, -- time when nameplace was opened
`second` INTEGER, -- time when second side opened `second` INTEGER -- time when second side opened
`closed` INTEGER -- time when closed
); );
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `id`); CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `id`);
CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
-- Clients exchange messages through a "mailbox", which has a long (randomly -- Clients exchange messages through a "mailbox", which has a long (randomly
@ -34,10 +36,11 @@ CREATE TABLE `mailboxes`
`id` VARCHAR, `id` VARCHAR,
`side1` VARCHAR -- side name, or NULL `side1` VARCHAR -- side name, or NULL
`side2` VARCHAR -- side name, or NULL `side2` VARCHAR -- side name, or NULL
`crowded` BOOLEAN, -- at some point, three or more sides were involved
`first_mood` VARCHAR,
-- timing data for the mailbox itself -- timing data for the mailbox itself
`started` INTEGER, -- time when opened `started` INTEGER, -- time when opened
`second` INTEGER, -- time when second side opened `second` INTEGER -- time when second side opened
`closed` INTEGER -- time when closed
); );
CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`);
@ -55,20 +58,22 @@ CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`);
CREATE TABLE `nameplate_usage` CREATE TABLE `nameplate_usage`
( (
`app_id` VARCHAR,
`started` INTEGER, -- seconds since epoch, rounded to "blur time" `started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`result` VARCHAR, -- happy, lonely, pruney, crowded `total_time` INTEGER, -- seconds from open to last close/prune
`result` VARCHAR -- happy, lonely, pruney, crowded
-- nameplate moods: -- nameplate moods:
-- "happy": two sides open and close -- "happy": two sides open and close
-- "lonely": one side opens and closes (no response from 2nd side) -- "lonely": one side opens and closes (no response from 2nd side)
-- "pruney": channels which get pruned for inactivity -- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved -- "crowded": three or more sides were involved
`waiting_time` INTEGER -- seconds from start to 2nd side appearing, or None
); );
CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`started`); CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`);
CREATE TABLE `mailbox_usage` CREATE TABLE `mailbox_usage`
( (
`app_id` VARCHAR,
`started` INTEGER, -- seconds since epoch, rounded to "blur time" `started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close `total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
@ -81,7 +86,7 @@ CREATE TABLE `mailbox_usage`
-- "pruney": channels which get pruned for inactivity -- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved -- "crowded": three or more sides were involved
); );
CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`started`); CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`);
CREATE TABLE `transit_usage` CREATE TABLE `transit_usage`
( (

View File

@ -1,5 +1,6 @@
from __future__ import print_function from __future__ import print_function
import time, random import os, time, random, base64
from collections import namedtuple
from twisted.python import log from twisted.python import log
from twisted.application import service, internet from twisted.application import service, internet
@ -22,83 +23,78 @@ def make_sides(sides):
def generate_mailbox_id(): def generate_mailbox_id():
return base64.b32encode(os.urandom(8)).lower().strip("=") return base64.b32encode(os.urandom(8)).lower().strip("=")
# Unlike Channels, these instances are ephemeral, and are created and
# destroyed casually.
class Nameplate:
def __init__(self, app_id, db, id, mailbox_id):
self._app_id = app_id
self._db = db
self._id = id
self._mailbox_id = mailbox_id
def get_id(self): SideResult = namedtuple("SideResult", ["changed", "empty", "side1", "side2"])
return self._id Unchanged = SideResult(changed=False, empty=False, side1=None, side2=None)
class CrowdedError(Exception):
def get_mailbox_id(self):
return self._mailbox_id
def claim(self, side, when):
db = self._db
sides = get_sides(db.execute("SELECT `side1`, `side2` FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._id)).fetchone())
old_sides = len(sides)
sides.add(side)
if len(sides) > 2:
# XXX: crowded: bail
pass pass
sides12 = make_sides(sides)
db.execute("UPDATE `nameplates` SET `side1`=?, `side2`=?"
" WHERE `app_id`=? AND `id`=?",
(sides12[0], sides12[1], self._app_id, self._id))
if old_sides == 0:
db.execute("UPDATE `mailboxes` SET `nameplate_started`=?"
" WHERE `app_id`=? AND `id`=?",
(when, self._app_id, self._mailbox_id))
else:
db.execute("UPDATE `mailboxes` SET `nameplate_second`=?"
" WHERE `app_id`=? AND `id`=?",
(when, self._app_id, self._mailbox_id))
db.commit()
def release(self, side, when): def add_side(row, new_side):
db = self._db old_sides = [s for s in [row["side1"], row["side2"]] if s]
sides = get_sides(db.execute("SELECT `side1`, `side2` FROM `nameplates`" if new_side in old_sides:
" WHERE `app_id`=? AND `id`=?", return Unchanged
(self._app_id, self._id)).fetchone()) if len(old_sides) == 2:
sides.discard(side) raise CrowdedError("too many sides for this thing")
sides12 = make_sides(sides) return SideResult(changed=True, empty=False,
db.execute("UPDATE `nameplates` SET `side1`=?, `side2`=?" side1=old_sides[0], side2=new_side)
" WHERE `app_id`=? AND `id`=?",
(sides12[0], sides12[1], self._app_id, self._id)) def remove_side(row, side):
if len(sides) == 0: old_sides = [s for s in [row["side1"], row["side2"]] if s]
db.execute("UPDATE `mailboxes` SET `nameplate_closed`=?" if side not in old_sides:
" WHERE `app_id`=? AND `id`=?", return Unchanged
(when, self._app_id, self._mailbox_id)) remaining_sides = old_sides[:]
db.commit() remaining_sides.remove(side)
if remaining_sides:
return SideResult(changed=True, empty=False, side1=remaining_sides[0],
side2=None)
return SideResult(changed=True, empty=True, side1=None, side2=None)
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
TransitUsage = namedtuple("TransitUsage",
["started", "waiting_time", "total_time",
"total_bytes", "result"])
class Mailbox: class Mailbox:
def __init__(self, app, db, blur_usage, log_requests, app_id, channelid): def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id):
self._app = app self._app = app
self._db = db self._db = db
self._blur_usage = blur_usage self._blur_usage = blur_usage
self._log_requests = log_requests self._log_requests = log_requests
self._app_id = app_id self._app_id = app_id
self._channelid = channelid self._mailbox_id = mailbox_id
self._listeners = {} # handle -> (send_f, stop_f) self._listeners = {} # handle -> (send_f, stop_f)
# "handle" is a hashable object, for deregistration # "handle" is a hashable object, for deregistration
# send_f() takes a JSONable object, stop_f() has no args # send_f() takes a JSONable object, stop_f() has no args
def get_channelid(self): def open(self, side, when):
return self._channelid # requires caller to db.commit()
assert isinstance(side, type(u"")), type(side)
db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id)).fetchone()
try:
sr = add_side(row, side)
except CrowdedError:
db.execute("UPDATE `mailboxes` SET `crowded`=?"
" WHERE `app_id`=? AND `id`=?",
(True, self._app_id, self._mailbox_id))
db.commit()
raise
if sr.changed:
db.execute("UPDATE `mailboxes` SET"
" `side1`=?, `side2`=?, `second`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when,
self._app_id, self._mailbox_id))
def get_messages(self): def get_messages(self):
messages = [] messages = []
db = self._db db = self._db
for row in db.execute("SELECT * FROM `messages`" for row in db.execute("SELECT * FROM `messages`"
" WHERE `app_id`=? AND `channelid`=?" " WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` ASC", " ORDER BY `server_rx` ASC",
(self._app_id, self._channelid)).fetchall(): (self._app_id, self._mailbox_id)).fetchall():
if row["phase"] in (CLAIM, RELEASE): if row["phase"] in (CLAIM, RELEASE):
continue continue
messages.append({"phase": row["phase"], "body": row["body"], messages.append({"phase": row["phase"], "body": row["body"],
@ -120,45 +116,107 @@ class Mailbox:
def _add_message(self, side, phase, body, server_rx, msgid): def _add_message(self, side, phase, body, server_rx, msgid):
db = self._db db = self._db
db.execute("INSERT INTO `messages`" db.execute("INSERT INTO `messages`"
" (`app_id`, `channelid`, `side`, `phase`, `body`," " (`app_id`, `mailbox_id`, `side`, `phase`, `body`,"
" `server_rx`, `msgid`)" " `server_rx`, `msgid`)"
" VALUES (?,?,?,?,?, ?,?)", " VALUES (?,?,?,?,?, ?,?)",
(self._app_id, self._channelid, side, phase, body, (self._app_id, self._mailbox_id, side, phase, body,
server_rx, msgid)) server_rx, msgid))
db.commit() db.commit()
def claim(self, side):
self._add_message(side, CLAIM, None, time.time(), None)
def add_message(self, side, phase, body, server_rx, msgid): def add_message(self, side, phase, body, server_rx, msgid):
self._add_message(side, phase, body, server_rx, msgid) self._add_message(side, phase, body, server_rx, msgid)
self.broadcast_message(phase, body, server_rx, msgid) self.broadcast_message(phase, body, server_rx, msgid)
return self.get_messages() # for rendezvous_web.py POST /add return self.get_messages() # for rendezvous_web.py POST /add
def release(self, side, mood): def close(self, side, mood, when):
self._add_message(side, RELEASE, mood, time.time(), None) assert isinstance(side, type(u"")), type(side)
db = self._db db = self._db
seen = set([row["side"] for row in row = db.execute("SELECT * FROM `mailboxes`"
db.execute("SELECT `side` FROM `messages`" " WHERE `app_id`=? AND `id`=?",
" WHERE `app_id`=? AND `channelid`=?", (self._app_id, self._mailbox_id)).fetchone()
(self._app_id, self._channelid))]) if not row:
freed = set([row["side"] for row in return
db.execute("SELECT `side` FROM `messages`" sr = remove_side(row, side)
" WHERE `app_id`=? AND `channelid`=?" if sr.empty:
" AND `phase`=?", rows = db.execute("SELECT DISTINCT(side) FROM `messages`"
(self._app_id, self._channelid, RELEASE))]) " WHERE `app_id`=? AND `id`=?",
if seen - freed: (self._app_id, self._mailbox_id)).fetchall()
return False num_sides = len(rows)
self.delete_and_summarize() self._summarize_and_store(row, num_sides, mood, when, pruned=False)
return True self._delete()
db.commit()
elif sr.changed:
db.execute("UPDATE `mailboxes`"
" SET `side1`=?, `side2`=?, `first_mood`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, mood,
self._app_id, self._mailbox_id))
db.commit()
def _delete(self):
# requires caller to db.commit()
self._db.execute("DELETE FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id))
self._db.execute("DELETE FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, self._mailbox_id))
# Shut down any listeners, just in case they're still lingering
# around.
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._app.free_mailbox(self._mailbox_id)
def _summarize_and_store(self, row, num_sides, second_mood, delete_time,
pruned):
u = self._summarize(row, num_sides, second_mood, delete_time, pruned)
self._db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`, "
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
def _summarize(self, row, num_sides, second_mood, delete_time, pruned):
started = row["started"]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
if len(num_sides) == 0:
result = u"quiet"
elif len(num_sides) == 1:
result = u"lonely"
else:
result = u"happy"
moods = set([row["first_mood"], second_mood])
if u"lonely" in moods:
result = u"lonely"
if u"errory" in moods:
result = u"errory"
if u"scary" in moods:
result = u"scary"
if pruned:
result = u"pruney"
if row["crowded"]:
result = u"crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def is_idle(self): def is_idle(self):
if self._listeners: if self._listeners:
return False return False
c = self._db.execute("SELECT `server_rx` FROM `messages`" c = self._db.execute("SELECT `server_rx` FROM `messages`"
" WHERE `app_id`=? AND `channelid`=?" " WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` DESC LIMIT 1", " ORDER BY `server_rx` DESC LIMIT 1",
(self._app_id, self._channelid)) (self._app_id, self._mailbox_id))
rows = c.fetchall() rows = c.fetchall()
if not rows: if not rows:
return True return True
@ -167,88 +225,6 @@ class Mailbox:
return True return True
return False return False
def _store_summary(self, summary):
(started, result, total_time, waiting_time) = summary
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
self._db.execute("INSERT INTO `usage`"
" (`type`, `started`, `result`,"
" `total_time`, `waiting_time`)"
" VALUES (?,?,?, ?,?)",
(u"rendezvous", started, result,
total_time, waiting_time))
self._db.commit()
def _summarize(self, messages, delete_time):
all_sides = set([m["side"] for m in messages])
if len(all_sides) == 0:
log.msg("_summarize was given zero messages") # shouldn't happen
return
started = min([m["server_rx"] for m in messages])
# 'total_time' is how long the channel was occupied. That ends now,
# both for channels that got pruned for inactivity, and for channels
# that got pruned because of two RELEASE messages
total_time = delete_time - started
if len(all_sides) == 1:
return (started, "lonely", total_time, None)
if len(all_sides) > 2:
# TODO: it'll be useful to have more detail here
return (started, "crowded", total_time, None)
# exactly two sides were involved
A_side = sorted(messages, key=lambda m: m["server_rx"])[0]["side"]
B_side = list(all_sides - set([A_side]))[0]
# How long did the first side wait until the second side showed up?
first_A = min([m["server_rx"] for m in messages if m["side"] == A_side])
first_B = min([m["server_rx"] for m in messages if m["side"] == B_side])
waiting_time = first_B - first_A
# now, were all sides closed? If not, this is "pruney"
A_deallocs = [m for m in messages
if m["phase"] == RELEASE and m["side"] == A_side]
B_deallocs = [m for m in messages
if m["phase"] == RELEASE and m["side"] == B_side]
if not A_deallocs or not B_deallocs:
return (started, "pruney", total_time, None)
# ok, both sides closed. figure out the mood
A_mood = A_deallocs[0]["body"] # maybe None
B_mood = B_deallocs[0]["body"] # maybe None
mood = "quiet"
if A_mood == u"happy" and B_mood == u"happy":
mood = "happy"
if A_mood == u"lonely" or B_mood == u"lonely":
mood = "lonely"
if A_mood == u"errory" or B_mood == u"errory":
mood = "errory"
if A_mood == u"scary" or B_mood == u"scary":
mood = "scary"
return (started, mood, total_time, waiting_time)
def delete_and_summarize(self):
db = self._db
c = self._db.execute("SELECT * FROM `messages`"
" WHERE `app_id`=? AND `channelid`=?"
" ORDER BY `server_rx`",
(self._app_id, self._channelid))
messages = c.fetchall()
summary = self._summarize(messages, time.time())
self._store_summary(summary)
db.execute("DELETE FROM `messages`"
" WHERE `app_id`=? AND `channelid`=?",
(self._app_id, self._channelid))
db.commit()
# Shut down any listeners, just in case they're still lingering
# around.
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._app.free_channel(self._channelid)
def _shutdown(self): def _shutdown(self):
# used at test shutdown to accelerate client disconnects # used at test shutdown to accelerate client disconnects
for (send_f, stop_f) in self._listeners.values(): for (send_f, stop_f) in self._listeners.values():
@ -261,7 +237,7 @@ class AppNamespace:
self._blur_usage = blur_usage self._blur_usage = blur_usage
self._log_requests = log_requests self._log_requests = log_requests
self._app_id = app_id self._app_id = app_id
self._channels = {} self._mailboxes = {}
def get_nameplate_ids(self): def get_nameplate_ids(self):
db = self._db db = self._db
@ -315,15 +291,19 @@ class AppNamespace:
(self._app_id, nameplate_id)).fetchone() (self._app_id, nameplate_id)).fetchone()
if row: if row:
mailbox_id = row["mailbox_id"] mailbox_id = row["mailbox_id"]
sides = [row["side1"], row["sides2"]] try:
if side not in sides: sr = add_side(row, side)
if sides[0] and sides[1]: except CrowdedError:
raise XXXERROR("crowded") db.execute("UPDATE `nameplates` SET `crowded`=?"
sides[1] = side
db.execute("UPDATE `nameplates` SET "
"`side1`=?, `side2`=?, `mailbox_id`=?, `second`=?"
" WHERE `app_id`=? AND `id`=?", " WHERE `app_id`=? AND `id`=?",
(sides[0], sides[1], mailbox_id, when, (True, self._app_id, nameplate_id))
db.commit()
raise
if sr.changed:
db.execute("UPDATE `nameplates` SET"
" `side1`=?, `side2`=?, `updated`=?, `second`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when, when,
self._app_id, nameplate_id)) self._app_id, nameplate_id))
else: else:
if self._log_requests: if self._log_requests:
@ -331,9 +311,11 @@ class AppNamespace:
(nameplate_id, self._app_id)) (nameplate_id, self._app_id))
mailbox_id = generate_mailbox_id() mailbox_id = generate_mailbox_id()
db.execute("INSERT INTO `nameplates`" db.execute("INSERT INTO `nameplates`"
" (`app_id`, `id`, `mailbox_id`, `side1`, `started`)" " (`app_id`, `id`, `mailbox_id`, `side1`,"
" VALUES(?,?,?,?,?)", " `updated`, `started`)"
(self._app_id, nameplate_id, mailbox_id, side, when)) " VALUES(?,?,?,?, ?,?)",
(self._app_id, nameplate_id, mailbox_id, side,
when, when))
db.commit() db.commit()
return mailbox_id return mailbox_id
@ -351,75 +333,120 @@ class AppNamespace:
(self._app_id, nameplate_id)).fetchone() (self._app_id, nameplate_id)).fetchone()
if not row: if not row:
return return
sides = get_sides(row) sr = remove_side(row, side)
if side not in sides: if sr.empty:
return
sides.discard(side)
if sides:
s12 = make_sides(sides)
db.execute("UPDATE `nameplates` SET `side1`=?, `side2`=?"
" WHERE `app_id`=? AND `id`=?",
(s12[0], s12[1], self._app_id, nameplate_id))
else:
db.execute("DELETE FROM `nameplates`" db.execute("DELETE FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?", " WHERE `app_id`=? AND `id`=?",
(self._app_id, nameplate_id)) (self._app_id, nameplate_id))
self._summarize_nameplate(row) self._summarize_nameplate_and_store(row, when, pruned=False)
db.commit()
elif sr.changed:
db.execute("UPDATE `nameplates`"
" SET `side1`=?, `side2`=?, `updated`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when,
self._app_id, nameplate_id))
db.commit()
def open_mailbox(self, channelid, side): def _summarize_nameplate_and_store(self, row, delete_time, pruned):
assert isinstance(channelid, type(u"")), type(channelid) # requires caller to db.commit()
channel = self.get_channel(channelid) u = self._summarize_nameplate_usage(row, delete_time, pruned)
channel.claim(side) self._db.execute("INSERT INTO `nameplate_usage`"
return channel " (`app_id`,"
# some of this overlaps with open() on a new mailbox " `started`, `total_time`, `waiting_time`, `result`)"
db.execute("INSERT INTO `mailboxes`" " VALUES (?, ?,?,?,?)",
" (`app_id`, `id`, `nameplate_started`, `started`)" (self._app_id,
" VALUES(?,?,?,?)", u.started, u.total_time, u.waiting_time, u.result))
(self._app_id, mailbox_id, when, when))
def get_channel(self, channelid): def _summarize_nameplate_usage(self, row, delete_time, pruned):
assert isinstance(channelid, type(u"")) started = row["started"]
if not channelid in self._channels: if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
result = u"lonely"
if pruned:
result = u"pruney"
if row["second"]:
result = u"happy"
if row["crowded"]:
result = u"crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def _prune_nameplate(self, row, delete_time):
# requires caller to db.commit()
db = self._db
db.execute("DELETE FROM `nameplates` WHERE `app_id`=? AND `id`=?",
(self._app_id, row["id"]))
self._summarize_nameplate_and_store(row, delete_time, pruned=True)
# TODO: make a Nameplate object, keep track of when there's a
# websocket that's watching it, don't prune a nameplate that someone
# is watching, even if they started watching a long time ago
def prune_nameplates(self, old):
db = self._db
for row in db.execute("SELECT * FROM `nameplates`"
" WHERE `updated` < ?",
(old,)).fetchall():
self._prune_nameplate(row)
count = db.execute("SELECT COUNT(*) FROM `nameplates`").fetchone()[0]
return count
def open_mailbox(self, mailbox_id, side, when):
assert isinstance(mailbox_id, type(u"")), type(mailbox_id)
db = self._db
if not mailbox_id in self._mailboxes:
if self._log_requests: if self._log_requests:
log.msg("spawning #%s for app_id %s" % (channelid, self._app_id)) log.msg("spawning #%s for app_id %s" % (mailbox_id,
self._channels[channelid] = Channel(self, self._db, self._app_id))
db.execute("INSERT INTO `mailboxes`"
" (`app_id`, `id`, `started`)"
" VALUES(?,?,?)",
(self._app_id, mailbox_id, when))
self._mailboxes[mailbox_id] = Mailbox(self, self._db,
self._blur_usage, self._blur_usage,
self._log_requests, self._log_requests,
self._app_id, channelid) self._app_id, mailbox_id)
return self._channels[channelid] mailbox = self._mailboxes[mailbox_id]
mailbox.open(side)
db.commit()
return mailbox
def free_channel(self, channelid): def free_mailbox(self, mailbox_id):
# called from Channel.delete_and_summarize(), which deletes any # called from Mailbox.delete_and_summarize(), which deletes any
# messages # messages
if channelid in self._channels: if mailbox_id in self._mailboxes:
self._channels.pop(channelid) self._mailboxes.pop(mailbox_id)
if self._log_requests: if self._log_requests:
log.msg("freed+killed #%s, now have %d DB channels, %d live" % log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" %
(channelid, len(self.get_claimed()), len(self._channels))) (mailbox_id, len(self.get_claimed()), len(self._mailboxes)))
def prune_old_channels(self): def prune_mailboxes(self, old):
# For now, pruning is logged even if log_requests is False, to debug # For now, pruning is logged even if log_requests is False, to debug
# the pruning process, and since pruning is triggered by a timer # the pruning process, and since pruning is triggered by a timer
# instead of by user action. It does reveal which channels were # instead of by user action. It does reveal which mailboxes were
# present when the pruning process began, though, so in the log run # present when the pruning process began, though, so in the log run
# it should do less logging. # it should do less logging.
log.msg(" channel prune begins") log.msg(" channel prune begins")
# a channel is deleted when there are no listeners and there have # a channel is deleted when there are no listeners and there have
# been no messages added in CHANNEL_EXPIRATION_TIME seconds # been no messages added in CHANNEL_EXPIRATION_TIME seconds
channels = set(self.get_claimed()) # these have messages mailboxes = set(self.get_claimed()) # these have messages
channels.update(self._channels) # these might have listeners mailboxes.update(self._mailboxes) # these might have listeners
for channelid in channels: for mailbox_id in mailboxes:
log.msg(" channel prune checking %d" % channelid) log.msg(" channel prune checking %d" % mailbox_id)
channel = self.get_channel(channelid) channel = self.get_channel(mailbox_id)
if channel.is_idle(): if channel.is_idle():
log.msg(" channel prune expiring %d" % channelid) log.msg(" channel prune expiring %d" % mailbox_id)
channel.delete_and_summarize() # calls self.free_channel channel.delete_and_summarize() # calls self.free_channel
log.msg(" channel prune done, %r left" % (self._channels.keys(),)) log.msg(" channel prune done, %r left" % (self._mailboxes.keys(),))
return bool(self._channels) return bool(self._mailboxes)
def _shutdown(self): def _shutdown(self):
for channel in self._channels.values(): for channel in self._mailboxes.values():
channel._shutdown() channel._shutdown()
class Rendezvous(service.MultiService): class Rendezvous(service.MultiService):
@ -427,7 +454,7 @@ class Rendezvous(service.MultiService):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self._db = db self._db = db
self._welcome = welcome self._welcome = welcome
self._blur_usage = blur_usage self._blur_usage = None
log_requests = blur_usage is None log_requests = blur_usage is None
self._log_requests = log_requests self._log_requests = log_requests
self._apps = {} self._apps = {}
@ -449,15 +476,18 @@ class Rendezvous(service.MultiService):
self._log_requests, app_id) self._log_requests, app_id)
return self._apps[app_id] return self._apps[app_id]
def prune(self): def prune(self, old=None):
# As with AppNamespace.prune_old_channels, we log for now. # As with AppNamespace.prune_old_mailboxes, we log for now.
log.msg("beginning app prune") log.msg("beginning app prune")
if old is None:
old = time.time() - CHANNEL_EXPIRATION_TIME
c = self._db.execute("SELECT DISTINCT `app_id` FROM `messages`") c = self._db.execute("SELECT DISTINCT `app_id` FROM `messages`")
apps = set([row["app_id"] for row in c.fetchall()]) # these have messages apps = set([row["app_id"] for row in c.fetchall()]) # these have messages
apps.update(self._apps) # these might have listeners apps.update(self._apps) # these might have listeners
for app_id in apps: for app_id in apps:
log.msg(" app prune checking %r" % (app_id,)) log.msg(" app prune checking %r" % (app_id,))
still_active = self.get_app(app_id).prune_old_channels() app = self.get_app(app_id)
still_active = app.prune_nameplates(old) + app.prune_mailboxes(old)
if not still_active: if not still_active:
log.msg("prune pops app %r" % (app_id,)) log.msg("prune pops app %r" % (app_id,))
self._apps.pop(app_id) self._apps.pop(app_id)

View File

@ -2,6 +2,7 @@ import json, time
from twisted.internet import reactor from twisted.internet import reactor
from twisted.python import log from twisted.python import log
from autobahn.twisted import websocket from autobahn.twisted import websocket
from .rendezvous import CrowdedError
# The WebSocket allows the client to send "commands" to the server, and the # The WebSocket allows the client to send "commands" to the server, and the
# server to send "responses" to the client. Note that commands and responses # server to send "responses" to the client. Note that commands and responses
@ -84,7 +85,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
self._app = None self._app = None
self._side = None self._side = None
self._did_allocate = False # only one allocate() per websocket self._did_allocate = False # only one allocate() per websocket
self._channels = {} # channel-id -> Channel (claimed) self._mailbox = None
def onConnect(self, request): def onConnect(self, request):
rv = self.factory.rendezvous rv = self.factory.rendezvous
@ -122,11 +123,11 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
return self.handle_release(msg, server_rx) return self.handle_release(msg, server_rx)
if mtype == "open": if mtype == "open":
return self.handle_open(msg) return self.handle_open(msg, server_rx)
if mtype == "add": if mtype == "add":
return self.handle_add(msg, server_rx) return self.handle_add(msg, server_rx)
if mtype == "close": if mtype == "close":
return self.handle_close(msg) return self.handle_close(msg, server_rx)
raise Error("Unknown type") raise Error("Unknown type")
except Error as e: except Error as e:
@ -154,7 +155,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
def handle_allocate(self, server_rx): def handle_allocate(self, server_rx):
if self._did_allocate: if self._did_allocate:
raise Error("You already allocated one channel, don't be greedy") raise Error("You already allocated one mailbox, don't be greedy")
nameplate_id = self._app.allocate_nameplate(self._side, server_rx) nameplate_id = self._app.allocate_nameplate(self._side, server_rx)
assert isinstance(nameplate_id, type(u"")) assert isinstance(nameplate_id, type(u""))
self._did_allocate = True self._did_allocate = True
@ -164,56 +165,52 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
if "nameplate" not in msg: if "nameplate" not in msg:
raise Error("claim requires 'nameplate'") raise Error("claim requires 'nameplate'")
nameplate_id = msg["nameplate"] nameplate_id = msg["nameplate"]
assert isinstance(nameplate_id, type(u"")), type(nameplate_id)
self._nameplate_id = nameplate_id self._nameplate_id = nameplate_id
assert isinstance(nameplate_id, type(u"")), type(nameplate) try:
mailbox_id = self._app.claim_nameplate(nameplate_id, self._side, mailbox_id = self._app.claim_nameplate(nameplate_id, self._side,
server_rx) server_rx)
except CrowdedError:
raise Error("crowded")
self.send("mailbox", mailbox=mailbox_id) self.send("mailbox", mailbox=mailbox_id)
def handle_release(self, server_rx): def handle_release(self, server_rx):
if not self._nameplate_id: if not self._nameplate_id:
raise Error("must claim a nameplate before releasing it") raise Error("must claim a nameplate before releasing it")
self._app.release_nameplate(self._nameplate_id, self._side, server_rx)
deleted = self._app.release_nameplate(self._nameplate_id,
self._side, server_rx)
self._nameplate_id = None self._nameplate_id = None
def handle_open(self, msg): def handle_open(self, msg, server_rx):
channelid = msg["channelid"] if self._mailbox:
if channelid not in self._channels: raise Error("you already have a mailbox open")
raise Error("must claim channel before watching") mailbox_id = msg["mailbox_id"]
assert isinstance(channelid, type(u"")) assert isinstance(mailbox_id, type(u""))
channel = self._channels[channelid] self._mailbox = self._app.open_mailbox(mailbox_id, self._side,
server_rx)
def _send(event): def _send(event):
self.send("message", channelid=channelid, message=event) self.send("message", message=event)
def _stop(): def _stop():
self._reactor.callLater(0, self.transport.loseConnection) self._reactor.callLater(0, self.transport.loseConnection)
for old_message in channel.add_listener(self, _send, _stop): for old_message in self._mailbox.add_listener(self, _send, _stop):
_send(old_message) _send(old_message)
def handle_add(self, msg, server_rx): def handle_add(self, msg, server_rx):
channelid = msg["channelid"] if not self._mailbox:
if channelid not in self._channels: raise Error("must open mailbox before adding")
raise Error("must claim channel before adding")
assert isinstance(channelid, type(u""))
channel = self._channels[channelid]
if "phase" not in msg: if "phase" not in msg:
raise Error("missing 'phase'") raise Error("missing 'phase'")
if "body" not in msg: if "body" not in msg:
raise Error("missing 'body'") raise Error("missing 'body'")
msgid = msg.get("id") # optional msgid = msg.get("id") # optional
channel.add_message(self._side, msg["phase"], msg["body"], self._mailbox.add_message(self._side, msg["phase"], msg["body"],
server_rx, msgid) server_rx, msgid)
def handle_close(self, msg): def handle_close(self, msg, server_rx):
channelid = msg["channelid"] if not self._mailbox:
if channelid not in self._channels: raise Error("must open mailbox before closing")
raise Error("must claim channel before releasing") deleted = self._mailbox.close(self._side, msg.get("mood"), server_rx)
assert isinstance(channelid, type(u"")) self._mailbox = None
channel = self._channels[channelid]
deleted = channel.release(self._side, msg.get("mood"))
del self._channels[channelid]
self.send("released", status="deleted" if deleted else "waiting") self.send("released", status="deleted" if deleted else "waiting")
def send(self, mtype, **kwargs): def send(self, mtype, **kwargs):