diff --git a/src/wormhole/_boss.py b/src/wormhole/_boss.py index 686335f..373c0f4 100644 --- a/src/wormhole/_boss.py +++ b/src/wormhole/_boss.py @@ -38,6 +38,8 @@ class Boss(object): _versions = attrib(validator=instance_of(dict)) _client_version = attrib(validator=instance_of(tuple)) _reactor = attrib() + _eventual_queue = attrib() + _cooperator = attrib() _journal = attrib(validator=provides(_interfaces.IJournal)) _tor = attrib(validator=optional(provides(_interfaces.ITorManager))) _timing = attrib(validator=provides(_interfaces.ITiming)) diff --git a/src/wormhole/test/test_machines.py b/src/wormhole/test/test_machines.py index 8a17b76..5b5e9f1 100644 --- a/src/wormhole/test/test_machines.py +++ b/src/wormhole/test/test_machines.py @@ -1286,11 +1286,14 @@ class Boss(unittest.TestCase): "closed") versions = {"app": "version1"} reactor = None + eq = None + cooperator = None journal = ImmediateJournal() tor_manager = None client_version = ("python", __version__) b = MockBoss(wormhole, "side", "url", "appid", versions, - client_version, reactor, journal, tor_manager, + client_version, reactor, eq, cooperator, journal, + tor_manager, timing.DebugTiming()) b._T = Dummy("t", events, ITerminator, "close") b._S = Dummy("s", events, ISend, "send") diff --git a/src/wormhole/wormhole.py b/src/wormhole/wormhole.py index 610069c..c02aa60 100644 --- a/src/wormhole/wormhole.py +++ b/src/wormhole/wormhole.py @@ -5,6 +5,7 @@ import sys from attr import attrib, attrs from twisted.python import failure +from twisted.internet.task import Cooperator from zope.interface import implementer from ._boss import Boss @@ -122,7 +123,8 @@ class _DelegatedWormhole(object): @implementer(IWormhole, IDeferredWormhole) class _DeferredWormhole(object): - def __init__(self, eq): + def __init__(self, reactor, eq): + self._reactor = reactor self._welcome_observer = OneShotObserver(eq) self._code_observer = OneShotObserver(eq) self._key = None @@ -258,10 +260,11 @@ def create( side = bytes_to_hexstr(os.urandom(5)) journal = journal or ImmediateJournal() eq = _eventual_queue or EventualQueue(reactor) + cooperator = Cooperator(scheduler=eq.eventually) if delegate: w = _DelegatedWormhole(delegate) else: - w = _DeferredWormhole(eq) + w = _DeferredWormhole(reactor, eq) wormhole_versions = {} # will be used to indicate Wormhole capabilities wormhole_versions["app_versions"] = versions # app-specific capabilities v = __version__ @@ -269,7 +272,7 @@ def create( v = v.decode("utf-8", errors="replace") client_version = ("python", v) b = Boss(w, side, relay_url, appid, wormhole_versions, client_version, - reactor, journal, tor, timing) + reactor, eq, cooperator, journal, tor, timing) w._set_boss(b) b.start() return w