Merge branch 'new-prune'

This changes the DB schema and rewrites the expiration/pruning
algorithm. The previous code had several bugs which failed to clean up
nameplates, mailboxes, and messages when clients didn't explicitly close
them before disconnecting (and sometimes even when they did).

The new code runs an expiration loop every 10 minutes, and prunes
anything that is more than 11 minutes old. Clients with a connected
listener (a websocket that has open()ed a Mailbox) update the "in-use"
timestamp at the beginning of the loop. As a result, nameplates and
mailboxes should be pruned within between 10 and 20 minutes after the
last activity.

Clients are expected to reconnect within 9 minutes of a connection being
lost. The current release does not survive a dropped connection, but a
future one will. By raising the 10min/11min settings for specific
applications, a "Persistent Wormhole" mode will tolerate even longer
periods of being offline.

The server now has an option to write stats to a file at the end of each
expiration loop, and the munin plugins have been rewritten to use this
file instead of reading the database directly. The file includes a
timestamp so the plugins can avoid using stale data.

When this new server is run against an old (v2) database, it should
automatically upgrade it to the new v3 schema. The nameplate and mailbox
tables will be erased, but the usage (history) will be preserved.
This commit is contained in:
Brian Warner 2016-06-26 17:50:15 -07:00
commit f005d8a9ce
20 changed files with 1112 additions and 862 deletions

19
misc/dump-stats.py Executable file
View File

@ -0,0 +1,19 @@
from __future__ import print_function
import time, json
# Run this as 'watch python misc/dump-stats.py' against a 'wormhole-server
# start --stats-file=stats.json'
with open("stats.json") as f:
data_s = f.read()
now = time.time()
data = json.loads(data_s)
if now < data["valid_until"]:
valid = "valid"
else:
valid = "EXPIRED"
age = now - data["created"]
print("age: %d (%s)" % (age, valid))
print(data_s)

50
misc/munin/wormhole_active Executable file
View File

@ -0,0 +1,50 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Active Channels
graph_vlabel Channels
graph_category network
nameplates.label Nameplates
nameplates.draw LINE2
nameplates.type GAUGE
mailboxes.label Mailboxes
mailboxes.draw LINE2
mailboxes.type GAUGE
messages.label Messages
messages.draw LINE1
messages.type GAUGE
transit_waiting.label Transit Waiting
transit_waiting.draw LINE1
transit_waiting.type GAUGE
transit_connected.label Transit Connected
transit_connected.draw LINE1
transit_connected.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
ra = data["rendezvous"]["active"]
print "nameplates.value", ra["nameplates_total"]
print "mailboxes.value", ra["mailboxes_total"]
print "messages.value", ra["messages_total"]
ta = data["transit"]["active"]
print "transit_waiting.value", ta["waiting"]
print "transit_connected.value", ta["connected"]

View File

@ -1,85 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, sqlite3
def count_events():
serverdir = os.environ["serverdir"]
dbfile = os.path.join(serverdir, "relay.sqlite")
if not os.path.exists(dbfile):
print "cannot find relay.sqlite, please set env.serverdir"
sys.exit(1)
db = sqlite3.connect(dbfile)
db.row_factory = sqlite3.Row
c_list = []
c_dict = {}
def add(key, value):
c_list.append((key, value))
c_dict[key] = value
def q(query, values=()):
return db.execute(query, values).fetchone()[0]
OLD = time.time() - 10*60
add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`"))
add("total nameplates", q("SELECT COUNT() FROM `nameplates`"))
add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`"
" WHERE `second` is null"))
add("connected nameplates", q("SELECT COUNT() FROM `nameplates`"
" WHERE `second` is not null"))
add("stale nameplates", q("SELECT COUNT() FROM `nameplates`"
" where `updated` < ?", (OLD,)))
add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`"))
add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`"
" WHERE `second` is null"))
add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`"
" WHERE `second` is not null"))
stale_mailboxes = 0
for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall():
newest = db.execute("SELECT `server_rx` FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` DESC LIMIT 1",
(mbox_row["app_id"], mbox_row["id"])).fetchone()
if newest and newest[0] < OLD:
stale_mailboxes += 1
add("stale mailboxes", stale_mailboxes)
add("messages", q("SELECT COUNT() FROM `messages`"))
return c_dict
CONFIG = """\
graph_title Magic-Wormhole Active Channels
graph_vlabel Channels
graph_category network
nameplates.label Total Nameplates
nameplates.draw LINE2
nameplates.type GAUGE
waiting_nameplates.label Waiting Nameplates
waiting_nameplates.draw LINE2
waiting_nameplates.type GAUGE
mailboxes.label Total Mailboxes
mailboxes.draw LINE2
mailboxes.type GAUGE
waiting_mailboxes.label Waiting Mailboxes
waiting_mailboxes.draw LINE2
waiting_mailboxes.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
c = count_events()
print "nameplates.value", c["total nameplates"]
print "waiting_nameplates.value", c["waiting nameplates"]
print "mailboxes.value", c["total mailboxes"]
print "waiting_mailboxes.value", c["waiting mailboxes"]

View File

@ -7,88 +7,41 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.serverdir /path/to/your/wormhole/server env.serverdir /path/to/your/wormhole/server
""" """
import os, sys, sqlite3 import os, sys, time, json
def count_events():
serverdir = os.environ["serverdir"]
dbfile = os.path.join(serverdir, "relay.sqlite")
if not os.path.exists(dbfile):
print "cannot find relay.sqlite, please set env.serverdir"
sys.exit(1)
db = sqlite3.connect(dbfile)
c_list = []
c_dict = {}
def add(key, value):
c_list.append((key, value))
c_dict[key] = value
def q(query, values=()):
return db.execute(query, values).fetchone()[0]
add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`"))
add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`"))
add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='happy'"))
add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='lonely'"))
add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='pruney'"))
add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='crowded'"))
add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"))
add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='happy'"))
add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='scary'"))
add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='lonely'"))
add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='errory'"))
add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='pruney'"))
add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='crowded'"))
add("total transit", q("SELECT COUNT() FROM `transit_usage`"))
add("happy transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='happy'"))
add("lonely transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='lonely'"))
add("errory transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='errory'"))
add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`"))
return c_dict
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Server Errors graph_title Magic-Wormhole Server Errors
graph_vlabel Events per Hour graph_vlabel Events Since Reboot
graph_category network graph_category network
nameplates.label Nameplates nameplates.label Nameplate Errors (total)
nameplates.draw LINE1 nameplates.draw LINE1
nameplates.type DERIVE nameplates.type GAUGE
nameplates.min 0 mailboxes.label Mailboxes (total)
nameplates.cdef nameplates,3600,*
mailboxes.label Mailboxes
mailboxes.draw LINE1 mailboxes.draw LINE1
mailboxes.type DERIVE mailboxes.type GAUGE
mailboxes.min 0 mailboxes_scary.label Mailboxes (scary)
mailboxes.cdef mailboxes,3600,* mailboxes_scary.draw LINE1
mailboxes_scary.type GAUGE
transit.label Transit transit.label Transit
transit.draw LINE1 transit.draw LINE1
transit.type DERIVE transit.type GAUGE
transit.min 0
transit.cdef transit,3600,*
""" """
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip() print CONFIG.rstrip()
sys.exit(0) sys.exit(0)
c = count_events() serverdir = os.environ["serverdir"]
print "nameplates.value", c["total nameplates"] - c["happy nameplates"] fn = os.path.join(serverdir, "stats.json")
print "mailboxes.value", c["total mailboxes"] - c["happy mailboxes"] with open(fn) as f:
print "transit.value", c["total transit"] - c["happy transit"] data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
r = data["rendezvous"]["since_reboot"]
print "nameplates.value", (r["nameplates_total"]
- r["nameplate_moods"].get("happy", 0))
print "mailboxes.value", (r["mailboxes_total"]
- r["mailbox_moods"].get("happy", 0))
t = data["transit"]["since_reboot"]
print "transit.value", (t["total"] - t["moods"].get("happy", 0))

50
misc/munin/wormhole_event_rate Executable file
View File

@ -0,0 +1,50 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, json
CONFIG = """\
graph_title Magic-Wormhole Server Events
graph_vlabel Events per Hour
graph_category network
happy.label Happy
happy.draw LINE
happy.type DERIVE
happy.min 0
happy.max 60
happy.cdef happy,3600,*
incomplete.label Incomplete
incomplete.draw LINE
incomplete.type DERIVE
incomplete.min 0
incomplete.max 60
incomplete.cdef happy,3600,*
scary.label Scary
scary.draw LINE
scary.type DERIVE
scary.min 0
scary.max 60
scary.cdef happy,3600,*
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
serverdir = os.environ["serverdir"]
fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
atm = data["rendezvous"]["all_time"]["mailbox_moods"]
print "happy.value", atm.get("happy", 0)
print "incomplete.value", (atm.get("pruney", 0) + atm.get("lonely", 0))
print "scary.value", atm.get("scary", 0)

View File

@ -7,88 +7,48 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.serverdir /path/to/your/wormhole/server env.serverdir /path/to/your/wormhole/server
""" """
import os, sys, sqlite3 import os, sys, time, json
def count_events():
serverdir = os.environ["serverdir"]
dbfile = os.path.join(serverdir, "relay.sqlite")
if not os.path.exists(dbfile):
print "cannot find relay.sqlite, please set env.serverdir"
sys.exit(1)
db = sqlite3.connect(dbfile)
c_list = []
c_dict = {}
def add(key, value):
c_list.append((key, value))
c_dict[key] = value
def q(query, values=()):
return db.execute(query, values).fetchone()[0]
add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`"))
add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`"))
add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='happy'"))
add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='lonely'"))
add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='pruney'"))
add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='crowded'"))
add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"))
add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='happy'"))
add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='scary'"))
add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='lonely'"))
add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='errory'"))
add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='pruney'"))
add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='crowded'"))
add("total transit", q("SELECT COUNT() FROM `transit_usage`"))
add("happy transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='happy'"))
add("lonely transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='lonely'"))
add("errory transit", q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='errory'"))
add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`"))
return c_dict
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Server Events graph_title Magic-Wormhole Mailbox Events
graph_vlabel Events per Hour graph_vlabel Events Since Reboot
graph_category network graph_category network
nameplates.label Nameplates total.label Total
nameplates.draw LINE total.draw LINE1
nameplates.type DERIVE total.type GAUGE
nameplates.min 0 happy.label Happy
nameplates.cdef nameplates,3600,* happy.draw LINE2
mailboxes.label Mailboxes happy.type GAUGE
mailboxes.draw LINE pruney.label Pruney
mailboxes.type DERIVE pruney.draw LINE1
mailboxes.min 0 pruney.type GAUGE
mailboxes.cdef mailboxes,3600,* incomplete.label Incomplete (pruned/lonely)
transit.label Transit incomplete.draw LINE2
transit.draw LINE incomplete.type GAUGE
transit.type DERIVE scary.label Scary
transit.min 0 scary.draw LINE1
transit.cdef transit,3600,* scary.type GAUGE
errory.label Errory
errory.draw LINE1
errory.type GAUGE
""" """
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip() print CONFIG.rstrip()
sys.exit(0) sys.exit(0)
c = count_events() serverdir = os.environ["serverdir"]
print "nameplates.value", c["total nameplates"] fn = os.path.join(serverdir, "stats.json")
print "mailboxes.value", c["total mailboxes"] with open(fn) as f:
print "transit.value", c["total transit"] data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
r = data["rendezvous"]["since_reboot"]
print "total.value", r["mailboxes_total"]
print "happy.value", r["mailbox_moods"].get("happy", 0)
print "pruney.value", r["mailbox_moods"].get("pruney", 0)
print "incomplete.value", (r["mailbox_moods"].get("pruney", 0)
+ r["mailbox_moods"].get("lonely", 0))
print "scary.value", r["mailbox_moods"].get("scary", 0)
print "errory.value", r["mailbox_moods"].get("errory", 0)

View File

@ -1,73 +0,0 @@
#! /usr/bin/env python
"""
Use the following in /etc/munin/plugin-conf.d/wormhole :
[wormhole_*]
env.serverdir /path/to/your/wormhole/server
"""
import os, sys, time, sqlite3
def count_events():
serverdir = os.environ["serverdir"]
dbfile = os.path.join(serverdir, "relay.sqlite")
if not os.path.exists(dbfile):
print "cannot find relay.sqlite, please set env.serverdir"
sys.exit(1)
db = sqlite3.connect(dbfile)
db.row_factory = sqlite3.Row
c_list = []
c_dict = {}
def add(key, value):
c_list.append((key, value))
c_dict[key] = value
def q(query, values=()):
return db.execute(query, values).fetchone()[0]
OLD = time.time() - 10*60
add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`"))
add("total nameplates", q("SELECT COUNT() FROM `nameplates`"))
add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`"
" WHERE `second` is null"))
add("connected nameplates", q("SELECT COUNT() FROM `nameplates`"
" WHERE `second` is not null"))
add("stale nameplates", q("SELECT COUNT() FROM `nameplates`"
" where `updated` < ?", (OLD,)))
add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`"))
add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`"
" WHERE `second` is null"))
add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`"
" WHERE `second` is not null"))
stale_mailboxes = 0
for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall():
newest = db.execute("SELECT `server_rx` FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` DESC LIMIT 1",
(mbox_row["app_id"], mbox_row["id"])).fetchone()
if newest and newest[0] < OLD:
stale_mailboxes += 1
add("stale mailboxes", stale_mailboxes)
add("messages", q("SELECT COUNT() FROM `messages`"))
return c_dict
CONFIG = """\
graph_title Magic-Wormhole Queued Messages
graph_vlabel Messages
graph_category network
messages.label Total Messages
messages.draw LINE2
messages.type GAUGE
"""
if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip()
sys.exit(0)
c = count_events()
print "messages.value", c["messages"]

