server: add "watch" endpoint, deprecate non-ES "get"
I'm planning to leave non-EventSource "/get" in until after 0.6.0, then remove it. I think it's cleaner for the logs to have the two forms (EventSource and immediate) use different URLs.
This commit is contained in:
parent
82cdadae80
commit
c482c248ff
|
@ -51,12 +51,14 @@ class EventsProtocol:
|
||||||
# note: no versions of IE (including the current IE11) support EventSource
|
# note: no versions of IE (including the current IE11) support EventSource
|
||||||
|
|
||||||
# relay URLs are as follows: (MESSAGES=[{phase:,body:}..])
|
# relay URLs are as follows: (MESSAGES=[{phase:,body:}..])
|
||||||
|
# ("-" indicates a deprecated URL)
|
||||||
# GET /list?appid= -> {channelids: [INT..]}
|
# GET /list?appid= -> {channelids: [INT..]}
|
||||||
# POST /allocate {appid:,side:} -> {channelid: INT}
|
# POST /allocate {appid:,side:} -> {channelid: INT}
|
||||||
# these return all messages (base64) for appid=/channelid= :
|
# these return all messages (base64) for appid=/channelid= :
|
||||||
# POST /add {appid:,channelid:,side:,phase:,body:} -> {messages: MESSAGES}
|
# POST /add {appid:,channelid:,side:,phase:,body:} -> {messages: MESSAGES}
|
||||||
# GET /get?appid=&channelid= (no-eventsource) -> {messages: MESSAGES}
|
# GET /get?appid=&channelid= (no-eventsource) -> {messages: MESSAGES}
|
||||||
# GET /get?appid=&channelid= (eventsource) -> {phase:, body:}..
|
#- GET /get?appid=&channelid= (eventsource) -> {phase:, body:}..
|
||||||
|
# GET /watch?appid=&channelid= (eventsource) -> {phase:, body:}..
|
||||||
# POST /deallocate {appid:,channelid:,side:} -> {status: waiting | deleted}
|
# POST /deallocate {appid:,channelid:,side:} -> {status: waiting | deleted}
|
||||||
# all JSON responses include a "welcome:{..}" key
|
# all JSON responses include a "welcome:{..}" key
|
||||||
|
|
||||||
|
@ -141,7 +143,7 @@ class Adder(RelayResource):
|
||||||
# 'welcome' and 'messages'
|
# 'welcome' and 'messages'
|
||||||
return json_response(request, response)
|
return json_response(request, response)
|
||||||
|
|
||||||
class Getter(RelayResource):
|
class GetterOrWatcher(RelayResource):
|
||||||
def render_GET(self, request):
|
def render_GET(self, request):
|
||||||
appid = request.args[b"appid"][0].decode("utf-8")
|
appid = request.args[b"appid"][0].decode("utf-8")
|
||||||
channelid = int(request.args[b"channelid"][0])
|
channelid = int(request.args[b"channelid"][0])
|
||||||
|
@ -163,6 +165,25 @@ class Getter(RelayResource):
|
||||||
ep.sendEvent(old_event)
|
ep.sendEvent(old_event)
|
||||||
return server.NOT_DONE_YET
|
return server.NOT_DONE_YET
|
||||||
|
|
||||||
|
class Watcher(RelayResource):
|
||||||
|
def render_GET(self, request):
|
||||||
|
appid = request.args[b"appid"][0].decode("utf-8")
|
||||||
|
channelid = int(request.args[b"channelid"][0])
|
||||||
|
app = self._relay.get_app(appid)
|
||||||
|
channel = app.get_channel(channelid)
|
||||||
|
if b"text/event-stream" not in (request.getHeader(b"accept") or b""):
|
||||||
|
raise TypeError("/watch is for EventSource only")
|
||||||
|
|
||||||
|
request.setHeader(b"content-type", b"text/event-stream; charset=utf-8")
|
||||||
|
ep = EventsProtocol(request)
|
||||||
|
ep.sendEvent(json.dumps(self._welcome), name="welcome")
|
||||||
|
old_events = channel.add_listener(ep.sendEvent)
|
||||||
|
request.notifyFinish().addErrback(lambda f:
|
||||||
|
channel.remove_listener(ep.sendEvent))
|
||||||
|
for old_event in old_events:
|
||||||
|
ep.sendEvent(old_event)
|
||||||
|
return server.NOT_DONE_YET
|
||||||
|
|
||||||
class Deallocator(RelayResource):
|
class Deallocator(RelayResource):
|
||||||
def render_POST(self, request):
|
def render_POST(self, request):
|
||||||
content = request.content.read()
|
content = request.content.read()
|
||||||
|
@ -436,7 +457,8 @@ class Relay(resource.Resource, service.MultiService):
|
||||||
self.putChild(b"list", ChannelLister(self, welcome))
|
self.putChild(b"list", ChannelLister(self, welcome))
|
||||||
self.putChild(b"allocate", Allocator(self, welcome))
|
self.putChild(b"allocate", Allocator(self, welcome))
|
||||||
self.putChild(b"add", Adder(self, welcome))
|
self.putChild(b"add", Adder(self, welcome))
|
||||||
self.putChild(b"get", Getter(self, welcome))
|
self.putChild(b"get", GetterOrWatcher(self, welcome))
|
||||||
|
self.putChild(b"watch", Watcher(self, welcome))
|
||||||
self.putChild(b"deallocate", Deallocator(self, welcome))
|
self.putChild(b"deallocate", Deallocator(self, welcome))
|
||||||
|
|
||||||
def getChild(self, path, req):
|
def getChild(self, path, req):
|
||||||
|
|
|
@ -273,13 +273,21 @@ class API(ServerBase, unittest.TestCase):
|
||||||
|
|
||||||
def test_watch_message(self):
|
def test_watch_message(self):
|
||||||
# exercise GET /get (the EventSource version)
|
# exercise GET /get (the EventSource version)
|
||||||
|
# this API is scheduled to be removed after 0.6.0
|
||||||
|
return self._do_watch("get")
|
||||||
|
|
||||||
|
def test_watch(self):
|
||||||
|
# exercise GET /watch (the EventSource version)
|
||||||
|
return self._do_watch("watch")
|
||||||
|
|
||||||
|
def _do_watch(self, endpoint_name):
|
||||||
if sys.version_info[0] >= 3:
|
if sys.version_info[0] >= 3:
|
||||||
raise unittest.SkipTest("twisted vs py3")
|
raise unittest.SkipTest("twisted vs py3")
|
||||||
|
|
||||||
d = self.post("allocate", {"appid": "app1", "side": "abc"})
|
d = self.post("allocate", {"appid": "app1", "side": "abc"})
|
||||||
def _allocated(data):
|
def _allocated(data):
|
||||||
self.cid = data["channelid"]
|
self.cid = data["channelid"]
|
||||||
url = self.build_url("get", "app1", self.cid)
|
url = self.build_url(endpoint_name, "app1", self.cid)
|
||||||
self.o = OneEventAtATime(url, parser=json.loads)
|
self.o = OneEventAtATime(url, parser=json.loads)
|
||||||
return self.o.wait_for_connection()
|
return self.o.wait_for_connection()
|
||||||
d.addCallback(_allocated)
|
d.addCallback(_allocated)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user