diff --git a/src/wormhole/eventual.py b/src/wormhole/eventual.py new file mode 100644 index 0000000..4fc8731 --- /dev/null +++ b/src/wormhole/eventual.py @@ -0,0 +1,50 @@ +# inspired-by/adapted-from Foolscap's eventual.py, which Glyph wrote for me +# years ago. + +from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IReactorTime +from twisted.python import log + +class EventualQueue(object): + def __init__(self, clock): + # pass clock=reactor unless you're testing + self._clock = IReactorTime(clock) + self._calls = [] + self._flush_d = None + self._timer = None + + def eventually(self, f, *args, **kwargs): + self._calls.append( (f, args, kwargs) ) + if not self._timer: + self._timer = self._clock.callLater(0, self._turn) + + def fire_eventually(self, value=None): + d = Deferred() + self.eventually(d.callback, value) + return d + + def _turn(self): + while self._calls: + (f, args, kwargs) = self._calls.pop(0) + try: + f(*args, **kwargs) + except: + log.err() + self._timer = None + d, self._flush_d = self._flush_d, None + if d: + d.callback(None) + + def flush_sync(self): + # if you have control over the Clock, this will synchronously flush the + # queue + assert self._clock.advance, "needs clock=twisted.internet.task.Clock()" + while self._calls: + self._clock.advance(0) + + def flush(self): + # this is for unit tests, not application code + assert not self._flush_d, "only one flush at a time" + self._flush_d = Deferred() + self.eventually(lambda: None) + return self._flush_d diff --git a/src/wormhole/test/test_eventual.py b/src/wormhole/test/test_eventual.py new file mode 100644 index 0000000..4813198 --- /dev/null +++ b/src/wormhole/test/test_eventual.py @@ -0,0 +1,57 @@ +from __future__ import print_function, unicode_literals +import mock +from twisted.trial import unittest +from twisted.internet import reactor +from twisted.internet.task import Clock +from twisted.internet.defer import Deferred, inlineCallbacks +from ..eventual import EventualQueue + +class IntentionalError(Exception): + pass + +class Eventual(unittest.TestCase, object): + def test_eventually(self): + c = Clock() + eq = EventualQueue(c) + c1 = mock.Mock() + eq.eventually(c1, "arg1", "arg2", kwarg1="kw1") + eq.eventually(c1, "arg3", "arg4", kwarg5="kw5") + d2 = eq.fire_eventually() + d3 = eq.fire_eventually("value") + self.assertEqual(c1.mock_calls, []) + self.assertNoResult(d2) + self.assertNoResult(d3) + + eq.flush_sync() + self.assertEqual(c1.mock_calls, + [mock.call("arg1", "arg2", kwarg1="kw1"), + mock.call("arg3", "arg4", kwarg5="kw5")]) + self.assertEqual(self.successResultOf(d2), None) + self.assertEqual(self.successResultOf(d3), "value") + + def test_error(self): + c = Clock() + eq = EventualQueue(c) + c1 = mock.Mock(side_effect=IntentionalError) + eq.eventually(c1, "arg1", "arg2", kwarg1="kw1") + self.assertEqual(c1.mock_calls, []) + + eq.flush_sync() + self.assertEqual(c1.mock_calls, + [mock.call("arg1", "arg2", kwarg1="kw1")]) + + self.flushLoggedErrors(IntentionalError) + + @inlineCallbacks + def test_flush(self): + eq = EventualQueue(reactor) + d1 = eq.fire_eventually() + d2 = Deferred() + def _more(res): + eq.eventually(d2.callback, None) + d1.addCallback(_more) + yield eq.flush() + # d1 will fire, which will queue d2 to fire, and the flush() ought to + # wait for d2 too + self.successResultOf(d2) +