magic-wormhole/src/wormhole/server/rendezvous.py

498 lines
20 KiB
Python
Raw Normal View History

from __future__ import print_function
2016-05-20 01:09:17 +00:00
import os, time, random, base64
from collections import namedtuple
from twisted.python import log
from twisted.application import service, internet
2015-02-12 02:13:54 +00:00
SECONDS = 1.0
MINUTE = 60*SECONDS
HOUR = 60*MINUTE
DAY = 24*HOUR
2015-02-12 02:13:54 +00:00
MB = 1000*1000
2015-05-05 01:19:40 +00:00
CHANNEL_EXPIRATION_TIME = 3*DAY
EXPIRATION_CHECK_PERIOD = 2*HOUR
2016-05-18 07:16:46 +00:00
def get_sides(row):
return set([s for s in [row["side1"], row["side2"]] if s])
2016-05-19 21:18:49 +00:00
def make_sides(sides):
2016-05-18 07:16:46 +00:00
return list(sides) + [None] * (2 - len(sides))
def generate_mailbox_id():
2016-05-20 19:12:07 +00:00
return base64.b32encode(os.urandom(8)).lower().strip(b"=").decode("ascii")
2016-05-18 07:16:46 +00:00
2016-05-20 01:09:17 +00:00
SideResult = namedtuple("SideResult", ["changed", "empty", "side1", "side2"])
Unchanged = SideResult(changed=False, empty=False, side1=None, side2=None)
class CrowdedError(Exception):
pass
def add_side(row, new_side):
old_sides = [s for s in [row["side1"], row["side2"]] if s]
2016-05-20 02:55:11 +00:00
assert old_sides
2016-05-20 01:09:17 +00:00
if new_side in old_sides:
return Unchanged
if len(old_sides) == 2:
raise CrowdedError("too many sides for this thing")
return SideResult(changed=True, empty=False,
side1=old_sides[0], side2=new_side)
def remove_side(row, side):
old_sides = [s for s in [row["side1"], row["side2"]] if s]
if side not in old_sides:
return Unchanged
remaining_sides = old_sides[:]
remaining_sides.remove(side)
if remaining_sides:
return SideResult(changed=True, empty=False, side1=remaining_sides[0],
side2=None)
return SideResult(changed=True, empty=True, side1=None, side2=None)
Usage = namedtuple("Usage", ["started", "waiting_time", "total_time", "result"])
TransitUsage = namedtuple("TransitUsage",
["started", "waiting_time", "total_time",
"total_bytes", "result"])
2016-05-18 07:16:46 +00:00
class Mailbox:
2016-05-20 01:09:17 +00:00
def __init__(self, app, db, blur_usage, log_requests, app_id, mailbox_id):
self._app = app
self._db = db
self._blur_usage = blur_usage
self._log_requests = log_requests
2016-05-18 07:16:46 +00:00
self._app_id = app_id
2016-05-20 01:09:17 +00:00
self._mailbox_id = mailbox_id
self._listeners = {} # handle -> (send_f, stop_f)
# "handle" is a hashable object, for deregistration
# send_f() takes a JSONable object, stop_f() has no args
2016-05-20 01:09:17 +00:00
def open(self, side, when):
# requires caller to db.commit()
assert isinstance(side, type(u"")), type(side)
db = self._db
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id)).fetchone()
try:
sr = add_side(row, side)
except CrowdedError:
db.execute("UPDATE `mailboxes` SET `crowded`=?"
" WHERE `app_id`=? AND `id`=?",
(True, self._app_id, self._mailbox_id))
db.commit()
raise
if sr.changed:
db.execute("UPDATE `mailboxes` SET"
" `side1`=?, `side2`=?, `second`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when,
self._app_id, self._mailbox_id))
def get_messages(self):
messages = []
db = self._db
for row in db.execute("SELECT * FROM `messages`"
2016-05-20 01:09:17 +00:00
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` ASC",
2016-05-20 01:09:17 +00:00
(self._app_id, self._mailbox_id)).fetchall():
messages.append({"phase": row["phase"], "body": row["body"],
2016-05-20 02:55:11 +00:00
"server_rx": row["server_rx"], "id": row["msg_id"]})
return messages
def add_listener(self, handle, send_f, stop_f):
self._listeners[handle] = (send_f, stop_f)
2016-05-06 01:21:06 +00:00
return self.get_messages()
def remove_listener(self, handle):
self._listeners.pop(handle)
def broadcast_message(self, phase, body, server_rx, msgid):
for (send_f, stop_f) in self._listeners.values():
send_f({"phase": phase, "body": body,
"server_rx": server_rx, "id": msgid})
def _add_message(self, side, phase, body, server_rx, msgid):
db = self._db
db.execute("INSERT INTO `messages`"
2016-05-20 01:09:17 +00:00
" (`app_id`, `mailbox_id`, `side`, `phase`, `body`,"
2016-05-20 02:55:11 +00:00
" `server_rx`, `msg_id`)"
" VALUES (?,?,?,?,?, ?,?)",
2016-05-20 01:09:17 +00:00
(self._app_id, self._mailbox_id, side, phase, body,
server_rx, msgid))
db.commit()
def add_message(self, side, phase, body, server_rx, msgid):
self._add_message(side, phase, body, server_rx, msgid)
self.broadcast_message(phase, body, server_rx, msgid)
2016-05-06 01:44:56 +00:00
return self.get_messages() # for rendezvous_web.py POST /add
2016-05-20 01:09:17 +00:00
def close(self, side, mood, when):
assert isinstance(side, type(u"")), type(side)
db = self._db
2016-05-20 01:09:17 +00:00
row = db.execute("SELECT * FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id)).fetchone()
if not row:
return
sr = remove_side(row, side)
if sr.empty:
2016-05-20 02:55:11 +00:00
rows = db.execute("SELECT DISTINCT(`side`) FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
2016-05-20 01:09:17 +00:00
(self._app_id, self._mailbox_id)).fetchall()
num_sides = len(rows)
self._summarize_and_store(row, num_sides, mood, when, pruned=False)
self._delete()
db.commit()
elif sr.changed:
db.execute("UPDATE `mailboxes`"
" SET `side1`=?, `side2`=?, `first_mood`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, mood,
self._app_id, self._mailbox_id))
db.commit()
def _delete(self):
# requires caller to db.commit()
self._db.execute("DELETE FROM `mailboxes`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, self._mailbox_id))
self._db.execute("DELETE FROM `messages`"
" WHERE `app_id`=? AND `mailbox_id`=?",
(self._app_id, self._mailbox_id))
# Shut down any listeners, just in case they're still lingering
# around.
for (send_f, stop_f) in self._listeners.values():
stop_f()
self._app.free_mailbox(self._mailbox_id)
def _summarize_and_store(self, row, num_sides, second_mood, delete_time,
pruned):
u = self._summarize(row, num_sides, second_mood, delete_time, pruned)
self._db.execute("INSERT INTO `mailbox_usage`"
" (`app_id`, "
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
def _summarize(self, row, num_sides, second_mood, delete_time, pruned):
started = row["started"]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
2016-05-20 02:55:11 +00:00
if num_sides == 0:
2016-05-20 01:09:17 +00:00
result = u"quiet"
2016-05-20 02:55:11 +00:00
elif num_sides == 1:
2016-05-20 01:09:17 +00:00
result = u"lonely"
else:
result = u"happy"
moods = set([row["first_mood"], second_mood])
if u"lonely" in moods:
result = u"lonely"
if u"errory" in moods:
result = u"errory"
if u"scary" in moods:
result = u"scary"
if pruned:
result = u"pruney"
if row["crowded"]:
result = u"crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def is_idle(self):
if self._listeners:
return False
c = self._db.execute("SELECT `server_rx` FROM `messages`"
2016-05-20 01:09:17 +00:00
" WHERE `app_id`=? AND `mailbox_id`=?"
" ORDER BY `server_rx` DESC LIMIT 1",
2016-05-20 01:09:17 +00:00
(self._app_id, self._mailbox_id))
rows = c.fetchall()
if not rows:
return True
old = time.time() - CHANNEL_EXPIRATION_TIME
if rows[0]["server_rx"] < old:
return True
return False
def _shutdown(self):
# used at test shutdown to accelerate client disconnects
for (send_f, stop_f) in self._listeners.values():
stop_f()
class AppNamespace:
2016-05-18 07:16:46 +00:00
def __init__(self, db, welcome, blur_usage, log_requests, app_id):
self._db = db
self._welcome = welcome
self._blur_usage = blur_usage
self._log_requests = log_requests
2016-05-18 07:16:46 +00:00
self._app_id = app_id
2016-05-20 01:09:17 +00:00
self._mailboxes = {}
2016-05-18 07:16:46 +00:00
def get_nameplate_ids(self):
db = self._db
2016-05-18 07:16:46 +00:00
# TODO: filter this to numeric ids?
c = db.execute("SELECT DISTINCT `id` FROM `nameplates`"
" WHERE `app_id`=?", (self._app_id,))
return set([row["id"] for row in c.fetchall()])
2016-05-19 21:18:49 +00:00
def _find_available_nameplate_id(self):
2016-05-18 07:16:46 +00:00
claimed = self.get_nameplate_ids()
for size in range(1,4): # stick to 1-999 for now
available = set()
2016-05-18 07:16:46 +00:00
for id_int in range(10**(size-1), 10**size):
id = u"%d" % id_int
if id not in claimed:
available.add(id)
if available:
return random.choice(list(available))
# ouch, 999 currently claimed. Try random ones for a while.
for tries in range(1000):
2016-05-18 07:16:46 +00:00
id_int = random.randrange(1000, 1000*1000)
id = u"%d" % id_int
if id not in claimed:
return id
raise ValueError("unable to find a free nameplate-id")
2016-05-19 21:18:49 +00:00
def allocate_nameplate(self, side, when):
nameplate_id = self._find_available_nameplate_id()
2016-05-20 02:55:11 +00:00
mailbox_id = self.claim_nameplate(nameplate_id, side, when)
2016-05-19 21:18:49 +00:00
del mailbox_id # ignored, they'll learn it from claim()
return nameplate_id
2016-05-18 07:16:46 +00:00
def claim_nameplate(self, nameplate_id, side, when):
2016-05-19 21:18:49 +00:00
# when we're done:
# * there will be one row for the nameplate
# * side1 or side2 will be populated
# * started or second will be populated
# * a mailbox id will be created, but not a mailbox row
# (ids are randomly unique, so we can defer creation until 'open')
2016-05-18 07:16:46 +00:00
assert isinstance(nameplate_id, type(u"")), type(nameplate_id)
2016-05-19 21:18:49 +00:00
assert isinstance(side, type(u"")), type(side)
2016-05-18 07:16:46 +00:00
db = self._db
2016-05-19 21:18:49 +00:00
row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, nameplate_id)).fetchone()
if row:
mailbox_id = row["mailbox_id"]
2016-05-20 01:09:17 +00:00
try:
sr = add_side(row, side)
except CrowdedError:
db.execute("UPDATE `nameplates` SET `crowded`=?"
2016-05-19 21:18:49 +00:00
" WHERE `app_id`=? AND `id`=?",
2016-05-20 01:09:17 +00:00
(True, self._app_id, nameplate_id))
db.commit()
raise
if sr.changed:
db.execute("UPDATE `nameplates` SET"
" `side1`=?, `side2`=?, `updated`=?, `second`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when, when,
2016-05-19 21:18:49 +00:00
self._app_id, nameplate_id))
2016-05-18 07:16:46 +00:00
else:
if self._log_requests:
log.msg("creating nameplate#%s for app_id %s" %
(nameplate_id, self._app_id))
2016-05-19 21:18:49 +00:00
mailbox_id = generate_mailbox_id()
2016-05-18 07:16:46 +00:00
db.execute("INSERT INTO `nameplates`"
2016-05-20 02:55:11 +00:00
" (`app_id`, `id`, `mailbox_id`, `side1`, `crowded`,"
2016-05-20 01:09:17 +00:00
" `updated`, `started`)"
2016-05-20 02:55:11 +00:00
" VALUES(?,?,?,?,?, ?,?)",
(self._app_id, nameplate_id, mailbox_id, side, False,
2016-05-20 01:09:17 +00:00
when, when))
2016-05-19 21:18:49 +00:00
db.commit()
return mailbox_id
def release_nameplate(self, nameplate_id, side, when):
# when we're done:
# * in the nameplate row, side1 or side2 will be removed
# * if the nameplate is now unused:
# * mailbox.nameplate_closed will be populated
# * the nameplate row will be removed
assert isinstance(nameplate_id, type(u"")), type(nameplate_id)
assert isinstance(side, type(u"")), type(side)
db = self._db
row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, nameplate_id)).fetchone()
if not row:
return
2016-05-20 01:09:17 +00:00
sr = remove_side(row, side)
if sr.empty:
2016-05-19 21:18:49 +00:00
db.execute("DELETE FROM `nameplates`"
" WHERE `app_id`=? AND `id`=?",
(self._app_id, nameplate_id))
2016-05-20 01:09:17 +00:00
self._summarize_nameplate_and_store(row, when, pruned=False)
db.commit()
elif sr.changed:
db.execute("UPDATE `nameplates`"
" SET `side1`=?, `side2`=?, `updated`=?"
" WHERE `app_id`=? AND `id`=?",
(sr.side1, sr.side2, when,
self._app_id, nameplate_id))
db.commit()
def _summarize_nameplate_and_store(self, row, delete_time, pruned):
# requires caller to db.commit()
u = self._summarize_nameplate_usage(row, delete_time, pruned)
self._db.execute("INSERT INTO `nameplate_usage`"
" (`app_id`,"
" `started`, `total_time`, `waiting_time`, `result`)"
" VALUES (?, ?,?,?,?)",
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
def _summarize_nameplate_usage(self, row, delete_time, pruned):
started = row["started"]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
result = u"lonely"
if row["second"]:
result = u"happy"
2016-05-20 02:55:11 +00:00
if pruned:
result = u"pruney"
2016-05-20 01:09:17 +00:00
if row["crowded"]:
result = u"crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
def _prune_nameplate(self, row, delete_time):
# requires caller to db.commit()
db = self._db
db.execute("DELETE FROM `nameplates` WHERE `app_id`=? AND `id`=?",
(self._app_id, row["id"]))
self._summarize_nameplate_and_store(row, delete_time, pruned=True)
# TODO: make a Nameplate object, keep track of when there's a
# websocket that's watching it, don't prune a nameplate that someone
# is watching, even if they started watching a long time ago
def prune_nameplates(self, old):
db = self._db
for row in db.execute("SELECT * FROM `nameplates`"
" WHERE `updated` < ?",
(old,)).fetchall():
self._prune_nameplate(row)
count = db.execute("SELECT COUNT(*) FROM `nameplates`").fetchone()[0]
return count
def open_mailbox(self, mailbox_id, side, when):
assert isinstance(mailbox_id, type(u"")), type(mailbox_id)
db = self._db
if not mailbox_id in self._mailboxes:
if self._log_requests:
log.msg("spawning #%s for app_id %s" % (mailbox_id,
self._app_id))
2016-05-19 21:18:49 +00:00
db.execute("INSERT INTO `mailboxes`"
2016-05-20 02:55:11 +00:00
" (`app_id`, `id`, `side1`, `crowded`, `started`)"
" VALUES(?,?,?,?,?)",
(self._app_id, mailbox_id, side, False, when))
db.commit() # XXX
# mailbox.open() does a SELECT to find the old sides
2016-05-20 01:09:17 +00:00
self._mailboxes[mailbox_id] = Mailbox(self, self._db,
self._blur_usage,
self._log_requests,
self._app_id, mailbox_id)
mailbox = self._mailboxes[mailbox_id]
2016-05-20 02:55:11 +00:00
mailbox.open(side, when)
2016-05-20 01:09:17 +00:00
db.commit()
return mailbox
2016-05-20 01:09:17 +00:00
def free_mailbox(self, mailbox_id):
# called from Mailbox.delete_and_summarize(), which deletes any
# messages
2016-05-20 01:09:17 +00:00
if mailbox_id in self._mailboxes:
self._mailboxes.pop(mailbox_id)
2016-05-20 02:55:11 +00:00
#if self._log_requests:
# log.msg("freed+killed #%s, now have %d DB mailboxes, %d live" %
# (mailbox_id, len(self.get_claimed()), len(self._mailboxes)))
2016-05-20 01:09:17 +00:00
def prune_mailboxes(self, old):
# For now, pruning is logged even if log_requests is False, to debug
# the pruning process, and since pruning is triggered by a timer
2016-05-20 01:09:17 +00:00
# instead of by user action. It does reveal which mailboxes were
# present when the pruning process began, though, so in the log run
# it should do less logging.
log.msg(" channel prune begins")
# a channel is deleted when there are no listeners and there have
# been no messages added in CHANNEL_EXPIRATION_TIME seconds
2016-05-20 01:09:17 +00:00
mailboxes = set(self.get_claimed()) # these have messages
mailboxes.update(self._mailboxes) # these might have listeners
for mailbox_id in mailboxes:
log.msg(" channel prune checking %d" % mailbox_id)
channel = self.get_channel(mailbox_id)
if channel.is_idle():
2016-05-20 01:09:17 +00:00
log.msg(" channel prune expiring %d" % mailbox_id)
channel.delete_and_summarize() # calls self.free_channel
2016-05-20 01:09:17 +00:00
log.msg(" channel prune done, %r left" % (self._mailboxes.keys(),))
return bool(self._mailboxes)
def _shutdown(self):
2016-05-20 01:09:17 +00:00
for channel in self._mailboxes.values():
channel._shutdown()
class Rendezvous(service.MultiService):
def __init__(self, db, welcome, blur_usage):
service.MultiService.__init__(self)
self._db = db
self._welcome = welcome
2016-05-20 01:09:17 +00:00
self._blur_usage = None
log_requests = blur_usage is None
self._log_requests = log_requests
self._apps = {}
t = internet.TimerService(EXPIRATION_CHECK_PERIOD, self.prune)
t.setServiceParent(self)
def get_welcome(self):
return self._welcome
def get_log_requests(self):
return self._log_requests
2016-05-18 07:16:46 +00:00
def get_app(self, app_id):
assert isinstance(app_id, type(u""))
if not app_id in self._apps:
if self._log_requests:
2016-05-18 07:16:46 +00:00
log.msg("spawning app_id %s" % (app_id,))
self._apps[app_id] = AppNamespace(self._db, self._welcome,
self._blur_usage,
2016-05-18 07:16:46 +00:00
self._log_requests, app_id)
return self._apps[app_id]
2016-05-20 01:09:17 +00:00
def prune(self, old=None):
# As with AppNamespace.prune_old_mailboxes, we log for now.
log.msg("beginning app prune")
2016-05-20 01:09:17 +00:00
if old is None:
old = time.time() - CHANNEL_EXPIRATION_TIME
2016-05-18 07:16:46 +00:00
c = self._db.execute("SELECT DISTINCT `app_id` FROM `messages`")
apps = set([row["app_id"] for row in c.fetchall()]) # these have messages
apps.update(self._apps) # these might have listeners
2016-05-18 07:16:46 +00:00
for app_id in apps:
log.msg(" app prune checking %r" % (app_id,))
2016-05-20 01:09:17 +00:00
app = self.get_app(app_id)
still_active = app.prune_nameplates(old) + app.prune_mailboxes(old)
if not still_active:
2016-05-18 07:16:46 +00:00
log.msg("prune pops app %r" % (app_id,))
self._apps.pop(app_id)
log.msg("app prune ends, %d remaining apps" % len(self._apps))
def stopService(self):
# This forcibly boots any clients that are still connected, which
# helps with unit tests that use threads for both clients. One client
# hits an exception, which terminates the test (and .tearDown calls
# stopService on the relay), but the other client (in its thread) is
# still waiting for a message. By killing off all connections, that
# other client gets an error, and exits promptly.
for app in self._apps.values():
app._shutdown()
return service.MultiService.stopService(self)