diff --git a/misc/dump-stats.py b/misc/dump-stats.py new file mode 100755 index 0000000..878a399 --- /dev/null +++ b/misc/dump-stats.py @@ -0,0 +1,19 @@ +from __future__ import print_function +import time, json + +# Run this as 'watch python misc/dump-stats.py' against a 'wormhole-server +# start --stats-file=stats.json' + +with open("stats.json") as f: + data_s = f.read() + +now = time.time() +data = json.loads(data_s) +if now < data["valid_until"]: + valid = "valid" +else: + valid = "EXPIRED" +age = now - data["created"] + +print("age: %d (%s)" % (age, valid)) +print(data_s) diff --git a/misc/munin/wormhole_active b/misc/munin/wormhole_active new file mode 100755 index 0000000..a9d1db7 --- /dev/null +++ b/misc/munin/wormhole_active @@ -0,0 +1,50 @@ +#! /usr/bin/env python + +""" +Use the following in /etc/munin/plugin-conf.d/wormhole : + +[wormhole_*] +env.serverdir /path/to/your/wormhole/server +""" + +import os, sys, time, json + +CONFIG = """\ +graph_title Magic-Wormhole Active Channels +graph_vlabel Channels +graph_category network +nameplates.label Nameplates +nameplates.draw LINE2 +nameplates.type GAUGE +mailboxes.label Mailboxes +mailboxes.draw LINE2 +mailboxes.type GAUGE +messages.label Messages +messages.draw LINE1 +messages.type GAUGE +transit_waiting.label Transit Waiting +transit_waiting.draw LINE1 +transit_waiting.type GAUGE +transit_connected.label Transit Connected +transit_connected.draw LINE1 +transit_connected.type GAUGE +""" + +if len(sys.argv) > 1 and sys.argv[1] == "config": + print CONFIG.rstrip() + sys.exit(0) + +serverdir = os.environ["serverdir"] +fn = os.path.join(serverdir, "stats.json") +with open(fn) as f: + data = json.load(f) +if time.time() > data["valid_until"]: + sys.exit(1) # expired + +ra = data["rendezvous"]["active"] +print "nameplates.value", ra["nameplates_total"] +print "mailboxes.value", ra["mailboxes_total"] +print "messages.value", ra["messages_total"] +ta = data["transit"]["active"] +print "transit_waiting.value", ta["waiting"] +print "transit_connected.value", ta["connected"] diff --git a/misc/munin/wormhole_channels b/misc/munin/wormhole_channels deleted file mode 100755 index 4b306b2..0000000 --- a/misc/munin/wormhole_channels +++ /dev/null @@ -1,85 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, sqlite3 - -def count_events(): - serverdir = os.environ["serverdir"] - dbfile = os.path.join(serverdir, "relay.sqlite") - if not os.path.exists(dbfile): - print "cannot find relay.sqlite, please set env.serverdir" - sys.exit(1) - db = sqlite3.connect(dbfile) - db.row_factory = sqlite3.Row - - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - def q(query, values=()): - return db.execute(query, values).fetchone()[0] - OLD = time.time() - 10*60 - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplates`")) - add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is null")) - add("connected nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is not null")) - add("stale nameplates", q("SELECT COUNT() FROM `nameplates`" - " where `updated` < ?", (OLD,))) - - add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`")) - add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is null")) - add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is not null")) - - stale_mailboxes = 0 - for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall(): - newest = db.execute("SELECT `server_rx` FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " ORDER BY `server_rx` DESC LIMIT 1", - (mbox_row["app_id"], mbox_row["id"])).fetchone() - if newest and newest[0] < OLD: - stale_mailboxes += 1 - add("stale mailboxes", stale_mailboxes) - - add("messages", q("SELECT COUNT() FROM `messages`")) - - return c_dict - -CONFIG = """\ -graph_title Magic-Wormhole Active Channels -graph_vlabel Channels -graph_category network -nameplates.label Total Nameplates -nameplates.draw LINE2 -nameplates.type GAUGE -waiting_nameplates.label Waiting Nameplates -waiting_nameplates.draw LINE2 -waiting_nameplates.type GAUGE -mailboxes.label Total Mailboxes -mailboxes.draw LINE2 -mailboxes.type GAUGE -waiting_mailboxes.label Waiting Mailboxes -waiting_mailboxes.draw LINE2 -waiting_mailboxes.type GAUGE -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -c = count_events() -print "nameplates.value", c["total nameplates"] -print "waiting_nameplates.value", c["waiting nameplates"] -print "mailboxes.value", c["total mailboxes"] -print "waiting_mailboxes.value", c["waiting mailboxes"] diff --git a/misc/munin/wormhole_errors b/misc/munin/wormhole_errors index 02b832d..05c3cf5 100755 --- a/misc/munin/wormhole_errors +++ b/misc/munin/wormhole_errors @@ -7,88 +7,41 @@ Use the following in /etc/munin/plugin-conf.d/wormhole : env.serverdir /path/to/your/wormhole/server """ -import os, sys, sqlite3 - -def count_events(): - serverdir = os.environ["serverdir"] - dbfile = os.path.join(serverdir, "relay.sqlite") - if not os.path.exists(dbfile): - print "cannot find relay.sqlite, please set env.serverdir" - sys.exit(1) - db = sqlite3.connect(dbfile) - - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - def q(query, values=()): - return db.execute(query, values).fetchone()[0] - - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`")) - add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='happy'")) - add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='lonely'")) - add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='pruney'")) - add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='crowded'")) - - add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`")) - add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='happy'")) - add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='scary'")) - add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='lonely'")) - add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='errory'")) - add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='pruney'")) - add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='crowded'")) - - add("total transit", q("SELECT COUNT() FROM `transit_usage`")) - add("happy transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='happy'")) - add("lonely transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='lonely'")) - add("errory transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='errory'")) - - add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`")) - - return c_dict +import os, sys, time, json CONFIG = """\ graph_title Magic-Wormhole Server Errors -graph_vlabel Events per Hour +graph_vlabel Events Since Reboot graph_category network -nameplates.label Nameplates +nameplates.label Nameplate Errors (total) nameplates.draw LINE1 -nameplates.type DERIVE -nameplates.min 0 -nameplates.cdef nameplates,3600,* -mailboxes.label Mailboxes +nameplates.type GAUGE +mailboxes.label Mailboxes (total) mailboxes.draw LINE1 -mailboxes.type DERIVE -mailboxes.min 0 -mailboxes.cdef mailboxes,3600,* +mailboxes.type GAUGE +mailboxes_scary.label Mailboxes (scary) +mailboxes_scary.draw LINE1 +mailboxes_scary.type GAUGE transit.label Transit transit.draw LINE1 -transit.type DERIVE -transit.min 0 -transit.cdef transit,3600,* +transit.type GAUGE """ if len(sys.argv) > 1 and sys.argv[1] == "config": print CONFIG.rstrip() sys.exit(0) -c = count_events() -print "nameplates.value", c["total nameplates"] - c["happy nameplates"] -print "mailboxes.value", c["total mailboxes"] - c["happy mailboxes"] -print "transit.value", c["total transit"] - c["happy transit"] +serverdir = os.environ["serverdir"] +fn = os.path.join(serverdir, "stats.json") +with open(fn) as f: + data = json.load(f) +if time.time() > data["valid_until"]: + sys.exit(1) # expired + +r = data["rendezvous"]["since_reboot"] +print "nameplates.value", (r["nameplates_total"] + - r["nameplate_moods"].get("happy", 0)) +print "mailboxes.value", (r["mailboxes_total"] + - r["mailbox_moods"].get("happy", 0)) +t = data["transit"]["since_reboot"] +print "transit.value", (t["total"] - t["moods"].get("happy", 0)) diff --git a/misc/munin/wormhole_event_rate b/misc/munin/wormhole_event_rate new file mode 100755 index 0000000..1fb9420 --- /dev/null +++ b/misc/munin/wormhole_event_rate @@ -0,0 +1,50 @@ +#! /usr/bin/env python + +""" +Use the following in /etc/munin/plugin-conf.d/wormhole : + +[wormhole_*] +env.serverdir /path/to/your/wormhole/server +""" + +import os, sys, time, json + +CONFIG = """\ +graph_title Magic-Wormhole Server Events +graph_vlabel Events per Hour +graph_category network +happy.label Happy +happy.draw LINE +happy.type DERIVE +happy.min 0 +happy.max 60 +happy.cdef happy,3600,* +incomplete.label Incomplete +incomplete.draw LINE +incomplete.type DERIVE +incomplete.min 0 +incomplete.max 60 +incomplete.cdef happy,3600,* +scary.label Scary +scary.draw LINE +scary.type DERIVE +scary.min 0 +scary.max 60 +scary.cdef happy,3600,* +""" + +if len(sys.argv) > 1 and sys.argv[1] == "config": + print CONFIG.rstrip() + sys.exit(0) + +serverdir = os.environ["serverdir"] +fn = os.path.join(serverdir, "stats.json") +with open(fn) as f: + data = json.load(f) +if time.time() > data["valid_until"]: + sys.exit(1) # expired + +atm = data["rendezvous"]["all_time"]["mailbox_moods"] +print "happy.value", atm.get("happy", 0) +print "incomplete.value", (atm.get("pruney", 0) + atm.get("lonely", 0)) +print "scary.value", atm.get("scary", 0) diff --git a/misc/munin/wormhole_events b/misc/munin/wormhole_events index 3ca5d82..b40ca75 100755 --- a/misc/munin/wormhole_events +++ b/misc/munin/wormhole_events @@ -7,88 +7,48 @@ Use the following in /etc/munin/plugin-conf.d/wormhole : env.serverdir /path/to/your/wormhole/server """ -import os, sys, sqlite3 - -def count_events(): - serverdir = os.environ["serverdir"] - dbfile = os.path.join(serverdir, "relay.sqlite") - if not os.path.exists(dbfile): - print "cannot find relay.sqlite, please set env.serverdir" - sys.exit(1) - db = sqlite3.connect(dbfile) - - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - def q(query, values=()): - return db.execute(query, values).fetchone()[0] - - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplate_usage`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplate_usage`")) - add("happy nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='happy'")) - add("lonely nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='lonely'")) - add("pruney nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='pruney'")) - add("crowded nameplates", q("SELECT COUNT() FROM `nameplate_usage`" - " WHERE `result`='crowded'")) - - add("total mailboxes", q("SELECT COUNT() FROM `mailbox_usage`")) - add("happy mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='happy'")) - add("scary mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='scary'")) - add("lonely mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='lonely'")) - add("errory mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='errory'")) - add("pruney mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='pruney'")) - add("crowded mailboxes", q("SELECT COUNT() FROM `mailbox_usage`" - " WHERE `result`='crowded'")) - - add("total transit", q("SELECT COUNT() FROM `transit_usage`")) - add("happy transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='happy'")) - add("lonely transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='lonely'")) - add("errory transit", q("SELECT COUNT() FROM `transit_usage`" - " WHERE `result`='errory'")) - - add("transit bytes", q("SELECT SUM(`total_bytes`) FROM `transit_usage`")) - - return c_dict +import os, sys, time, json CONFIG = """\ -graph_title Magic-Wormhole Server Events -graph_vlabel Events per Hour +graph_title Magic-Wormhole Mailbox Events +graph_vlabel Events Since Reboot graph_category network -nameplates.label Nameplates -nameplates.draw LINE -nameplates.type DERIVE -nameplates.min 0 -nameplates.cdef nameplates,3600,* -mailboxes.label Mailboxes -mailboxes.draw LINE -mailboxes.type DERIVE -mailboxes.min 0 -mailboxes.cdef mailboxes,3600,* -transit.label Transit -transit.draw LINE -transit.type DERIVE -transit.min 0 -transit.cdef transit,3600,* +total.label Total +total.draw LINE1 +total.type GAUGE +happy.label Happy +happy.draw LINE2 +happy.type GAUGE +pruney.label Pruney +pruney.draw LINE1 +pruney.type GAUGE +incomplete.label Incomplete (pruned/lonely) +incomplete.draw LINE2 +incomplete.type GAUGE +scary.label Scary +scary.draw LINE1 +scary.type GAUGE +errory.label Errory +errory.draw LINE1 +errory.type GAUGE """ if len(sys.argv) > 1 and sys.argv[1] == "config": print CONFIG.rstrip() sys.exit(0) -c = count_events() -print "nameplates.value", c["total nameplates"] -print "mailboxes.value", c["total mailboxes"] -print "transit.value", c["total transit"] +serverdir = os.environ["serverdir"] +fn = os.path.join(serverdir, "stats.json") +with open(fn) as f: + data = json.load(f) +if time.time() > data["valid_until"]: + sys.exit(1) # expired + +r = data["rendezvous"]["since_reboot"] +print "total.value", r["mailboxes_total"] +print "happy.value", r["mailbox_moods"].get("happy", 0) +print "pruney.value", r["mailbox_moods"].get("pruney", 0) +print "incomplete.value", (r["mailbox_moods"].get("pruney", 0) + + r["mailbox_moods"].get("lonely", 0)) +print "scary.value", r["mailbox_moods"].get("scary", 0) +print "errory.value", r["mailbox_moods"].get("errory", 0) diff --git a/misc/munin/wormhole_messages b/misc/munin/wormhole_messages deleted file mode 100755 index 3dc6acd..0000000 --- a/misc/munin/wormhole_messages +++ /dev/null @@ -1,73 +0,0 @@ -#! /usr/bin/env python - -""" -Use the following in /etc/munin/plugin-conf.d/wormhole : - -[wormhole_*] -env.serverdir /path/to/your/wormhole/server -""" - -import os, sys, time, sqlite3 - -def count_events(): - serverdir = os.environ["serverdir"] - dbfile = os.path.join(serverdir, "relay.sqlite") - if not os.path.exists(dbfile): - print "cannot find relay.sqlite, please set env.serverdir" - sys.exit(1) - db = sqlite3.connect(dbfile) - db.row_factory = sqlite3.Row - - c_list = [] - c_dict = {} - def add(key, value): - c_list.append((key, value)) - c_dict[key] = value - def q(query, values=()): - return db.execute(query, values).fetchone()[0] - OLD = time.time() - 10*60 - add("apps", q("SELECT COUNT(DISTINCT(`app_id`)) FROM `nameplates`")) - - add("total nameplates", q("SELECT COUNT() FROM `nameplates`")) - add("waiting nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is null")) - add("connected nameplates", q("SELECT COUNT() FROM `nameplates`" - " WHERE `second` is not null")) - add("stale nameplates", q("SELECT COUNT() FROM `nameplates`" - " where `updated` < ?", (OLD,))) - - add("total mailboxes", q("SELECT COUNT() FROM `mailboxes`")) - add("waiting mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is null")) - add("connected mailboxes", q("SELECT COUNT() FROM `mailboxes`" - " WHERE `second` is not null")) - - stale_mailboxes = 0 - for mbox_row in db.execute("SELECT * FROM `mailboxes`").fetchall(): - newest = db.execute("SELECT `server_rx` FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " ORDER BY `server_rx` DESC LIMIT 1", - (mbox_row["app_id"], mbox_row["id"])).fetchone() - if newest and newest[0] < OLD: - stale_mailboxes += 1 - add("stale mailboxes", stale_mailboxes) - - add("messages", q("SELECT COUNT() FROM `messages`")) - - return c_dict - -CONFIG = """\ -graph_title Magic-Wormhole Queued Messages -graph_vlabel Messages -graph_category network -messages.label Total Messages -messages.draw LINE2 -messages.type GAUGE -""" - -if len(sys.argv) > 1 and sys.argv[1] == "config": - print CONFIG.rstrip() - sys.exit(0) - -c = count_events() -print "messages.value", c["messages"] diff --git a/misc/munin/wormhole_transit b/misc/munin/wormhole_transit index e5211a3..92a0d01 100755 --- a/misc/munin/wormhole_transit +++ b/misc/munin/wormhole_transit @@ -7,7 +7,7 @@ Use the following in /etc/munin/plugin-conf.d/wormhole : env.serverdir /path/to/your/wormhole/server """ -import os, sys, sqlite3 +import os, sys, time, json def count_events(): serverdir = os.environ["serverdir"] @@ -65,18 +65,23 @@ def count_events(): CONFIG = """\ graph_title Magic-Wormhole Transit Usage -graph_vlabel Bytes per Hour +graph_vlabel Bytes Since Reboot graph_category network bytes.label Transit Bytes bytes.draw LINE1 -bytes.type DERIVE -bytes.min 0 -bytes.cdef bytes,3600,* +bytes.type GAUGE """ if len(sys.argv) > 1 and sys.argv[1] == "config": print CONFIG.rstrip() sys.exit(0) -c = count_events() -print "bytes.value", c["transit bytes"] +serverdir = os.environ["serverdir"] +fn = os.path.join(serverdir, "stats.json") +with open(fn) as f: + data = json.load(f) +if time.time() > data["valid_until"]: + sys.exit(1) # expired + +t = data["transit"]["since_reboot"] +print "bytes.value", t["bytes"] diff --git a/src/wormhole/server/cli.py b/src/wormhole/server/cli.py index 2408114..ae99760 100644 --- a/src/wormhole/server/cli.py +++ b/src/wormhole/server/cli.py @@ -48,8 +48,13 @@ def server(ctx): "--signal-error", is_flag=True, help="force all clients to fail with a message", ) +@click.option( + "--stats-file", default=None, type=type(u""), metavar="stats.json", + help="periodically write stats to a file for monitoring tools like Munin", +) @click.pass_obj -def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit, rendezvous): +def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, + transit, rendezvous, stats_file): """ Start a relay server """ @@ -60,6 +65,7 @@ def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit, cfg.transit = str(transit) cfg.rendezvous = str(rendezvous) cfg.signal_error = signal_error + cfg.stats_file = stats_file start_server(cfg) @@ -92,8 +98,13 @@ def start(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit, "--signal-error", is_flag=True, help="force all clients to fail with a message", ) +@click.option( + "--stats-file", default=None, type=type(u""), metavar="stats.json", + help="periodically write stats to a file for monitoring tools like Munin", +) @click.pass_obj -def restart(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit, rendezvous): +def restart(cfg, signal_error, no_daemon, blur_usage, advertise_version, + transit, rendezvous, stats_file): """ Re-start a relay server """ @@ -104,6 +115,7 @@ def restart(cfg, signal_error, no_daemon, blur_usage, advertise_version, transit cfg.transit = str(transit) cfg.rendezvous = str(rendezvous) cfg.signal_error = signal_error + cfg.stats_file = stats_file restart_server(cfg) diff --git a/src/wormhole/server/cmd_server.py b/src/wormhole/server/cmd_server.py index f4fdac5..19035c4 100644 --- a/src/wormhole/server/cmd_server.py +++ b/src/wormhole/server/cmd_server.py @@ -15,6 +15,7 @@ class MyPlugin: self.args.advertise_version, "relay.sqlite", self.args.blur_usage, signal_error=self.args.signal_error, + stats_file=self.args.stats_file, ) class MyTwistdConfig(twistd.ServerOptions): diff --git a/src/wormhole/server/database.py b/src/wormhole/server/database.py index d2589ad..c772af7 100644 --- a/src/wormhole/server/database.py +++ b/src/wormhole/server/database.py @@ -16,7 +16,7 @@ def get_upgrader(new_version): "db-schemas/upgrade-to-v%d.sql" % new_version) return schema_bytes.decode("utf-8") -TARGET_VERSION = 2 +TARGET_VERSION = 3 def dict_factory(cursor, row): d = {} @@ -35,6 +35,10 @@ def get_db(dbfile, target_version=TARGET_VERSION, stderr=sys.stderr): except (EnvironmentError, sqlite3.OperationalError) as e: raise DBError("Unable to create/open db file %s: %s" % (dbfile, e)) db.row_factory = dict_factory + db.execute("PRAGMA foreign_keys = ON") + problems = db.execute("PRAGMA foreign_key_check").fetchall() + if problems: + raise DBError("failed foreign key check: %s" % (problems,)) if must_create: schema = get_schema(target_version) diff --git a/src/wormhole/server/db-schemas/upgrade-to-v3.sql b/src/wormhole/server/db-schemas/upgrade-to-v3.sql new file mode 100644 index 0000000..69bb255 --- /dev/null +++ b/src/wormhole/server/db-schemas/upgrade-to-v3.sql @@ -0,0 +1,68 @@ +DROP TABLE `nameplates`; +DROP TABLE `messages`; +DROP TABLE `mailboxes`; + + +-- Wormhole codes use a "nameplate": a short name which is only used to +-- reference a specific (long-named) mailbox. The codes only use numeric +-- nameplates, but the protocol and server allow can use arbitrary strings. +CREATE TABLE `nameplates` +( + `id` INTEGER PRIMARY KEY AUTOINCREMENT, + `app_id` VARCHAR, + `name` VARCHAR, + `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), + `request_id` VARCHAR -- from 'allocate' message, for future deduplication +); +CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); +CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); +CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); + +CREATE TABLE `nameplate_sides` +( + `nameplates_id` REFERENCES `nameplates`(`id`), + `claimed` BOOLEAN, -- True after claim(), False after release() + `side` VARCHAR, + `added` INTEGER -- time when this side first claimed the nameplate +); + + +-- Clients exchange messages through a "mailbox", which has a long (randomly +-- unique) identifier and a queue of messages. +-- `id` is randomly-generated and unique across all apps. +CREATE TABLE `mailboxes` +( + `app_id` VARCHAR, + `id` VARCHAR PRIMARY KEY, + `updated` INTEGER, -- time of last activity, used for pruning + `for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone +); +CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); + +CREATE TABLE `mailbox_sides` +( + `mailbox_id` REFERENCES `mailboxes`(`id`), + `opened` BOOLEAN, -- True after open(), False after close() + `side` VARCHAR, + `added` INTEGER, -- time when this side first opened the mailbox + `mood` VARCHAR +); + +CREATE TABLE `messages` +( + `app_id` VARCHAR, + `mailbox_id` VARCHAR, + `side` VARCHAR, + `phase` VARCHAR, -- numeric or string + `body` VARCHAR, + `server_rx` INTEGER, + `msg_id` VARCHAR +); +CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); + +ALTER TABLE `mailbox_usage` ADD COLUMN `for_nameplate` BOOLEAN; +CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`); +CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`); + +DELETE FROM `version`; +INSERT INTO `version` (`version`) VALUES (3); diff --git a/src/wormhole/server/db-schemas/v3.sql b/src/wormhole/server/db-schemas/v3.sql new file mode 100644 index 0000000..e447744 --- /dev/null +++ b/src/wormhole/server/db-schemas/v3.sql @@ -0,0 +1,115 @@ + +-- note: anything which isn't an boolean, integer, or human-readable unicode +-- string, (i.e. binary strings) will be stored as hex + +CREATE TABLE `version` +( + `version` INTEGER -- contains one row, set to 3 +); + + +-- Wormhole codes use a "nameplate": a short name which is only used to +-- reference a specific (long-named) mailbox. The codes only use numeric +-- nameplates, but the protocol and server allow can use arbitrary strings. +CREATE TABLE `nameplates` +( + `id` INTEGER PRIMARY KEY AUTOINCREMENT, + `app_id` VARCHAR, + `name` VARCHAR, + `mailbox_id` VARCHAR REFERENCES `mailboxes`(`id`), + `request_id` VARCHAR -- from 'allocate' message, for future deduplication +); +CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`); +CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`); +CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`); + +CREATE TABLE `nameplate_sides` +( + `nameplates_id` REFERENCES `nameplates`(`id`), + `claimed` BOOLEAN, -- True after claim(), False after release() + `side` VARCHAR, + `added` INTEGER -- time when this side first claimed the nameplate +); + + +-- Clients exchange messages through a "mailbox", which has a long (randomly +-- unique) identifier and a queue of messages. +-- `id` is randomly-generated and unique across all apps. +CREATE TABLE `mailboxes` +( + `app_id` VARCHAR, + `id` VARCHAR PRIMARY KEY, + `updated` INTEGER, -- time of last activity, used for pruning + `for_nameplate` BOOLEAN -- allocated for a nameplate, not standalone +); +CREATE INDEX `mailboxes_idx` ON `mailboxes` (`app_id`, `id`); + +CREATE TABLE `mailbox_sides` +( + `mailbox_id` REFERENCES `mailboxes`(`id`), + `opened` BOOLEAN, -- True after open(), False after close() + `side` VARCHAR, + `added` INTEGER, -- time when this side first opened the mailbox + `mood` VARCHAR +); + +CREATE TABLE `messages` +( + `app_id` VARCHAR, + `mailbox_id` VARCHAR, + `side` VARCHAR, + `phase` VARCHAR, -- numeric or string + `body` VARCHAR, + `server_rx` INTEGER, + `msg_id` VARCHAR +); +CREATE INDEX `messages_idx` ON `messages` (`app_id`, `mailbox_id`); + +CREATE TABLE `nameplate_usage` +( + `app_id` VARCHAR, + `started` INTEGER, -- seconds since epoch, rounded to "blur time" + `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None + `total_time` INTEGER, -- seconds from open to last close/prune + `result` VARCHAR -- happy, lonely, pruney, crowded + -- nameplate moods: + -- "happy": two sides open and close + -- "lonely": one side opens and closes (no response from 2nd side) + -- "pruney": channels which get pruned for inactivity + -- "crowded": three or more sides were involved +); +CREATE INDEX `nameplate_usage_idx` ON `nameplate_usage` (`app_id`, `started`); + +CREATE TABLE `mailbox_usage` +( + `app_id` VARCHAR, + `for_nameplate` BOOLEAN, -- allocated for a nameplate, not standalone + `started` INTEGER, -- seconds since epoch, rounded to "blur time" + `total_time` INTEGER, -- seconds from open to last close + `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None + `result` VARCHAR -- happy, scary, lonely, errory, pruney + -- rendezvous moods: + -- "happy": both sides close with mood=happy + -- "scary": any side closes with mood=scary (bad MAC, probably wrong pw) + -- "lonely": any side closes with mood=lonely (no response from 2nd side) + -- "errory": any side closes with mood=errory (other errors) + -- "pruney": channels which get pruned for inactivity + -- "crowded": three or more sides were involved +); +CREATE INDEX `mailbox_usage_idx` ON `mailbox_usage` (`app_id`, `started`); +CREATE INDEX `mailbox_usage_result_idx` ON `mailbox_usage` (`result`); + +CREATE TABLE `transit_usage` +( + `started` INTEGER, -- seconds since epoch, rounded to "blur time" + `total_time` INTEGER, -- seconds from open to last close + `waiting_time` INTEGER, -- seconds from start to 2nd side appearing, or None + `total_bytes` INTEGER, -- total bytes relayed (both directions) + `result` VARCHAR -- happy, scary, lonely, errory, pruney + -- transit moods: + -- "errory": one side gave the wrong handshake + -- "lonely": good handshake, but the other side never showed up + -- "happy": both sides gave correct handshake +); +CREATE INDEX `transit_usage_idx` ON `transit_usage` (`started`); +CREATE INDEX `transit_usage_result_idx` ON `transit_usage` (`result`); diff --git a/src/wormhole/server/rendezvous.py b/src/wormhole/server/rendezvous.py index 0f5ac82..b13a0d2 100644 --- a/src/wormhole/server/rendezvous.py +++ b/src/wormhole/server/rendezvous.py @@ -1,48 +1,15 @@ from __future__ import print_function, unicode_literals -import os, time, random, base64 +import os, random, base64, collections from collections import namedtuple from twisted.python import log -from twisted.application import service, internet - -SECONDS = 1.0 -MINUTE = 60*SECONDS -HOUR = 60*MINUTE -DAY = 24*HOUR -MB = 1000*1000 - -CHANNEL_EXPIRATION_TIME = 2*HOUR -EXPIRATION_CHECK_PERIOD = 1*HOUR +from twisted.application import service def generate_mailbox_id(): return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii") - -SideResult = namedtuple("SideResult", ["changed", "empty", "side1", "side2"]) -Unchanged = SideResult(changed=False, empty=False, side1=None, side2=None) class CrowdedError(Exception): pass -def add_side(row, new_side): - old_sides = [s for s in [row["side1"], row["side2"]] if s] - assert old_sides - if new_side in old_sides: - return Unchanged - if len(old_sides) == 2: - raise CrowdedError("too many sides for this thing") - return SideResult(changed=True, empty=False, - side1=old_sides[0], side2=new_side) - -def remove_side(row, side): - old_sides = [s for s in [row["side1"], row["side2"]] if s] - if side not in old_sides: - return Unchanged - remaining_sides = old_sides[:] - remaining_sides.remove(side) - if remaining_sides: - return SideResult(changed=True, empty=False, side1=remaining_sides[0], - side2=None) - return SideResult(changed=True, empty=True, side1=None, side2=None) - Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"]) TransitUsage = namedtuple("TransitUsage", ["started", "waiting_time", "total_time", @@ -65,23 +32,21 @@ class Mailbox: # requires caller to db.commit() assert isinstance(side, type("")), type(side) db = self._db - row = db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, self._mailbox_id)).fetchone() - try: - sr = add_side(row, side) - except CrowdedError: - db.execute("UPDATE `mailboxes` SET `crowded`=?" - " WHERE `app_id`=? AND `id`=?", - (True, self._app_id, self._mailbox_id)) - db.commit() - raise - if sr.changed: - db.execute("UPDATE `mailboxes` SET" - " `side1`=?, `side2`=?, `second`=?" - " WHERE `app_id`=? AND `id`=?", - (sr.side1, sr.side2, when, - self._app_id, self._mailbox_id)) + + already = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=? AND `side`=?", + (self._mailbox_id, side)).fetchone() + if not already: + db.execute("INSERT INTO `mailbox_sides`" + " (`mailbox_id`, `opened`, `side`, `added`)" + " VALUES(?,?,?,?)", + (self._mailbox_id, True, side, when)) + self._touch(when) + db.commit() # XXX: reconcile the need for this with the comment above + + def _touch(self, when): + self._db.execute("UPDATE `mailboxes` SET `updated`=? WHERE `id`=?", + (when, self._mailbox_id)) def get_messages(self): messages = [] @@ -97,12 +62,15 @@ class Mailbox: return messages def add_listener(self, handle, send_f, stop_f): - # TODO: update 'updated' + #log.msg("add_listener", self._mailbox_id, handle) self._listeners[handle] = (send_f, stop_f) + #log.msg(" added", len(self._listeners)) return self.get_messages() def remove_listener(self, handle): - self._listeners.pop(handle) + #log.msg("remove_listener", self._mailbox_id, handle) + self._listeners.pop(handle, None) + #log.msg(" removed", len(self._listeners)) def has_listeners(self): return bool(self._listeners) @@ -118,6 +86,7 @@ class Mailbox: " VALUES (?,?,?,?,?, ?,?)", (self._app_id, self._mailbox_id, sm.side, sm.phase, sm.body, sm.server_rx, sm.msg_id)) + self._touch(sm.server_rx) self._db.commit() def add_message(self, sm): @@ -133,43 +102,46 @@ class Mailbox: (self._app_id, self._mailbox_id)).fetchone() if not row: return - sr = remove_side(row, side) - if sr.empty: - self._app._summarize_mailbox_and_store(self._mailbox_id, row, - mood, when, pruned=False) - self._delete() - db.commit() - elif sr.changed: - db.execute("UPDATE `mailboxes`" - " SET `side1`=?, `side2`=?, `first_mood`=?" - " WHERE `app_id`=? AND `id`=?", - (sr.side1, sr.side2, mood, - self._app_id, self._mailbox_id)) - db.commit() + for_nameplate = row["for_nameplate"] - def _delete(self): - # requires caller to db.commit() - self._db.execute("DELETE FROM `mailboxes`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, self._mailbox_id)) - self._db.execute("DELETE FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?", - (self._app_id, self._mailbox_id)) + row = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=? AND `side`=?", + (self._mailbox_id, side)).fetchone() + if not row: + return + db.execute("UPDATE `mailbox_sides` SET `opened`=?, `mood`=?" + " WHERE `mailbox_id`=? AND `side`=?", + (False, mood, self._mailbox_id, side)) + db.commit() + # are any sides still open? + side_rows = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=?", + (self._mailbox_id,)).fetchall() + if any([sr["opened"] for sr in side_rows]): + return + + # nope. delete and summarize + db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?", + (self._mailbox_id,)) + db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?", + (self._mailbox_id,)) + db.execute("DELETE FROM `mailboxes` WHERE `id`=?", (self._mailbox_id,)) + self._app._summarize_mailbox_and_store(for_nameplate, side_rows, + when, pruned=False) + db.commit() # Shut down any listeners, just in case they're still lingering # around. for (send_f, stop_f) in self._listeners.values(): stop_f() - + self._listeners = {} self._app.free_mailbox(self._mailbox_id) - def is_active(self): - return bool(self._listeners) - def _shutdown(self): # used at test shutdown to accelerate client disconnects for (send_f, stop_f) in self._listeners.values(): stop_f() + self._listeners = {} class AppNamespace: def __init__(self, db, blur_usage, log_requests, app_id): @@ -178,22 +150,15 @@ class AppNamespace: self._log_requests = log_requests self._app_id = app_id self._mailboxes = {} - - def is_active(self): - # An idle AppNamespace does not need to be kept in memory: it can be - # reconstructed from the DB if needed. And active one must be kept - # alive. - for mb in self._mailboxes.values(): - if mb.is_active(): - return True - return False + self._nameplate_counts = collections.defaultdict(int) + self._mailbox_counts = collections.defaultdict(int) def get_nameplate_ids(self): db = self._db # TODO: filter this to numeric ids? - c = db.execute("SELECT DISTINCT `id` FROM `nameplates`" + c = db.execute("SELECT DISTINCT `name` FROM `nameplates`" " WHERE `app_id`=?", (self._app_id,)) - return set([row["id"] for row in c.fetchall()]) + return set([row["name"] for row in c.fetchall()]) def _find_available_nameplate_id(self): claimed = self.get_nameplate_ids() @@ -219,127 +184,157 @@ class AppNamespace: del mailbox_id # ignored, they'll learn it from claim() return nameplate_id - def claim_nameplate(self, nameplate_id, side, when, _test_mailbox_id=None): + def claim_nameplate(self, name, side, when): # when we're done: # * there will be one row for the nameplate - # * side1 or side2 will be populated - # * started or second will be populated - # * a mailbox id will be created, but not a mailbox row - # (ids are randomly unique, so we can defer creation until 'open') - assert isinstance(nameplate_id, type("")), type(nameplate_id) + # * there will be one 'side' attached to it, with claimed=True + # * a mailbox id and mailbox row will be created + # * a mailbox 'side' will be attached, with opened=True + assert isinstance(name, type("")), type(name) assert isinstance(side, type("")), type(side) db = self._db row = db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, nameplate_id)).fetchone() - if row: - mailbox_id = row["mailbox_id"] - try: - sr = add_side(row, side) - except CrowdedError: - db.execute("UPDATE `nameplates` SET `crowded`=?" - " WHERE `app_id`=? AND `id`=?", - (True, self._app_id, nameplate_id)) - db.commit() - raise - if sr.changed: - db.execute("UPDATE `nameplates` SET" - " `side1`=?, `side2`=?, `updated`=?, `second`=?" - " WHERE `app_id`=? AND `id`=?", - (sr.side1, sr.side2, when, when, - self._app_id, nameplate_id)) - else: + " WHERE `app_id`=? AND `name`=?", + (self._app_id, name)).fetchone() + if not row: if self._log_requests: log.msg("creating nameplate#%s for app_id %s" % - (nameplate_id, self._app_id)) - if _test_mailbox_id is not None: # for unit tests - mailbox_id = _test_mailbox_id - else: - mailbox_id = generate_mailbox_id() - db.execute("INSERT INTO `nameplates`" - " (`app_id`, `id`, `mailbox_id`, `side1`, `crowded`," - " `updated`, `started`)" - " VALUES(?,?,?,?,?, ?,?)", - (self._app_id, nameplate_id, mailbox_id, side, False, - when, when)) + (name, self._app_id)) + mailbox_id = generate_mailbox_id() + self._add_mailbox(mailbox_id, True, side, when) # ensure row exists + sql = ("INSERT INTO `nameplates`" + " (`app_id`, `name`, `mailbox_id`)" + " VALUES(?,?,?)") + npid = db.execute(sql, (self._app_id, name, mailbox_id) + ).lastrowid + else: + npid = row["id"] + mailbox_id = row["mailbox_id"] + + row = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=? AND `side`=?", + (npid, side)).fetchone() + if not row: + db.execute("INSERT INTO `nameplate_sides`" + " (`nameplates_id`, `claimed`, `side`, `added`)" + " VALUES(?,?,?,?)", + (npid, True, side, when)) db.commit() + + self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError + rows = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", (npid,)).fetchall() + if len(rows) > 2: + # this line will probably never get hit: any crowding is noticed + # on mailbox_sides first, inside open_mailbox() + raise CrowdedError("too many sides have claimed this nameplate") return mailbox_id - def release_nameplate(self, nameplate_id, side, when): + def release_nameplate(self, name, side, when): # when we're done: - # * in the nameplate row, side1 or side2 will be removed - # * if the nameplate is now unused: + # * the 'claimed' flag will be cleared on the nameplate_sides row + # * if the nameplate is now unused (no claimed sides): # * mailbox.nameplate_closed will be populated # * the nameplate row will be removed - assert isinstance(nameplate_id, type("")), type(nameplate_id) + # * the nameplate sides will be removed + assert isinstance(name, type("")), type(name) assert isinstance(side, type("")), type(side) db = self._db - row = db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, nameplate_id)).fetchone() + np_row = db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`=? AND `name`=?", + (self._app_id, name)).fetchone() + if not np_row: + return + npid = np_row["id"] + row = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=? AND `side`=?", + (npid, side)).fetchone() if not row: return - sr = remove_side(row, side) - if sr.empty: - db.execute("DELETE FROM `nameplates`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, nameplate_id)) - self._summarize_nameplate_and_store(row, when, pruned=False) - db.commit() - elif sr.changed: - db.execute("UPDATE `nameplates`" - " SET `side1`=?, `side2`=?, `updated`=?" - " WHERE `app_id`=? AND `id`=?", - (sr.side1, sr.side2, when, - self._app_id, nameplate_id)) - db.commit() + db.execute("UPDATE `nameplate_sides` SET `claimed`=?" + " WHERE `nameplates_id`=? AND `side`=?", + (False, npid, side)) + db.commit() - def _summarize_nameplate_and_store(self, row, delete_time, pruned): + # now, are there any remaining claims? + side_rows = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + claims = [1 for sr in side_rows if sr["claimed"]] + if claims: + return + # delete and summarize + db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?", + (npid,)) + db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,)) + self._summarize_nameplate_and_store(side_rows, when, pruned=False) + db.commit() + + def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned): # requires caller to db.commit() - u = self._summarize_nameplate_usage(row, delete_time, pruned) + u = self._summarize_nameplate_usage(side_rows, delete_time, pruned) self._db.execute("INSERT INTO `nameplate_usage`" " (`app_id`," " `started`, `total_time`, `waiting_time`, `result`)" " VALUES (?, ?,?,?,?)", (self._app_id, u.started, u.total_time, u.waiting_time, u.result)) + self._nameplate_counts[u.result] += 1 - def _summarize_nameplate_usage(self, row, delete_time, pruned): - started = row["started"] + def _summarize_nameplate_usage(self, side_rows, delete_time, pruned): + times = sorted([row["added"] for row in side_rows]) + started = times[0] if self._blur_usage: started = self._blur_usage * (started // self._blur_usage) waiting_time = None - if row["second"]: - waiting_time = row["second"] - row["started"] - total_time = delete_time - row["started"] + if len(times) > 1: + waiting_time = times[1] - times[0] + total_time = delete_time - times[0] result = "lonely" - if row["second"]: + if len(times) == 2: result = "happy" if pruned: result = "pruney" - if row["crowded"]: + if len(times) > 2: result = "crowded" return Usage(started=started, waiting_time=waiting_time, total_time=total_time, result=result) - def open_mailbox(self, mailbox_id, side, when): + def _add_mailbox(self, mailbox_id, for_nameplate, side, when): assert isinstance(mailbox_id, type("")), type(mailbox_id) db = self._db - if not mailbox_id in self._mailboxes: + row = db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`=? AND `id`=?", + (self._app_id, mailbox_id)).fetchone() + if not row: + self._db.execute("INSERT INTO `mailboxes`" + " (`app_id`, `id`, `for_nameplate`, `updated`)" + " VALUES(?,?,?,?)", + (self._app_id, mailbox_id, for_nameplate, when)) + # we don't need a commit here, because mailbox.open() only + # does SELECT FROM `mailbox_sides`, not from `mailboxes` + + def open_mailbox(self, mailbox_id, side, when): + assert isinstance(mailbox_id, type("")), type(mailbox_id) + self._add_mailbox(mailbox_id, False, side, when) # ensure row exists + db = self._db + if not mailbox_id in self._mailboxes: # ensure Mailbox object exists if self._log_requests: log.msg("spawning #%s for app_id %s" % (mailbox_id, self._app_id)) - db.execute("INSERT INTO `mailboxes`" - " (`app_id`, `id`, `side1`, `crowded`, `started`)" - " VALUES(?,?,?,?,?)", - (self._app_id, mailbox_id, side, False, when)) - db.commit() # XXX - # mailbox.open() does a SELECT to find the old sides self._mailboxes[mailbox_id] = Mailbox(self, self._db, self._app_id, mailbox_id) mailbox = self._mailboxes[mailbox_id] + + # delegate to mailbox.open() to add a row to mailbox_sides, and + # update the mailbox.updated timestamp mailbox.open(side, when) db.commit() + rows = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=?", + (mailbox_id,)).fetchall() + if len(rows) > 2: + raise CrowdedError("too many sides have opened this mailbox") return mailbox def free_mailbox(self, mailbox_id): @@ -352,32 +347,29 @@ class AppNamespace: # log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" % # (mailbox_id, len(self.get_claimed()), len(self._mailboxes))) - def _summarize_mailbox_and_store(self, mailbox_id, row, - second_mood, delete_time, pruned): + def _summarize_mailbox_and_store(self, for_nameplate, side_rows, + delete_time, pruned): db = self._db - rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?", - (self._app_id, mailbox_id)).fetchall() - num_sides = len(rows) - u = self._summarize_mailbox(row, num_sides, second_mood, delete_time, - pruned) + u = self._summarize_mailbox(side_rows, delete_time, pruned) db.execute("INSERT INTO `mailbox_usage`" - " (`app_id`," + " (`app_id`, `for_nameplate`," " `started`, `total_time`, `waiting_time`, `result`)" - " VALUES (?, ?,?,?,?)", - (self._app_id, + " VALUES (?,?, ?,?,?,?)", + (self._app_id, for_nameplate, u.started, u.total_time, u.waiting_time, u.result)) + self._mailbox_counts[u.result] += 1 - def _summarize_mailbox(self, row, num_sides, second_mood, delete_time, - pruned): - started = row["started"] + def _summarize_mailbox(self, side_rows, delete_time, pruned): + times = sorted([row["added"] for row in side_rows]) + started = times[0] if self._blur_usage: started = self._blur_usage * (started // self._blur_usage) waiting_time = None - if row["second"]: - waiting_time = row["second"] - row["started"] - total_time = delete_time - row["started"] + if len(times) > 1: + waiting_time = times[1] - times[0] + total_time = delete_time - times[0] + num_sides = len(times) if num_sides == 0: result = "quiet" elif num_sides == 1: @@ -385,7 +377,8 @@ class AppNamespace: else: result = "happy" - moods = set([row["first_mood"], second_mood]) + # "mood" is only recorded at close() + moods = [row["mood"] for row in side_rows if row.get("mood")] if "lonely" in moods: result = "lonely" if "errory" in moods: @@ -394,13 +387,24 @@ class AppNamespace: result = "scary" if pruned: result = "pruney" - if row["crowded"]: + if num_sides > 2: result = "crowded" return Usage(started=started, waiting_time=waiting_time, total_time=total_time, result=result) def prune(self, now, old): + # The pruning check runs every 10 minutes, and "old" is defined to be + # 11 minutes ago (unit tests can use different values). The client is + # allowed to disconnect for up to 9 minutes without losing the + # channel (nameplate, mailbox, and messages). + + # Each time a client does something, the mailbox.updated field is + # updated with the current timestamp. If a client is subscribed to + # the mailbox when pruning check runs, the "updated" field is also + # updated. After that check, if the "updated" field is "old", the + # channel is deleted. + # For now, pruning is logged even if log_requests is False, to debug # the pruning process, and since pruning is triggered by a timer # instead of by user action. It does reveal which mailboxes were @@ -409,122 +413,74 @@ class AppNamespace: log.msg(" prune begins (%s)" % self._app_id) db = self._db modified = False - # for all `mailboxes`: classify as new or old - OLD = 0; NEW = 1 - all_mailboxes = {} - all_mailbox_rows = {} - for row in db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`=?", + + for mailbox in self._mailboxes.values(): + if mailbox.has_listeners(): + log.msg("touch %s because listeners" % mailbox._mailbox_id) + mailbox._touch(now) + db.commit() # make sure the updates are visible below + + new_mailboxes = set() + old_mailboxes = set() + for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?", (self._app_id,)).fetchall(): mailbox_id = row["id"] - all_mailbox_rows[mailbox_id] = row - if row["started"] > old: - which = NEW - elif row["second"] and row["second"] > old: - which = NEW - else: - which = OLD - all_mailboxes[mailbox_id] = which - #log.msg(" 2: all_mailboxes", all_mailboxes, all_mailbox_rows) - - # for all mailbox ids used by `messages`: - # if there is no matching mailbox: delete the messages - # if there is at least one new message (select when>old limit 1): - # classify the mailbox as new - for row in db.execute("SELECT DISTINCT(`mailbox_id`)" - " FROM `messages`" - " WHERE `app_id`=?", - (self._app_id,)).fetchall(): - mailbox_id = row["mailbox_id"] - if mailbox_id not in all_mailboxes: - log.msg(" deleting orphan messages", mailbox_id) - db.execute("DELETE FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?", - (self._app_id, mailbox_id)) - modified = True - else: - new_msgs = db.execute("SELECT * FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?" - " AND `server_rx` > ?" - " LIMIT 1", - (self._app_id, mailbox_id, old) - ).fetchall() - if new_msgs: - #log.msg(" 3-: saved by new messages", new_msgs) - all_mailboxes[mailbox_id] = NEW - #log.msg(" 4: all_mailboxes", all_mailboxes) - - # for all mailbox objects with active listeners: - # classify the mailbox as new - for mailbox_id in self._mailboxes: - #log.msg(" -5: checking", mailbox_id, self._mailboxes[mailbox_id]) - if self._mailboxes[mailbox_id].has_listeners(): - all_mailboxes[mailbox_id] = NEW - #log.msg(" 5: all_mailboxes", all_mailboxes) - - # for all `nameplates`: - # classify as new or old - # if the linked mailbox exists: - # if it is new: - # classify nameplate as new - # if it is old: - # if the nameplate is new: - # classify mailbox as new - all_nameplates = {} - all_nameplate_rows = {} - for row in db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`=?", - (self._app_id,)).fetchall(): - nameplate_id = row["id"] - all_nameplate_rows[nameplate_id] = row + log.msg(" 1: age=%s, old=%s, %s" % + (now - row["updated"], now - old, mailbox_id)) if row["updated"] > old: - which = NEW + new_mailboxes.add(mailbox_id) else: - which = OLD + old_mailboxes.add(mailbox_id) + log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes) + + old_nameplates = set() + for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?", + (self._app_id,)).fetchall(): + npid = row["id"] mailbox_id = row["mailbox_id"] - if mailbox_id in all_mailboxes: - if all_mailboxes[mailbox_id] == NEW: - which = NEW - else: - if which == NEW: - all_mailboxes[mailbox_id] = NEW - all_nameplates[nameplate_id] = which - #log.msg(" 6: all_nameplates", all_nameplates, all_nameplate_rows) + if mailbox_id in old_mailboxes: + old_nameplates.add(npid) + log.msg(" 3: old_nameplates", old_nameplates) - # delete all old nameplates - # invariant check: if there is a linked mailbox, it is old - - for nameplate_id, which in all_nameplates.items(): - if which == OLD: - log.msg(" deleting nameplate", nameplate_id) - row = all_nameplate_rows[nameplate_id] - self._summarize_nameplate_and_store(row, now, pruned=True) - db.execute("DELETE FROM `nameplates`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, nameplate_id)) - modified = True + for npid in old_nameplates: + log.msg(" deleting nameplate", npid) + side_rows = db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?", + (npid,)) + db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,)) + self._summarize_nameplate_and_store(side_rows, now, pruned=True) + modified = True # delete all messages for old mailboxes # delete all old mailboxes - for mailbox_id, which in all_mailboxes.items(): - if which == OLD: - log.msg(" deleting mailbox", mailbox_id) - self._summarize_mailbox_and_store(mailbox_id, - all_mailbox_rows[mailbox_id], - "pruney", now, pruned=True) - db.execute("DELETE FROM `messages`" - " WHERE `app_id`=? AND `mailbox_id`=?", - (self._app_id, mailbox_id)) - db.execute("DELETE FROM `mailboxes`" - " WHERE `app_id`=? AND `id`=?", - (self._app_id, mailbox_id)) - modified = True + for mailbox_id in old_mailboxes: + log.msg(" deleting mailbox", mailbox_id) + row = db.execute("SELECT * FROM `mailboxes`" + " WHERE `id`=?", (mailbox_id,)).fetchone() + for_nameplate = row["for_nameplate"] + side_rows = db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=?", + (mailbox_id,)).fetchall() + db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?", + (mailbox_id,)) + db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?", + (mailbox_id,)) + db.execute("DELETE FROM `mailboxes` WHERE `id`=?", + (mailbox_id,)) + self._summarize_mailbox_and_store(for_nameplate, side_rows, + now, pruned=True) + modified = True if modified: db.commit() log.msg(" prune complete, modified:", modified) + def get_counts(self): + return (self._nameplate_counts, self._mailbox_counts) + def _shutdown(self): for channel in self._mailboxes.values(): channel._shutdown() @@ -538,8 +494,6 @@ class Rendezvous(service.MultiService): log_requests = blur_usage is None self._log_requests = log_requests self._apps = {} - t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.prune) - t.setServiceParent(self) def get_welcome(self): return self._welcome @@ -569,19 +523,94 @@ class Rendezvous(service.MultiService): apps.add(row["app_id"]) return apps - def prune(self, now=None, old=None): + def prune_all_apps(self, now, old): # As with AppNamespace.prune_old_mailboxes, we log for now. log.msg("beginning app prune") - now = now or time.time() - old = old or (now - CHANNEL_EXPIRATION_TIME) for app_id in sorted(self.get_all_apps()): log.msg(" app prune checking %r" % (app_id,)) app = self.get_app(app_id) app.prune(now, old) - if not app.is_active(): # meaning no websockets - log.msg(" pruning idle app", app_id) - self._apps.pop(app_id) - log.msg("app prune ends, %d remaining apps" % len(self._apps)) + log.msg("app prune ends, %d apps" % len(self._apps)) + + def get_stats(self): + stats = {} + + # current status: expected to be zero most of the time + c = stats["active"] = {} + c["apps"] = len(self.get_all_apps()) + def q(query, values=()): + row = self._db.execute(query, values).fetchone() + return list(row.values())[0] + c["nameplates_total"] = q("SELECT COUNT() FROM `nameplates`") + # TODO: nameplates with only one side (most of them) + # TODO: nameplates with two sides (very fleeting) + # TODO: nameplates with three or more sides (crowded, unlikely) + c["mailboxes_total"] = q("SELECT COUNT() FROM `mailboxes`") + # TODO: mailboxes with only one side (most of them) + # TODO: mailboxes with two sides (somewhat fleeting, in-transit) + # TODO: mailboxes with three or more sides (unlikely) + c["messages_total"] = q("SELECT COUNT() FROM `messages`") + + # usage since last reboot + nameplate_counts = collections.defaultdict(int) + mailbox_counts = collections.defaultdict(int) + for app in self._apps.values(): + nc, mc = app.get_counts() + for result, count in nc.items(): + nameplate_counts[result] += count + for result, count in mc.items(): + mailbox_counts[result] += count + urb = stats["since_reboot"] = {} + urb["nameplate_moods"] = {} + for result, count in nameplate_counts.items(): + urb["nameplate_moods"][result] = count + urb["nameplates_total"] = sum(nameplate_counts.values()) + urb["mailbox_moods"] = {} + for result, count in mailbox_counts.items(): + urb["mailbox_moods"][result] = count + urb["mailboxes_total"] = sum(mailbox_counts.values()) + + # historical usage (all-time) + u = stats["all_time"] = {} + un = u["nameplate_moods"] = {} + # TODO: there's probably a single SQL query for all this + un["happy"] = q("SELECT COUNT() FROM `nameplate_usage`" + " WHERE `result`='happy'") + un["lonely"] = q("SELECT COUNT() FROM `nameplate_usage`" + " WHERE `result`='lonely'") + un["pruney"] = q("SELECT COUNT() FROM `nameplate_usage`" + " WHERE `result`='pruney'") + un["crowded"] = q("SELECT COUNT() FROM `nameplate_usage`" + " WHERE `result`='crowded'") + u["nameplates_total"] = q("SELECT COUNT() FROM `nameplate_usage`") + um = u["mailbox_moods"] = {} + um["happy"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='happy'") + um["scary"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='scary'") + um["lonely"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='lonely'") + um["quiet"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='quiet'") + um["errory"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='errory'") + um["pruney"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='pruney'") + um["crowded"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `result`='crowded'") + u["mailboxes_total"] = q("SELECT COUNT() FROM `mailbox_usage`") + u["mailboxes_standalone"] = q("SELECT COUNT() FROM `mailbox_usage`" + " WHERE `for_nameplate`=0") + + # recent timings (last 100 operations) + # TODO: median/etc of nameplate.total_time + # TODO: median/etc of mailbox.waiting_time (should be the same) + # TODO: median/etc of mailbox.total_time + + # other + # TODO: mailboxes without nameplates (needs new DB schema) + + return stats def stopService(self): # This forcibly boots any clients that are still connected, which diff --git a/src/wormhole/server/rendezvous_websocket.py b/src/wormhole/server/rendezvous_websocket.py index 2ab2643..a6289a7 100644 --- a/src/wormhole/server/rendezvous_websocket.py +++ b/src/wormhole/server/rendezvous_websocket.py @@ -88,6 +88,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): self._app = None self._side = None self._did_allocate = False # only one allocate() per websocket + self._listening = False self._nameplate_id = None self._mailbox = None @@ -203,6 +204,7 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): body=sm.body, server_rx=sm.server_rx, id=sm.msg_id) def _stop(): pass + self._listening = True for old_sm in self._mailbox.add_listener(self, _send, _stop): _send(old_sm) @@ -222,6 +224,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): def handle_close(self, msg, server_rx): if not self._mailbox: raise Error("must open mailbox before closing") + if self._listening: + self._mailbox.remove_listener(self) + self._listening = False self._mailbox.close(self._side, msg.get("mood"), server_rx) self._mailbox = None self.send("closed") @@ -233,7 +238,9 @@ class WebSocketRendezvous(websocket.WebSocketServerProtocol): self.sendMessage(payload, False) def onClose(self, wasClean, code, reason): - pass + #log.msg("onClose", self, self._mailbox, self._listening) + if self._mailbox and self._listening: + self._mailbox.remove_listener(self) class WebSocketRendezvousFactory(websocket.WebSocketServerFactory): diff --git a/src/wormhole/server/server.py b/src/wormhole/server/server.py index efd8c78..bddf4a4 100644 --- a/src/wormhole/server/server.py +++ b/src/wormhole/server/server.py @@ -1,9 +1,10 @@ # NO unicode_literals or static.Data() will break, because it demands # a str on Python 2 from __future__ import print_function +import os, time, json from twisted.python import log from twisted.internet import reactor, endpoints -from twisted.application import service +from twisted.application import service, internet from twisted.web import server, static, resource from autobahn.twisted.resource import WebSocketResource from .endpoint_service import ServerEndpointService @@ -13,6 +14,12 @@ from .rendezvous import Rendezvous from .rendezvous_websocket import WebSocketRendezvousFactory from .transit_server import Transit +SECONDS = 1.0 +MINUTE = 60*SECONDS + +CHANNEL_EXPIRATION_TIME = 11*MINUTE +EXPIRATION_CHECK_PERIOD = 10*MINUTE + class Root(resource.Resource): # child_FOO is a nevow thing, not a twisted.web.resource thing def __init__(self): @@ -28,7 +35,7 @@ class PrivacyEnhancedSite(server.Site): class RelayServer(service.MultiService): def __init__(self, rendezvous_web_port, transit_port, advertise_version, db_url=":memory:", blur_usage=None, - signal_error=None): + signal_error=None, stats_file=None): service.MultiService.__init__(self) self._blur_usage = blur_usage @@ -52,11 +59,11 @@ class RelayServer(service.MultiService): if signal_error: welcome["error"] = signal_error - rendezvous = Rendezvous(db, welcome, blur_usage) - rendezvous.setServiceParent(self) # for the pruning timer + self._rendezvous = Rendezvous(db, welcome, blur_usage) + self._rendezvous.setServiceParent(self) # for the pruning timer root = Root() - wsrf = WebSocketRendezvousFactory(None, rendezvous) + wsrf = WebSocketRendezvousFactory(None, self._rendezvous) root.putChild(b"v1", WebSocketResource(wsrf)) site = PrivacyEnhancedSite(root) @@ -74,12 +81,21 @@ class RelayServer(service.MultiService): transit_service = ServerEndpointService(t, transit) transit_service.setServiceParent(self) + self._stats_file = stats_file + if self._stats_file and os.path.exists(self._stats_file): + os.unlink(self._stats_file) + # this will be regenerated immediately, but if something goes + # wrong in dump_stats(), it's better to have a missing file than + # a stale one + t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.timer) + t.setServiceParent(self) + # make some things accessible for tests self._db = db - self._rendezvous = rendezvous self._root = root self._rendezvous_web_service = rendezvous_web_service self._rendezvous_websocket = wsrf + self._transit = None if transit_port: self._transit = transit self._transit_service = transit_service @@ -93,3 +109,28 @@ class RelayServer(service.MultiService): log.msg("not logging HTTP requests") else: log.msg("not blurring access times") + + def timer(self): + now = time.time() + old = now - CHANNEL_EXPIRATION_TIME + self._rendezvous.prune_all_apps(now, old) + self.dump_stats(now, validity=EXPIRATION_CHECK_PERIOD+60) + + def dump_stats(self, now, validity): + if not self._stats_file: + return + tmpfn = self._stats_file + ".tmp" + + data = {} + data["created"] = now + data["valid_until"] = now + validity + + start = time.time() + data["rendezvous"] = self._rendezvous.get_stats() + data["transit"] = self._transit.get_stats() + log.msg("get_stats took:", time.time() - start) + + with open(tmpfn, "wb") as f: + json.dump(data, f, indent=1) + f.write("\n") + os.rename(tmpfn, self._stats_file) diff --git a/src/wormhole/server/transit_server.py b/src/wormhole/server/transit_server.py index 17d42a0..c99f204 100644 --- a/src/wormhole/server/transit_server.py +++ b/src/wormhole/server/transit_server.py @@ -1,5 +1,5 @@ from __future__ import print_function, unicode_literals -import re, time +import re, time, collections from twisted.python import log from twisted.internet import protocol from twisted.application import service @@ -166,6 +166,8 @@ class Transit(protocol.ServerFactory, service.MultiService): self._blur_usage = blur_usage self._pending_requests = {} # token -> TransitConnection self._active_connections = set() # TransitConnection + self._counts = collections.defaultdict(int) + self._count_bytes = 0 def connection_got_token(self, token, p): if token in self._pending_requests: @@ -193,6 +195,8 @@ class Transit(protocol.ServerFactory, service.MultiService): (started, total_time, waiting_time, total_bytes, result)) self._db.commit() + self._counts[result] += 1 + self._count_bytes += total_bytes def transitFinished(self, p, token, description): for token,tc in self._pending_requests.items(): @@ -205,3 +209,36 @@ class Transit(protocol.ServerFactory, service.MultiService): def transitFailed(self, p): log.msg("transitFailed %r" % p) pass + + def get_stats(self): + stats = {} + def q(query, values=()): + row = self._db.execute(query, values).fetchone() + return list(row.values())[0] + + # current status: expected to be zero most of the time + c = stats["active"] = {} + c["connected"] = len(self._active_connections) / 2 + c["waiting"] = len(self._pending_requests) + + # usage since last reboot + rb = stats["since_reboot"] = {} + rb["bytes"] = self._count_bytes + rb["total"] = sum(self._counts.values(), 0) + rbm = rb["moods"] = {} + for result, count in self._counts.items(): + rbm[result] = count + + # historical usage (all-time) + u = stats["all_time"] = {} + u["total"] = q("SELECT COUNT() FROM `transit_usage`") + u["bytes"] = q("SELECT SUM(`total_bytes`) FROM `transit_usage`") or 0 + um = u["moods"] = {} + um["happy"] = q("SELECT COUNT() FROM `transit_usage`" + " WHERE `result`='happy'") + um["lonely"] = q("SELECT COUNT() FROM `transit_usage`" + " WHERE `result`='lonely'") + um["errory"] = q("SELECT COUNT() FROM `transit_usage`" + " WHERE `result`='errory'") + + return stats diff --git a/src/wormhole/test/test_database.py b/src/wormhole/test/test_database.py index 460508e..7a7b491 100644 --- a/src/wormhole/test/test_database.py +++ b/src/wormhole/test/test_database.py @@ -50,4 +50,3 @@ class DB(unittest.TestCase): with open("new.sql","w") as f: f.write(latest_text) # check with "diff -u _trial_temp/up.sql _trial_temp/new.sql" self.assertEqual(dbA_text, latest_text) - test_upgrade.skip = "disabled until at least one upgrader is written" diff --git a/src/wormhole/test/test_scripts.py b/src/wormhole/test/test_scripts.py index 02fea26..ce4d499 100644 --- a/src/wormhole/test/test_scripts.py +++ b/src/wormhole/test/test_scripts.py @@ -427,6 +427,9 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase): self.failUnlessEqual(modes[i], stat.S_IMODE(os.stat(fn).st_mode)) + # check server stats + self._rendezvous.get_stats() + def test_text(self): return self._do_test() def test_text_subprocess(self): diff --git a/src/wormhole/test/test_server.py b/src/wormhole/test/test_server.py index 6a9066b..c79ebf0 100644 --- a/src/wormhole/test/test_server.py +++ b/src/wormhole/test/test_server.py @@ -1,5 +1,5 @@ from __future__ import print_function, unicode_literals -import json, itertools +import json, itertools, time from binascii import hexlify import mock from twisted.trial import unittest @@ -12,7 +12,7 @@ from autobahn.twisted import websocket from .. import __version__ from .common import ServerBase from ..server import rendezvous, transit_server -from ..server.rendezvous import Usage, SidedMessage, Mailbox +from ..server.rendezvous import Usage, SidedMessage from ..server.database import get_db class Server(ServerBase, unittest.TestCase): @@ -48,165 +48,173 @@ class Server(ServerBase, unittest.TestCase): biggest = max(nids) self.assert_(1000 <= biggest < 1000000, biggest) - def _nameplate(self, app, nameplate_id): - return app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `id`=?", - (nameplate_id,)).fetchone() + def _nameplate(self, app, name): + np_row = app._db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`='appid' AND `name`=?", + (name,)).fetchone() + if not np_row: + return None, None + npid = np_row["id"] + side_rows = app._db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + return np_row, side_rows def test_nameplate(self): app = self._rendezvous.get_app("appid") - nameplate_id = app.allocate_nameplate("side1", 0) - self.assertEqual(type(nameplate_id), type("")) - nid = int(nameplate_id) + name = app.allocate_nameplate("side1", 0) + self.assertEqual(type(name), type("")) + nid = int(name) self.assert_(0 < nid < 10, nid) - self.assertEqual(app.get_nameplate_ids(), set([nameplate_id])) + self.assertEqual(app.get_nameplate_ids(), set([name])) # allocate also does a claim - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], None) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side1") + self.assertEqual(side_rows[0]["added"], 0) - mailbox_id = app.claim_nameplate(nameplate_id, "side1", 1) - self.assertEqual(type(mailbox_id), type("")) # duplicate claims by the same side are combined - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) + mailbox_id = app.claim_nameplate(name, "side1", 1) + self.assertEqual(type(mailbox_id), type("")) + self.assertEqual(mailbox_id, np_row["mailbox_id"]) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["added"], 0) + self.assertEqual(mailbox_id, np_row["mailbox_id"]) - mailbox_id2 = app.claim_nameplate(nameplate_id, "side1", 2) + # and they don't updated the 'added' time + mailbox_id2 = app.claim_nameplate(name, "side1", 2) self.assertEqual(mailbox_id, mailbox_id2) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], None) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["added"], 0) # claim by the second side is new - mailbox_id3 = app.claim_nameplate(nameplate_id, "side2", 3) + mailbox_id3 = app.claim_nameplate(name, "side2", 3) self.assertEqual(mailbox_id, mailbox_id3) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 3) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 2) + self.assertEqual(sorted([row["side"] for row in side_rows]), + sorted(["side1", "side2"])) + self.assertIn(("side2", 3), + [(row["side"], row["added"]) for row in side_rows]) - # a third claim marks the nameplate as "crowded", but leaves the two - # existing claims alone + # a third claim marks the nameplate as "crowded", and adds a third + # claim (which must be released later), but leaves the two existing + # claims alone self.assertRaises(rendezvous.CrowdedError, - app.claim_nameplate, nameplate_id, "side3", 0) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], True) + app.claim_nameplate, name, "side3", 4) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 3) # releasing a non-existent nameplate is ignored - app.release_nameplate(nameplate_id+"not", "side4", 0) + app.release_nameplate(name+"not", "side4", 0) # releasing a side that never claimed the nameplate is ignored - app.release_nameplate(nameplate_id, "side4", 0) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") + app.release_nameplate(name, "side4", 0) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 3) # releasing one side leaves the second claim - app.release_nameplate(nameplate_id, "side1", 5) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) + app.release_nameplate(name, "side1", 5) + np_row, side_rows = self._nameplate(app, name) + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side1", False), claims) + self.assertIn(("side2", True), claims) + self.assertIn(("side3", True), claims) # releasing one side multiple times is ignored - app.release_nameplate(nameplate_id, "side1", 5) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) + app.release_nameplate(name, "side1", 5) + np_row, side_rows = self._nameplate(app, name) + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side1", False), claims) + self.assertIn(("side2", True), claims) + self.assertIn(("side3", True), claims) - # releasing the second side frees the nameplate, and adds usage - app.release_nameplate(nameplate_id, "side2", 6) - row = self._nameplate(app, nameplate_id) - self.assertEqual(row, None) + # release the second side + app.release_nameplate(name, "side2", 6) + np_row, side_rows = self._nameplate(app, name) + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side1", False), claims) + self.assertIn(("side2", False), claims) + self.assertIn(("side3", True), claims) + + # releasing the third side frees the nameplate, and adds usage + app.release_nameplate(name, "side3", 7) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(np_row, None) usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone() self.assertEqual(usage["app_id"], "appid") self.assertEqual(usage["started"], 0) self.assertEqual(usage["waiting_time"], 3) - self.assertEqual(usage["total_time"], 6) + self.assertEqual(usage["total_time"], 7) self.assertEqual(usage["result"], "crowded") def _mailbox(self, app, mailbox_id): - return app._db.execute("SELECT * FROM `mailboxes`" - " WHERE `app_id`='appid' AND `id`=?", - (mailbox_id,)).fetchone() + mb_row = app._db.execute("SELECT * FROM `mailboxes`" + " WHERE `app_id`='appid' AND `id`=?", + (mailbox_id,)).fetchone() + if not mb_row: + return None, None + side_rows = app._db.execute("SELECT * FROM `mailbox_sides`" + " WHERE `mailbox_id`=?", + (mailbox_id,)).fetchall() + return mb_row, side_rows def test_mailbox(self): app = self._rendezvous.get_app("appid") mailbox_id = "mid" m1 = app.open_mailbox(mailbox_id, "side1", 0) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], None) + mb_row, side_rows = self._mailbox(app, mailbox_id) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side1") + self.assertEqual(side_rows[0]["added"], 0) # opening the same mailbox twice, by the same side, gets the same - # object + # object, and does not update the "added" timestamp self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side1", 1)) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], None) - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], None) + mb_row, side_rows = self._mailbox(app, mailbox_id) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side1") + self.assertEqual(side_rows[0]["added"], 0) # opening a second side gets the same object, and adds a new claim self.assertIdentical(m1, app.open_mailbox(mailbox_id, "side2", 2)) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], False) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 2) + mb_row, side_rows = self._mailbox(app, mailbox_id) + self.assertEqual(len(side_rows), 2) + adds = [(row["side"], row["added"]) for row in side_rows] + self.assertIn(("side1", 0), adds) + self.assertIn(("side2", 2), adds) # a third open marks it as crowded self.assertRaises(rendezvous.CrowdedError, app.open_mailbox, mailbox_id, "side3", 3) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], True) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 2) + mb_row, side_rows = self._mailbox(app, mailbox_id) + self.assertEqual(len(side_rows), 3) + m1.close("side3", "company", 4) # closing a side that never claimed the mailbox is ignored m1.close("side4", "mood", 4) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side1") - self.assertEqual(row["side2"], "side2") - self.assertEqual(row["crowded"], True) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 2) + mb_row, side_rows = self._mailbox(app, mailbox_id) + self.assertEqual(len(side_rows), 3) # closing one side leaves the second claim m1.close("side1", "mood", 5) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) - self.assertEqual(row["crowded"], True) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 2) + mb_row, side_rows = self._mailbox(app, mailbox_id) + sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows] + self.assertIn(("side1", False, "mood"), sides) + self.assertIn(("side2", True, None), sides) + self.assertIn(("side3", False, "company"), sides) - # closing one side multiple is ignored + # closing one side multiple times is ignored m1.close("side1", "mood", 6) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) - self.assertEqual(row["crowded"], True) - self.assertEqual(row["started"], 0) - self.assertEqual(row["second"], 2) + mb_row, side_rows = self._mailbox(app, mailbox_id) + sides = [(row["side"], row["opened"], row["mood"]) for row in side_rows] + self.assertIn(("side1", False, "mood"), sides) + self.assertIn(("side2", True, None), sides) + self.assertIn(("side3", False, "company"), sides) l1 = []; stop1 = []; stop1_f = lambda: stop1.append(True) m1.add_listener("handle1", l1.append, stop1_f) @@ -215,8 +223,8 @@ class Server(ServerBase, unittest.TestCase): m1.close("side2", "mood", 7) self.assertEqual(stop1, [True]) - row = self._mailbox(app, mailbox_id) - self.assertEqual(row, None) + mb_row, side_rows = self._mailbox(app, mailbox_id) + self.assertEqual(mb_row, None) usage = app._db.execute("SELECT * FROM `mailbox_usage`").fetchone() self.assertEqual(usage["app_id"], "appid") self.assertEqual(usage["started"], 0) @@ -287,32 +295,36 @@ class Server(ServerBase, unittest.TestCase): class Prune(unittest.TestCase): + def _get_mailbox_updated(self, app, mbox_id): + row = app._db.execute("SELECT * FROM `mailboxes` WHERE" + " `app_id`=? AND `id`=?", + (app._app_id, mbox_id)).fetchone() + return row["updated"] + + def test_update(self): + db = get_db(":memory:") + rv = rendezvous.Rendezvous(db, None, None) + app = rv.get_app("appid") + mbox_id = "mbox1" + app.open_mailbox(mbox_id, "side1", 1) + self.assertEqual(self._get_mailbox_updated(app, mbox_id), 1) + + mb = app.open_mailbox(mbox_id, "side2", 2) + self.assertEqual(self._get_mailbox_updated(app, mbox_id), 2) + + sm = SidedMessage("side1", "phase", "body", 3, "msgid") + mb.add_message(sm) + self.assertEqual(self._get_mailbox_updated(app, mbox_id), 3) + def test_apps(self): rv = rendezvous.Rendezvous(get_db(":memory:"), None, None) app = rv.get_app("appid") app.allocate_nameplate("side", 121) app.prune = mock.Mock() - rv.prune(now=123, old=122) + rv.prune_all_apps(now=123, old=122) self.assertEqual(app.prune.mock_calls, [mock.call(123, 122)]) - def test_active(self): - rv = rendezvous.Rendezvous(get_db(":memory:"), None, None) - app = rv.get_app("appid1") - self.assertFalse(app.is_active()) - - mb = app.open_mailbox("mbid", "side1", 0) - self.assertFalse(mb.is_active()) - self.assertFalse(app.is_active()) - - mb.add_listener("handle", None, None) - self.assertTrue(mb.is_active()) - self.assertTrue(app.is_active()) - - mb.remove_listener("handle") - self.assertFalse(mb.is_active()) - self.assertFalse(app.is_active()) - - def test_basic(self): + def test_nameplates(self): db = get_db(":memory:") rv = rendezvous.Rendezvous(db, None, 3600) @@ -320,14 +332,11 @@ class Prune(unittest.TestCase): #OLD = "old"; NEW = "new" #when = {OLD: 1, NEW: 60} new_nameplates = set() - new_mailboxes = set() - new_messages = set() APPID = "appid" app = rv.get_app(APPID) - # Exercise the first-vs-second newness tests. These nameplates have - # no mailbox. + # Exercise the first-vs-second newness tests app.claim_nameplate("np-1", "side1", 1) app.claim_nameplate("np-2", "side1", 1) app.claim_nameplate("np-2", "side2", 2) @@ -340,7 +349,28 @@ class Prune(unittest.TestCase): app.claim_nameplate("np-5", "side2", 61) new_nameplates.add("np-5") - # same for mailboxes + rv.prune_all_apps(now=123, old=50) + + nameplates = set([row["name"] for row in + db.execute("SELECT * FROM `nameplates`").fetchall()]) + self.assertEqual(new_nameplates, nameplates) + mailboxes = set([row["id"] for row in + db.execute("SELECT * FROM `mailboxes`").fetchall()]) + self.assertEqual(len(new_nameplates), len(mailboxes)) + + def test_mailboxes(self): + db = get_db(":memory:") + rv = rendezvous.Rendezvous(db, None, 3600) + + # timestamps <=50 are "old", >=51 are "new" + #OLD = "old"; NEW = "new" + #when = {OLD: 1, NEW: 60} + new_mailboxes = set() + + APPID = "appid" + app = rv.get_app(APPID) + + # Exercise the first-vs-second newness tests app.open_mailbox("mb-11", "side1", 1) app.open_mailbox("mb-12", "side1", 1) app.open_mailbox("mb-12", "side2", 2) @@ -353,37 +383,26 @@ class Prune(unittest.TestCase): app.open_mailbox("mb-15", "side2", 61) new_mailboxes.add("mb-15") - rv.prune(now=123, old=50) + rv.prune_all_apps(now=123, old=50) - nameplates = set([row["id"] for row in - db.execute("SELECT * FROM `nameplates`").fetchall()]) - self.assertEqual(new_nameplates, nameplates) mailboxes = set([row["id"] for row in db.execute("SELECT * FROM `mailboxes`").fetchall()]) self.assertEqual(new_mailboxes, mailboxes) - messages = set([row["msg_id"] for row in - db.execute("SELECT * FROM `messages`").fetchall()]) - self.assertEqual(new_messages, messages) def test_lots(self): OLD = "old"; NEW = "new" - for nameplate in [None, OLD, NEW]: - for mailbox in [None, OLD, NEW]: - listeners = [False] - if mailbox is not None: - listeners = [False, True] - for has_listeners in listeners: - for messages in [None, OLD, NEW]: - self.one(nameplate, mailbox, has_listeners, messages) + for nameplate in [False, True]: + for mailbox in [OLD, NEW]: + for has_listeners in [False, True]: + self.one(nameplate, mailbox, has_listeners) - # def test_one(self): - # # to debug specific problems found by test_lots - # self.one(None, "old", False, 'new') + def test_one(self): + # to debug specific problems found by test_lots + self.one(None, "new", False) - def one(self, nameplate, mailbox, has_listeners, messages): - desc = ("nameplate=%s, mailbox=%s, has_listeners=%s," - " messages=%s" % - (nameplate, mailbox, has_listeners, messages)) + def one(self, nameplate, mailbox, has_listeners): + desc = ("nameplate=%s, mailbox=%s, has_listeners=%s" % + (nameplate, mailbox, has_listeners)) log.msg(desc) db = get_db(":memory:") @@ -396,50 +415,30 @@ class Prune(unittest.TestCase): when = {OLD: 1, NEW: 60} nameplate_survives = False mailbox_survives = False - messages_survive = False mbid = "mbid" - if nameplate is not None: - app.claim_nameplate("npid", "side1", when[nameplate], - _test_mailbox_id=mbid) - if mailbox is not None: - mb = app.open_mailbox(mbid, "side1", when[mailbox]) - else: - # We might want a Mailbox, because that's the easiest way to add - # a "messages" row, but we can't use app.open_mailbox() because - # that modifies both the "mailboxes" table and app._mailboxes, - # and sometimes we're testing what happens when there are - # messages but not a mailbox - mb = Mailbox(app, db, APPID, mbid) - # we need app._mailboxes to know about this, because that's - # where it looks to find listeners - app._mailboxes[mbid] = mb + if nameplate: + mbid = app.claim_nameplate("npid", "side1", when[mailbox]) + mb = app.open_mailbox(mbid, "side1", when[mailbox]) - if messages is not None: - sm = SidedMessage("side1", "phase", "body", when[messages], - "msgid") - mb.add_message(sm) + # the pruning algorithm doesn't care about the age of messages, + # because mailbox.updated is always updated each time we add a + # message + sm = SidedMessage("side1", "phase", "body", when[mailbox], "msgid") + mb.add_message(sm) if has_listeners: mb.add_listener("handle", None, None) - if mailbox is None and messages is not None: - # orphaned messages, even new ones, can't keep a nameplate alive - messages = None - messages_survive = False - - if (nameplate == NEW or mailbox == NEW - or has_listeners or messages == NEW): - if nameplate is not None: + if (mailbox == NEW or has_listeners): + if nameplate: nameplate_survives = True - if mailbox is not None: - mailbox_survives = True - if messages is not None: - messages_survive = True + mailbox_survives = True + messages_survive = mailbox_survives - rv.prune(now=123, old=50) + rv.prune_all_apps(now=123, old=50) - nameplates = set([row["id"] for row in + nameplates = set([row["name"] for row in db.execute("SELECT * FROM `nameplates`").fetchall()]) self.assertEqual(nameplate_survives, bool(nameplates), ("nameplate", nameplate_survives, nameplates, desc)) @@ -485,6 +484,14 @@ class WSClient(websocket.WebSocketClientProtocol): return self.events.append(event) + def close(self): + self.d = defer.Deferred() + self.transport.loseConnection() + return self.d + def onClose(self, wasClean, code, reason): + if self.d: + self.d.callback((wasClean, code, reason)) + def next_event(self): assert not self.d if self.events: @@ -695,6 +702,18 @@ class WebSocketAPI(ServerBase, unittest.TestCase): nids.add(n["id"]) self.assertEqual(nids, set([nameplate_id1, "np2"])) + def _nameplate(self, app, name): + np_row = app._db.execute("SELECT * FROM `nameplates`" + " WHERE `app_id`='appid' AND `name`=?", + (name,)).fetchone() + if not np_row: + return None, None + npid = np_row["id"] + side_rows = app._db.execute("SELECT * FROM `nameplate_sides`" + " WHERE `nameplates_id`=?", + (npid,)).fetchall() + return np_row, side_rows + @inlineCallbacks def test_allocate(self): c1 = yield self.make_client() @@ -710,11 +729,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase): c1.send("allocate") m = yield c1.next_non_ack() self.assertEqual(m["type"], "allocated") - nameplate_id = m["nameplate"] + name = m["nameplate"] nids = app.get_nameplate_ids() self.assertEqual(len(nids), 1) - self.assertEqual(nameplate_id, list(nids)[0]) + self.assertEqual(name, list(nids)[0]) c1.send("allocate") err = yield c1.next_non_ack() @@ -722,13 +741,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(err["error"], "you already allocated one, don't be greedy") - c1.send("claim", nameplate=nameplate_id) # allocate+claim is ok + c1.send("claim", nameplate=name) # allocate+claim is ok yield c1.sync() - row = app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `id`=?", - (nameplate_id,)).fetchone() - self.assertEqual(row["side1"], "side") - self.assertEqual(row["side2"], None) + np_row, side_rows = self._nameplate(app, name) + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side") @inlineCallbacks def test_claim(self): @@ -751,12 +768,15 @@ class WebSocketAPI(ServerBase, unittest.TestCase): nids = app.get_nameplate_ids() self.assertEqual(len(nids), 1) self.assertEqual("np1", list(nids)[0]) + np_row, side_rows = self._nameplate(app, "np1") + self.assertEqual(len(side_rows), 1) + self.assertEqual(side_rows[0]["side"], "side") - # claiming a nameplate will assign a random mailbox id, but won't - # create the mailbox itself + # claiming a nameplate assigns a random mailbox id and creates the + # mailbox row mailboxes = app._db.execute("SELECT * FROM `mailboxes`" " WHERE `app_id`='appid'").fetchall() - self.assertEqual(len(mailboxes), 0) + self.assertEqual(len(mailboxes), 1) @inlineCallbacks def test_claim_crowded(self): @@ -796,10 +816,10 @@ class WebSocketAPI(ServerBase, unittest.TestCase): m = yield c1.next_non_ack() self.assertEqual(m["type"], "released") - row = app._db.execute("SELECT * FROM `nameplates`" - " WHERE `app_id`='appid' AND `id`='np1'").fetchone() - self.assertEqual(row["side1"], "side2") - self.assertEqual(row["side2"], None) + np_row, side_rows = self._nameplate(app, "np1") + claims = [(row["side"], row["claimed"]) for row in side_rows] + self.assertIn(("side", False), claims) + self.assertIn(("side2", True), claims) c1.send("release") # no longer claimed err = yield c1.next_non_ack() @@ -828,6 +848,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase): m = yield c1.next_non_ack() self.assertEqual(m["type"], "message") self.assertEqual(m["body"], "body") + self.assertTrue(mb1.has_listeners()) mb1.add_message(SidedMessage(side="side2", phase="phase2", body="body2", server_rx=0, @@ -881,6 +902,7 @@ class WebSocketAPI(ServerBase, unittest.TestCase): c1 = yield self.make_client() yield c1.next_non_ack() c1.send("bind", appid="appid", side="side") + app = self._rendezvous.get_app("appid") c1.send("close", mood="mood") # must open first err = yield c1.next_non_ack() @@ -888,72 +910,103 @@ class WebSocketAPI(ServerBase, unittest.TestCase): self.assertEqual(err["error"], "must open mailbox before closing") c1.send("open", mailbox="mb1") + yield c1.sync() + mb1 = app._mailboxes["mb1"] + self.assertTrue(mb1.has_listeners()) + c1.send("close", mood="mood") m = yield c1.next_non_ack() self.assertEqual(m["type"], "closed") + self.assertFalse(mb1.has_listeners()) c1.send("close", mood="mood") # already closed err = yield c1.next_non_ack() self.assertEqual(err["type"], "error") self.assertEqual(err["error"], "must open mailbox before closing") + @inlineCallbacks + def test_disconnect(self): + c1 = yield self.make_client() + yield c1.next_non_ack() + c1.send("bind", appid="appid", side="side") + app = self._rendezvous.get_app("appid") + + c1.send("open", mailbox="mb1") + yield c1.sync() + mb1 = app._mailboxes["mb1"] + self.assertTrue(mb1.has_listeners()) + + yield c1.close() + # wait for the server to notice the socket has closed + started = time.time() + while mb1.has_listeners() and (time.time()-started < 5.0): + d = defer.Deferred() + reactor.callLater(0.01, d.callback, None) + yield d + self.assertFalse(mb1.has_listeners()) + class Summary(unittest.TestCase): def test_mailbox(self): app = rendezvous.AppNamespace(None, None, False, None) # starts at time 1, maybe gets second open at time 3, closes at 5 - base_row = {"started": 1, "second": None, - "first_mood": None, "crowded": False} - def summ(num_sides, second_mood=None, pruned=False, **kwargs): - row = base_row.copy() - row.update(kwargs) - return app._summarize_mailbox(row, num_sides, second_mood, 5, - pruned) + def s(rows, pruned=False): + return app._summarize_mailbox(rows, 5, pruned) - self.assertEqual(summ(1), Usage(1, None, 4, "lonely")) - self.assertEqual(summ(1, "lonely"), Usage(1, None, 4, "lonely")) - self.assertEqual(summ(1, "errory"), Usage(1, None, 4, "errory")) - self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, "crowded")) + rows = [dict(added=1)] + self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) + rows = [dict(added=1, mood="lonely")] + self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) + rows = [dict(added=1, mood="errory")] + self.assertEqual(s(rows), Usage(1, None, 4, "errory")) + rows = [dict(added=1, mood=None)] + self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney")) + rows = [dict(added=1, mood="happy")] + self.assertEqual(s(rows, pruned=True), Usage(1, None, 4, "pruney")) - self.assertEqual(summ(2, first_mood="happy", - second=3, second_mood="happy"), - Usage(1, 2, 4, "happy")) + rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")] + self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - self.assertEqual(summ(2, first_mood="errory", - second=3, second_mood="happy"), - Usage(1, 2, 4, "errory")) + rows = [dict(added=1, mood="errory"), dict(added=3, mood="happy")] + self.assertEqual(s(rows), Usage(1, 2, 4, "errory")) - self.assertEqual(summ(2, first_mood="happy", - second=3, second_mood="errory"), - Usage(1, 2, 4, "errory")) + rows = [dict(added=1, mood="happy"), dict(added=3, mood="errory")] + self.assertEqual(s(rows), Usage(1, 2, 4, "errory")) - self.assertEqual(summ(2, first_mood="scary", - second=3, second_mood="happy"), - Usage(1, 2, 4, "scary")) + rows = [dict(added=1, mood="scary"), dict(added=3, mood="happy")] + self.assertEqual(s(rows), Usage(1, 2, 4, "scary")) - self.assertEqual(summ(2, first_mood="scary", - second=3, second_mood="errory"), - Usage(1, 2, 4, "scary")) + rows = [dict(added=1, mood="scary"), dict(added=3, mood="errory")] + self.assertEqual(s(rows), Usage(1, 2, 4, "scary")) - self.assertEqual(summ(2, first_mood="happy", second=3, pruned=True), - Usage(1, 2, 4, "pruney")) + rows = [dict(added=1, mood="happy"), dict(added=3, mood=None)] + self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) + rows = [dict(added=1, mood="happy"), dict(added=3, mood="happy")] + self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) + + rows = [dict(added=1), dict(added=3), dict(added=4)] + self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) + + rows = [dict(added=1), dict(added=3), dict(added=4)] + self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "crowded")) def test_nameplate(self): a = rendezvous.AppNamespace(None, None, False, None) # starts at time 1, maybe gets second open at time 3, closes at 5 - base_row = {"started": 1, "second": None, "crowded": False} - def summ(num_sides, pruned=False, **kwargs): - row = base_row.copy() - row.update(kwargs) - return a._summarize_nameplate_usage(row, 5, pruned) + def s(rows, pruned=False): + return a._summarize_nameplate_usage(rows, 5, pruned) - self.assertEqual(summ(1), Usage(1, None, 4, "lonely")) - self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, "crowded")) + rows = [dict(added=1)] + self.assertEqual(s(rows), Usage(1, None, 4, "lonely")) + rows = [dict(added=1), dict(added=3)] + self.assertEqual(s(rows), Usage(1, 2, 4, "happy")) - self.assertEqual(summ(2, second=3), Usage(1, 2, 4, "happy")) + rows = [dict(added=1), dict(added=3)] + self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney")) + + rows = [dict(added=1), dict(added=3), dict(added=4)] + self.assertEqual(s(rows), Usage(1, 2, 4, "crowded")) - self.assertEqual(summ(2, second=3, pruned=True), - Usage(1, 2, 4, "pruney")) def test_blur(self): db = get_db(":memory:") @@ -961,14 +1014,14 @@ class Summary(unittest.TestCase): APPID = "appid" app = rv.get_app(APPID) app.claim_nameplate("npid", "side1", 10) # start time is 10 - rv.prune(now=123, old=50) + rv.prune_all_apps(now=123, old=50) # start time should be rounded to top of the hour (blur_usage=3600) row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() self.assertEqual(row["started"], 0) app = rv.get_app(APPID) app.open_mailbox("mbid", "side1", 20) # start time is 20 - rv.prune(now=123, old=50) + rv.prune_all_apps(now=123, old=50) row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() self.assertEqual(row["started"], 0) @@ -978,13 +1031,15 @@ class Summary(unittest.TestCase): APPID = "appid" app = rv.get_app(APPID) app.claim_nameplate("npid", "side1", 10) # start time is 10 - rv.prune(now=123, old=50) + rv.prune_all_apps(now=123, old=50) row = db.execute("SELECT * FROM `nameplate_usage`").fetchone() self.assertEqual(row["started"], 10) + db.execute("DELETE FROM `mailbox_usage`") + db.commit() app = rv.get_app(APPID) app.open_mailbox("mbid", "side1", 20) # start time is 20 - rv.prune(now=123, old=50) + rv.prune_all_apps(now=123, old=50) row = db.execute("SELECT * FROM `mailbox_usage`").fetchone() self.assertEqual(row["started"], 20)