db: use 'nameplate_sides' instead of cols in 'nameplates'

This commit is contained in:
Brian Warner 2016-06-23 16:26:53 -07:00
parent 355ece3e47
commit 6c725e4a86
3 changed files with 209 additions and 162 deletions

View File

@ -13,23 +13,27 @@ CREATE TABLE `version`
-- nameplates, but the protocol and server allow can use arbitrary strings.
CREATE TABLE `nameplates`
(
`id` INTEGER PRIMARY KEY AUTOINCREMENT,
`app_id` VARCHAR,
`name` VARCHAR,
`mailbox_id` VARCHAR, -- really a foreign key
`side1` VARCHAR, -- side name, or NULL
`side2` VARCHAR, -- side name, or NULL
`request_id` VARCHAR, -- from 'allocate' message, for future deduplication
`crowded` BOOLEAN, -- at some point, three or more sides were involved
`updated` INTEGER, -- time of last activity, used for pruning
-- timing data
`started` INTEGER, -- time when nameplace was opened
`second` INTEGER -- time when second side opened
`updated` INTEGER -- time of last activity, used for pruning
);
CREATE INDEX `nameplates_idx` ON `nameplates` (`app_id`, `name`);
CREATE INDEX `nameplates_updated_idx` ON `nameplates` (`app_id`, `updated`);
CREATE INDEX `nameplates_mailbox_idx` ON `nameplates` (`app_id`, `mailbox_id`);
CREATE INDEX `nameplates_request_idx` ON `nameplates` (`app_id`, `request_id`);
CREATE TABLE `nameplate_sides`
(
`nameplates_id` REFERENCES `nameplates`(`id`),
`claimed` BOOLEAN, -- True after claim(), False after release()
`side` VARCHAR,
`added` INTEGER -- time when this side first claimed the nameplate
);
-- Clients exchange messages through a "mailbox", which has a long (randomly
-- unique) identifier and a queue of messages.
CREATE TABLE `mailboxes`

View File

