relay: track allocations through DB
This commit is contained in:
parent
043392ee2a
commit
a03fb3900e
|
@ -15,4 +15,11 @@ CREATE TABLE `messages`
|
||||||
`message` VARCHAR,
|
`message` VARCHAR,
|
||||||
`when` INTEGER
|
`when` INTEGER
|
||||||
);
|
);
|
||||||
CREATE INDEX `lookup` ON `messages` (`channel_id`, `side`, `msgnum`);
|
CREATE INDEX `messages_idx` ON `messages` (`channel_id`, `side`, `msgnum`);
|
||||||
|
|
||||||
|
CREATE TABLE `allocations`
|
||||||
|
(
|
||||||
|
`channel_id` INTEGER,
|
||||||
|
`side` VARCHAR
|
||||||
|
);
|
||||||
|
CREATE INDEX `allocations_idx` ON `allocations` (`channel_id`);
|
||||||
|
|
|
@ -66,8 +66,6 @@ class Channel(resource.Resource):
|
||||||
self.db = db
|
self.db = db
|
||||||
self.welcome = welcome
|
self.welcome = welcome
|
||||||
self.expire_at = time.time() + CHANNEL_EXPIRATION_TIME
|
self.expire_at = time.time() + CHANNEL_EXPIRATION_TIME
|
||||||
self.sides = set()
|
|
||||||
self.messages = [] # (side, msgnum, str)
|
|
||||||
self.event_channels = set() # (side, msgnum, ep)
|
self.event_channels = set() # (side, msgnum, ep)
|
||||||
|
|
||||||
|
|
||||||
|
@ -87,8 +85,12 @@ class Channel(resource.Resource):
|
||||||
handle = (their_side, their_msgnum, ep)
|
handle = (their_side, their_msgnum, ep)
|
||||||
self.event_channels.add(handle)
|
self.event_channels.add(handle)
|
||||||
request.notifyFinish().addErrback(self._shutdown, handle)
|
request.notifyFinish().addErrback(self._shutdown, handle)
|
||||||
for (msg_side, msg_msgnum, msg_str) in self.messages:
|
for row in self.db.execute("SELECT * FROM `messages`"
|
||||||
self.message_added(msg_side, msg_msgnum, msg_str, channels=[handle])
|
" WHERE `channel_id`=?"
|
||||||
|
" ORDER BY `when` ASC",
|
||||||
|
(self.channel_id,)).fetchall():
|
||||||
|
self.message_added(row["side"], row["msgnum"], row["message"],
|
||||||
|
channels=[handle])
|
||||||
return server.NOT_DONE_YET
|
return server.NOT_DONE_YET
|
||||||
|
|
||||||
def _shutdown(self, _, handle):
|
def _shutdown(self, _, handle):
|
||||||
|
@ -107,12 +109,17 @@ class Channel(resource.Resource):
|
||||||
def render_POST(self, request):
|
def render_POST(self, request):
|
||||||
# rest of URL is: SIDE/(MSGNUM|deallocate)/(post|poll)
|
# rest of URL is: SIDE/(MSGNUM|deallocate)/(post|poll)
|
||||||
side = request.postpath[0]
|
side = request.postpath[0]
|
||||||
self.sides.add(side)
|
|
||||||
verb = request.postpath[1]
|
verb = request.postpath[1]
|
||||||
|
|
||||||
if verb == "deallocate":
|
if verb == "deallocate":
|
||||||
self.sides.remove(side)
|
self.db.execute("DELETE FROM `allocations`"
|
||||||
if self.sides:
|
" WHERE `channel_id`=? AND `side`=?",
|
||||||
|
(self.channel_id, side))
|
||||||
|
self.db.commit()
|
||||||
|
remaining = self.db.execute("SELECT COUNT(*) FROM `allocations`"
|
||||||
|
" WHERE `channel_id`=?",
|
||||||
|
(self.channel_id,)).fetchone()[0]
|
||||||
|
if remaining:
|
||||||
return "waiting\n"
|
return "waiting\n"
|
||||||
self.relay.free_child(self.channel_id)
|
self.relay.free_child(self.channel_id)
|
||||||
return "deleted\n"
|
return "deleted\n"
|
||||||
|
@ -124,13 +131,25 @@ class Channel(resource.Resource):
|
||||||
msgnum = request.postpath[2]
|
msgnum = request.postpath[2]
|
||||||
|
|
||||||
other_messages = []
|
other_messages = []
|
||||||
for (msg_side, msg_msgnum, msg_str) in self.messages:
|
for row in self.db.execute("SELECT `message` FROM `messages`"
|
||||||
if msg_side != side and msg_msgnum == msgnum:
|
" WHERE `channel_id`=? AND `side`!=?"
|
||||||
other_messages.append(msg_str)
|
" AND `msgnum`=?"
|
||||||
|
" ORDER BY `when` ASC",
|
||||||
|
(self.channel_id, side, msgnum)).fetchall():
|
||||||
|
other_messages.append(row["message"])
|
||||||
|
|
||||||
if verb == "post":
|
if verb == "post":
|
||||||
data = json.load(request.content)
|
data = json.load(request.content)
|
||||||
self.messages.append( (side, msgnum, data["message"]) )
|
self.db.execute("INSERT INTO `messages`"
|
||||||
|
" (`channel_id`, `side`, `msgnum`, `message`, `when`)"
|
||||||
|
" VALUES (?,?,?,?,?)",
|
||||||
|
(self.channel_id, side, msgnum, data["message"],
|
||||||
|
time.time()))
|
||||||
|
self.db.execute("INSERT INTO `allocations`"
|
||||||
|
" (`channel_id`, `side`)"
|
||||||
|
" VALUES (?,?)",
|
||||||
|
(self.channel_id, side))
|
||||||
|
self.db.commit()
|
||||||
self.message_added(side, msgnum, data["message"])
|
self.message_added(side, msgnum, data["message"])
|
||||||
|
|
||||||
request.setHeader("content-type", "application/json; charset=utf-8")
|
request.setHeader("content-type", "application/json; charset=utf-8")
|
||||||
|
@ -213,6 +232,9 @@ class Relay(resource.Resource):
|
||||||
return self.channels[channel_id]
|
return self.channels[channel_id]
|
||||||
|
|
||||||
def free_child(self, channel_id):
|
def free_child(self, channel_id):
|
||||||
|
self.db.execute("DELETE FROM `messages` WHERE `channel_id`=?",
|
||||||
|
(channel_id,))
|
||||||
|
self.db.commit()
|
||||||
self.channels.pop(channel_id)
|
self.channels.pop(channel_id)
|
||||||
log.msg("freed #%d, now have %d channels" %
|
log.msg("freed #%d, now have %d channels" %
|
||||||
(channel_id, len(self.channels)))
|
(channel_id, len(self.channels)))
|
||||||
|
|
Loading…
Reference in New Issue
Block a user