eventsource: add Agent, deliver eventtype correctly
import eventual.py from the right place
This commit is contained in:
parent
5241c07b8c
commit
951da1a59b
|
@ -4,7 +4,7 @@ from twisted.application import service
|
||||||
from twisted.protocols import basic
|
from twisted.protocols import basic
|
||||||
from twisted.web.client import Agent, ResponseDone
|
from twisted.web.client import Agent, ResponseDone
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from .eventual import eventually
|
from ..util.eventual import eventually
|
||||||
|
|
||||||
class EventSourceParser(basic.LineOnlyReceiver):
|
class EventSourceParser(basic.LineOnlyReceiver):
|
||||||
delimiter = "\n"
|
delimiter = "\n"
|
||||||
|
@ -14,6 +14,7 @@ class EventSourceParser(basic.LineOnlyReceiver):
|
||||||
self.current_lines = []
|
self.current_lines = []
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.done_deferred = defer.Deferred()
|
self.done_deferred = defer.Deferred()
|
||||||
|
self.eventtype = "message"
|
||||||
|
|
||||||
def connectionLost(self, why):
|
def connectionLost(self, why):
|
||||||
if why.check(ResponseDone):
|
if why.check(ResponseDone):
|
||||||
|
@ -45,8 +46,17 @@ class EventSourceParser(basic.LineOnlyReceiver):
|
||||||
else:
|
else:
|
||||||
self.current_lines.append(line)
|
self.current_lines.append(line)
|
||||||
|
|
||||||
def fieldReceived(self, name, data):
|
def fieldReceived(self, fieldname, data):
|
||||||
self.handler(name, data)
|
if fieldname == "event":
|
||||||
|
self.eventtype = data
|
||||||
|
elif fieldname == "data":
|
||||||
|
self.eventReceived(self.eventtype, data)
|
||||||
|
self.eventtype = "message"
|
||||||
|
else:
|
||||||
|
log.msg("weird fieldname", fieldname, data)
|
||||||
|
|
||||||
|
def eventReceived(self, eventtype, data):
|
||||||
|
self.handler(eventtype, data)
|
||||||
|
|
||||||
class EventSourceError(Exception):
|
class EventSourceError(Exception):
|
||||||
pass
|
pass
|
||||||
|
@ -56,19 +66,21 @@ class EventSourceError(Exception):
|
||||||
# es.cancel()
|
# es.cancel()
|
||||||
|
|
||||||
class EventSource: # TODO: service.Service
|
class EventSource: # TODO: service.Service
|
||||||
def __init__(self, url, handler, when_connected=None):
|
def __init__(self, url, handler, when_connected=None, agent=None):
|
||||||
self.url = url
|
self.url = url
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.when_connected = when_connected
|
self.when_connected = when_connected
|
||||||
self.started = False
|
self.started = False
|
||||||
self.cancelled = False
|
self.cancelled = False
|
||||||
self.proto = EventSourceParser(self.handler)
|
self.proto = EventSourceParser(self.handler)
|
||||||
|
if not agent:
|
||||||
|
agent = Agent(reactor)
|
||||||
|
self.agent = agent
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
assert not self.started, "single-use"
|
assert not self.started, "single-use"
|
||||||
self.started = True
|
self.started = True
|
||||||
a = Agent(reactor)
|
d = self.agent.request("GET", self.url,
|
||||||
d = a.request("GET", self.url,
|
|
||||||
Headers({"accept": ["text/event-stream"]}))
|
Headers({"accept": ["text/event-stream"]}))
|
||||||
d.addCallback(self._connected)
|
d.addCallback(self._connected)
|
||||||
return d
|
return d
|
||||||
|
@ -125,7 +137,7 @@ class Connector:
|
||||||
|
|
||||||
class ReconnectingEventSource(service.MultiService,
|
class ReconnectingEventSource(service.MultiService,
|
||||||
protocol.ReconnectingClientFactory):
|
protocol.ReconnectingClientFactory):
|
||||||
def __init__(self, baseurl, connection_starting, handler):
|
def __init__(self, baseurl, connection_starting, handler, agent=None):
|
||||||
service.MultiService.__init__(self)
|
service.MultiService.__init__(self)
|
||||||
# we don't use any of the basic Factory/ClientFactory methods of
|
# we don't use any of the basic Factory/ClientFactory methods of
|
||||||
# this, just the ReconnectingClientFactory.retry, stopTrying, and
|
# this, just the ReconnectingClientFactory.retry, stopTrying, and
|
||||||
|
@ -134,6 +146,7 @@ class ReconnectingEventSource(service.MultiService,
|
||||||
self.baseurl = baseurl
|
self.baseurl = baseurl
|
||||||
self.connection_starting = connection_starting
|
self.connection_starting = connection_starting
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
self.agent = agent
|
||||||
# IService provides self.running, toggled by {start,stop}Service.
|
# IService provides self.running, toggled by {start,stop}Service.
|
||||||
# self.active is toggled by {,de}activate. If both .running and
|
# self.active is toggled by {,de}activate. If both .running and
|
||||||
# .active are True, then we want to have an outstanding EventSource
|
# .active are True, then we want to have an outstanding EventSource
|
||||||
|
@ -172,7 +185,8 @@ class ReconnectingEventSource(service.MultiService,
|
||||||
return
|
return
|
||||||
self.continueTrying = True
|
self.continueTrying = True
|
||||||
url = self.connection_starting()
|
url = self.connection_starting()
|
||||||
self.es = EventSource(url, self.handler, self.resetDelay)
|
self.es = EventSource(url, self.handler, self.resetDelay,
|
||||||
|
agent=self.agent)
|
||||||
d = self.es.start()
|
d = self.es.start()
|
||||||
d.addBoth(self._stopped)
|
d.addBoth(self._stopped)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user