@ -219,84 +219,96 @@ class AppNamespace:
del mailbox_id # ignored, they'll learn it from claim()
return nameplate_id
def claim_nameplate(self, nameplate_id, side, when, _test_mailbox_id=None):
def claim_nameplate(self, name, side, when, _test_mailbox_id=None):
# when we're done:
# * there will be one row for the nameplate
# * side1 or side2 will be populated
# * started or second will be populated
# * there will be one 'side' attached to it, with claimed=True
# * a mailbox id will be created, but not a mailbox row
# (ids are randomly unique, so we can defer creation until 'open')
assert isinstance(nameplate_id, type("")), type(nameplate_id)
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side)
db = self._db
row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, nameplate_id)).fetchone()
if row:
mailbox_id = row["mailbox_id"]
try:
sr = add_side(row, side)
except CrowdedError:
db.execute("UPDATE `nameplates` SET `crowded`=?"
" WHERE `app_id`=? AND `name`=?",
(True, self._app_id, nameplate_id))
db.commit()
raise
if sr.changed:
db.execute("UPDATE `nameplates` SET"
" `side1`=?, `side2`=?, `updated`=?, `second`=?"
" WHERE `app_id`=? AND `name`=?",
(sr.side1, sr.side2, when, when,
self._app_id, nameplate_id))
else:
(self._app_id, name)).fetchone()
if not row:
if self._log_requests:
log.msg("creating nameplate#%s for app_id %s" %
(nameplate_id, self._app_id))
(name, self._app_id))
if _test_mailbox_id is not None: # for unit tests
mailbox_id = _test_mailbox_id
else:
mailbox_id = generate_mailbox_id()
db.execute("INSERT INTO `nameplates`"
" (`app_id`, `name`, `mailbox_id`, `side1`, `crowded`,"
" `updated`, `started`)"
" VALUES(?,?,?,?,?, ?,?)",
(self._app_id, nameplate_id, mailbox_id, side, False,
when, when))
sql = ("INSERT INTO `nameplates`"
" (`app_id`, `name`, `mailbox_id`, `updated`)"
" VALUES(?,?,?,?)")
npid = db.execute(sql,
(self._app_id, name, mailbox_id, when)
).lastrowid
else:
npid = row["id"]
mailbox_id = row["mailbox_id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
db.execute("INSERT INTO `nameplate_sides`"
" (`nameplates_id`, `claimed`, `side`, `added`)"
" VALUES(?,?,?,?)",
(npid, True, side, when))
db.execute("UPDATE `nameplates` SET `updated`=? WHERE `id`=?",
(when, npid))
db.commit()
rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?", (npid,)).fetchall()
if len(rows) > 2:
raise CrowdedError("too many sides have claimed this nameplate")
return mailbox_id
def release_nameplate(self, nameplate_id, side, when):
def release_nameplate(self, name, side, when):
# when we're done:
# * in the nameplate row, side1 or side2 will be removed
# * if the nameplate is now unused:
# * the 'claimed' flag will be cleared on the nameplate_sides row
# * if the nameplate is now unused (no claimed sides):
# * mailbox.nameplate_closed will be populated
# * the nameplate row will be removed
assert isinstance(nameplate_id, type("")), type(nameplate_id)
# * the nameplate sides will be removed
assert isinstance(name, type("")), type(name)
assert isinstance(side, type("")), type(side)
db = self._db
row = db.execute("SELECT * FROM `nameplates`"
np_row = db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, nameplate_id)).fetchone()
(self._app_id, name)).fetchone()
if not np_row:
return
npid = np_row["id"]
row = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=? AND `side`=?",
(npid, side)).fetchone()
if not row:
return
sr = remove_side(row, side)
if sr.empty:
db.execute("DELETE FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, nameplate_id))
self._summarize_nameplate_and_store(row, when, pruned=False)
db.commit()
elif sr.changed:
db.execute("UPDATE `nameplates`"
" SET `side1`=?, `side2`=?, `updated`=?"
" WHERE `app_id`=? AND `name`=?",
(sr.side1, sr.side2, when,
self._app_id, nameplate_id))
db.execute("UPDATE `nameplate_sides` SET `claimed`=?"
" WHERE `nameplates_id`=? AND `side`=?",
(False, npid, side))
db.commit()
def _summarize_nameplate_and_store(self, row, delete_time, pruned):
# now, are there any remaining claims?
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
claims = [1 for sr in side_rows if sr["claimed"]]
if claims:
return
# delete and summarize
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, when, pruned=False)
db.commit()
def _summarize_nameplate_and_store(self, side_rows, delete_time, pruned):
# requires caller to db.commit()
u = self._summarize_nameplate_usage(row, delete_time, pruned)
u = self._summarize_nameplate_usage(side_rows, delete_time, pruned)
self._db.execute("INSERT INTO `nameplate_usage`"
" (`app_id`,"
" `started`, `total_time`, `waiting_time`, `result`)"
@ -304,20 +316,21 @@ class AppNamespace:
(self._app_id,
u.started, u.total_time, u.waiting_time, u.result))
def _summarize_nameplate_usage(self, row, delete_time, pruned):
started = row["started"]
def _summarize_nameplate_usage(self, side_rows, delete_time, pruned):
times = sorted([row["added"] for row in side_rows])
started = times[0]
if self._blur_usage:
started = self._blur_usage * (started // self._blur_usage)
waiting_time = None
if row["second"]:
waiting_time = row["second"] - row["started"]
total_time = delete_time - row["started"]
if len(times) > 1:
waiting_time = times[1] - times[0]
total_time = delete_time - times[0]
result = "lonely"
if row["second"]:
if len(times) == 2:
result = "happy"
if pruned:
result = "pruney"
if row["crowded"]:
if len(times) > 2:
result = "crowded"
return Usage(started=started, waiting_time=waiting_time,
total_time=total_time, result=result)
@ -470,13 +483,11 @@ class AppNamespace:
# if it is old:
# if the nameplate is new:
# classify mailbox as new
all_nameplates = {}
all_nameplate_rows = {}
old_nameplate_ids = []
for row in db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`=?",
(self._app_id,)).fetchall():
nameplate_id = row["name"]
all_nameplate_rows[nameplate_id] = row
npid = row["id"]
if row["updated"] > old:
which = NEW
else:
@ -488,20 +499,22 @@ class AppNamespace:
else:
if which == NEW:
all_mailboxes[mailbox_id] = NEW
all_nameplates[nameplate_id] = which
#log.msg(" 6: all_nameplates", all_nameplates, all_nameplate_rows)
if which == OLD:
old_nameplate_ids.append(npid)
#log.msg(" 6: old_nameplate_ids", old_nameplate_ids)
# delete all old nameplates
# invariant check: if there is a linked mailbox, it is old
for nameplate_id, which in all_nameplates.items():
if which == OLD:
log.msg(" deleting nameplate", nameplate_id)
row = all_nameplate_rows[nameplate_id]
self._summarize_nameplate_and_store(row, now, pruned=True)
db.execute("DELETE FROM `nameplates`"
" WHERE `app_id`=? AND `name`=?",
(self._app_id, nameplate_id))
for npid in old_nameplate_ids:
log.msg(" deleting nameplate", npid)
side_rows = db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
db.execute("DELETE FROM `nameplate_sides` WHERE `nameplates_id`=?",
(npid,))
db.execute("DELETE FROM `nameplates` WHERE `id`=?", (npid,))
self._summarize_nameplate_and_store(side_rows, now, pruned=True)
modified = True
# delete all messages for old mailboxes

View File

@ -48,90 +48,106 @@ class Server(ServerBase, unittest.TestCase):
biggest = max(nids)
self.assert_(1000 <= biggest < 1000000, biggest)
def _nameplate(self, app, nameplate_id):
return app._db.execute("SELECT * FROM `nameplates`"
def _nameplate(self, app, name):
np_row = app._db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`='appid' AND `name`=?",
(nameplate_id,)).fetchone()
(name,)).fetchone()
if not np_row:
return None, None
npid = np_row["id"]
side_rows = app._db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
return np_row, side_rows
def test_nameplate(self):
app = self._rendezvous.get_app("appid")
nameplate_id = app.allocate_nameplate("side1", 0)
self.assertEqual(type(nameplate_id), type(""))
nid = int(nameplate_id)
name = app.allocate_nameplate("side1", 0)
self.assertEqual(type(name), type(""))
nid = int(name)
self.assert_(0 < nid < 10, nid)
self.assertEqual(app.get_nameplate_ids(), set([nameplate_id]))
self.assertEqual(app.get_nameplate_ids(), set([name]))
# allocate also does a claim
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side1")
self.assertEqual(row["side2"], None)
self.assertEqual(row["crowded"], False)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], None)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["side"], "side1")
self.assertEqual(side_rows[0]["added"], 0)
mailbox_id = app.claim_nameplate(nameplate_id, "side1", 1)
self.assertEqual(type(mailbox_id), type(""))
# duplicate claims by the same side are combined
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side1")
self.assertEqual(row["side2"], None)
mailbox_id = app.claim_nameplate(name, "side1", 1)
self.assertEqual(type(mailbox_id), type(""))
self.assertEqual(mailbox_id, np_row["mailbox_id"])
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["added"], 0)
self.assertEqual(mailbox_id, np_row["mailbox_id"])
mailbox_id2 = app.claim_nameplate(nameplate_id, "side1", 2)
# and they don't updated the 'added' time
mailbox_id2 = app.claim_nameplate(name, "side1", 2)
self.assertEqual(mailbox_id, mailbox_id2)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side1")
self.assertEqual(row["side2"], None)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], None)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["added"], 0)
# claim by the second side is new
mailbox_id3 = app.claim_nameplate(nameplate_id, "side2", 3)
mailbox_id3 = app.claim_nameplate(name, "side2", 3)
self.assertEqual(mailbox_id, mailbox_id3)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side1")
self.assertEqual(row["side2"], "side2")
self.assertEqual(row["crowded"], False)
self.assertEqual(row["started"], 0)
self.assertEqual(row["second"], 3)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 2)
self.assertEqual(sorted([row["side"] for row in side_rows]),
sorted(["side1", "side2"]))
self.assertIn(("side2", 3),
[(row["side"], row["added"]) for row in side_rows])
# a third claim marks the nameplate as "crowded", but leaves the two
# existing claims alone
# a third claim marks the nameplate as "crowded", and adds a third
# claim (which must be released later), but leaves the two existing
# claims alone
self.assertRaises(rendezvous.CrowdedError,
app.claim_nameplate, nameplate_id, "side3", 0)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side1")
self.assertEqual(row["side2"], "side2")
self.assertEqual(row["crowded"], True)
app.claim_nameplate, name, "side3", 4)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 3)
# releasing a non-existent nameplate is ignored
app.release_nameplate(nameplate_id+"not", "side4", 0)
app.release_nameplate(name+"not", "side4", 0)
# releasing a side that never claimed the nameplate is ignored
app.release_nameplate(nameplate_id, "side4", 0)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side1")
self.assertEqual(row["side2"], "side2")
app.release_nameplate(name, "side4", 0)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 3)
# releasing one side leaves the second claim
app.release_nameplate(nameplate_id, "side1", 5)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side2")
self.assertEqual(row["side2"], None)
app.release_nameplate(name, "side1", 5)
np_row, side_rows = self._nameplate(app, name)
claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertIn(("side1", False), claims)
self.assertIn(("side2", True), claims)
self.assertIn(("side3", True), claims)
# releasing one side multiple times is ignored
app.release_nameplate(nameplate_id, "side1", 5)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row["side1"], "side2")
self.assertEqual(row["side2"], None)
app.release_nameplate(name, "side1", 5)
np_row, side_rows = self._nameplate(app, name)
claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertIn(("side1", False), claims)
self.assertIn(("side2", True), claims)
self.assertIn(("side3", True), claims)
# releasing the second side frees the nameplate, and adds usage
app.release_nameplate(nameplate_id, "side2", 6)
row = self._nameplate(app, nameplate_id)
self.assertEqual(row, None)
# release the second side
app.release_nameplate(name, "side2", 6)
np_row, side_rows = self._nameplate(app, name)
claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertIn(("side1", False), claims)
self.assertIn(("side2", False), claims)
self.assertIn(("side3", True), claims)
# releasing the third side frees the nameplate, and adds usage
app.release_nameplate(name, "side3", 7)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(np_row, None)
usage = app._db.execute("SELECT * FROM `nameplate_usage`").fetchone()
self.assertEqual(usage["app_id"], "appid")
self.assertEqual(usage["started"], 0)
self.assertEqual(usage["waiting_time"], 3)
self.assertEqual(usage["total_time"], 6)
self.assertEqual(usage["total_time"], 7)
self.assertEqual(usage["result"], "crowded")
@ -695,6 +711,18 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
nids.add(n["id"])
self.assertEqual(nids, set([nameplate_id1, "np2"]))
def _nameplate(self, app, name):
np_row = app._db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`='appid' AND `name`=?",
(name,)).fetchone()
if not np_row:
return None, None
npid = np_row["id"]
side_rows = app._db.execute("SELECT * FROM `nameplate_sides`"
" WHERE `nameplates_id`=?",
(npid,)).fetchall()
return np_row, side_rows
@inlineCallbacks
def test_allocate(self):
c1 = yield self.make_client()
@ -710,11 +738,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
c1.send("allocate")
m = yield c1.next_non_ack()
self.assertEqual(m["type"], "allocated")
nameplate_id = m["nameplate"]
name = m["nameplate"]
nids = app.get_nameplate_ids()
self.assertEqual(len(nids), 1)
self.assertEqual(nameplate_id, list(nids)[0])
self.assertEqual(name, list(nids)[0])
c1.send("allocate")
err = yield c1.next_non_ack()
@ -722,13 +750,11 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
self.assertEqual(err["error"],
"you already allocated one, don't be greedy")
c1.send("claim", nameplate=nameplate_id) # allocate+claim is ok
c1.send("claim", nameplate=name) # allocate+claim is ok
yield c1.sync()
row = app._db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`='appid' AND `name`=?",
(nameplate_id,)).fetchone()
self.assertEqual(row["side1"], "side")
self.assertEqual(row["side2"], None)
np_row, side_rows = self._nameplate(app, name)
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["side"], "side")
@inlineCallbacks
def test_claim(self):
@ -751,6 +777,9 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
nids = app.get_nameplate_ids()
self.assertEqual(len(nids), 1)
self.assertEqual("np1", list(nids)[0])
np_row, side_rows = self._nameplate(app, "np1")
self.assertEqual(len(side_rows), 1)
self.assertEqual(side_rows[0]["side"], "side")
# claiming a nameplate will assign a random mailbox id, but won't
# create the mailbox itself
@ -796,10 +825,10 @@ class WebSocketAPI(ServerBase, unittest.TestCase):
m = yield c1.next_non_ack()
self.assertEqual(m["type"], "released")
row = app._db.execute("SELECT * FROM `nameplates`"
" WHERE `app_id`='appid' AND `name`='np1'").fetchone()
self.assertEqual(row["side1"], "side2")
self.assertEqual(row["side2"], None)
np_row, side_rows = self._nameplate(app, "np1")
claims = [(row["side"], row["claimed"]) for row in side_rows]
self.assertIn(("side", False), claims)
self.assertIn(("side2", True), claims)
c1.send("release") # no longer claimed
err = yield c1.next_non_ack()
@ -941,19 +970,20 @@ class Summary(unittest.TestCase):
def test_nameplate(self):
a = rendezvous.AppNamespace(None, None, False, None)
# starts at time 1, maybe gets second open at time 3, closes at 5
base_row = {"started": 1, "second": None, "crowded": False}
def summ(num_sides, pruned=False, **kwargs):
row = base_row.copy()
row.update(kwargs)
return a._summarize_nameplate_usage(row, 5, pruned)
def s(rows, pruned=False):
return a._summarize_nameplate_usage(rows, 5, pruned)
self.assertEqual(summ(1), Usage(1, None, 4, "lonely"))
self.assertEqual(summ(1, crowded=True), Usage(1, None, 4, "crowded"))
rows = [dict(added=1)]
self.assertEqual(s(rows), Usage(1, None, 4, "lonely"))
rows = [dict(added=1), dict(added=3)]
self.assertEqual(s(rows), Usage(1, 2, 4, "happy"))
self.assertEqual(summ(2, second=3), Usage(1, 2, 4, "happy"))
rows = [dict(added=1), dict(added=3)]
self.assertEqual(s(rows, pruned=True), Usage(1, 2, 4, "pruney"))
rows = [dict(added=1), dict(added=3), dict(added=4)]
self.assertEqual(s(rows), Usage(1, 2, 4, "crowded"))
self.assertEqual(summ(2, second=3, pruned=True),
Usage(1, 2, 4, "pruney"))
def test_blur(self):
db = get_db(":memory:")