From 43965d5289a6e0da3c662a69a4361d8a32f0099a Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 25 Feb 2018 22:37:00 -0800 Subject: [PATCH] add eventual-send queue We defer starting a new timer until we've completely emptied the queue, since we know we'll get to any new events added inside one of our callbacks. The old design in Foolscap (which copied the list, cleared the original, then fired everything in the copy) didn't look at these new events. OTOH, this pop(0)-until-empty approach makes it easier to get into an infinite loop (any callback which queues a new callback will get priority over anything else). But the code is simpler. --- src/wormhole/eventual.py | 50 ++++++++++++++++++++++++++ src/wormhole/test/test_eventual.py | 57 ++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 src/wormhole/eventual.py create mode 100644 src/wormhole/test/test_eventual.py diff --git a/src/wormhole/eventual.py b/src/wormhole/eventual.py new file mode 100644 index 0000000..4fc8731 --- /dev/null +++ b/src/wormhole/eventual.py @@ -0,0 +1,50 @@ +# inspired-by/adapted-from Foolscap's eventual.py, which Glyph wrote for me +# years ago. + +from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IReactorTime +from twisted.python import log + +class EventualQueue(object): + def __init__(self, clock): + # pass clock=reactor unless you're testing + self._clock = IReactorTime(clock) + self._calls = [] + self._flush_d = None + self._timer = None + + def eventually(self, f, *args, **kwargs): + self._calls.append( (f, args, kwargs) ) + if not self._timer: + self._timer = self._clock.callLater(0, self._turn) + + def fire_eventually(self, value=None): + d = Deferred() + self.eventually(d.callback, value) + return d + + def _turn(self): + while self._calls: + (f, args, kwargs) = self._calls.pop(0) + try: + f(*args, **kwargs) + except: + log.err() + self._timer = None + d, self._flush_d = self._flush_d, None + if d: + d.callback(None) + + def flush_sync(self): + # if you have control over the Clock, this will synchronously flush the + # queue + assert self._clock.advance, "needs clock=twisted.internet.task.Clock()" + while self._calls: + self._clock.advance(0) + + def flush(self): + # this is for unit tests, not application code + assert not self._flush_d, "only one flush at a time" + self._flush_d = Deferred() + self.eventually(lambda: None) + return self._flush_d diff --git a/src/wormhole/test/test_eventual.py b/src/wormhole/test/test_eventual.py new file mode 100644 index 0000000..4813198 --- /dev/null +++ b/src/wormhole/test/test_eventual.py @@ -0,0 +1,57 @@ +from __future__ import print_function, unicode_literals +import mock +from twisted.trial import unittest +from twisted.internet import reactor +from twisted.internet.task import Clock +from twisted.internet.defer import Deferred, inlineCallbacks +from ..eventual import EventualQueue + +class IntentionalError(Exception): + pass + +class Eventual(unittest.TestCase, object): + def test_eventually(self): + c = Clock() + eq = EventualQueue(c) + c1 = mock.Mock() + eq.eventually(c1, "arg1", "arg2", kwarg1="kw1") + eq.eventually(c1, "arg3", "arg4", kwarg5="kw5") + d2 = eq.fire_eventually() + d3 = eq.fire_eventually("value") + self.assertEqual(c1.mock_calls, []) + self.assertNoResult(d2) + self.assertNoResult(d3) + + eq.flush_sync() + self.assertEqual(c1.mock_calls, + [mock.call("arg1", "arg2", kwarg1="kw1"), + mock.call("arg3", "arg4", kwarg5="kw5")]) + self.assertEqual(self.successResultOf(d2), None) + self.assertEqual(self.successResultOf(d3), "value") + + def test_error(self): + c = Clock() + eq = EventualQueue(c) + c1 = mock.Mock(side_effect=IntentionalError) + eq.eventually(c1, "arg1", "arg2", kwarg1="kw1") + self.assertEqual(c1.mock_calls, []) + + eq.flush_sync() + self.assertEqual(c1.mock_calls, + [mock.call("arg1", "arg2", kwarg1="kw1")]) + + self.flushLoggedErrors(IntentionalError) + + @inlineCallbacks + def test_flush(self): + eq = EventualQueue(reactor) + d1 = eq.fire_eventually() + d2 = Deferred() + def _more(res): + eq.eventually(d2.callback, None) + d1.addCallback(_more) + yield eq.flush() + # d1 will fire, which will queue d2 to fire, and the flush() ought to + # wait for d2 too + self.successResultOf(d2) +