diff --git a/src/wormhole/twisted/eventsource.py b/src/wormhole/twisted/eventsource.py index 0630fe5..521ac76 100644 --- a/src/wormhole/twisted/eventsource.py +++ b/src/wormhole/twisted/eventsource.py @@ -4,7 +4,7 @@ from twisted.application import service from twisted.protocols import basic from twisted.web.client import Agent, ResponseDone from twisted.web.http_headers import Headers -from .eventual import eventually +from ..util.eventual import eventually class EventSourceParser(basic.LineOnlyReceiver): delimiter = "\n" @@ -14,6 +14,7 @@ class EventSourceParser(basic.LineOnlyReceiver): self.current_lines = [] self.handler = handler self.done_deferred = defer.Deferred() + self.eventtype = "message" def connectionLost(self, why): if why.check(ResponseDone): @@ -45,8 +46,17 @@ class EventSourceParser(basic.LineOnlyReceiver): else: self.current_lines.append(line) - def fieldReceived(self, name, data): - self.handler(name, data) + def fieldReceived(self, fieldname, data): + if fieldname == "event": + self.eventtype = data + elif fieldname == "data": + self.eventReceived(self.eventtype, data) + self.eventtype = "message" + else: + log.msg("weird fieldname", fieldname, data) + + def eventReceived(self, eventtype, data): + self.handler(eventtype, data) class EventSourceError(Exception): pass @@ -56,20 +66,22 @@ class EventSourceError(Exception): # es.cancel() class EventSource: # TODO: service.Service - def __init__(self, url, handler, when_connected=None): + def __init__(self, url, handler, when_connected=None, agent=None): self.url = url self.handler = handler self.when_connected = when_connected self.started = False self.cancelled = False self.proto = EventSourceParser(self.handler) + if not agent: + agent = Agent(reactor) + self.agent = agent def start(self): assert not self.started, "single-use" self.started = True - a = Agent(reactor) - d = a.request("GET", self.url, - Headers({"accept": ["text/event-stream"]})) + d = self.agent.request("GET", self.url, + Headers({"accept": ["text/event-stream"]})) d.addCallback(self._connected) return d @@ -125,7 +137,7 @@ class Connector: class ReconnectingEventSource(service.MultiService, protocol.ReconnectingClientFactory): - def __init__(self, baseurl, connection_starting, handler): + def __init__(self, baseurl, connection_starting, handler, agent=None): service.MultiService.__init__(self) # we don't use any of the basic Factory/ClientFactory methods of # this, just the ReconnectingClientFactory.retry, stopTrying, and @@ -134,6 +146,7 @@ class ReconnectingEventSource(service.MultiService, self.baseurl = baseurl self.connection_starting = connection_starting self.handler = handler + self.agent = agent # IService provides self.running, toggled by {start,stop}Service. # self.active is toggled by {,de}activate. If both .running and # .active are True, then we want to have an outstanding EventSource @@ -172,7 +185,8 @@ class ReconnectingEventSource(service.MultiService, return self.continueTrying = True url = self.connection_starting() - self.es = EventSource(url, self.handler, self.resetDelay) + self.es = EventSource(url, self.handler, self.resetDelay, + agent=self.agent) d = self.es.start() d.addBoth(self._stopped)