View File

@ -7,7 +7,7 @@ Use the following in /etc/munin/plugin-conf.d/wormhole :
env.serverdir /path/to/your/wormhole/server env.serverdir /path/to/your/wormhole/server
""" """
import os, sys, sqlite3 import os, sys, time, json
def count_events(): def count_events():
serverdir = os.environ["serverdir"] serverdir = os.environ["serverdir"]
@ -65,18 +65,23 @@ def count_events():
CONFIG = """\ CONFIG = """\
graph_title Magic-Wormhole Transit Usage graph_title Magic-Wormhole Transit Usage
graph_vlabel Bytes per Hour graph_vlabel Bytes Since Reboot
graph_category network graph_category network
bytes.label Transit Bytes bytes.label Transit Bytes
bytes.draw LINE1 bytes.draw LINE1
bytes.type DERIVE bytes.type GAUGE
bytes.min 0
bytes.cdef bytes,3600,*
""" """
if len(sys.argv) > 1 and sys.argv[1] == "config": if len(sys.argv) > 1 and sys.argv[1] == "config":
print CONFIG.rstrip() print CONFIG.rstrip()
sys.exit(0) sys.exit(0)
c = count_events() serverdir = os.environ["serverdir"]
print "bytes.value", c["transit bytes"] fn = os.path.join(serverdir, "stats.json")
with open(fn) as f:
data = json.load(f)
if time.time() > data["valid_until"]:
sys.exit(1) # expired
t = data["transit"]["since_reboot"]
print "bytes.value", t["bytes"]

View File

@ -48,8 +48,13 @@ def server(ctx):
"--signal-error", is_flag=True, "--signal-error", is_flag=True,
help="force all clients to fail with a message", help="force all clients to fail with a message",
) )
@click.option(
"--stats-file", default=None, type=type(u""), metavar="stats.json",
help="periodically write stats to a file for monitoring tools like Munin",
)
@click.pass_obj @click.pass_obj
def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit, rendezvous): def start(cfg, signal_error, no_daemon, blur_usage, advertise_version,
transit, rendezvous, stats_file):
""" """
Start a relay server Start a relay server
""" """
@ -60,6 +65,7 @@ def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit,
cfg.transit = str(transit) cfg.transit = str(transit)
cfg.rendezvous = str(rendezvous) cfg.rendezvous = str(rendezvous)
cfg.signal_error = signal_error cfg.signal_error = signal_error
cfg.stats_file = stats_file
start_server(cfg) start_server(cfg)
@ -92,8 +98,13 @@ def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit,
"--signal-error", is_flag=True, "--signal-error", is_flag=True,
help="force all clients to fail with a message", help="force all clients to fail with a message",
) )
@click.option(
"--stats-file", default=None, type=type(u""), metavar="stats.json",
help="periodically write stats to a file for monitoring tools like Munin",
)
@click.pass_obj @click.pass_obj
def restart(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit, rendezvous): def restart(cfg, signal_error, no_daemon, blur_usage, advertise_version,
transit, rendezvous, stats_file):
""" """
Re-start a relay server Re-start a relay server
""" """
@ -104,6 +115,7 @@ def restart(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit
cfg.transit = str(transit) cfg.transit = str(transit)
cfg.rendezvous = str(rendezvous) cfg.rendezvous = str(rendezvous)
cfg.signal_error = signal_error cfg.signal_error = signal_error
cfg.stats_file = stats_file
restart_server(cfg) restart_server(cfg)

View File

@ -15,6 +15,7 @@ class MyPlugin:
self.args.advertise_version, self.args.advertise_version,
"relay.sqlite", self.args.blur_usage, "relay.sqlite", self.args.blur_usage,
signal_error=self.args.signal_error, signal_error=self.args.signal_error,
stats_file=self.args.stats_file,
) )
class MyTwistdConfig(twistd.ServerOptions): class MyTwistdConfig(twistd.ServerOptions):

View File

@ -16,7 +16,7 @@ def get_upgrader(new_version):
"db-schemas/upgrade-to-v%d.sql" % new_version) "db-schemas/upgrade-to-v%d.sql" % new_version)
return schema_bytes.decode("utf-8") return schema_bytes.decode("utf-8")
TARGET_VERSION = 2 TARGET_VERSION = 3
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
@ -35,6 +35,10 @@ def get_db(dbfile, target_version=TARGET_VERSION, stderr=sys.stderr):
except (EnvironmentError, sqlite3.OperationalError) as e: except (EnvironmentError, sqlite3.OperationalError) as e:
raise DBError("Unable to create/open db file %s: %s" % (dbfile, e)) raise DBError("Unable to create/open db file %s: %s" % (dbfile, e))
db.row_factory = dict_factory db.row_factory = dict_factory
db.execute("PRAGMA foreign_keys = ON")
problems = db.execute("PRAGMA foreign_key_check").fetchall()
if problems:
raise DBError("failed foreign key check: %s" % (problems,))
if must_create: if must_create:
schema = get_schema(target_version) schema = get_schema(target_version)

View File

