move blocking/eventsource out to a separate file

This commit is contained in:
Brian Warner 2015-07-17 16:55:29 -07:00
parent f6eeaab0e4
commit bc54a0bbca
2 changed files with 42 additions and 40 deletions

View File

@ -0,0 +1,41 @@
import requests
class EventSourceFollower:
def __init__(self, url, timeout):
self.resp = requests.get(url,
headers={"accept": "text/event-stream"},
stream=True,
timeout=timeout)
self.resp.raise_for_status()
def close(self):
self.resp.close()
def _get_fields(self, lines):
while True:
first_line = lines.next() # raises StopIteration when closed
fieldname, data = first_line.split(": ", 1)
data_lines = [data]
while True:
next_line = lines.next()
if not next_line: # empty string, original was "\n"
yield (fieldname, "\n".join(data_lines))
break
data_lines.append(next_line)
def iter_events(self):
# I think Request.iter_lines and .iter_content use chunk_size= in a
# funny way, and nothing happens until at least that much data has
# arrived. So unless we set chunk_size=1, we won't hear about lines
# for a long time. I'd prefer that chunk_size behaved like
# read(size), and gave you 1<=x<=size bytes in response.
eventtype = "message"
lines_iter = self.resp.iter_lines(chunk_size=1)
for (fieldname, data) in self._get_fields(lines_iter):
if fieldname == "data":
yield (eventtype, data)
eventtype = "message"
elif fieldname == "event":
eventtype = data
else:
print("weird fieldname", fieldname, data)

View File

@ -5,6 +5,7 @@ from spake2 import SPAKE2_A, SPAKE2_B
from nacl.secret import SecretBox
from nacl.exceptions import CryptoError
from nacl import utils
from .eventsource import EventSourceFollower
from .. import __version__
from .. import codes
from ..errors import ServerError
@ -48,46 +49,6 @@ class ReceiverWrongPasswordError(WrongPasswordError):
# GET /CHANNEL-ID/SIDE/poll/MSGNUM (eventsource) -> STR, STR, ..
# POST /CHANNEL-ID/SIDE/deallocate -> waiting | deleted
class EventSourceFollower:
def __init__(self, url, timeout):
self.resp = requests.get(url,
headers={"accept": "text/event-stream"},
stream=True,
timeout=timeout)
self.resp.raise_for_status()
def close(self):
self.resp.close()
def _get_fields(self, lines):
while True:
first_line = lines.next() # raises StopIteration when closed
fieldname, data = first_line.split(": ", 1)
data_lines = [data]
while True:
next_line = lines.next()
if not next_line: # empty string, original was "\n"
yield (fieldname, "\n".join(data_lines))
break
data_lines.append(next_line)
def iter_events(self):
# I think Request.iter_lines and .iter_content use chunk_size= in a
# funny way, and nothing happens until at least that much data has
# arrived. So unless we set chunk_size=1, we won't hear about lines
# for a long time. I'd prefer that chunk_size behaved like
# read(size), and gave you 1<=x<=size bytes in response.
eventtype = "message"
lines_iter = self.resp.iter_lines(chunk_size=1)
for (fieldname, data) in self._get_fields(lines_iter):
if fieldname == "data":
yield (eventtype, data)
eventtype = "message"
elif fieldname == "event":
eventtype = data
else:
print("weird fieldname", fieldname, data)
class Common:
def url(self, verb, msgnum=None):
url = "%s%d/%s/%s" % (self.relay, self.channel_id, self.side, verb)