magic-wormhole/src/wormhole/server/rendezvous.py
2016-06-26 17:49:36 -07:00

625 lines
26 KiB
Python

from __future__ import print_function, unicode_literals
import os, random, base64, collections
from collections import namedtuple
from twisted.python import log
from twisted.application import service
def generate_mailbox_id():
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
class CrowdedError(Exception):
pass
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
TransitUsage = namedtuple("TransitUsage",
["started", "waiting_time", "total_time",
"total_bytes", "result"])
SidedMessage = namedtuple("SidedMessage", ["side", "phase", "body",
"server_rx", "msg_id"])
class Mailbox:
def __init__(self, app, db, app_id, mailbox_id):
self._app = app
self._db = db
self._app_id = app_id
self._mailbox_id = mailbox_id
self._listeners = {} # handle -> (send_f, stop_f)
# "handle" is a hashable object, for deregistration
# send_f() takes a JSONable object, stop_f() has no args
def open(self, side, when):
# requires caller to db.commit()
assert isinstance(side, type("")), type(side)
db = self._db
already = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=? AND `side`=?",
(self._mailbox_id, side)).fetchone()
if not already:
db.execute("INSERT INTO `mailbox_sides`"
" (`mailbox_id`, `opened`, `side`, `added`)"
" VALUES(?,?,?,?)",
(self._mailbox_id, True, side, when))
self._touch(when)
db.commit() # XXX: reconcile the need for this with the comment above
def _touch(self, when):
self._db.execute("UPDATE `mailboxes` SET `updated`=? WHERE `id`=?",
(when, self._mailbox_id))
def get_messages(self):
messages = []
db = self._db
for row in db.execute("SELECT * FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` ASC",
(self._app_id, self._mailbox_id)).fetchall():
sm = SidedMessage(side=row["side"], phase=row["phase"],
body=row["body"], server_rx=row["server_rx"],
msg_id=row["msg_id"])
messages.append(sm)
return messages
def add_listener(self, handle, send_f, stop_f):
#log.msg("add_listener", self._mailbox_id, handle)
self._listeners[handle] = (send_f, stop_f)
#log.msg(" added", len(self._listeners))
return self.get_messages()
def remove_listener(self, handle):
#log.msg("remove_listener", self._mailbox_id, handle)
self._listeners.pop(handle, None)
#log.msg(" removed", len(self._listeners))
def has_listeners(self):
return bool(self._listeners)
def broadcast_message(self, sm):
for (send_f, stop_f) in self._listeners.values():
send_f(sm)
def _add_message(self, sm):
self._db.execute("INSERT INTO `messages`"
" (`app_id`, `mailbox_id`, `side`, `phase`, `body`,"
" `server_rx`, `msg_id`)"
" VALUES (?,?,?,?,?, ?,?)",
(self._app_id, self._mailbox_id, sm.side,
sm.phase, sm.body, sm.server_rx, sm.msg_id))
self._touch(sm.server_rx)
self._db.commit()
def add_message(self, sm):
assert isinstance(sm, SidedMessage)
self._add_message(sm)
self.broadcast_message(sm)
def close(self, side, mood, when):
assert isinstance(side, type("")), type(side)
db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id)).fetchone()
if not row:
return
for_nameplate = row["for_nameplate"]
row = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=? AND `side`=?",
(self._mailbox_id, side)).fetchone()
if not row:
return
db.execute("UPDATE `mailbox_sides` SET `opened`=?, `mood`=?"
" WHERE `mailbox_id`=? AND `side`=?",
(False, mood, self._mailbox_id, side))
db.commit()
# are any sides still open?
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(self._mailbox_id,)).fetchall()
if any([sr["opened"] for sr in side_rows]):
return
# nope. delete and summarize
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
(self._mailbox_id,))
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
(self._mailbox_id,))
db.execute("DELETE FROM `mailboxes` WHERE `id`=?", (self._mailbox_id,))
self._app._summarize_mailbox_and_store(for_nameplate, side_rows,
when, pruned=False)
db.commit()
# Shut down any listeners, just in case they're still lingering
# around.
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._listeners = {}
self._app.free_mailbox(self._mailbox_id)
def _shutdown(self):
# used at test shutdown to accelerate client disconnects
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._listeners = {}
class AppNamespace:
def __init__(self, db, blur_usage, log_requests, app_id):
self._db = db
self._blur_usage = blur_usage
self._log_requests = log_requests
self._app_id = app_id
self._mailboxes = {}
self._nameplate_counts = collections.defaultdict(int)
self._mailbox_counts = collections.defaultdict(int)
def get_nameplate_ids(self):
db = self._db
# TODO: filter this to numeric ids?
c = db.execute("SELECT DISTINCT `name` FROM `nameplates`"
" WHERE `app_id`=?", (self._app_id,))
return set([row["name"] for row in c.fetchall()])
def _find_available_nameplate_id(self):
claimed = self.get_nameplate_ids()
for size in range(1,4): # stick to 1-999 for now
available = set()
for id_int in range(10**(size-1), 10**size):
id = "%d" % id_int
if id not in claimed:
available.add(id)
if available:
return random.choice(list(available))
# ouch, 999 currently claimed. Try random ones for a while.
for tries in range(1000):
id_int = random.randrange(1000, 1000*1000)
id = "%d" % id_int
if id not in claimed:
return id
raise ValueError("unable to find a free nameplate-id")
def allocate_nameplate(self, side, when):
nameplate_id = self._find_available_nameplate_id()
mailbox_id = self.claim_nameplate(nameplate_id, side, when)
del mailbox_id # ignored, they'll learn it from claim()
return nameplate_id
def claim_nameplate(self, name, side, when):
# when we're done:
# * there will be one row for the nameplate
# * there will be one 'side' attached to it, with claimed=True
# * a mailbox id and mailbox row will be created
# * a mailbox 'side' will be attached, with opened=True
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side)
db = self._db
row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, name)).fetchone()
if not row:
if self._log_requests:
log.msg("creating nameplate#%s for app_id %s" %
(name, self._app_id))
mailbox_id = generate_mailbox_id()
self._add_mailbox(mailbox_id, True, side, when) # ensure row exists
sql = ("INSERT INTO `nameplates`"
" (`app_id`, `name`, `mailbox_id`)"
" VALUES(?,?,?)")
npid = db.execute(sql, (self._app_id, name, mailbox_id)
).lastrowid
else:
npid = row["id"]
mailbox_id = row["mailbox_id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
db.execute("INSERT INTO `nameplate_sides`"
" (`nameplates_id`, `claimed`, `side`, `added`)"
" VALUES(?,?,?,?)",
(npid, True, side, when))
db.commit()
self.open_mailbox(mailbox_id, side, when) # may raise CrowdedError
rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?", (npid,)).fetchall()
if len(rows) > 2:
# this line will probably never get hit: any crowding is noticed
# on mailbox_sides first, inside open_mailbox()
raise CrowdedError("too many sides have claimed this nameplate")
return mailbox_id
def release_nameplate(self, name, side, when):
# when we're done:
# * the 'claimed' flag will be cleared on the nameplate_sides row
# * if the nameplate is now unused (no claimed sides):
# * mailbox.nameplate_closed will be populated
# * the nameplate row will be removed
# * the nameplate sides will be removed
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side)
db = self._db
np_row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, name)).fetchone()
if not np_row:
return
npid = np_row["id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
return
db.execute("UPDATE `nameplate_sides` SET `claimed`=?"
" WHERE `nameplates_id`=? AND `side`=?",
(False, npid, side))
db.commit()
# now, are there any remaining claims?
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
claims = [1 for sr in side_rows if sr["claimed"]]
if claims:
return
# delete and summarize
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, when, pruned=False)
db.commit()
def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned):
# requires caller to db.commit()
u = self._summarize_nameplate_usage(side_rows, delete_time, pruned)
self._db.execute("INSERT INTO `nameplate_usage`"
" (`app_id`,"
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
self._nameplate_counts[u.result] += 1
def _summarize_nameplate_usage(self, side_rows, delete_time, pruned):
times = sorted([row["added"] for row in side_rows])
started = times[0]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if len(times) > 1:
waiting_time = times[1] - times[0]
total_time = delete_time - times[0]
result = "lonely"
if len(times) == 2:
result = "happy"
if pruned:
result = "pruney"
if len(times) > 2:
result = "crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def _add_mailbox(self, mailbox_id, for_nameplate, side, when):
assert isinstance(mailbox_id, type("")), type(mailbox_id)
db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, mailbox_id)).fetchone()
if not row:
self._db.execute("INSERT INTO `mailboxes`"
" (`app_id`, `id`, `for_nameplate`, `updated`)"
" VALUES(?,?,?,?)",
(self._app_id, mailbox_id, for_nameplate, when))
# we don't need a commit here, because mailbox.open() only
# does SELECT FROM `mailbox_sides`, not from `mailboxes`
def open_mailbox(self, mailbox_id, side, when):
assert isinstance(mailbox_id, type("")), type(mailbox_id)
self._add_mailbox(mailbox_id, False, side, when) # ensure row exists
db = self._db
if not mailbox_id in self._mailboxes: # ensure Mailbox object exists
if self._log_requests:
log.msg("spawning #%s for app_id %s" % (mailbox_id,
self._app_id))
self._mailboxes[mailbox_id] = Mailbox(self, self._db,
self._app_id, mailbox_id)
mailbox = self._mailboxes[mailbox_id]
# delegate to mailbox.open() to add a row to mailbox_sides, and
# update the mailbox.updated timestamp
mailbox.open(side, when)
db.commit()
rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(mailbox_id,)).fetchall()
if len(rows) > 2:
raise CrowdedError("too many sides have opened this mailbox")
return mailbox
def free_mailbox(self, mailbox_id):
# called from Mailbox.delete_and_summarize(), which deletes any
# messages
if mailbox_id in self._mailboxes:
self._mailboxes.pop(mailbox_id)
#if self._log_requests:
# log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" %
# (mailbox_id, len(self.get_claimed()), len(self._mailboxes)))
def _summarize_mailbox_and_store(self, for_nameplate, side_rows,
delete_time, pruned):
db = self._db
u = self._summarize_mailbox(side_rows, delete_time, pruned)
db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`, `for_nameplate`,"
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?,?, ?,?,?,?)",
(self._app_id, for_nameplate,
u.started, u.total_time, u.waiting_time, u.result))
self._mailbox_counts[u.result] += 1
def _summarize_mailbox(self, side_rows, delete_time, pruned):
times = sorted([row["added"] for row in side_rows])
started = times[0]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if len(times) > 1:
waiting_time = times[1] - times[0]
total_time = delete_time - times[0]
num_sides = len(times)
if num_sides == 0:
result = "quiet"
elif num_sides == 1:
result = "lonely"
else:
result = "happy"
# "mood" is only recorded at close()
moods = [row["mood"] for row in side_rows if row.get("mood")]
if "lonely" in moods:
result = "lonely"
if "errory" in moods:
result = "errory"
if "scary" in moods:
result = "scary"
if pruned:
result = "pruney"
if num_sides > 2:
result = "crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def prune(self, now, old):
# The pruning check runs every 10 minutes, and "old" is defined to be
# 11 minutes ago (unit tests can use different values). The client is
# allowed to disconnect for up to 9 minutes without losing the
# channel (nameplate, mailbox, and messages).
# Each time a client does something, the mailbox.updated field is
# updated with the current timestamp. If a client is subscribed to
# the mailbox when pruning check runs, the "updated" field is also
# updated. After that check, if the "updated" field is "old", the
# channel is deleted.
# For now, pruning is logged even if log_requests is False, to debug
# the pruning process, and since pruning is triggered by a timer
# instead of by user action. It does reveal which mailboxes were
# present when the pruning process began, though, so in the log run
# it should do less logging.
log.msg(" prune begins (%s)" % self._app_id)
db = self._db
modified = False
for mailbox in self._mailboxes.values():
if mailbox.has_listeners():
log.msg("touch %s because listeners" % mailbox._mailbox_id)
mailbox._touch(now)
db.commit() # make sure the updates are visible below
new_mailboxes = set()
old_mailboxes = set()
for row in db.execute("SELECT * FROM `mailboxes` WHERE `app_id`=?",
(self._app_id,)).fetchall():
mailbox_id = row["id"]
log.msg(" 1: age=%s, old=%s, %s" %
(now - row["updated"], now - old, mailbox_id))
if row["updated"] > old:
new_mailboxes.add(mailbox_id)
else:
old_mailboxes.add(mailbox_id)
log.msg(" 2: mailboxes:", new_mailboxes, old_mailboxes)
old_nameplates = set()
for row in db.execute("SELECT * FROM `nameplates` WHERE `app_id`=?",
(self._app_id,)).fetchall():
npid = row["id"]
mailbox_id = row["mailbox_id"]
if mailbox_id in old_mailboxes:
old_nameplates.add(npid)
log.msg(" 3: old_nameplates", old_nameplates)
for npid in old_nameplates:
log.msg(" deleting nameplate", npid)
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, now, pruned=True)
modified = True
# delete all messages for old mailboxes
# delete all old mailboxes
for mailbox_id in old_mailboxes:
log.msg(" deleting mailbox", mailbox_id)
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `id`=?", (mailbox_id,)).fetchone()
for_nameplate = row["for_nameplate"]
side_rows = db.execute("SELECT * FROM `mailbox_sides`"
" WHERE `mailbox_id`=?",
(mailbox_id,)).fetchall()
db.execute("DELETE FROM `messages` WHERE `mailbox_id`=?",
(mailbox_id,))
db.execute("DELETE FROM `mailbox_sides` WHERE `mailbox_id`=?",
(mailbox_id,))
db.execute("DELETE FROM `mailboxes` WHERE `id`=?",
(mailbox_id,))
self._summarize_mailbox_and_store(for_nameplate, side_rows,
now, pruned=True)
modified = True
if modified:
db.commit()
log.msg(" prune complete, modified:", modified)
def get_counts(self):
return (self._nameplate_counts, self._mailbox_counts)
def _shutdown(self):
for channel in self._mailboxes.values():
channel._shutdown()
class Rendezvous(service.MultiService):
def __init__(self, db, welcome, blur_usage):
service.MultiService.__init__(self)
self._db = db
self._welcome = welcome
self._blur_usage = blur_usage
log_requests = blur_usage is None
self._log_requests = log_requests
self._apps = {}
def get_welcome(self):
return self._welcome
def get_log_requests(self):
return self._log_requests
def get_app(self, app_id):
assert isinstance(app_id, type(""))
if not app_id in self._apps:
if self._log_requests:
log.msg("spawning app_id %s" % (app_id,))
self._apps[app_id] = AppNamespace(self._db,
self._blur_usage,
self._log_requests, app_id)
return self._apps[app_id]
def get_all_apps(self):
apps = set()
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `nameplates`").fetchall():
apps.add(row["app_id"])
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `mailboxes`").fetchall():
apps.add(row["app_id"])
for row in self._db.execute("SELECT DISTINCT `app_id`"
" FROM `messages`").fetchall():
apps.add(row["app_id"])
return apps
def prune_all_apps(self, now, old):
# As with AppNamespace.prune_old_mailboxes, we log for now.
log.msg("beginning app prune")
for app_id in sorted(self.get_all_apps()):
log.msg(" app prune checking %r" % (app_id,))
app = self.get_app(app_id)
app.prune(now, old)
log.msg("app prune ends, %d apps" % len(self._apps))
def get_stats(self):
stats = {}
# current status: expected to be zero most of the time
c = stats["active"] = {}
c["apps"] = len(self.get_all_apps())
def q(query, values=()):
row = self._db.execute(query, values).fetchone()
return list(row.values())[0]
c["nameplates_total"] = q("SELECT COUNT() FROM `nameplates`")
# TODO: nameplates with only one side (most of them)
# TODO: nameplates with two sides (very fleeting)
# TODO: nameplates with three or more sides (crowded, unlikely)
c["mailboxes_total"] = q("SELECT COUNT() FROM `mailboxes`")
# TODO: mailboxes with only one side (most of them)
# TODO: mailboxes with two sides (somewhat fleeting, in-transit)
# TODO: mailboxes with three or more sides (unlikely)
c["messages_total"] = q("SELECT COUNT() FROM `messages`")
# usage since last reboot
nameplate_counts = collections.defaultdict(int)
mailbox_counts = collections.defaultdict(int)
for app in self._apps.values():
nc, mc = app.get_counts()
for result, count in nc.items():
nameplate_counts[result] += count
for result, count in mc.items():
mailbox_counts[result] += count
urb = stats["since_reboot"] = {}
urb["nameplate_moods"] = {}
for result, count in nameplate_counts.items():
urb["nameplate_moods"][result] = count
urb["nameplates_total"] = sum(nameplate_counts.values())
urb["mailbox_moods"] = {}
for result, count in mailbox_counts.items():
urb["mailbox_moods"][result] = count
urb["mailboxes_total"] = sum(mailbox_counts.values())
# historical usage (all-time)
u = stats["all_time"] = {}
un = u["nameplate_moods"] = {}
# TODO: there's probably a single SQL query for all this
un["happy"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='happy'")
un["lonely"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='lonely'")
un["pruney"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='pruney'")
un["crowded"] = q("SELECT COUNT() FROM `nameplate_usage`"
" WHERE `result`='crowded'")
u["nameplates_total"] = q("SELECT COUNT() FROM `nameplate_usage`")
um = u["mailbox_moods"] = {}
um["happy"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='happy'")
um["scary"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='scary'")
um["lonely"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='lonely'")
um["quiet"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='quiet'")
um["errory"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='errory'")
um["pruney"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='pruney'")
um["crowded"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `result`='crowded'")
u["mailboxes_total"] = q("SELECT COUNT() FROM `mailbox_usage`")
u["mailboxes_standalone"] = q("SELECT COUNT() FROM `mailbox_usage`"
" WHERE `for_nameplate`=0")
# recent timings (last 100 operations)
# TODO: median/etc of nameplate.total_time
# TODO: median/etc of mailbox.waiting_time (should be the same)
# TODO: median/etc of mailbox.total_time
# other
# TODO: mailboxes without nameplates (needs new DB schema)
return stats
def stopService(self):
# This forcibly boots any clients that are still connected, which
# helps with unit tests that use threads for both clients. One client
# hits an exception, which terminates the test (and .tearDown calls
# stopService on the relay), but the other client (in its thread) is
# still waiting for a message. By killing off all connections, that
# other client gets an error, and exits promptly.
for app in self._apps.values():
app._shutdown()
return service.MultiService.stopService(self)