diff --git a/src/wormhole/blocking/eventsource.py b/src/wormhole/blocking/eventsource.py index bdd272e..fd7a4a0 100644 --- a/src/wormhole/blocking/eventsource.py +++ b/src/wormhole/blocking/eventsource.py @@ -1,30 +1,18 @@ -from __future__ import print_function -import six +from __future__ import print_function, unicode_literals import requests class EventSourceFollower: def __init__(self, url, timeout): - self.resp = requests.get(url, - headers={"accept": "text/event-stream"}, - stream=True, - timeout=timeout) - self.resp.raise_for_status() + self._resp = requests.get(url, + headers={"accept": "text/event-stream"}, + stream=True, + timeout=timeout) + self._resp.raise_for_status() + self._lines_iter = self._resp.iter_lines(chunk_size=1, + decode_unicode=True) def close(self): - self.resp.close() - - def _get_fields(self, lines): - while True: - first_line = next(lines) # raises StopIteration when closed - assert isinstance(first_line, type(six.u(""))), type(first_line) - fieldname, data = first_line.split(": ", 1) - data_lines = [data] - while True: - next_line = next(lines) - if not next_line: # empty string, original was "\n" - yield (fieldname, "\n".join(data_lines)) - break - data_lines.append(next_line) + self._resp.close() def iter_events(self): # I think Request.iter_lines and .iter_content use chunk_size= in a @@ -33,16 +21,29 @@ class EventSourceFollower: # for a long time. I'd prefer that chunk_size behaved like # read(size), and gave you 1<=x<=size bytes in response. eventtype = "message" - lines_iter = self.resp.iter_lines(chunk_size=1, decode_unicode=True) - for (fieldname, data) in self._get_fields(lines_iter): - # fieldname/data are unicode on both py2 and py3. On py2, where - # ("abc"==u"abc" is True), this compares unicode against str, - # which matches. On py3, where (b"abc"=="abc" is False), this - # compares unicode against unicode, which matches. - if fieldname == "data": - yield (eventtype, data) + current_lines = [] + for line in self._lines_iter: + assert isinstance(line, type(u"")), type(line) + if not line: + # blank line ends the field: deliver event, reset for next + yield (eventtype, "\n".join(current_lines)) eventtype = "message" - elif fieldname == "event": - eventtype = data + current_lines[:] = [] + continue + if ":" in line: + fieldname, data = line.split(":", 1) + if data.startswith(" "): + data = data[1:] else: - print("weird fieldname", fieldname, type(fieldname), data) + fieldname = line + data = "" + if fieldname == "event": + eventtype = data + elif fieldname == "data": + current_lines.append(data) + elif fieldname in ("id", "retry"): + # documented but unhandled + pass + else: + #log.msg("weird fieldname", fieldname, data) + pass diff --git a/src/wormhole/servers/relay_server.py b/src/wormhole/servers/relay_server.py index e9884ce..c038172 100644 --- a/src/wormhole/servers/relay_server.py +++ b/src/wormhole/servers/relay_server.py @@ -34,13 +34,10 @@ class EventsProtocol: # e.g. if name=foo, then the client web page should do: # (new EventSource(url)).addEventListener("foo", handlerfunc) # Note that this basically defaults to "message". - self.request.write(b"\n") if id: self.request.write(b"id: " + id.encode("utf-8") + b"\n") - self.request.write(b"\n") if retry: self.request.write(b"retry: " + retry + b"\n") # milliseconds - self.request.write(b"\n") for line in data.splitlines(): self.request.write(b"data: " + line.encode("utf-8") + b"\n") self.request.write(b"\n") diff --git a/src/wormhole/test/test_blocking.py b/src/wormhole/test/test_blocking.py index ca4be3c..64f2ffd 100644 --- a/src/wormhole/test/test_blocking.py +++ b/src/wormhole/test/test_blocking.py @@ -4,6 +4,7 @@ from twisted.trial import unittest from twisted.internet.defer import gatherResults, succeed from twisted.internet.threads import deferToThread from ..blocking.transcribe import Wormhole, UsageError, ChannelManager +from ..blocking.eventsource import EventSourceFollower from .common import ServerBase APPID = u"appid" @@ -297,3 +298,34 @@ class Blocking(ServerBase, unittest.TestCase): d.addCallback(_done) return d test_serialize.skip = "not yet implemented for the blocking flavor" + +data1 = u"""\ +event: welcome +data: one and a +data: two +data:. + +data: three + +: this line is ignored +event: e2 +: this line is ignored too +i am a dataless field name +data: four + +""" + +class NoNetworkESF(EventSourceFollower): + def __init__(self, text): + self._lines_iter = iter(text.splitlines()) + +class EventSourceClient(unittest.TestCase): + def test_parser(self): + events = [] + f = NoNetworkESF(data1) + events = list(f.iter_events()) + self.failUnlessEqual(events, + [(u"welcome", u"one and a\ntwo\n."), + (u"message", u"three"), + (u"e2", u"four"), + ]) diff --git a/src/wormhole/test/test_twisted.py b/src/wormhole/test/test_twisted.py index f0d0f3d..ec789b5 100644 --- a/src/wormhole/test/test_twisted.py +++ b/src/wormhole/test/test_twisted.py @@ -3,6 +3,7 @@ import sys, json from twisted.trial import unittest from twisted.internet.defer import gatherResults, succeed from ..twisted.transcribe import Wormhole, UsageError, ChannelManager +from ..twisted.eventsource_twisted import EventSourceParser from .common import ServerBase APPID = u"appid" @@ -284,9 +285,41 @@ class Basic(ServerBase, unittest.TestCase): d.addCallback(_done) return d +data1 = u"""\ +event: welcome +data: one and a +data: two +data:. + +data: three + +: this line is ignored +event: e2 +: this line is ignored too +i am a dataless field name +data: four + +""" + +class FakeTransport: + disconnecting = False + +class EventSourceClient(unittest.TestCase): + def test_parser(self): + events = [] + p = EventSourceParser(lambda t,d: events.append((t,d))) + p.transport = FakeTransport() + p.dataReceived(data1) + self.failUnlessEqual(events, + [(u"welcome", u"one and a\ntwo\n."), + (u"message", u"three"), + (u"e2", u"four"), + ]) + if sys.version_info[0] >= 3: Channel.skip = "twisted is not yet sufficiently ported to py3" Basic.skip = "twisted is not yet sufficiently ported to py3" + EventSourceClient.skip = "twisted is not yet sufficiently ported to py3" # as of 15.4.0, Twisted is still missing: # * web.client.Agent (for all non-EventSource POSTs in transcribe.py) # * python.logfile (to allow daemonization of 'wormhole server') diff --git a/src/wormhole/twisted/eventsource_twisted.py b/src/wormhole/twisted/eventsource_twisted.py index b2f37fc..af90819 100644 --- a/src/wormhole/twisted/eventsource_twisted.py +++ b/src/wormhole/twisted/eventsource_twisted.py @@ -14,6 +14,7 @@ from ..util.eventual import eventually # to_unicode = str class EventSourceParser(basic.LineOnlyReceiver): + # http://www.w3.org/TR/eventsource/ delimiter = "\n" def __init__(self, handler): @@ -21,7 +22,7 @@ class EventSourceParser(basic.LineOnlyReceiver): self.current_lines = [] self.handler = handler self.done_deferred = defer.Deferred() - self.eventtype = "message" + self.eventtype = u"message" self.encoding = "utf-8" def set_encoding(self, encoding): @@ -44,27 +45,28 @@ class EventSourceParser(basic.LineOnlyReceiver): raise def lineReceived(self, line): + #line = to_unicode(line, self.encoding) + line = line.decode(self.encoding) if not line: - # blank line ends the field - self.fieldReceived(self.current_field, - "\n".join(self.current_lines)) - self.current_field = None + # blank line ends the field: deliver event, reset for next + self.eventReceived(self.eventtype, "\n".join(self.current_lines)) + self.eventtype = u"message" self.current_lines[:] = [] return - line = line.decode(self.encoding) - #line = to_unicode(line, self.encoding) - if self.current_field is None: - self.current_field, data = line.split(": ", 1) - self.current_lines.append(data) + if u":" in line: + fieldname, data = line.split(u":", 1) + if data.startswith(u" "): + data = data[1:] else: - self.current_lines.append(line) - - def fieldReceived(self, fieldname, data): - if fieldname == "event": + fieldname = line + data = u"" + if fieldname == u"event": self.eventtype = data - elif fieldname == "data": - self.eventReceived(self.eventtype, data) - self.eventtype = "message" + elif fieldname == u"data": + self.current_lines.append(data) + elif fieldname in (u"id", u"retry"): + # documented but unhandled + pass else: log.msg("weird fieldname", fieldname, data)