add eventual-send queue

We defer starting a new timer until we've completely emptied the queue, since
we know we'll get to any new events added inside one of our callbacks. The
old design in Foolscap (which copied the list, cleared the original, then
fired everything in the copy) didn't look at these new events. OTOH, this
pop(0)-until-empty approach makes it easier to get into an infinite loop (any
callback which queues a new callback will get priority over anything else).
But the code is simpler.
This commit is contained in:
Brian Warner 2018-02-25 22:37:00 -08:00
parent 323044e9f5
commit 43965d5289
2 changed files with 107 additions and 0 deletions

50
src/wormhole/eventual.py Normal file
View File

@ -0,0 +1,50 @@
# inspired-by/adapted-from Foolscap's eventual.py, which Glyph wrote for me
# years ago.
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IReactorTime
from twisted.python import log
class EventualQueue(object):
def __init__(self, clock):
# pass clock=reactor unless you're testing
self._clock = IReactorTime(clock)
self._calls = []
self._flush_d = None
self._timer = None
def eventually(self, f, *args, **kwargs):
self._calls.append( (f, args, kwargs) )
if not self._timer:
self._timer = self._clock.callLater(0, self._turn)
def fire_eventually(self, value=None):
d = Deferred()
self.eventually(d.callback, value)
return d
def _turn(self):
while self._calls:
(f, args, kwargs) = self._calls.pop(0)
try:
f(*args, **kwargs)
except:
log.err()
self._timer = None
d, self._flush_d = self._flush_d, None
if d:
d.callback(None)
def flush_sync(self):
# if you have control over the Clock, this will synchronously flush the
# queue
assert self._clock.advance, "needs clock=twisted.internet.task.Clock()"
while self._calls:
self._clock.advance(0)
def flush(self):
# this is for unit tests, not application code
assert not self._flush_d, "only one flush at a time"
self._flush_d = Deferred()
self.eventually(lambda: None)
return self._flush_d

View File

@ -0,0 +1,57 @@
from __future__ import print_function, unicode_literals
import mock
from twisted.trial import unittest
from twisted.internet import reactor
from twisted.internet.task import Clock
from twisted.internet.defer import Deferred, inlineCallbacks
from ..eventual import EventualQueue
class IntentionalError(Exception):
pass
class Eventual(unittest.TestCase, object):
def test_eventually(self):
c = Clock()
eq = EventualQueue(c)
c1 = mock.Mock()
eq.eventually(c1, "arg1", "arg2", kwarg1="kw1")
eq.eventually(c1, "arg3", "arg4", kwarg5="kw5")
d2 = eq.fire_eventually()
d3 = eq.fire_eventually("value")
self.assertEqual(c1.mock_calls, [])
self.assertNoResult(d2)
self.assertNoResult(d3)
eq.flush_sync()
self.assertEqual(c1.mock_calls,
[mock.call("arg1", "arg2", kwarg1="kw1"),
mock.call("arg3", "arg4", kwarg5="kw5")])
self.assertEqual(self.successResultOf(d2), None)
self.assertEqual(self.successResultOf(d3), "value")
def test_error(self):
c = Clock()
eq = EventualQueue(c)
c1 = mock.Mock(side_effect=IntentionalError)
eq.eventually(c1, "arg1", "arg2", kwarg1="kw1")
self.assertEqual(c1.mock_calls, [])
eq.flush_sync()
self.assertEqual(c1.mock_calls,
[mock.call("arg1", "arg2", kwarg1="kw1")])
self.flushLoggedErrors(IntentionalError)
@inlineCallbacks
def test_flush(self):
eq = EventualQueue(reactor)
d1 = eq.fire_eventually()
d2 = Deferred()
def _more(res):
eq.eventually(d2.callback, None)
d1.addCallback(_more)
yield eq.flush()
# d1 will fire, which will queue d2 to fire, and the flush() ought to
# wait for d2 too
self.successResultOf(d2)