From 6cfabba31a8532a7e30ffc475ae95dd2c3bd6e5b Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sat, 30 Jun 2018 14:15:06 -0700 Subject: [PATCH] add reactor/cooperator to Wormhole and Boss calls --- src/wormhole/_boss.py | 2 ++ src/wormhole/test/test_machines.py | 5 ++++- src/wormhole/wormhole.py | 9 ++++++--- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/wormhole/_boss.py b/src/wormhole/_boss.py index 686335f..373c0f4 100644 --- a/src/wormhole/_boss.py +++ b/src/wormhole/_boss.py @@ -38,6 +38,8 @@ class Boss(object): _versions = attrib(validator=instance_of(dict)) _client_version = attrib(validator=instance_of(tuple)) _reactor = attrib() + _eventual_queue = attrib() + _cooperator = attrib() _journal = attrib(validator=provides(_interfaces.IJournal)) _tor = attrib(validator=optional(provides(_interfaces.ITorManager))) _timing = attrib(validator=provides(_interfaces.ITiming)) diff --git a/src/wormhole/test/test_machines.py b/src/wormhole/test/test_machines.py index 8a17b76..5b5e9f1 100644 --- a/src/wormhole/test/test_machines.py +++ b/src/wormhole/test/test_machines.py @@ -1286,11 +1286,14 @@ class Boss(unittest.TestCase): "closed") versions = {"app": "version1"} reactor = None + eq = None + cooperator = None journal = ImmediateJournal() tor_manager = None client_version = ("python", __version__) b = MockBoss(wormhole, "side", "url", "appid", versions, - client_version, reactor, journal, tor_manager, + client_version, reactor, eq, cooperator, journal, + tor_manager, timing.DebugTiming()) b._T = Dummy("t", events, ITerminator, "close") b._S = Dummy("s", events, ISend, "send") diff --git a/src/wormhole/wormhole.py b/src/wormhole/wormhole.py index 610069c..c02aa60 100644 --- a/src/wormhole/wormhole.py +++ b/src/wormhole/wormhole.py @@ -5,6 +5,7 @@ import sys from attr import attrib, attrs from twisted.python import failure +from twisted.internet.task import Cooperator from zope.interface import implementer from ._boss import Boss @@ -122,7 +123,8 @@ class _DelegatedWormhole(object): @implementer(IWormhole, IDeferredWormhole) class _DeferredWormhole(object): - def __init__(self, eq): + def __init__(self, reactor, eq): + self._reactor = reactor self._welcome_observer = OneShotObserver(eq) self._code_observer = OneShotObserver(eq) self._key = None @@ -258,10 +260,11 @@ def create( side = bytes_to_hexstr(os.urandom(5)) journal = journal or ImmediateJournal() eq = _eventual_queue or EventualQueue(reactor) + cooperator = Cooperator(scheduler=eq.eventually) if delegate: w = _DelegatedWormhole(delegate) else: - w = _DeferredWormhole(eq) + w = _DeferredWormhole(reactor, eq) wormhole_versions = {} # will be used to indicate Wormhole capabilities wormhole_versions["app_versions"] = versions # app-specific capabilities v = __version__ @@ -269,7 +272,7 @@ def create( v = v.decode("utf-8", errors="replace") client_version = ("python", v) b = Boss(w, side, relay_url, appid, wormhole_versions, client_version, - reactor, journal, tor, timing) + reactor, eq, cooperator, journal, tor, timing) w._set_boss(b) b.start() return w