fix EventSource (server and parsers)
I was really confused about the Server-Sent Events syntax. This new one is compatible with actual web browsers and the spec: http://www.w3.org/TR/eventsource/
This commit is contained in:
parent
e77b39313a
commit
df3aee2a86
|
@ -1,30 +1,18 @@
|
||||||
from __future__ import print_function
|
from __future__ import print_function, unicode_literals
|
||||||
import six
|
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
class EventSourceFollower:
|
class EventSourceFollower:
|
||||||
def __init__(self, url, timeout):
|
def __init__(self, url, timeout):
|
||||||
self.resp = requests.get(url,
|
self._resp = requests.get(url,
|
||||||
headers={"accept": "text/event-stream"},
|
headers={"accept": "text/event-stream"},
|
||||||
stream=True,
|
stream=True,
|
||||||
timeout=timeout)
|
timeout=timeout)
|
||||||
self.resp.raise_for_status()
|
self._resp.raise_for_status()
|
||||||
|
self._lines_iter = self._resp.iter_lines(chunk_size=1,
|
||||||
|
decode_unicode=True)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.resp.close()
|
self._resp.close()
|
||||||
|
|
||||||
def _get_fields(self, lines):
|
|
||||||
while True:
|
|
||||||
first_line = next(lines) # raises StopIteration when closed
|
|
||||||
assert isinstance(first_line, type(six.u(""))), type(first_line)
|
|
||||||
fieldname, data = first_line.split(": ", 1)
|
|
||||||
data_lines = [data]
|
|
||||||
while True:
|
|
||||||
next_line = next(lines)
|
|
||||||
if not next_line: # empty string, original was "\n"
|
|
||||||
yield (fieldname, "\n".join(data_lines))
|
|
||||||
break
|
|
||||||
data_lines.append(next_line)
|
|
||||||
|
|
||||||
def iter_events(self):
|
def iter_events(self):
|
||||||
# I think Request.iter_lines and .iter_content use chunk_size= in a
|
# I think Request.iter_lines and .iter_content use chunk_size= in a
|
||||||
|
@ -33,16 +21,29 @@ class EventSourceFollower:
|
||||||
# for a long time. I'd prefer that chunk_size behaved like
|
# for a long time. I'd prefer that chunk_size behaved like
|
||||||
# read(size), and gave you 1<=x<=size bytes in response.
|
# read(size), and gave you 1<=x<=size bytes in response.
|
||||||
eventtype = "message"
|
eventtype = "message"
|
||||||
lines_iter = self.resp.iter_lines(chunk_size=1, decode_unicode=True)
|
current_lines = []
|
||||||
for (fieldname, data) in self._get_fields(lines_iter):
|
for line in self._lines_iter:
|
||||||
# fieldname/data are unicode on both py2 and py3. On py2, where
|
assert isinstance(line, type(u"")), type(line)
|
||||||
# ("abc"==u"abc" is True), this compares unicode against str,
|
if not line:
|
||||||
# which matches. On py3, where (b"abc"=="abc" is False), this
|
# blank line ends the field: deliver event, reset for next
|
||||||
# compares unicode against unicode, which matches.
|
yield (eventtype, "\n".join(current_lines))
|
||||||
if fieldname == "data":
|
|
||||||
yield (eventtype, data)
|
|
||||||
eventtype = "message"
|
eventtype = "message"
|
||||||
elif fieldname == "event":
|
current_lines[:] = []
|
||||||
eventtype = data
|
continue
|
||||||
|
if ":" in line:
|
||||||
|
fieldname, data = line.split(":", 1)
|
||||||
|
if data.startswith(" "):
|
||||||
|
data = data[1:]
|
||||||
else:
|
else:
|
||||||
print("weird fieldname", fieldname, type(fieldname), data)
|
fieldname = line
|
||||||
|
data = ""
|
||||||
|
if fieldname == "event":
|
||||||
|
eventtype = data
|
||||||
|
elif fieldname == "data":
|
||||||
|
current_lines.append(data)
|
||||||
|
elif fieldname in ("id", "retry"):
|
||||||
|
# documented but unhandled
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
#log.msg("weird fieldname", fieldname, data)
|
||||||
|
pass
|
||||||
|
|
|
@ -34,13 +34,10 @@ class EventsProtocol:
|
||||||
# e.g. if name=foo, then the client web page should do:
|
# e.g. if name=foo, then the client web page should do:
|
||||||
# (new EventSource(url)).addEventListener("foo", handlerfunc)
|
# (new EventSource(url)).addEventListener("foo", handlerfunc)
|
||||||
# Note that this basically defaults to "message".
|
# Note that this basically defaults to "message".
|
||||||
self.request.write(b"\n")
|
|
||||||
if id:
|
if id:
|
||||||
self.request.write(b"id: " + id.encode("utf-8") + b"\n")
|
self.request.write(b"id: " + id.encode("utf-8") + b"\n")
|
||||||
self.request.write(b"\n")
|
|
||||||
if retry:
|
if retry:
|
||||||
self.request.write(b"retry: " + retry + b"\n") # milliseconds
|
self.request.write(b"retry: " + retry + b"\n") # milliseconds
|
||||||
self.request.write(b"\n")
|
|
||||||
for line in data.splitlines():
|
for line in data.splitlines():
|
||||||
self.request.write(b"data: " + line.encode("utf-8") + b"\n")
|
self.request.write(b"data: " + line.encode("utf-8") + b"\n")
|
||||||
self.request.write(b"\n")
|
self.request.write(b"\n")
|
||||||
|
|
|
@ -4,6 +4,7 @@ from twisted.trial import unittest
|
||||||
from twisted.internet.defer import gatherResults, succeed
|
from twisted.internet.defer import gatherResults, succeed
|
||||||
from twisted.internet.threads import deferToThread
|
from twisted.internet.threads import deferToThread
|
||||||
from ..blocking.transcribe import Wormhole, UsageError, ChannelManager
|
from ..blocking.transcribe import Wormhole, UsageError, ChannelManager
|
||||||
|
from ..blocking.eventsource import EventSourceFollower
|
||||||
from .common import ServerBase
|
from .common import ServerBase
|
||||||
|
|
||||||
APPID = u"appid"
|
APPID = u"appid"
|
||||||
|
@ -297,3 +298,34 @@ class Blocking(ServerBase, unittest.TestCase):
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
return d
|
return d
|
||||||
test_serialize.skip = "not yet implemented for the blocking flavor"
|
test_serialize.skip = "not yet implemented for the blocking flavor"
|
||||||
|
|
||||||
|
data1 = u"""\
|
||||||
|
event: welcome
|
||||||
|
data: one and a
|
||||||
|
data: two
|
||||||
|
data:.
|
||||||
|
|
||||||
|
data: three
|
||||||
|
|
||||||
|
: this line is ignored
|
||||||
|
event: e2
|
||||||
|
: this line is ignored too
|
||||||
|
i am a dataless field name
|
||||||
|
data: four
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
class NoNetworkESF(EventSourceFollower):
|
||||||
|
def __init__(self, text):
|
||||||
|
self._lines_iter = iter(text.splitlines())
|
||||||
|
|
||||||
|
class EventSourceClient(unittest.TestCase):
|
||||||
|
def test_parser(self):
|
||||||
|
events = []
|
||||||
|
f = NoNetworkESF(data1)
|
||||||
|
events = list(f.iter_events())
|
||||||
|
self.failUnlessEqual(events,
|
||||||
|
[(u"welcome", u"one and a\ntwo\n."),
|
||||||
|
(u"message", u"three"),
|
||||||
|
(u"e2", u"four"),
|
||||||
|
])
|
||||||
|
|
|
@ -3,6 +3,7 @@ import sys, json
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from twisted.internet.defer import gatherResults, succeed
|
from twisted.internet.defer import gatherResults, succeed
|
||||||
from ..twisted.transcribe import Wormhole, UsageError, ChannelManager
|
from ..twisted.transcribe import Wormhole, UsageError, ChannelManager
|
||||||
|
from ..twisted.eventsource_twisted import EventSourceParser
|
||||||
from .common import ServerBase
|
from .common import ServerBase
|
||||||
|
|
||||||
APPID = u"appid"
|
APPID = u"appid"
|
||||||
|
@ -284,9 +285,41 @@ class Basic(ServerBase, unittest.TestCase):
|
||||||
d.addCallback(_done)
|
d.addCallback(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
data1 = u"""\
|
||||||
|
event: welcome
|
||||||
|
data: one and a
|
||||||
|
data: two
|
||||||
|
data:.
|
||||||
|
|
||||||
|
data: three
|
||||||
|
|
||||||
|
: this line is ignored
|
||||||
|
event: e2
|
||||||
|
: this line is ignored too
|
||||||
|
i am a dataless field name
|
||||||
|
data: four
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
class FakeTransport:
|
||||||
|
disconnecting = False
|
||||||
|
|
||||||
|
class EventSourceClient(unittest.TestCase):
|
||||||
|
def test_parser(self):
|
||||||
|
events = []
|
||||||
|
p = EventSourceParser(lambda t,d: events.append((t,d)))
|
||||||
|
p.transport = FakeTransport()
|
||||||
|
p.dataReceived(data1)
|
||||||
|
self.failUnlessEqual(events,
|
||||||
|
[(u"welcome", u"one and a\ntwo\n."),
|
||||||
|
(u"message", u"three"),
|
||||||
|
(u"e2", u"four"),
|
||||||
|
])
|
||||||
|
|
||||||
if sys.version_info[0] >= 3:
|
if sys.version_info[0] >= 3:
|
||||||
Channel.skip = "twisted is not yet sufficiently ported to py3"
|
Channel.skip = "twisted is not yet sufficiently ported to py3"
|
||||||
Basic.skip = "twisted is not yet sufficiently ported to py3"
|
Basic.skip = "twisted is not yet sufficiently ported to py3"
|
||||||
|
EventSourceClient.skip = "twisted is not yet sufficiently ported to py3"
|
||||||
# as of 15.4.0, Twisted is still missing:
|
# as of 15.4.0, Twisted is still missing:
|
||||||
# * web.client.Agent (for all non-EventSource POSTs in transcribe.py)
|
# * web.client.Agent (for all non-EventSource POSTs in transcribe.py)
|
||||||
# * python.logfile (to allow daemonization of 'wormhole server')
|
# * python.logfile (to allow daemonization of 'wormhole server')
|
||||||
|
|
|
@ -14,6 +14,7 @@ from ..util.eventual import eventually
|
||||||
# to_unicode = str
|
# to_unicode = str
|
||||||
|
|
||||||
class EventSourceParser(basic.LineOnlyReceiver):
|
class EventSourceParser(basic.LineOnlyReceiver):
|
||||||
|
# http://www.w3.org/TR/eventsource/
|
||||||
delimiter = "\n"
|
delimiter = "\n"
|
||||||
|
|
||||||
def __init__(self, handler):
|
def __init__(self, handler):
|
||||||
|
@ -21,7 +22,7 @@ class EventSourceParser(basic.LineOnlyReceiver):
|
||||||
self.current_lines = []
|
self.current_lines = []
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.done_deferred = defer.Deferred()
|
self.done_deferred = defer.Deferred()
|
||||||
self.eventtype = "message"
|
self.eventtype = u"message"
|
||||||
self.encoding = "utf-8"
|
self.encoding = "utf-8"
|
||||||
|
|
||||||
def set_encoding(self, encoding):
|
def set_encoding(self, encoding):
|
||||||
|
@ -44,27 +45,28 @@ class EventSourceParser(basic.LineOnlyReceiver):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def lineReceived(self, line):
|
def lineReceived(self, line):
|
||||||
|
#line = to_unicode(line, self.encoding)
|
||||||
|
line = line.decode(self.encoding)
|
||||||
if not line:
|
if not line:
|
||||||
# blank line ends the field
|
# blank line ends the field: deliver event, reset for next
|
||||||
self.fieldReceived(self.current_field,
|
self.eventReceived(self.eventtype, "\n".join(self.current_lines))
|
||||||
"\n".join(self.current_lines))
|
self.eventtype = u"message"
|
||||||
self.current_field = None
|
|
||||||
self.current_lines[:] = []
|
self.current_lines[:] = []
|
||||||
return
|
return
|
||||||
line = line.decode(self.encoding)
|
if u":" in line:
|
||||||
#line = to_unicode(line, self.encoding)
|
fieldname, data = line.split(u":", 1)
|
||||||
if self.current_field is None:
|
if data.startswith(u" "):
|
||||||
self.current_field, data = line.split(": ", 1)
|
data = data[1:]
|
||||||
self.current_lines.append(data)
|
|
||||||
else:
|
else:
|
||||||
self.current_lines.append(line)
|
fieldname = line
|
||||||
|
data = u""
|
||||||
def fieldReceived(self, fieldname, data):
|
if fieldname == u"event":
|
||||||
if fieldname == "event":
|
|
||||||
self.eventtype = data
|
self.eventtype = data
|
||||||
elif fieldname == "data":
|
elif fieldname == u"data":
|
||||||
self.eventReceived(self.eventtype, data)
|
self.current_lines.append(data)
|
||||||
self.eventtype = "message"
|
elif fieldname in (u"id", u"retry"):
|
||||||
|
# documented but unhandled
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
log.msg("weird fieldname", fieldname, data)
|
log.msg("weird fieldname", fieldname, data)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user