diff --git a/src/wormhole/blocking/eventsource.py b/src/wormhole/blocking/eventsource.py new file mode 100644 index 0000000..28bc70f --- /dev/null +++ b/src/wormhole/blocking/eventsource.py @@ -0,0 +1,41 @@ +import requests + +class EventSourceFollower: + def __init__(self, url, timeout): + self.resp = requests.get(url, + headers={"accept": "text/event-stream"}, + stream=True, + timeout=timeout) + self.resp.raise_for_status() + + def close(self): + self.resp.close() + + def _get_fields(self, lines): + while True: + first_line = lines.next() # raises StopIteration when closed + fieldname, data = first_line.split(": ", 1) + data_lines = [data] + while True: + next_line = lines.next() + if not next_line: # empty string, original was "\n" + yield (fieldname, "\n".join(data_lines)) + break + data_lines.append(next_line) + + def iter_events(self): + # I think Request.iter_lines and .iter_content use chunk_size= in a + # funny way, and nothing happens until at least that much data has + # arrived. So unless we set chunk_size=1, we won't hear about lines + # for a long time. I'd prefer that chunk_size behaved like + # read(size), and gave you 1<=x<=size bytes in response. + eventtype = "message" + lines_iter = self.resp.iter_lines(chunk_size=1) + for (fieldname, data) in self._get_fields(lines_iter): + if fieldname == "data": + yield (eventtype, data) + eventtype = "message" + elif fieldname == "event": + eventtype = data + else: + print("weird fieldname", fieldname, data) diff --git a/src/wormhole/blocking/transcribe.py b/src/wormhole/blocking/transcribe.py index dbf95a3..eb76576 100644 --- a/src/wormhole/blocking/transcribe.py +++ b/src/wormhole/blocking/transcribe.py @@ -5,6 +5,7 @@ from spake2 import SPAKE2_A, SPAKE2_B from nacl.secret import SecretBox from nacl.exceptions import CryptoError from nacl import utils +from .eventsource import EventSourceFollower from .. import __version__ from .. import codes from ..errors import ServerError @@ -48,46 +49,6 @@ class ReceiverWrongPasswordError(WrongPasswordError): # GET /CHANNEL-ID/SIDE/poll/MSGNUM (eventsource) -> STR, STR, .. # POST /CHANNEL-ID/SIDE/deallocate -> waiting | deleted -class EventSourceFollower: - def __init__(self, url, timeout): - self.resp = requests.get(url, - headers={"accept": "text/event-stream"}, - stream=True, - timeout=timeout) - self.resp.raise_for_status() - - def close(self): - self.resp.close() - - def _get_fields(self, lines): - while True: - first_line = lines.next() # raises StopIteration when closed - fieldname, data = first_line.split(": ", 1) - data_lines = [data] - while True: - next_line = lines.next() - if not next_line: # empty string, original was "\n" - yield (fieldname, "\n".join(data_lines)) - break - data_lines.append(next_line) - - def iter_events(self): - # I think Request.iter_lines and .iter_content use chunk_size= in a - # funny way, and nothing happens until at least that much data has - # arrived. So unless we set chunk_size=1, we won't hear about lines - # for a long time. I'd prefer that chunk_size behaved like - # read(size), and gave you 1<=x<=size bytes in response. - eventtype = "message" - lines_iter = self.resp.iter_lines(chunk_size=1) - for (fieldname, data) in self._get_fields(lines_iter): - if fieldname == "data": - yield (eventtype, data) - eventtype = "message" - elif fieldname == "event": - eventtype = data - else: - print("weird fieldname", fieldname, data) - class Common: def url(self, verb, msgnum=None): url = "%s%d/%s/%s" % (self.relay, self.channel_id, self.side, verb)