2016-04-19 00:57:27 +00:00
|
|
|
import json, time
|
|
|
|
from twisted.internet import reactor
|
|
|
|
from twisted.python import log
|
|
|
|
from autobahn.twisted import websocket
|
|
|
|
|
2016-05-13 01:01:56 +00:00
|
|
|
# Each WebSocket connection is bound to one "appid", one "side", and zero or
|
|
|
|
# more "channelids". The connection's appid and side are set by the "bind"
|
|
|
|
# message (which must be the first message on the connection). Both must be
|
|
|
|
# set before any other message (allocate, claim, watch, add, deallocate) will
|
|
|
|
# be accepted. Short channel IDs can be obtained from the server with an
|
|
|
|
# "allocate" message. Longer ones can be selected independently by the
|
|
|
|
# client. Channels are maintained (saved from deletion) by a "claim" message
|
|
|
|
# (and also incidentally by "allocate"). Channels are deleted when the last
|
|
|
|
# claim is released with "release".
|
2016-04-19 00:57:27 +00:00
|
|
|
|
|
|
|
# All websocket messages are JSON-encoded. The client can send us "inbound"
|
|
|
|
# messages (marked as "->" below), which may (or may not) provoke immediate
|
|
|
|
# (or delayed) "outbound" messages (marked as "<-"). There is no guaranteed
|
|
|
|
# correlation between requests and responses. In this list, "A -> B" means
|
|
|
|
# that some time after A is received, at least one message of type B will be
|
2016-05-13 01:01:56 +00:00
|
|
|
# sent out (probably).
|
2016-04-19 00:57:27 +00:00
|
|
|
|
2016-05-13 00:12:04 +00:00
|
|
|
# All outbound messages include a "server_tx" key, which is a float (seconds
|
|
|
|
# since epoch) with the server clock just before the outbound message was
|
2016-05-13 01:01:56 +00:00
|
|
|
# written to the socket. Unrecognized keys will be ignored.
|
2016-04-19 00:57:27 +00:00
|
|
|
|
|
|
|
# connection -> welcome
|
|
|
|
# <- {type: "welcome", welcome: {}} # .welcome keys are all optional:
|
|
|
|
# current_version: out-of-date clients display a warning
|
|
|
|
# motd: all clients display message, then continue normally
|
|
|
|
# error: all clients display mesage, then terminate with error
|
|
|
|
# -> {type: "bind", appid:, side:}
|
2016-05-13 01:01:56 +00:00
|
|
|
#
|
2016-04-19 06:49:11 +00:00
|
|
|
# -> {type: "list"} -> channelids
|
|
|
|
# <- {type: "channelids", channelids: [int..]}
|
2016-04-19 00:57:27 +00:00
|
|
|
# -> {type: "allocate"} -> allocated
|
|
|
|
# <- {type: "allocated", channelid: int}
|
|
|
|
# -> {type: "claim", channelid: int}
|
2016-05-13 00:48:26 +00:00
|
|
|
#
|
2016-05-13 01:01:56 +00:00
|
|
|
# -> {type: "watch", channelid: int} -> message
|
|
|
|
# sends old messages and more in future
|
|
|
|
# <- {type: "message", channelid: int, message: {phase:, body:}} # body is hex
|
|
|
|
# -> {type: "add", channelid: int, phase: str, body: hex} # will send echo
|
|
|
|
#
|
|
|
|
# -> {type: "release", channelid: int, mood: str} -> deallocated
|
|
|
|
# <- {type: "released", channelid: int, status: waiting|deleted}
|
2016-05-13 00:48:26 +00:00
|
|
|
#
|
2016-04-19 00:57:27 +00:00
|
|
|
# <- {type: "error", error: str, orig: {}} # in response to malformed msgs
|
|
|
|
|
|
|
|
# for tests that need to know when a message has been processed:
|
|
|
|
# -> {type: "ping", ping: int} -> pong (does not require bind/claim)
|
|
|
|
# <- {type: "pong", pong: int}
|
|
|
|
|
|
|
|
class Error(Exception):
|
2016-04-20 06:57:38 +00:00
|
|
|
def __init__(self, explain):
|
2016-04-19 00:57:27 +00:00
|
|
|
self._explain = explain
|
|
|
|
|
|
|
|
class WebSocketRendezvous(websocket.WebSocketServerProtocol):
|
|
|
|
def __init__(self):
|
|
|
|
websocket.WebSocketServerProtocol.__init__(self)
|
|
|
|
self._app = None
|
|
|
|
self._side = None
|
2016-05-13 01:01:56 +00:00
|
|
|
self._did_allocate = False # only one allocate() per websocket
|
|
|
|
self._channels = {} # channel-id -> Channel (claimed)
|
2016-04-19 00:57:27 +00:00
|
|
|
|
|
|
|
def onConnect(self, request):
|
|
|
|
rv = self.factory.rendezvous
|
|
|
|
if rv.get_log_requests():
|
|
|
|
log.msg("ws client connecting: %s" % (request.peer,))
|
|
|
|
self._reactor = self.factory.reactor
|
|
|
|
|
|
|
|
def onOpen(self):
|
|
|
|
rv = self.factory.rendezvous
|
|
|
|
self.send("welcome", welcome=rv.get_welcome())
|
|
|
|
|
|
|
|
def onMessage(self, payload, isBinary):
|
2016-05-06 01:43:43 +00:00
|
|
|
server_rx = time.time()
|
2016-04-19 00:57:27 +00:00
|
|
|
msg = json.loads(payload.decode("utf-8"))
|
|
|
|
try:
|
|
|
|
if "type" not in msg:
|
|
|
|
raise Error("missing 'type'")
|
2016-05-06 02:02:52 +00:00
|
|
|
if "id" in msg:
|
|
|
|
# Only ack clients modern enough to include [id]. Older ones
|
|
|
|
# won't recognize the message, then they'll abort.
|
|
|
|
self.send("ack", id=msg["id"])
|
|
|
|
|
2016-04-19 00:57:27 +00:00
|
|
|
mtype = msg["type"]
|
|
|
|
if mtype == "ping":
|
|
|
|
return self.handle_ping(msg)
|
|
|
|
if mtype == "bind":
|
|
|
|
return self.handle_bind(msg)
|
|
|
|
|
|
|
|
if not self._app:
|
|
|
|
raise Error("Must bind first")
|
|
|
|
if mtype == "list":
|
|
|
|
return self.handle_list()
|
|
|
|
if mtype == "allocate":
|
|
|
|
return self.handle_allocate()
|
|
|
|
if mtype == "claim":
|
|
|
|
return self.handle_claim(msg)
|
2016-05-06 01:42:24 +00:00
|
|
|
if mtype == "watch":
|
2016-05-13 01:01:56 +00:00
|
|
|
return self.handle_watch(msg)
|
2016-05-06 01:42:24 +00:00
|
|
|
if mtype == "add":
|
2016-05-13 01:01:56 +00:00
|
|
|
return self.handle_add(msg, server_rx)
|
2016-05-13 00:46:15 +00:00
|
|
|
if mtype == "release":
|
2016-05-13 01:01:56 +00:00
|
|
|
return self.handle_release(msg)
|
2016-05-06 01:42:24 +00:00
|
|
|
|
|
|
|
raise Error("Unknown type")
|
2016-04-19 00:57:27 +00:00
|
|
|
except Error as e:
|
|
|
|
self.send("error", error=e._explain, orig=msg)
|
|
|
|
|
|
|
|
def handle_ping(self, msg):
|
|
|
|
if "ping" not in msg:
|
|
|
|
raise Error("ping requires 'ping'")
|
|
|
|
self.send("pong", pong=msg["ping"])
|
|
|
|
|
|
|
|
def handle_bind(self, msg):
|
|
|
|
if self._app or self._side:
|
|
|
|
raise Error("already bound")
|
|
|
|
if "appid" not in msg:
|
|
|
|
raise Error("bind requires 'appid'")
|
|
|
|
if "side" not in msg:
|
|
|
|
raise Error("bind requires 'side'")
|
|
|
|
self._app = self.factory.rendezvous.get_app(msg["appid"])
|
|
|
|
self._side = msg["side"]
|
|
|
|
|
|
|
|
def handle_list(self):
|
2016-05-13 00:46:15 +00:00
|
|
|
channelids = sorted(self._app.get_claimed())
|
2016-04-19 06:49:11 +00:00
|
|
|
self.send("channelids", channelids=channelids)
|
2016-04-19 00:57:27 +00:00
|
|
|
|
|
|
|
def handle_allocate(self):
|
2016-05-13 01:01:56 +00:00
|
|
|
if self._did_allocate:
|
|
|
|
raise Error("You already allocated one channel, don't be greedy")
|
2016-04-19 00:57:27 +00:00
|
|
|
channelid = self._app.find_available_channelid()
|
2016-05-13 07:37:53 +00:00
|
|
|
assert isinstance(channelid, type(u""))
|
2016-05-13 01:01:56 +00:00
|
|
|
self._did_allocate = True
|
|
|
|
channel = self._app.claim_channel(channelid, self._side)
|
|
|
|
self._channels[channelid] = channel
|
2016-04-19 00:57:27 +00:00
|
|
|
self.send("allocated", channelid=channelid)
|
|
|
|
|
|
|
|
def handle_claim(self, msg):
|
|
|
|
if "channelid" not in msg:
|
|
|
|
raise Error("claim requires 'channelid'")
|
2016-05-13 01:01:56 +00:00
|
|
|
channelid = msg["channelid"]
|
2016-05-13 07:37:53 +00:00
|
|
|
assert isinstance(channelid, type(u"")), type(channelid)
|
2016-05-13 01:01:56 +00:00
|
|
|
if channelid not in self._channels:
|
|
|
|
channel = self._app.claim_channel(channelid, self._side)
|
|
|
|
self._channels[channelid] = channel
|
|
|
|
|
|
|
|
def handle_watch(self, msg):
|
|
|
|
channelid = msg["channelid"]
|
|
|
|
if channelid not in self._channels:
|
|
|
|
raise Error("must claim channel before watching")
|
2016-05-13 07:37:53 +00:00
|
|
|
assert isinstance(channelid, type(u""))
|
2016-05-13 01:01:56 +00:00
|
|
|
channel = self._channels[channelid]
|
2016-05-13 00:03:57 +00:00
|
|
|
def _send(event):
|
2016-05-13 01:01:56 +00:00
|
|
|
self.send("message", channelid=channelid, message=event)
|
2016-05-13 00:03:57 +00:00
|
|
|
def _stop():
|
2016-05-13 01:01:56 +00:00
|
|
|
self._reactor.callLater(0, self.transport.loseConnection)
|
2016-05-13 00:03:57 +00:00
|
|
|
for old_message in channel.add_listener(self, _send, _stop):
|
|
|
|
_send(old_message)
|
2016-04-19 00:57:27 +00:00
|
|
|
|
2016-05-13 01:01:56 +00:00
|
|
|
def handle_add(self, msg, server_rx):
|
|
|
|
channelid = msg["channelid"]
|
|
|
|
if channelid not in self._channels:
|
|
|
|
raise Error("must claim channel before adding")
|
2016-05-13 07:37:53 +00:00
|
|
|
assert isinstance(channelid, type(u""))
|
2016-05-13 01:01:56 +00:00
|
|
|
channel = self._channels[channelid]
|
2016-04-19 00:57:27 +00:00
|
|
|
if "phase" not in msg:
|
|
|
|
raise Error("missing 'phase'")
|
|
|
|
if "body" not in msg:
|
|
|
|
raise Error("missing 'body'")
|
2016-05-06 01:46:11 +00:00
|
|
|
msgid = msg.get("id") # optional
|
|
|
|
channel.add_message(self._side, msg["phase"], msg["body"],
|
|
|
|
server_rx, msgid)
|
2016-04-19 00:57:27 +00:00
|
|
|
|
2016-05-13 01:01:56 +00:00
|
|
|
def handle_release(self, msg):
|
|
|
|
channelid = msg["channelid"]
|
|
|
|
if channelid not in self._channels:
|
|
|
|
raise Error("must claim channel before releasing")
|
2016-05-13 07:37:53 +00:00
|
|
|
assert isinstance(channelid, type(u""))
|
2016-05-13 01:01:56 +00:00
|
|
|
channel = self._channels[channelid]
|
2016-05-13 00:46:15 +00:00
|
|
|
deleted = channel.release(self._side, msg.get("mood"))
|
2016-05-13 01:01:56 +00:00
|
|
|
del self._channels[channelid]
|
2016-05-13 00:46:15 +00:00
|
|
|
self.send("released", status="deleted" if deleted else "waiting")
|
2016-04-19 00:57:27 +00:00
|
|
|
|
|
|
|
def send(self, mtype, **kwargs):
|
|
|
|
kwargs["type"] = mtype
|
2016-05-06 01:43:49 +00:00
|
|
|
kwargs["server_tx"] = time.time()
|
2016-04-19 00:57:27 +00:00
|
|
|
payload = json.dumps(kwargs).encode("utf-8")
|
|
|
|
self.sendMessage(payload, False)
|
|
|
|
|
|
|
|
def onClose(self, wasClean, code, reason):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class WebSocketRendezvousFactory(websocket.WebSocketServerFactory):
|
|
|
|
protocol = WebSocketRendezvous
|
|
|
|
def __init__(self, url, rendezvous):
|
|
|
|
websocket.WebSocketServerFactory.__init__(self, url)
|
|
|
|
self.rendezvous = rendezvous
|
|
|
|
self.reactor = reactor # for tests to control
|