@ -0,0 +1,68 @@
DROP TABLE `nameplates`;
DROP TABLE `messages`;
DROP TABLE `mailboxes`;
-- Wormhole codes use a "nameplate": a short name which is only used to
-- reference a specific (long-named) mailbox. The codes only use numeric
-- nameplates, but the protocol and server allow can use arbitrary strings.
CREATE TABLE `nameplates`
(
`id` INTEGER PRIMARY KEY AUTOINCREMENT,
`app_id` VARCHAR,
`name` VARCHAR,
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
`request_id` VARCHAR -- from 'allocate' message, for future deduplication
);
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
CREATE TABLE `nameplate_sides`
(
`nameplates_id` REFERENCES `nameplates`(`id`),
`claimed` BOOLEAN, -- True after claim(), False after release()
`side` VARCHAR,
`added` INTEGER -- time when this side first claimed the nameplate
);
-- Clients exchange messages through a "mailbox", which has a long (randomly
-- unique) identifier and a queue of messages.
-- `id` is randomly-generated and unique across all apps.
CREATE TABLE `mailboxes`
(
`app_id` VARCHAR,
`id` VARCHAR PRIMARY KEY,
`updated` INTEGER, -- time of last activity, used for pruning
`for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone
);
CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`);
CREATE TABLE `mailbox_sides`
(
`mailbox_id` REFERENCES `mailboxes`(`id`),
`opened` BOOLEAN, -- True after open(), False after close()
`side` VARCHAR,
`added` INTEGER, -- time when this side first opened the mailbox
`mood` VARCHAR
);
CREATE TABLE `messages`
(
`app_id` VARCHAR,
`mailbox_id` VARCHAR,
`side` VARCHAR,
`phase` VARCHAR, -- numeric or string
`body` VARCHAR,
`server_rx` INTEGER,
`msg_id` VARCHAR
);
CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`);
ALTER TABLE `mailbox_usage` ADD COLUMN `for_nameplate` BOOLEAN;
CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`);
CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`);
DELETE FROM `version`;
INSERT INTO `version` (`version`) VALUES (3);

View File

@ -0,0 +1,115 @@
-- note: anything which isn't an boolean, integer, or human-readable unicode
-- string, (i.e. binary strings) will be stored as hex
CREATE TABLE `version`
(
`version` INTEGER -- contains one row, set to 3
);
-- Wormhole codes use a "nameplate": a short name which is only used to
-- reference a specific (long-named) mailbox. The codes only use numeric
-- nameplates, but the protocol and server allow can use arbitrary strings.
CREATE TABLE `nameplates`
(
`id` INTEGER PRIMARY KEY AUTOINCREMENT,
`app_id` VARCHAR,
`name` VARCHAR,
`mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`),
`request_id` VARCHAR -- from 'allocate' message, for future deduplication
);
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
CREATE TABLE `nameplate_sides`
(
`nameplates_id` REFERENCES `nameplates`(`id`),
`claimed` BOOLEAN, -- True after claim(), False after release()
`side` VARCHAR,
`added` INTEGER -- time when this side first claimed the nameplate
);
-- Clients exchange messages through a "mailbox", which has a long (randomly
-- unique) identifier and a queue of messages.
-- `id` is randomly-generated and unique across all apps.
CREATE TABLE `mailboxes`
(
`app_id` VARCHAR,
`id` VARCHAR PRIMARY KEY,
`updated` INTEGER, -- time of last activity, used for pruning
`for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone
);
CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`);
CREATE TABLE `mailbox_sides`
(
`mailbox_id` REFERENCES `mailboxes`(`id`),
`opened` BOOLEAN, -- True after open(), False after close()
`side` VARCHAR,
`added` INTEGER, -- time when this side first opened the mailbox
`mood` VARCHAR
);
CREATE TABLE `messages`
(
`app_id` VARCHAR,
`mailbox_id` VARCHAR,
`side` VARCHAR,
`phase` VARCHAR, -- numeric or string
`body` VARCHAR,
`server_rx` INTEGER,
`msg_id` VARCHAR
);
CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`);
CREATE TABLE `nameplate_usage`
(
`app_id` VARCHAR,
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_time` INTEGER, -- seconds from open to last close/prune
`result` VARCHAR -- happy, lonely, pruney, crowded
-- nameplate moods:
-- "happy": two sides open and close
-- "lonely": one side opens and closes (no response from 2nd side)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
);
CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`);
CREATE TABLE `mailbox_usage`
(
`app_id` VARCHAR,
`for_nameplate` BOOLEAN, -- allocated for a nameplate, not standalone
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- rendezvous moods:
-- "happy": both sides close with mood=happy
-- "scary": any side closes with mood=scary (bad MAC, probably wrong pw)
-- "lonely": any side closes with mood=lonely (no response from 2nd side)
-- "errory": any side closes with mood=errory (other errors)
-- "pruney": channels which get pruned for inactivity
-- "crowded": three or more sides were involved
);
CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`);
CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`);
CREATE TABLE `transit_usage`
(
`started` INTEGER, -- seconds since epoch, rounded to "blur time"
`total_time` INTEGER, -- seconds from open to last close
`waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None
`total_bytes` INTEGER, -- total bytes relayed (both directions)
`result` VARCHAR -- happy, scary, lonely, errory, pruney
-- transit moods:
-- "errory": one side gave the wrong handshake
-- "lonely": good handshake, but the other side never showed up
-- "happy": both sides gave correct handshake
);
CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`);
CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`);

View File

@ -1,48 +1,15 @@
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
import os, time, random, base64 import os, random, base64, collections
from collections import namedtuple from collections import namedtuple
from twisted.python import log from twisted.python import log
from twisted.application import service, internet from twisted.application import service
SECONDS = 1.0
MINUTE = 60*SECONDS
HOUR = 60*MINUTE
DAY = 24*HOUR
MB = 1000*1000
CHANNEL_EXPIRATION_TIME = 2*HOUR
EXPIRATION_CHECK_PERIOD = 1*HOUR
def generate_mailbox_id(): def generate_mailbox_id():
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii") return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
SideResult = namedtuple("SideResult", ["changed", "empty", "side1", "side2"])
Unchanged = SideResult(changed=False, empty=False, side1=None, side2=None)
class CrowdedError(Exception): class CrowdedError(Exception):
pass pass
def add_side(row, new_side):
old_sides = [s for s in [row["side1"], row["side2"]] if s]
assert old_sides
if new_side in old_sides:
return Unchanged
if len(old_sides) == 2:
raise CrowdedError("too many sides for this thing")
return SideResult(changed=True, empty=False,
side1=old_sides[0], side2=new_side)
def remove_side(row, side):
old_sides = [s for s in [row["side1"], row["side2"]] if s]
if side not in old_sides:
return Unchanged
remaining_sides = old_sides[:]
remaining_sides.remove(side)
if remaining_sides:
return SideResult(changed=True, empty=False, side1=remaining_sides[0],
side2=None)
return SideResult(changed=True, empty=True, side1=None, side2=None)
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"]) Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
TransitUsage = namedtuple("TransitUsage", TransitUsage = namedtuple("TransitUsage",
["started", "waiting_time", "total_time", ["started", "waiting_time", "total_time",
@ -65,23 +32,21 @@ class Mailbox:
# requires caller to db.commit() # requires caller to db.commit()
assert isinstance(side, type("")), type(side) assert isinstance(side, type("")), type(side)
db = self._db db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?", already = db.execute("SELECT * FROM `mailbox_sides`"
(self._app_id, self._mailbox_id)).fetchone() " WHERE `mailbox_id`=? AND `side`=?",
try: (self._mailbox_id, side)).fetchone()
sr = add_side(row, side) if not already:
except CrowdedError: db.execute("INSERT INTO `mailbox_sides`"
db.execute("UPDATE `mailboxes` SET `crowded`=?" " (`mailbox_id`, `opened`, `side`, `added`)"
" WHERE `app_id`=? AND `id`=?", " VALUES(?,?,?,?)",
(True, self._app_id, self._mailbox_id)) (self._mailbox_id, True, side, when))
db.commit() self._touch(when)
raise db.commit() # XXX: reconcile the need for this with the comment above
if sr.changed:
db.execute("UPDATE `mailboxes` SET" def _touch(self, when):
" `side1`=?, `side2`=?, `second`=?" self._db.execute("UPDATE `mailboxes` SET `updated`=? WHERE `id`=?",
" WHERE `app_id`=? AND `id`=?", (when, self._mailbox_id))
(sr.side1, sr.side2, when,
self._app_id, self._mailbox_id))
def get_messages(self): def get_messages(self):
messages = [] messages = []
@ -97,12 +62,15 @@ class Mailbox:
return messages return messages
def add_listener(self, handle, send_f, stop_f): def add_listener(self, handle, send_f, stop_f):
# TODO: update 'updated' #log.msg("add_listener", self._mailbox_id, handle)
self._listeners[handle] = (send_f, stop_f) self._listeners[handle] = (send_f, stop_f)
#log.msg(" added", len(self._listeners))
return self.get_messages() return self.get_messages()
def remove_listener(self, handle): def remove_listener(self, handle):
self._listeners.pop(handle) #log.msg("remove_listener", self._mailbox_id, handle)
self._listeners.pop(handle, None)
#log.msg(" removed", len(self._listeners))
def has_listeners(self): def has_listeners(self):
return bool(self._listeners) return bool(self._listeners)
@ -118,6 +86,7 @@ class Mailbox:
" VALUES (?,?,?,?,?, ?,?)", " VALUES (?,?,?,?,?, ?,?)",
(self._app_id, self._mailbox_id, sm.side, (self._app_id, self._mailbox_id, sm.side,
sm.phase, sm.body, sm.server_rx, sm.msg_id)) sm.phase, sm.body, sm.server_rx, sm.msg_id))
self._touch(sm.server_rx)
self._db.commit() self._db.commit()
def add_message(self, sm): def add_message(self, sm):
@ -133,43 +102,46 @@ class Mailbox:
(self._app_id, self._mailbox_id)).fetchone() (self._app_id, self._mailbox_id)).fetchone()
if not row: if not row:
return return
sr = remove_side(row, side) for_nameplate = row["for_nameplate"]
if sr.empty:
self._app._summarize_mailbox_and_store(self._mailbox_id, row,
mood, when, pruned=False)
self._delete()
db.commit()
elif sr.changed:
db.execute("UPDATE `mailboxes`"
" SET `side1`=?, `side2`=?, `first_mood`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, mood,
self._app_id, self._mailbox_id))
db.commit()
def _delete(self): row = db.execute("SELECT * FROM `mailbox_sides`"
# requires caller to db.commit() " WHERE `mailbox_id`=? AND `side`=?",
self._db.execute("DELETE FROM `mailboxes`" (self._mailbox_id, side)).fetchone()
" WHERE `app_id`=? AND `id`=?", if not row:
(self._app_id, self._mailbox_id)) return
self._db.execute("DELETE FROM `messages`" db.execute("UPDATE `mailbox_sides` SET `opened`=?, `mood`=?"
" WHERE `app_id`=? AND `mailbox_id`=?", " WHERE `mailbox_id`=? AND `side`=?",
(self._app_id, self._mailbox_id)) (False, mood, self._mailbox_id, side))
db.commit()
# are any sides still open?
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(self._mailbox_id,)).fetchall()
if any([sr["opened"] for sr in side_rows]):
return
# nope. delete and summarize
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
(self._mailbox_id,))
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
(self._mailbox_id,))
db.execute("DELETE FROM `mailboxes` WHERE `id`=?", (self._mailbox_id,))
self._app._summarize_mailbox_and_store(for_nameplate, side_rows,
when, pruned=False)
db.commit()
# Shut down any listeners, just in case they're still lingering # Shut down any listeners, just in case they're still lingering
# around. # around.
for (send_f, stop_f) in self._listeners.values(): for (send_f, stop_f) in self._listeners.values():
stop_f() stop_f()
self._listeners = {}
self._app.free_mailbox(self._mailbox_id) self._app.free_mailbox(self._mailbox_id)
def is_active(self):
return bool(self._listeners)
def _shutdown(self): def _shutdown(self):
# used at test shutdown to accelerate client disconnects # used at test shutdown to accelerate client disconnects
for (send_f, stop_f) in self._listeners.values(): for (send_f, stop_f) in self._listeners.values():
stop_f() stop_f()
self._listeners = {}
class AppNamespace: class AppNamespace:
def __init__(self, db, blur_usage, log_requests, app_id): def __init__(self, db, blur_usage, log_requests, app_id):
@ -178,22 +150,15 @@ class AppNamespace:
self._log_requests = log_requests self._log_requests = log_requests
self._app_id = app_id self._app_id = app_id
self._mailboxes = {} self._mailboxes = {}
self._nameplate_counts = collections.defaultdict(int)
def is_active(self): self._mailbox_counts = collections.defaultdict(int)
# An idle AppNamespace does not need to be kept in memory: it can be
# reconstructed from the DB if needed. And active one must be kept
# alive.
for mb in self._mailboxes.values():
if mb.is_active():
return True
return False
def get_nameplate_ids(self): def get_nameplate_ids(self):
db = self._db db = self._db
# TODO: filter this to numeric ids? # TODO: filter this to numeric ids?
c = db.execute("SELECT DISTINCT `id` FROM `nameplates`" c = db.execute("SELECT DISTINCT `name` FROM `nameplates`"
" WHERE `app_id`=?", (self._app_id,)) " WHERE `app_id`=?", (self._app_id,))
return set([row["id"] for row in c.fetchall()]) return set([row["name"] for row in c.fetchall()])
def _find_available_nameplate_id(self): def _find_available_nameplate_id(self):
claimed = self.get_nameplate_ids() claimed = self.get_nameplate_ids()
@ -219,127 +184,157 @@ class AppNamespace:
del mailbox_id # ignored, they'll learn it from claim() del mailbox_id # ignored, they'll learn it from claim()
return nameplate_id return nameplate_id
def claim_nameplate(self, nameplate_id, side, when, _test_mailbox_id=None): def claim_nameplate(self, name, side, when):
# when we're done: # when we're done:
# * there will be one row for the nameplate # * there will be one row for the nameplate
# * side1 or side2 will be populated # * there will be one 'side' attached to it, with claimed=True
# * started or second will be populated # * a mailbox id and mailbox row will be created
# * a mailbox id will be created, but not a mailbox row # * a mailbox 'side' will be attached, with opened=True
# (ids are randomly unique, so we can defer creation until 'open') assert isinstance(name, type("")), type(name)
assert isinstance(nameplate_id, type("")), type(nameplate_id)
assert isinstance(side, type("")), type(side) assert isinstance(side, type("")), type(side)
db = self._db db = self._db
row = db.execute("SELECT * FROM `nameplates`" row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?", " WHERE `app_id`=? AND `name`=?",
(self._app_id, nameplate_id)).fetchone() (self._app_id, name)).fetchone()
if row: if not row:
mailbox_id = row["mailbox_id"]
try:
sr = add_side(row, side)
except CrowdedError:
db.execute("UPDATE `nameplates` SET `crowded`=?"
" WHERE `app_id`=? AND `id`=?",
(True, self._app_id, nameplate_id))
db.commit()
raise
if sr.changed:
db.execute("UPDATE `nameplates` SET"
" `side1`=?, `side2`=?, `updated`=?, `second`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when, when,
self._app_id, nameplate_id))
else:
if self._log_requests: if self._log_requests:
log.msg("creating nameplate#%s for app_id %s" % log.msg("creating nameplate#%s for app_id %s" %
(nameplate_id, self._app_id)) (name, self._app_id))
if _test_mailbox_id is not None: # for unit tests mailbox_id = generate_mailbox_id()
mailbox_id = _test_mailbox_id self._add_mailbox(mailbox_id, True, side, when) # ensure row exists
else: sql = ("INSERT INTO `nameplates`"
mailbox_id = generate_mailbox_id() " (`app_id`, `name`, `mailbox_id`)"
db.execute("INSERT INTO `nameplates`" " VALUES(?,?,?)")
" (`app_id`, `id`, `mailbox_id`, `side1`, `crowded`," npid = db.execute(sql, (self._app_id, name, mailbox_id)
" `updated`, `started`)" ).lastrowid
" VALUES(?,?,?,?,?, ?,?)", else:
(self._app_id, nameplate_id, mailbox_id, side, False, npid = row["id"]
when, when)) mailbox_id = row["mailbox_id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
db.execute("INSERT INTO `nameplate_sides`"
" (`nameplates_id`, `claimed`, `side`, `added`)"
" VALUES(?,?,?,?)",
(npid, True, side, when))
db.commit() db.commit()
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?", (npid,)).fetchall()
if len(rows) > 2:
# this line will probably never get hit: any crowding is noticed
# on mailbox_sides first, inside open_mailbox()
raise CrowdedError("too many sides have claimed this nameplate")
return mailbox_id return mailbox_id
def release_nameplate(self, nameplate_id, side, when): def release_nameplate(self, name, side, when):
# when we're done: # when we're done:
# * in the nameplate row, side1 or side2 will be removed # * the 'claimed' flag will be cleared on the nameplate_sides row
# * if the nameplate is now unused: # * if the nameplate is now unused (no claimed sides):
# * mailbox.nameplate_closed will be populated # * mailbox.nameplate_closed will be populated
# * the nameplate row will be removed # * the nameplate row will be removed
assert isinstance(nameplate_id, type("")), type(nameplate_id) # * the nameplate sides will be removed
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side) assert isinstance(side, type("")), type(side)
db = self._db db = self._db
row = db.execute("SELECT * FROM `nameplates`" np_row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?", " WHERE `app_id`=? AND `name`=?",
(self._app_id, nameplate_id)).fetchone() (self._app_id, name)).fetchone()
if not np_row:
return
npid = np_row["id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row: if not row:
return return
sr = remove_side(row, side) db.execute("UPDATE `nameplate_sides` SET `claimed`=?"
if sr.empty: " WHERE `nameplates_id`=? AND `side`=?",
db.execute("DELETE FROM `nameplates`" (False, npid, side))
" WHERE `app_id`=? AND `id`=?", db.commit()
(self._app_id, nameplate_id))
self._summarize_nameplate_and_store(row, when, pruned=False)
db.commit()
elif sr.changed:
db.execute("UPDATE `nameplates`"
" SET `side1`=?, `side2`=?, `updated`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when,
self._app_id, nameplate_id))
db.commit()
def _summarize_nameplate_and_store(self, row, delete_time, pruned): # now, are there any remaining claims?
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
claims = [1 for sr in side_rows if sr["claimed"]]
if claims:
return
# delete and summarize
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, when, pruned=False)
db.commit()
def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned):
# requires caller to db.commit() # requires caller to db.commit()
u = self._summarize_nameplate_usage(row, delete_time, pruned) u = self._summarize_nameplate_usage(side_rows, delete_time, pruned)
self._db.execute("INSERT INTO `nameplate_usage`" self._db.execute("INSERT INTO `nameplate_usage`"
" (`app_id`," " (`app_id`,"
" `started`, `total_time`, `waiting_time`, `result`)" " `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)", " VALUES (?, ?,?,?,?)",
(self._app_id, (self._app_id,
u.started, u.total_time, u.waiting_time, u.result)) u.started, u.total_time, u.waiting_time, u.result))
self._nameplate_counts[u.result] += 1
def _summarize_nameplate_usage(self, row, delete_time, pruned): def _summarize_nameplate_usage(self, side_rows, delete_time, pruned):
started = row["started"] times = sorted([row["added"] for row in side_rows])
started = times[0]
if self._blur_usage: if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage) started = self._blur_usage * (started // self._blur_usage)
waiting_time = None waiting_time = None
if row["second"]: if len(times) > 1:
waiting_time = row["second"] - row["started"] waiting_time = times[1] - times[0]
total_time = delete_time - row["started"] total_time = delete_time - times[0]
result = "lonely" result = "lonely"
if row["second"]: if len(times) == 2:
result = "happy" result = "happy"
if pruned: if pruned:
result = "pruney" result = "pruney"
if row["crowded"]: if len(times) > 2:
result = "crowded" result = "crowded"
return Usage(started=started, waiting_time=waiting_time, return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result) total_time=total_time, result=result)
def open_mailbox(self, mailbox_id, side, when): def _add_mailbox(self, mailbox_id, for_nameplate, side, when):
assert isinstance(mailbox_id, type("")), type(mailbox_id) assert isinstance(mailbox_id, type("")), type(mailbox_id)
db = self._db db = self._db
if not mailbox_id in self._mailboxes: row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, mailbox_id)).fetchone()
if not row:
self._db.execute("INSERT INTO `mailboxes`"
" (`app_id`, `id`, `for_nameplate`, `updated`)"
" VALUES(?,?,?,?)",
(self._app_id, mailbox_id, for_nameplate, when))
# we don't need a commit here, because mailbox.open() only
# does SELECT FROM `mailbox_sides`, not from `mailboxes`
def open_mailbox(self, mailbox_id, side, when):
assert isinstance(mailbox_id, type("")), type(mailbox_id)
self._add_mailbox(mailbox_id, False, side, when) # ensure row exists
db = self._db
if not mailbox_id in self._mailboxes: # ensure Mailbox object exists
if self._log_requests: if self._log_requests:
log.msg("spawning #%s for app_id %s" % (mailbox_id, log.msg("spawning #%s for app_id %s" % (mailbox_id,
self._app_id)) self._app_id))
db.execute("INSERT INTO `mailboxes`"
" (`app_id`, `id`, `side1`, `crowded`, `started`)"
" VALUES(?,?,?,?,?)",
(self._app_id, mailbox_id, side, False, when))
db.commit() # XXX
# mailbox.open() does a SELECT to find the old sides
self._mailboxes[mailbox_id] = Mailbox(self, self._db, self._mailboxes[mailbox_id] = Mailbox(self, self._db,
self._app_id, mailbox_id) self._app_id, mailbox_id)
mailbox = self._mailboxes[mailbox_id] mailbox = self._mailboxes[mailbox_id]
# delegate to mailbox.open() to add a row to mailbox_sides, and
# update the mailbox.updated timestamp
mailbox.open(side, when) mailbox.open(side, when)
db.commit() db.commit()
rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(mailbox_id,)).fetchall()
if len(rows) > 2:
raise CrowdedError("too many sides have opened this mailbox")
return mailbox return mailbox
def free_mailbox(self, mailbox_id): def free_mailbox(self, mailbox_id):
@ -352,32 +347,29 @@ class AppNamespace:
# log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % # log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" %
# (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) # (mailbox_id, len(self.get_claimed()), len(self._mailboxes)))
def _summarize_mailbox_and_store(self, mailbox_id, row, def _summarize_mailbox_and_store(self, for_nameplate, side_rows,
second_mood, delete_time, pruned): delete_time, pruned):
db = self._db db = self._db
rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`" u = self._summarize_mailbox(side_rows, delete_time, pruned)
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, mailbox_id)).fetchall()
num_sides = len(rows)
u = self._summarize_mailbox(row, num_sides, second_mood, delete_time,
pruned)
db.execute("INSERT INTO `mailbox_usage`" db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`," " (`app_id`, `for_nameplate`,"
" `started`, `total_time`, `waiting_time`, `result`)" " `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)", " VALUES (?,?, ?,?,?,?)",
(self._app_id, (self._app_id, for_nameplate,
u.started, u.total_time, u.waiting_time, u.result)) u.started, u.total_time, u.waiting_time, u.result))
self._mailbox_counts[u.result] += 1
def _summarize_mailbox(self, row, num_sides, second_mood, delete_time, def _summarize_mailbox(self, side_rows, delete_time, pruned):
pruned): times = sorted([row["added"] for row in side_rows])
started = row["started"] started = times[0]
if self._blur_usage: if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage) started = self._blur_usage * (started // self._blur_usage)
waiting_time = None waiting_time = None
if row["second"]: if len(times) > 1:
waiting_time = row["second"] - row["started"] waiting_time = times[1] - times[0]
total_time = delete_time - row["started"] total_time = delete_time - times[0]
num_sides = len(times)
if num_sides == 0: if num_sides == 0:
result = "quiet" result = "quiet"
elif num_sides == 1: elif num_sides == 1:
@ -385,7 +377,8 @@ class AppNamespace:
else: else:
result = "happy" result = "happy"
moods = set([row["first_mood"], second_mood]) # "mood" is only recorded at close()
moods = [row["mood"] for row in side_rows if row.get("mood")]
if "lonely" in moods: if "lonely" in moods:
result = "lonely" result = "lonely"
if "errory" in moods: if "errory" in moods:
@ -394,13 +387,24 @@ class AppNamespace:
result = "scary" result = "scary"
if pruned: if pruned:
result = "pruney" result = "pruney"
if row["crowded"]: if num_sides > 2:
result = "crowded" result = "crowded"
return Usage(started=started, waiting_time=waiting_time, return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result) total_time=total_time, result=result)
def prune(self, now, old): def prune(self, now, old):
# The pruning check runs every 10 minutes, and "old" is defined to be
# 11 minutes ago (unit tests can use different values). The client is
# allowed to disconnect for up to 9 minutes without losing the
# channel (nameplate, mailbox, and messages).
# Each time a client does something, the mailbox.updated field is
# updated with the current timestamp. If a client is subscribed to
# the mailbox when pruning check runs, the "updated" field is also
# updated. After that check, if the "updated" field is "old", the
# channel is deleted.
# For now, pruning is logged even if log_requests is False, to debug # For now, pruning is logged even if log_requests is False, to debug
# the pruning process, and since pruning is triggered by a timer # the pruning process, and since pruning is triggered by a timer
# instead of by user action. It does reveal which mailboxes were # instead of by user action. It does reveal which mailboxes were
@ -409,122 +413,74 @@ class AppNamespace:
log.msg(" prune begins (%s)" % self._app_id) log.msg(" prune begins (%s)" % self._app_id)
db = self._db db = self._db
modified = False modified = False
# for all `mailboxes`: classify as new or old
OLD = 0; NEW = 1 for mailbox in self._mailboxes.values():
all_mailboxes = {} if mailbox.has_listeners():
all_mailbox_rows = {} log.msg("touch %s because listeners" % mailbox._mailbox_id)
for row in db.execute("SELECT * FROM `mailboxes`" mailbox._touch(now)
" WHERE `app_id`=?", db.commit() # make sure the updates are visible below
new_mailboxes = set()
old_mailboxes = set()
for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?",
(self._app_id,)).fetchall(): (self._app_id,)).fetchall():
mailbox_id = row["id"] mailbox_id = row["id"]
all_mailbox_rows[mailbox_id] = row log.msg(" 1: age=%s, old=%s, %s" %
if row["started"] > old: (now - row["updated"], now - old, mailbox_id))
which = NEW
elif row["second"] and row["second"] > old:
which = NEW
else:
which = OLD
all_mailboxes[mailbox_id] = which
#log.msg(" 2: all_mailboxes", all_mailboxes, all_mailbox_rows)
# for all mailbox ids used by `messages`:
# if there is no matching mailbox: delete the messages
# if there is at least one new message (select when>old limit 1):
# classify the mailbox as new
for row in db.execute("SELECT DISTINCT(`mailbox_id`)"
" FROM `messages`"
" WHERE `app_id`=?",
(self._app_id,)).fetchall():
mailbox_id = row["mailbox_id"]
if mailbox_id not in all_mailboxes:
log.msg(" deleting orphan messages", mailbox_id)
db.execute("DELETE FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, mailbox_id))
modified = True
else:
new_msgs = db.execute("SELECT * FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" AND `server_rx` > ?"
" LIMIT 1",
(self._app_id, mailbox_id, old)
).fetchall()
if new_msgs:
#log.msg(" 3-: saved by new messages", new_msgs)
all_mailboxes[mailbox_id] = NEW
#log.msg(" 4: all_mailboxes", all_mailboxes)
# for all mailbox objects with active listeners:
# classify the mailbox as new
for mailbox_id in self._mailboxes:
#log.msg(" -5: checking", mailbox_id, self._mailboxes[mailbox_id])
if self._mailboxes[mailbox_id].has_listeners():
all_mailboxes[mailbox_id] = NEW
#log.msg(" 5: all_mailboxes", all_mailboxes)
# for all `nameplates`:
# classify as new or old
# if the linked mailbox exists:
# if it is new:
# classify nameplate as new
# if it is old:
# if the nameplate is new:
# classify mailbox as new
all_nameplates = {}
all_nameplate_rows = {}
for row in db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=?",
(self._app_id,)).fetchall():
nameplate_id = row["id"]
all_nameplate_rows[nameplate_id] = row
if row["updated"] > old: if row["updated"] > old:
which = NEW new_mailboxes.add(mailbox_id)
else: else:
which = OLD old_mailboxes.add(mailbox_id)
log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes)
old_nameplates = set()
for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?",
(self._app_id,)).fetchall():
npid = row["id"]
mailbox_id = row["mailbox_id"] mailbox_id = row["mailbox_id"]
if mailbox_id in all_mailboxes: if mailbox_id in old_mailboxes:
if all_mailboxes[mailbox_id] == NEW: old_nameplates.add(npid)
which = NEW log.msg(" 3: old_nameplates", old_nameplates)
else:
if which == NEW:
all_mailboxes[mailbox_id] = NEW
all_nameplates[nameplate_id] = which
#log.msg(" 6: all_nameplates", all_nameplates, all_nameplate_rows)
# delete all old nameplates for npid in old_nameplates:
# invariant check: if there is a linked mailbox, it is old log.msg(" deleting nameplate", npid)
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
for nameplate_id, which in all_nameplates.items(): " WHERE `nameplates_id`=?",
if which == OLD: (npid,)).fetchall()
log.msg(" deleting nameplate", nameplate_id) db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
row = all_nameplate_rows[nameplate_id] (npid,))
self._summarize_nameplate_and_store(row, now, pruned=True) db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
db.execute("DELETE FROM `nameplates`" self._summarize_nameplate_and_store(side_rows, now, pruned=True)
" WHERE `app_id`=? AND `id`=?", modified = True
(self._app_id, nameplate_id))
modified = True
# delete all messages for old mailboxes # delete all messages for old mailboxes
# delete all old mailboxes # delete all old mailboxes
for mailbox_id, which in all_mailboxes.items(): for mailbox_id in old_mailboxes:
if which == OLD: log.msg(" deleting mailbox", mailbox_id)
log.msg(" deleting mailbox", mailbox_id) row = db.execute("SELECT * FROM `mailboxes`"
self._summarize_mailbox_and_store(mailbox_id, " WHERE `id`=?", (mailbox_id,)).fetchone()
all_mailbox_rows[mailbox_id], for_nameplate = row["for_nameplate"]
"pruney", now, pruned=True) side_rows = db.execute("SELECT * FROM `mailbox_sides`"
db.execute("DELETE FROM `messages`" " WHERE `mailbox_id`=?",
" WHERE `app_id`=? AND `mailbox_id`=?", (mailbox_id,)).fetchall()
(self._app_id, mailbox_id)) db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
db.execute("DELETE FROM `mailboxes`" (mailbox_id,))
" WHERE `app_id`=? AND `id`=?", db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
(self._app_id, mailbox_id)) (mailbox_id,))
modified = True db.execute("DELETE FROM `mailboxes` WHERE `id`=?",
(mailbox_id,))
self._summarize_mailbox_and_store(for_nameplate, side_rows,
now, pruned=True)
modified = True
if modified: if modified:
db.commit() db.commit()
log.msg(" prune complete, modified:", modified) log.msg(" prune complete, modified:", modified)
def get_counts(self):
return (self._nameplate_counts, self._mailbox_counts)
def _shutdown(self): def _shutdown(self):
for channel in self._mailboxes.values(): for channel in self._mailboxes.values():
channel._shutdown() channel._shutdown()
@ -538,8 +494,6 @@ class Rendezvous(service.MultiService):
log_requests = blur_usage is None log_requests = blur_usage is None
self._log_requests = log_requests self._log_requests = log_requests
self._apps = {} self._apps = {}
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.prune)
t.setServiceParent(self)
def get_welcome(self): def get_welcome(self):
return self._welcome return self._welcome
@ -569,19 +523,94 @@ class Rendezvous(service.MultiService):
apps.add(row["app_id"]) apps.add(row["app_id"])
return apps return apps
def prune(self, now=None, old=None): def prune_all_apps(self, now, old):
# As with AppNamespace.prune_old_mailboxes, we log for now. # As with AppNamespace.prune_old_mailboxes, we log for now.
log.msg("beginning app prune") log.msg("beginning app prune")
now = now or time.time()
old = old or (now - CHANNEL_EXPIRATION_TIME)
for app_id in sorted(self.get_all_apps()): for app_id in sorted(self.get_all_apps()):
log.msg(" app prune checking %r" % (app_id,)) log.msg(" app prune checking %r" % (app_id,))
app = self.get_app(app_id) app = self.get_app(app_id)
app.prune(now, old) app.prune(now, old)
if not app.is_active(): # meaning no websockets log.msg("app prune ends, %d apps" % len(self._apps))
log.msg(" pruning idle app", app_id)
self._apps.pop(app_id) def get_stats(self):
log.msg("app prune ends, %d remaining apps" % len(self._apps)) stats = {}
# current status: expected to be zero most of the time
c = stats["active"] = {}
c["apps"] = len(self.get_all_apps())
def q(query, values=()):
row = self._db.execute(query, values).fetchone()
return list(row.values())[0]
c["nameplates_total"] = q("SELECT COUNT() FROM `nameplates`")
# TODO: nameplates with only one side (most of them)
# TODO: nameplates with two sides (very fleeting)
# TODO: nameplates with three or more sides (crowded, unlikely)
c["mailboxes_total"] = q("SELECT COUNT() FROM `mailboxes`")
# TODO: mailboxes with only one side (most of them)
# TODO: mailboxes with two sides (somewhat fleeting, in-transit)
# TODO: mailboxes with three or more sides (unlikely)
c["messages_total"] = q("SELECT COUNT() FROM `messages`")
# usage since last reboot
nameplate_counts = collections.defaultdict(int)
mailbox_counts = collections.defaultdict(int)
for app in self._apps.values():
nc, mc = app.get_counts()
for result, count in nc.items():
nameplate_counts[result] += count
for result, count in mc.items():
mailbox_counts[result] += count
urb = stats["since_reboot"] = {}
urb["nameplate_moods"] = {}
for result, count in nameplate_counts.items():
urb["nameplate_moods"][result] = count
urb["nameplates_total"] = sum(nameplate_counts.values())
urb["mailbox_moods"] = {}
for result, count in mailbox_counts.items():
urb["mailbox_moods"][result] = count
urb["mailboxes_total"] = sum(mailbox_counts.values())
# historical usage (all-time)
u = stats["all_time"] = {}
un = u["nameplate_moods"] = {}
# TODO: there's probably a single SQL query for all this
un["happy"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='happy'")
un["lonely"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='lonely'")
un["pruney"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='pruney'")
un["crowded"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='crowded'")
u["nameplates_total"] = q("SELECT COUNT() FROM `nameplate_usage`")
um = u["mailbox_moods"] = {}
um["happy"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='happy'")
um["scary"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='scary'")
um["lonely"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='lonely'")
um["quiet"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='quiet'")
um["errory"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='errory'")
um["pruney"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='pruney'")
um["crowded"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='crowded'")
u["mailboxes_total"] = q("SELECT COUNT() FROM `mailbox_usage`")
u["mailboxes_standalone"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `for_nameplate`=0")
# recent timings (last 100 operations)
# TODO: median/etc of nameplate.total_time
# TODO: median/etc of mailbox.waiting_time (should be the same)
# TODO: median/etc of mailbox.total_time
# other
# TODO: mailboxes without nameplates (needs new DB schema)
return stats
def stopService(self): def stopService(self):
# This forcibly boots any clients that are still connected, which # This forcibly boots any clients that are still connected, which

View File

@ -88,6 +88,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
self._app = None self._app = None
self._side = None self._side = None
self._did_allocate = False # only one allocate() per websocket self._did_allocate = False # only one allocate() per websocket
self._listening = False
self._nameplate_id = None self._nameplate_id = None
self._mailbox = None self._mailbox = None
@ -203,6 +204,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
body=sm.body, server_rx=sm.server_rx, id=sm.msg_id) body=sm.body, server_rx=sm.server_rx, id=sm.msg_id)
def _stop(): def _stop():
pass pass
self._listening = True
for old_sm in self._mailbox.add_listener(self, _send, _stop): for old_sm in self._mailbox.add_listener(self, _send, _stop):
_send(old_sm) _send(old_sm)
@ -222,6 +224,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
def handle_close(self, msg, server_rx): def handle_close(self, msg, server_rx):
if not self._mailbox: if not self._mailbox:
raise Error("must open mailbox before closing") raise Error("must open mailbox before closing")
if self._listening:
self._mailbox.remove_listener(self)
self._listening = False
self._mailbox.close(self._side, msg.get("mood"), server_rx) self._mailbox.close(self._side, msg.get("mood"), server_rx)
self._mailbox = None self._mailbox = None
self.send("closed") self.send("closed")
@ -233,7 +238,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol):
self.sendMessage(payload, False) self.sendMessage(payload, False)
def onClose(self, wasClean, code, reason): def onClose(self, wasClean, code, reason):
pass #log.msg("onClose", self, self._mailbox, self._listening)
if self._mailbox and self._listening:
self._mailbox.remove_listener(self)
class WebSocketRendezvousFactory(websocket.WebSocketServerFactory): class WebSocketRendezvousFactory(websocket.WebSocketServerFactory):

View File

@ -1,9 +1,10 @@
# NO unicode_literals or static.Data() will break, because it demands # NO unicode_literals or static.Data() will break, because it demands
# a str on Python 2 # a str on Python 2
from __future__ import print_function from __future__ import print_function
import os, time, json
from twisted.python import log from twisted.python import log
from twisted.internet import reactor, endpoints from twisted.internet import reactor, endpoints
from twisted.application import service from twisted.application import service, internet
from twisted.web import server, static, resource from twisted.web import server, static, resource
from autobahn.twisted.resource import WebSocketResource from autobahn.twisted.resource import WebSocketResource
from .endpoint_service import ServerEndpointService from .endpoint_service import ServerEndpointService
@ -13,6 +14,12 @@ from .rendezvous import Rendezvous
from .rendezvous_websocket import WebSocketRendezvousFactory from .rendezvous_websocket import WebSocketRendezvousFactory
from .transit_server import Transit from .transit_server import Transit
SECONDS = 1.0
MINUTE = 60*SECONDS
CHANNEL_EXPIRATION_TIME = 11*MINUTE
EXPIRATION_CHECK_PERIOD = 10*MINUTE
class Root(resource.Resource): class Root(resource.Resource):
# child_FOO is a nevow thing, not a twisted.web.resource thing # child_FOO is a nevow thing, not a twisted.web.resource thing
def __init__(self): def __init__(self):
@ -28,7 +35,7 @@ class PrivacyEnhancedSite(server.Site):
class RelayServer(service.MultiService): class RelayServer(service.MultiService):
def __init__(self, rendezvous_web_port, transit_port, def __init__(self, rendezvous_web_port, transit_port,
advertise_version, db_url=":memory:", blur_usage=None, advertise_version, db_url=":memory:", blur_usage=None,
signal_error=None): signal_error=None, stats_file=None):
service.MultiService.__init__(self) service.MultiService.__init__(self)
self._blur_usage = blur_usage self._blur_usage = blur_usage
@ -52,11 +59,11 @@ class RelayServer(service.MultiService):
if signal_error: if signal_error:
welcome["error"] = signal_error welcome["error"] = signal_error
rendezvous = Rendezvous(db, welcome, blur_usage) self._rendezvous = Rendezvous(db, welcome, blur_usage)
rendezvous.setServiceParent(self) # for the pruning timer self._rendezvous.setServiceParent(self) # for the pruning timer
root = Root() root = Root()
wsrf = WebSocketRendezvousFactory(None, rendezvous) wsrf = WebSocketRendezvousFactory(None, self._rendezvous)
root.putChild(b"v1", WebSocketResource(wsrf)) root.putChild(b"v1", WebSocketResource(wsrf))
site = PrivacyEnhancedSite(root) site = PrivacyEnhancedSite(root)
@ -74,12 +81,21 @@ class RelayServer(service.MultiService):
transit_service = ServerEndpointService(t, transit) transit_service = ServerEndpointService(t, transit)
transit_service.setServiceParent(self) transit_service.setServiceParent(self)
self._stats_file = stats_file
if self._stats_file and os.path.exists(self._stats_file):
os.unlink(self._stats_file)
# this will be regenerated immediately, but if something goes
# wrong in dump_stats(), it's better to have a missing file than
# a stale one
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer)
t.setServiceParent(self)
# make some things accessible for tests # make some things accessible for tests
self._db = db self._db = db
self._rendezvous = rendezvous
self._root = root self._root = root
self._rendezvous_web_service = rendezvous_web_service self._rendezvous_web_service = rendezvous_web_service
self._rendezvous_websocket = wsrf self._rendezvous_websocket = wsrf
self._transit = None
if transit_port: if transit_port:
self._transit = transit self._transit = transit
self._transit_service = transit_service self._transit_service = transit_service
@ -93,3 +109,28 @@ class RelayServer(service.MultiService):
log.msg("not logging HTTP requests") log.msg("not logging HTTP requests")
else: else:
log.msg("not blurring access times") log.msg("not blurring access times")
def timer(self):
now = time.time()
old = now - CHANNEL_EXPIRATION_TIME
self._rendezvous.prune_all_apps(now, old)
self.dump_stats(now, validity=EXPIRATION_CHECK_PERIOD+60)
def dump_stats(self, now, validity):
if not self._stats_file:
return
tmpfn = self._stats_file + ".tmp"
data = {}
data["created"] = now
data["valid_until"] = now + validity
start = time.time()
data["rendezvous"] = self._rendezvous.get_stats()
data["transit"] = self._transit.get_stats()
log.msg("get_stats took:", time.time() - start)
with open(tmpfn, "wb") as f:
json.dump(data, f, indent=1)
f.write("\n")
os.rename(tmpfn, self._stats_file)

View File

@ -1,5 +1,5 @@
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
import re, time import re, time, collections
from twisted.python import log from twisted.python import log
from twisted.internet import protocol from twisted.internet import protocol
from twisted.application import service from twisted.application import service
@ -166,6 +166,8 @@ class Transit(protocol.ServerFactory, service.MultiService):
self._blur_usage = blur_usage self._blur_usage = blur_usage
self._pending_requests = {} # token -> TransitConnection self._pending_requests = {} # token -> TransitConnection
self._active_connections = set() # TransitConnection self._active_connections = set() # TransitConnection
self._counts = collections.defaultdict(int)
self._count_bytes = 0
def connection_got_token(self, token, p): def connection_got_token(self, token, p):
if token in self._pending_requests: if token in self._pending_requests:
@ -193,6 +195,8 @@ class Transit(protocol.ServerFactory, service.MultiService):
(started, total_time, waiting_time, (started, total_time, waiting_time,
total_bytes, result)) total_bytes, result))
self._db.commit() self._db.commit()
self._counts[result] += 1
self._count_bytes += total_bytes
def transitFinished(self, p, token, description): def transitFinished(self, p, token, description):
for token,tc in self._pending_requests.items(): for token,tc in self._pending_requests.items():
@ -205,3 +209,36 @@ class Transit(protocol.ServerFactory, service.MultiService):
def transitFailed(self, p): def transitFailed(self, p):
log.msg("transitFailed %r" % p) log.msg("transitFailed %r" % p)
pass pass
def get_stats(self):
stats = {}
def q(query, values=()):
row = self._db.execute(query, values).fetchone()
return list(row.values())[0]
# current status: expected to be zero most of the time
c = stats["active"] = {}
c["connected"] = len(self._active_connections) / 2
c["waiting"] = len(self._pending_requests)
# usage since last reboot
rb = stats["since_reboot"] = {}
rb["bytes"] = self._count_bytes
rb["total"] = sum(self._counts.values(), 0)
rbm = rb["moods"] = {}
for result, count in self._counts.items():
rbm[result] = count
# historical usage (all-time)
u = stats["all_time"] = {}
u["total"] = q("SELECT COUNT() FROM `transit_usage`")
u["bytes"] = q("SELECT SUM(`total_bytes`) FROM `transit_usage`") or 0
um = u["moods"] = {}
um["happy"] = q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='happy'")
um["lonely"] = q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='lonely'")
um["errory"] = q("SELECT COUNT() FROM `transit_usage`"
" WHERE `result`='errory'")
return stats

View File

@ -50,4 +50,3 @@ class DB(unittest.TestCase):
with open("new.sql","w") as f: f.write(latest_text) with open("new.sql","w") as f: f.write(latest_text)
# check with "diff -u _trial_temp/up.sql _trial_temp/new.sql" # check with "diff -u _trial_temp/up.sql _trial_temp/new.sql"
self.assertEqual(dbA_text, latest_text) self.assertEqual(dbA_text, latest_text)
test_upgrade.skip = "disabled until at least one upgrader is written"

View File

@ -427,6 +427,9 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
self.failUnlessEqual(modes[i], self.failUnlessEqual(modes[i],
stat.S_IMODE(os.stat(fn).st_mode)) stat.S_IMODE(os.stat(fn).st_mode))
# check server stats
self._rendezvous.get_stats()
def test_text(self): def test_text(self):
return self._do_test() return self._do_test()
def test_text_subprocess(self): def test_text_subprocess(self):

View File

@ -1,5 +1,5 @@
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
import json, itertools import json, itertools, time
from binascii import hexlify from binascii import hexlify
import mock import mock
from twisted.trial import unittest from twisted.trial import unittest
@ -12,7 +12,7 @@ from autobahn.twisted import websocket
from .. import __version__ from .. import __version__
from .common import ServerBase from .common import ServerBase
from ..server import rendezvous, transit_server from ..server import rendezvous, transit_server
from ..server.rendezvous import Usage, SidedMessage, Mailbox from ..server.rendezvous import Usage, SidedMessage
from ..server.database import get_db from ..server.database import get_db
class Server(ServerBase, unittest.TestCase): class Server(ServerBase, unittest.TestCase):
@ -48,165 +48,173 @@ class Server(ServerBase, unittest.TestCase):
biggest = max(nids) biggest = max(nids)
self.assert_(1000 <= biggest < 1000000, biggest) self.assert_(1000 <= biggest < 1000000, biggest)
def _nameplate(self, app, nameplate_id): def _nameplate(self, app, name):
return app._db.execute("SELECT * FROM `nameplates`" np_row = app._db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`='appid' AND `id`=?", " WHERE `app_id`='appid' AND `name`=?",
(nameplate_id,)).fetchone() (name,)).fetchone()
if not np_row:
return None, None
npid = np_row["id"]
side_rows = app._db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
return np_row, side_rows
def test_nameplate(self): def test_nameplate(self):
app = self._rendezvous.get_app("appid") app = self._rendezvous.get_app("appid")
nameplate_id = app.allocate_nameplate("side1", 0) name = app.allocate_nameplate("side1", 0)
self.assertEqual(type(nameplate_id), type("")) self.assertEqual(type(name), type(""))
nid = int(nameplate_id) nid = int(name)
self.assert_(0 < nid < 10, nid) self.assert_(0 < nid < 10, nid)
self.assertEqual(app.get_nameplate_ids(), set([nameplate_id])) self.assertEqual(app.get_nameplate_ids(), set([name]))
# allocate also does a claim # allocate also does a claim
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 1)
self.assertEqual(row["side2"], None) self.assertEqual(side_rows[0]["side"], "side1")
self.assertEqual(row["crowded"], False) self.assertEqual(side_rows[0]["added"], 0)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], None)
mailbox_id = app.claim_nameplate(nameplate_id, "side1", 1)
self.assertEqual(type(mailbox_id), type(""))
# duplicate claims by the same side are combined # duplicate claims by the same side are combined
row = self._nameplate(app, nameplate_id) mailbox_id = app.claim_nameplate(name, "side1", 1)
self.assertEqual(row["side1"], "side1") self.assertEqual(type(mailbox_id), type(""))
self.assertEqual(row["side2"], None) self.assertEqual(mailbox_id, np_row["mailbox_id"])
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["added"], 0)
self.assertEqual(mailbox_id, np_row["mailbox_id"])
mailbox_id2 = app.claim_nameplate(nameplate_id, "side1", 2) # and they don't updated the 'added' time
mailbox_id2 = app.claim_nameplate(name, "side1", 2)
self.assertEqual(mailbox_id, mailbox_id2) self.assertEqual(mailbox_id, mailbox_id2)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 1)
self.assertEqual(row["side2"], None) self.assertEqual(side_rows[0]["added"], 0)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], None)
# claim by the second side is new # claim by the second side is new
mailbox_id3 = app.claim_nameplate(nameplate_id, "side2", 3) mailbox_id3 = app.claim_nameplate(name, "side2", 3)
self.assertEqual(mailbox_id, mailbox_id3) self.assertEqual(mailbox_id, mailbox_id3)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 2)
self.assertEqual(row["side2"], "side2") self.assertEqual(sorted([row["side"] for row in side_rows]),
self.assertEqual(row["crowded"], False) sorted(["side1", "side2"]))
self.assertEqual(row["started"], 0) self.assertIn(("side2", 3),
self.assertEqual(row["second"], 3) [(row["side"], row["added"]) for row in side_rows])
# a third claim marks the nameplate as "crowded", but leaves the two # a third claim marks the nameplate as "crowded", and adds a third
# existing claims alone # claim (which must be released later), but leaves the two existing
# claims alone
self.assertRaises(rendezvous.CrowdedError, self.assertRaises(rendezvous.CrowdedError,
app.claim_nameplate, nameplate_id, "side3", 0) app.claim_nameplate, name, "side3", 4)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 3)
self.assertEqual(row["side2"], "side2")
self.assertEqual(row["crowded"], True)
# releasing a non-existent nameplate is ignored # releasing a non-existent nameplate is ignored
app.release_nameplate(nameplate_id+"not", "side4", 0) app.release_nameplate(name+"not", "side4", 0)
# releasing a side that never claimed the nameplate is ignored # releasing a side that never claimed the nameplate is ignored
app.release_nameplate(nameplate_id, "side4", 0) app.release_nameplate(name, "side4", 0)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 3)
self.assertEqual(row["side2"], "side2")
# releasing one side leaves the second claim # releasing one side leaves the second claim
app.release_nameplate(nameplate_id, "side1", 5) app.release_nameplate(name, "side1", 5)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side2") claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertEqual(row["side2"], None) self.assertIn(("side1", False), claims)
self.assertIn(("side2", True), claims)
self.assertIn(("side3", True), claims)
# releasing one side multiple times is ignored # releasing one side multiple times is ignored
app.release_nameplate(nameplate_id, "side1", 5) app.release_nameplate(name, "side1", 5)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row["side1"], "side2") claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertEqual(row["side2"], None) self.assertIn(("side1", False), claims)
self.assertIn(("side2", True), claims)
self.assertIn(("side3", True), claims)
# releasing the second side frees the nameplate, and adds usage # release the second side
app.release_nameplate(nameplate_id, "side2", 6) app.release_nameplate(name, "side2", 6)
row = self._nameplate(app, nameplate_id) np_row, side_rows = self._nameplate(app, name)
self.assertEqual(row, None) claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertIn(("side1", False), claims)
self.assertIn(("side2", False), claims)
self.assertIn(("side3", True), claims)
# releasing the third side frees the nameplate, and adds usage
app.release_nameplate(name, "side3", 7)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(np_row, None)
usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone() usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone()
self.assertEqual(usage["app_id"], "appid") self.assertEqual(usage["app_id"], "appid")
self.assertEqual(usage["started"], 0) self.assertEqual(usage["started"], 0)
self.assertEqual(usage["waiting_time"], 3) self.assertEqual(usage["waiting_time"], 3)
self.assertEqual(usage["total_time"], 6) self.assertEqual(usage["total_time"], 7)
self.assertEqual(usage["result"], "crowded") self.assertEqual(usage["result"], "crowded")
def _mailbox(self, app, mailbox_id): def _mailbox(self, app, mailbox_id):
return app._db.execute("SELECT * FROM `mailboxes`" mb_row = app._db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`='appid' AND `id`=?", " WHERE `app_id`='appid' AND `id`=?",
(mailbox_id,)).fetchone() (mailbox_id,)).fetchone()
if not mb_row:
return None, None
side_rows = app._db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(mailbox_id,)).fetchall()
return mb_row, side_rows
def test_mailbox(self): def test_mailbox(self):
app = self._rendezvous.get_app("appid") app = self._rendezvous.get_app("appid")
mailbox_id = "mid" mailbox_id = "mid"
m1 = app.open_mailbox(mailbox_id, "side1", 0) m1 = app.open_mailbox(mailbox_id, "side1", 0)
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 1)
self.assertEqual(row["side2"], None) self.assertEqual(side_rows[0]["side"], "side1")
self.assertEqual(row["crowded"], False) self.assertEqual(side_rows[0]["added"], 0)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], None)
# opening the same mailbox twice, by the same side, gets the same # opening the same mailbox twice, by the same side, gets the same
# object # object, and does not update the "added" timestamp
self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side1", 1)) self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side1", 1))
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 1)
self.assertEqual(row["side2"], None) self.assertEqual(side_rows[0]["side"], "side1")
self.assertEqual(row["crowded"], False) self.assertEqual(side_rows[0]["added"], 0)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], None)
# opening a second side gets the same object, and adds a new claim # opening a second side gets the same object, and adds a new claim
self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side2", 2)) self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side2", 2))
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 2)
self.assertEqual(row["side2"], "side2") adds = [(row["side"], row["added"]) for row in side_rows]
self.assertEqual(row["crowded"], False) self.assertIn(("side1", 0), adds)
self.assertEqual(row["started"], 0) self.assertIn(("side2", 2), adds)
self.assertEqual(row["second"], 2)
# a third open marks it as crowded # a third open marks it as crowded
self.assertRaises(rendezvous.CrowdedError, self.assertRaises(rendezvous.CrowdedError,
app.open_mailbox, mailbox_id, "side3", 3) app.open_mailbox, mailbox_id, "side3", 3)
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 3)
self.assertEqual(row["side2"], "side2") m1.close("side3", "company", 4)
self.assertEqual(row["crowded"], True)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], 2)
# closing a side that never claimed the mailbox is ignored # closing a side that never claimed the mailbox is ignored
m1.close("side4", "mood", 4) m1.close("side4", "mood", 4)
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side1") self.assertEqual(len(side_rows), 3)
self.assertEqual(row["side2"], "side2")
self.assertEqual(row["crowded"], True)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], 2)
# closing one side leaves the second claim # closing one side leaves the second claim
m1.close("side1", "mood", 5) m1.close("side1", "mood", 5)
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side2") sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows]
self.assertEqual(row["side2"], None) self.assertIn(("side1", False, "mood"), sides)
self.assertEqual(row["crowded"], True) self.assertIn(("side2", True, None), sides)
self.assertEqual(row["started"], 0) self.assertIn(("side3", False, "company"), sides)
self.assertEqual(row["second"], 2)
# closing one side multiple is ignored # closing one side multiple times is ignored
m1.close("side1", "mood", 6) m1.close("side1", "mood", 6)
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row["side1"], "side2") sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows]
self.assertEqual(row["side2"], None) self.assertIn(("side1", False, "mood"), sides)
self.assertEqual(row["crowded"], True) self.assertIn(("side2", True, None), sides)
self.assertEqual(row["started"], 0) self.assertIn(("side3", False, "company"), sides)
self.assertEqual(row["second"], 2)
l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True)
m1.add_listener("handle1", l1.append, stop1_f) m1.add_listener("handle1", l1.append, stop1_f)
@ -215,8 +223,8 @@ class Server(ServerBase, unittest.TestCase):
m1.close("side2", "mood", 7) m1.close("side2", "mood", 7)
self.assertEqual(stop1, [True]) self.assertEqual(stop1, [True])
row = self._mailbox(app, mailbox_id) mb_row, side_rows = self._mailbox(app, mailbox_id)
self.assertEqual(row, None) self.assertEqual(mb_row, None)
usage = app._db.execute("SELECT * FROM `mailbox_usage`").fetchone() usage = app._db.execute("SELECT * FROM `mailbox_usage`").fetchone()
self.assertEqual(usage["app_id"], "appid") self.assertEqual(usage["app_id"], "appid")
self.assertEqual(usage["started"], 0) self.assertEqual(usage["started"], 0)
@ -287,32 +295,36 @@ class Server(ServerBase, unittest.TestCase):
class Prune(unittest.TestCase): class Prune(unittest.TestCase):
def _get_mailbox_updated(self, app, mbox_id):
row = app._db.execute("SELECT * FROM `mailboxes` WHERE"
" `app_id`=? AND `id`=?",
(app._app_id, mbox_id)).fetchone()
return row["updated"]
def test_update(self):
db = get_db(":memory:")
rv = rendezvous.Rendezvous(db, None, None)
app = rv.get_app("appid")
mbox_id = "mbox1"
app.open_mailbox(mbox_id, "side1", 1)
self.assertEqual(self._get_mailbox_updated(app, mbox_id), 1)
mb = app.open_mailbox(mbox_id, "side2", 2)
self.assertEqual(self._get_mailbox_updated(app, mbox_id), 2)
sm = SidedMessage("side1", "phase", "body", 3, "msgid")
mb.add_message(sm)
self.assertEqual(self._get_mailbox_updated(app, mbox_id), 3)
def test_apps(self): def test_apps(self):
rv = rendezvous.Rendezvous(get_db(":memory:"), None, None) rv = rendezvous.Rendezvous(get_db(":memory:"), None, None)
app = rv.get_app("appid") app = rv.get_app("appid")
app.allocate_nameplate("side", 121) app.allocate_nameplate("side", 121)
app.prune = mock.Mock() app.prune = mock.Mock()
rv.prune(now=123, old=122) rv.prune_all_apps(now=123, old=122)
self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)]) self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)])
def test_active(self): def test_nameplates(self):
rv = rendezvous.Rendezvous(get_db(":memory:"), None, None)
app = rv.get_app("appid1")
self.assertFalse(app.is_active())
mb = app.open_mailbox("mbid", "side1", 0)
self.assertFalse(mb.is_active())
self.assertFalse(app.is_active())
mb.add_listener("handle", None, None)
self.assertTrue(mb.is_active())
self.assertTrue(app.is_active())
mb.remove_listener("handle")
self.assertFalse(mb.is_active())
self.assertFalse(app.is_active())
def test_basic(self):
db = get_db(":memory:") db = get_db(":memory:")
rv = rendezvous.Rendezvous(db, None, 3600) rv = rendezvous.Rendezvous(db, None, 3600)
@ -320,14 +332,11 @@ class Prune(unittest.TestCase):
#OLD = "old"; NEW = "new" #OLD = "old"; NEW = "new"
#when = {OLD: 1, NEW: 60} #when = {OLD: 1, NEW: 60}
new_nameplates = set() new_nameplates = set()
new_mailboxes = set()
new_messages = set()
APPID = "appid" APPID = "appid"
app = rv.get_app(APPID) app = rv.get_app(APPID)
# Exercise the first-vs-second newness tests. These nameplates have # Exercise the first-vs-second newness tests
# no mailbox.
app.claim_nameplate("np-1", "side1", 1) app.claim_nameplate("np-1", "side1", 1)
app.claim_nameplate("np-2", "side1", 1) app.claim_nameplate("np-2", "side1", 1)
app.claim_nameplate("np-2", "side2", 2) app.claim_nameplate("np-2", "side2", 2)
@ -340,7 +349,28 @@ class Prune(unittest.TestCase):
app.claim_nameplate("np-5", "side2", 61) app.claim_nameplate("np-5", "side2", 61)
new_nameplates.add("np-5") new_nameplates.add("np-5")
# same for mailboxes rv.prune_all_apps(now=123, old=50)
nameplates = set([row["name"] for row in
db.execute("SELECT * FROM `nameplates`").fetchall()])
self.assertEqual(new_nameplates, nameplates)
mailboxes = set([row["id"] for row in
db.execute("SELECT * FROM `mailboxes`").fetchall()])
self.assertEqual(len(new_nameplates), len(mailboxes))
def test_mailboxes(self):
db = get_db(":memory:")
rv = rendezvous.Rendezvous(db, None, 3600)
# timestamps <=50 are "old", >=51 are "new"
#OLD = "old"; NEW = "new"
#when = {OLD: 1, NEW: 60}
new_mailboxes = set()
APPID = "appid"
app = rv.get_app(APPID)
# Exercise the first-vs-second newness tests
app.open_mailbox("mb-11", "side1", 1) app.open_mailbox("mb-11", "side1", 1)
app.open_mailbox("mb-12", "side1", 1) app.open_mailbox("mb-12", "side1", 1)
app.open_mailbox("mb-12", "side2", 2) app.open_mailbox("mb-12", "side2", 2)
@ -353,37 +383,26 @@ class Prune(unittest.TestCase):
app.open_mailbox("mb-15", "side2", 61) app.open_mailbox("mb-15", "side2", 61)
new_mailboxes.add("mb-15") new_mailboxes.add("mb-15")
rv.prune(now=123, old=50) rv.prune_all_apps(now=123, old=50)
nameplates = set([row["id"] for row in
db.execute("SELECT * FROM `nameplates`").fetchall()])
self.assertEqual(new_nameplates, nameplates)
mailboxes = set([row["id"] for row in mailboxes = set([row["id"] for row in
db.execute("SELECT * FROM `mailboxes`").fetchall()]) db.execute("SELECT * FROM `mailboxes`").fetchall()])
self.assertEqual(new_mailboxes, mailboxes) self.assertEqual(new_mailboxes, mailboxes)
messages = set([row["msg_id"] for row in
db.execute("SELECT * FROM `messages`").fetchall()])
self.assertEqual(new_messages, messages)
def test_lots(self): def test_lots(self):
OLD = "old"; NEW = "new" OLD = "old"; NEW = "new"
for nameplate in [None, OLD, NEW]: for nameplate in [False, True]:
for mailbox in [None, OLD, NEW]: for mailbox in [OLD, NEW]:
listeners = [False] for has_listeners in [False, True]:
if mailbox is not None: self.one(nameplate, mailbox, has_listeners)
listeners = [False, True]
for has_listeners in listeners:
for messages in [None, OLD, NEW]:
self.one(nameplate, mailbox, has_listeners, messages)
# def test_one(self): def test_one(self):
# # to debug specific problems found by test_lots # to debug specific problems found by test_lots
# self.one(None, "old", False, 'new') self.one(None, "new", False)
def one(self, nameplate, mailbox, has_listeners, messages): def one(self, nameplate, mailbox, has_listeners):
desc = ("nameplate=%s, mailbox=%s, has_listeners=%s," desc = ("nameplate=%s, mailbox=%s, has_listeners=%s" %
" messages=%s" % (nameplate, mailbox, has_listeners))
(nameplate, mailbox, has_listeners, messages))
log.msg(desc) log.msg(desc)
db = get_db(":memory:") db = get_db(":memory:")
@ -396,50 +415,30 @@ class Prune(unittest.TestCase):
when = {OLD: 1, NEW: 60} when = {OLD: 1, NEW: 60}
nameplate_survives = False nameplate_survives = False
mailbox_survives = False mailbox_survives = False
messages_survive = False
mbid = "mbid" mbid = "mbid"
if nameplate is not None: if nameplate:
app.claim_nameplate("npid", "side1", when[nameplate], mbid = app.claim_nameplate("npid", "side1", when[mailbox])
_test_mailbox_id=mbid) mb = app.open_mailbox(mbid, "side1", when[mailbox])
if mailbox is not None:
mb = app.open_mailbox(mbid, "side1", when[mailbox])
else:
# We might want a Mailbox, because that's the easiest way to add
# a "messages" row, but we can't use app.open_mailbox() because
# that modifies both the "mailboxes" table and app._mailboxes,
# and sometimes we're testing what happens when there are
# messages but not a mailbox
mb = Mailbox(app, db, APPID, mbid)
# we need app._mailboxes to know about this, because that's
# where it looks to find listeners
app._mailboxes[mbid] = mb
if messages is not None: # the pruning algorithm doesn't care about the age of messages,
sm = SidedMessage("side1", "phase", "body", when[messages], # because mailbox.updated is always updated each time we add a
"msgid") # message
mb.add_message(sm) sm = SidedMessage("side1", "phase", "body", when[mailbox], "msgid")
mb.add_message(sm)
if has_listeners: if has_listeners:
mb.add_listener("handle", None, None) mb.add_listener("handle", None, None)
if mailbox is None and messages is not None: if (mailbox == NEW or has_listeners):
# orphaned messages, even new ones, can't keep a nameplate alive if nameplate:
messages = None
messages_survive = False
if (nameplate == NEW or mailbox == NEW
or has_listeners or messages == NEW):
if nameplate is not None:
nameplate_survives = True nameplate_survives = True
if mailbox is not None: mailbox_survives = True
mailbox_survives = True messages_survive = mailbox_survives
if messages is not None:
messages_survive = True
rv.prune(now=123, old=50) rv.prune_all_apps(now=123, old=50)
nameplates = set([row["id"] for row in nameplates = set([row["name"] for row in
db.execute("SELECT * FROM `nameplates`").fetchall()]) db.execute("SELECT * FROM `nameplates`").fetchall()])
self.assertEqual(nameplate_survives, bool(nameplates), self.assertEqual(nameplate_survives, bool(nameplates),
("nameplate", nameplate_survives, nameplates, desc)) ("nameplate", nameplate_survives, nameplates, desc))
@ -485,6 +484,14 @@ class WSClient(websocket.WebSocketClientProtocol):
return return
self.events.append(event) self.events.append(event)
def close(self):
self.d = defer.Deferred()
self.transport.loseConnection()
return self.d
def onClose(self, wasClean, code, reason):
if self.d:
self.d.callback((wasClean, code, reason))
def next_event(self): def next_event(self):
assert not self.d assert not self.d
if self.events: if self.events:
@ -695,6 +702,18 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
nids.add(n["id"]) nids.add(n["id"])
self.assertEqual(nids, set([nameplate_id1, "np2"])) self.assertEqual(nids, set([nameplate_id1, "np2"]))
def _nameplate(self, app, name):
np_row = app._db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`='appid' AND `name`=?",
(name,)).fetchone()
if not np_row:
return None, None
npid = np_row["id"]
side_rows = app._db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
return np_row, side_rows
@inlineCallbacks @inlineCallbacks
def test_allocate(self): def test_allocate(self):
c1 = yield self.make_client() c1 = yield self.make_client()
@ -710,11 +729,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send("allocate") c1.send("allocate")
m = yield c1.next_non_ack() m = yield c1.next_non_ack()
self.assertEqual(m["type"], "allocated") self.assertEqual(m["type"], "allocated")
nameplate_id = m["nameplate"] name = m["nameplate"]
nids = app.get_nameplate_ids() nids = app.get_nameplate_ids()
self.assertEqual(len(nids), 1) self.assertEqual(len(nids), 1)
self.assertEqual(nameplate_id, list(nids)[0]) self.assertEqual(name, list(nids)[0])
c1.send("allocate") c1.send("allocate")
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
@ -722,13 +741,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(err["error"], self.assertEqual(err["error"],
"you already allocated one, don't be greedy") "you already allocated one, don't be greedy")
c1.send("claim", nameplate=nameplate_id) # allocate+claim is ok c1.send("claim", nameplate=name) # allocate+claim is ok
yield c1.sync() yield c1.sync()
row = app._db.execute("SELECT * FROM `nameplates`" np_row, side_rows = self._nameplate(app, name)
" WHERE `app_id`='appid' AND `id`=?", self.assertEqual(len(side_rows), 1)
(nameplate_id,)).fetchone() self.assertEqual(side_rows[0]["side"], "side")
self.assertEqual(row["side1"], "side")
self.assertEqual(row["side2"], None)
@inlineCallbacks @inlineCallbacks
def test_claim(self): def test_claim(self):
@ -751,12 +768,15 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
nids = app.get_nameplate_ids() nids = app.get_nameplate_ids()
self.assertEqual(len(nids), 1) self.assertEqual(len(nids), 1)
self.assertEqual("np1", list(nids)[0]) self.assertEqual("np1", list(nids)[0])
np_row, side_rows = self._nameplate(app, "np1")
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["side"], "side")
# claiming a nameplate will assign a random mailbox id, but won't # claiming a nameplate assigns a random mailbox id and creates the
# create the mailbox itself # mailbox row
mailboxes = app._db.execute("SELECT * FROM `mailboxes`" mailboxes = app._db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`='appid'").fetchall() " WHERE `app_id`='appid'").fetchall()
self.assertEqual(len(mailboxes), 0) self.assertEqual(len(mailboxes), 1)
@inlineCallbacks @inlineCallbacks
def test_claim_crowded(self): def test_claim_crowded(self):
@ -796,10 +816,10 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
m = yield c1.next_non_ack() m = yield c1.next_non_ack()
self.assertEqual(m["type"], "released") self.assertEqual(m["type"], "released")
row = app._db.execute("SELECT * FROM `nameplates`" np_row, side_rows = self._nameplate(app, "np1")
" WHERE `app_id`='appid' AND `id`='np1'").fetchone() claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertEqual(row["side1"], "side2") self.assertIn(("side", False), claims)
self.assertEqual(row["side2"], None) self.assertIn(("side2", True), claims)
c1.send("release") # no longer claimed c1.send("release") # no longer claimed
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
@ -828,6 +848,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
m = yield c1.next_non_ack() m = yield c1.next_non_ack()
self.assertEqual(m["type"], "message") self.assertEqual(m["type"], "message")
self.assertEqual(m["body"], "body") self.assertEqual(m["body"], "body")
self.assertTrue(mb1.has_listeners())
mb1.add_message(SidedMessage(side="side2", phase="phase2", mb1.add_message(SidedMessage(side="side2", phase="phase2",
body="body2", server_rx=0, body="body2", server_rx=0,
@ -881,6 +902,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1 = yield self.make_client() c1 = yield self.make_client()
yield c1.next_non_ack() yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side") c1.send("bind", appid="appid", side="side")
app = self._rendezvous.get_app("appid")
c1.send("close", mood="mood") # must open first c1.send("close", mood="mood") # must open first
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
@ -888,72 +910,103 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(err["error"], "must open mailbox before closing") self.assertEqual(err["error"], "must open mailbox before closing")
c1.send("open", mailbox="mb1") c1.send("open", mailbox="mb1")
yield c1.sync()
mb1 = app._mailboxes["mb1"]
self.assertTrue(mb1.has_listeners())
c1.send("close", mood="mood") c1.send("close", mood="mood")
m = yield c1.next_non_ack() m = yield c1.next_non_ack()
self.assertEqual(m["type"], "closed") self.assertEqual(m["type"], "closed")
self.assertFalse(mb1.has_listeners())
c1.send("close", mood="mood") # already closed c1.send("close", mood="mood") # already closed
err = yield c1.next_non_ack() err = yield c1.next_non_ack()
self.assertEqual(err["type"], "error") self.assertEqual(err["type"], "error")
self.assertEqual(err["error"], "must open mailbox before closing") self.assertEqual(err["error"], "must open mailbox before closing")
@inlineCallbacks
def test_disconnect(self):
c1 = yield self.make_client()
yield c1.next_non_ack()
c1.send("bind", appid="appid", side="side")
app = self._rendezvous.get_app("appid")
c1.send("open", mailbox="mb1")
yield c1.sync()
mb1 = app._mailboxes["mb1"]
self.assertTrue(mb1.has_listeners())
yield c1.close()
# wait for the server to notice the socket has closed
started = time.time()
while mb1.has_listeners() and (time.time()-started < 5.0):
d = defer.Deferred()
reactor.callLater(0.01, d.callback, None)
yield d
self.assertFalse(mb1.has_listeners())
class Summary(unittest.TestCase): class Summary(unittest.TestCase):
def test_mailbox(self): def test_mailbox(self):
app = rendezvous.AppNamespace(None, None, False, None) app = rendezvous.AppNamespace(None, None, False, None)
# starts at time 1, maybe gets second open at time 3, closes at 5 # starts at time 1, maybe gets second open at time 3, closes at 5
base_row = {"started": 1, "second": None, def s(rows, pruned=False):
"first_mood": None, "crowded": False} return app._summarize_mailbox(rows, 5, pruned)
def summ(num_sides, second_mood=None, pruned=False, **kwargs):
row = base_row.copy()
row.update(kwargs)
return app._summarize_mailbox(row, num_sides, second_mood, 5,
pruned)
self.assertEqual(summ(1), Usage(1, None, 4, "lonely")) rows = [dict(added=1)]
self.assertEqual(summ(1, "lonely"), Usage(1, None, 4, "lonely")) self.assertEqual(s(rows), Usage(1, None, 4, "lonely"))
self.assertEqual(summ(1, "errory"), Usage(1, None, 4, "errory")) rows = [dict(added=1, mood="lonely")]
self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, "crowded")) self.assertEqual(s(rows), Usage(1, None, 4, "lonely"))
rows = [dict(added=1, mood="errory")]
self.assertEqual(s(rows), Usage(1, None, 4, "errory"))
rows = [dict(added=1, mood=None)]
self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney"))
rows = [dict(added=1, mood="happy")]
self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney"))
self.assertEqual(summ(2, first_mood="happy", rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")]
second=3, second_mood="happy"), self.assertEqual(s(rows), Usage(1, 2, 4, "happy"))
Usage(1, 2, 4, "happy"))
self.assertEqual(summ(2, first_mood="errory", rows = [dict(added=1, mood="errory"), dict(added=3, mood="happy")]
second=3, second_mood="happy"), self.assertEqual(s(rows), Usage(1, 2, 4, "errory"))
Usage(1, 2, 4, "errory"))
self.assertEqual(summ(2, first_mood="happy", rows = [dict(added=1, mood="happy"), dict(added=3, mood="errory")]
second=3, second_mood="errory"), self.assertEqual(s(rows), Usage(1, 2, 4, "errory"))
Usage(1, 2, 4, "errory"))
self.assertEqual(summ(2, first_mood="scary", rows = [dict(added=1, mood="scary"), dict(added=3, mood="happy")]
second=3, second_mood="happy"), self.assertEqual(s(rows), Usage(1, 2, 4, "scary"))
Usage(1, 2, 4, "scary"))
self.assertEqual(summ(2, first_mood="scary", rows = [dict(added=1, mood="scary"), dict(added=3, mood="errory")]
second=3, second_mood="errory"), self.assertEqual(s(rows), Usage(1, 2, 4, "scary"))
Usage(1, 2, 4, "scary"))
self.assertEqual(summ(2, first_mood="happy", second=3, pruned=True), rows = [dict(added=1, mood="happy"), dict(added=3, mood=None)]
Usage(1, 2, 4, "pruney")) self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney"))
rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")]
self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney"))
rows = [dict(added=1), dict(added=3), dict(added=4)]
self.assertEqual(s(rows), Usage(1, 2, 4, "crowded"))
rows = [dict(added=1), dict(added=3), dict(added=4)]
self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "crowded"))
def test_nameplate(self): def test_nameplate(self):
a = rendezvous.AppNamespace(None, None, False, None) a = rendezvous.AppNamespace(None, None, False, None)
# starts at time 1, maybe gets second open at time 3, closes at 5 # starts at time 1, maybe gets second open at time 3, closes at 5
base_row = {"started": 1, "second": None, "crowded": False} def s(rows, pruned=False):
def summ(num_sides, pruned=False, **kwargs): return a._summarize_nameplate_usage(rows, 5, pruned)
row = base_row.copy()
row.update(kwargs)
return a._summarize_nameplate_usage(row, 5, pruned)
self.assertEqual(summ(1), Usage(1, None, 4, "lonely")) rows = [dict(added=1)]
self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, "crowded")) self.assertEqual(s(rows), Usage(1, None, 4, "lonely"))
rows = [dict(added=1), dict(added=3)]
self.assertEqual(s(rows), Usage(1, 2, 4, "happy"))
self.assertEqual(summ(2, second=3), Usage(1, 2, 4, "happy")) rows = [dict(added=1), dict(added=3)]
self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney"))
rows = [dict(added=1), dict(added=3), dict(added=4)]
self.assertEqual(s(rows), Usage(1, 2, 4, "crowded"))
self.assertEqual(summ(2, second=3, pruned=True),
Usage(1, 2, 4, "pruney"))
def test_blur(self): def test_blur(self):
db = get_db(":memory:") db = get_db(":memory:")
@ -961,14 +1014,14 @@ class Summary(unittest.TestCase):
APPID = "appid" APPID = "appid"
app = rv.get_app(APPID) app = rv.get_app(APPID)
app.claim_nameplate("npid", "side1", 10) # start time is 10 app.claim_nameplate("npid", "side1", 10) # start time is 10
rv.prune(now=123, old=50) rv.prune_all_apps(now=123, old=50)
# start time should be rounded to top of the hour (blur_usage=3600) # start time should be rounded to top of the hour (blur_usage=3600)
row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() row = db.execute("SELECT * FROM `nameplate_usage`").fetchone()
self.assertEqual(row["started"], 0) self.assertEqual(row["started"], 0)
app = rv.get_app(APPID) app = rv.get_app(APPID)
app.open_mailbox("mbid", "side1", 20) # start time is 20 app.open_mailbox("mbid", "side1", 20) # start time is 20
rv.prune(now=123, old=50) rv.prune_all_apps(now=123, old=50)
row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() row = db.execute("SELECT * FROM `mailbox_usage`").fetchone()
self.assertEqual(row["started"], 0) self.assertEqual(row["started"], 0)
@ -978,13 +1031,15 @@ class Summary(unittest.TestCase):
APPID = "appid" APPID = "appid"
app = rv.get_app(APPID) app = rv.get_app(APPID)
app.claim_nameplate("npid", "side1", 10) # start time is 10 app.claim_nameplate("npid", "side1", 10) # start time is 10
rv.prune(now=123, old=50) rv.prune_all_apps(now=123, old=50)
row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() row = db.execute("SELECT * FROM `nameplate_usage`").fetchone()
self.assertEqual(row["started"], 10) self.assertEqual(row["started"], 10)
db.execute("DELETE FROM `mailbox_usage`")
db.commit()
app = rv.get_app(APPID) app = rv.get_app(APPID)
app.open_mailbox("mbid", "side1", 20) # start time is 20 app.open_mailbox("mbid", "side1", 20) # start time is 20
rv.prune(now=123, old=50) rv.prune_all_apps(now=123, old=50)
row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() row = db.execute("SELECT * FROM `mailbox_usage`").fetchone()
self.assertEqual(row["started"], 20) self.assertEqual(row["started"], 20)