add OneShotObserver and SequenceObserver, with tests

This factors out the various "give me a Deferred for an value that may or may
not eventually be successfully generated" routines in _DeferredWormhole. It
uses an eventual-send to fire the Deferreds to avoid plan-coordination
hazards when the attached callbacks then call back into the Wormhole object
before the rest of the state transition has finished.
This commit is contained in:
Brian Warner 2018-02-24 13:09:54 -08:00
parent 43965d5289
commit caabb3510c
2 changed files with 191 additions and 0 deletions

69
src/wormhole/observer.py Normal file
View File

@ -0,0 +1,69 @@
from __future__ import unicode_literals, print_function
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
NoResult = object()
class OneShotObserver(object):
def __init__(self, eventual_queue):
self._eq = eventual_queue
self._result = NoResult
self._observers = [] # list of Deferreds
def when_fired(self):
d = Deferred()
self._observers.append(d)
self._maybe_call_observers()
return d
def fire(self, result):
assert self._result is NoResult
self._result = result
self._maybe_call_observers()
def _maybe_call_observers(self):
if self._result is NoResult:
return
observers, self._observers = self._observers, []
for d in observers:
self._eq.eventually(d.callback, self._result)
def error(self, f):
# errors will override an existing result
assert isinstance(f, Failure)
self._result = f
self._maybe_call_observers()
def fire_if_not_fired(self, result):
if self._result is NoResult:
self.fire(result)
class SequenceObserver(object):
def __init__(self, eventual_queue):
self._eq = eventual_queue
self._error = None
self._results = []
self._observers = []
def when_next_event(self):
d = Deferred()
if self._error:
self._eq.eventually(d.errback, self._error)
elif self._results:
result = self._results.pop(0)
self._eq.eventually(d.callback, result)
else:
self._observers.append(d)
return d
def fire(self, result):
if isinstance(result, Failure):
self._error = result
for d in self._observers:
self._eq.eventually(d.errback, self._error)
self._observers = []
else:
self._results.append(result)
if self._observers:
d = self._observers.pop(0)
self._eq.eventually(d.callback, self._results.pop(0))

View File

@ -0,0 +1,122 @@
from twisted.trial import unittest
from twisted.internet.task import Clock
from twisted.python.failure import Failure
from ..eventual import EventualQueue
from ..observer import OneShotObserver, SequenceObserver
class OneShot(unittest.TestCase):
def test_fire(self):
c = Clock()
eq = EventualQueue(c)
o = OneShotObserver(eq)
res = object()
d1 = o.when_fired()
eq.flush_sync()
self.assertNoResult(d1)
o.fire(res)
eq.flush_sync()
self.assertIdentical(self.successResultOf(d1), res)
d2 = o.when_fired()
eq.flush_sync()
self.assertIdentical(self.successResultOf(d2), res)
o.fire_if_not_fired(object())
eq.flush_sync()
def test_fire_if_not_fired(self):
c = Clock()
eq = EventualQueue(c)
o = OneShotObserver(eq)
res1 = object()
res2 = object()
d1 = o.when_fired()
eq.flush_sync()
self.assertNoResult(d1)
o.fire_if_not_fired(res1)
o.fire_if_not_fired(res2)
eq.flush_sync()
self.assertIdentical(self.successResultOf(d1), res1)
def test_error_before_firing(self):
c = Clock()
eq = EventualQueue(c)
o = OneShotObserver(eq)
f = Failure(ValueError("oops"))
d1 = o.when_fired()
eq.flush_sync()
self.assertNoResult(d1)
o.error(f)
eq.flush_sync()
self.assertIdentical(self.failureResultOf(d1), f)
d2 = o.when_fired()
eq.flush_sync()
self.assertIdentical(self.failureResultOf(d2), f)
def test_error_after_firing(self):
c = Clock()
eq = EventualQueue(c)
o = OneShotObserver(eq)
res = object()
f = Failure(ValueError("oops"))
o.fire(res)
eq.flush_sync()
d1 = o.when_fired()
o.error(f)
d2 = o.when_fired()
eq.flush_sync()
self.assertIdentical(self.successResultOf(d1), res)
self.assertIdentical(self.failureResultOf(d2), f)
class Sequence(unittest.TestCase):
def test_fire(self):
c = Clock()
eq = EventualQueue(c)
o = SequenceObserver(eq)
d1 = o.when_next_event()
eq.flush_sync()
self.assertNoResult(d1)
d2 = o.when_next_event()
eq.flush_sync()
self.assertNoResult(d1)
self.assertNoResult(d2)
ev1 = object()
ev2 = object()
o.fire(ev1)
eq.flush_sync()
self.assertIdentical(self.successResultOf(d1), ev1)
self.assertNoResult(d2)
o.fire(ev2)
eq.flush_sync()
self.assertIdentical(self.successResultOf(d2), ev2)
ev3 = object()
ev4 = object()
o.fire(ev3)
o.fire(ev4)
d3 = o.when_next_event()
eq.flush_sync()
self.assertIdentical(self.successResultOf(d3), ev3)
d4 = o.when_next_event()
eq.flush_sync()
self.assertIdentical(self.successResultOf(d4), ev4)
def test_error(self):
c = Clock()
eq = EventualQueue(c)
o = SequenceObserver(eq)
d1 = o.when_next_event()
eq.flush_sync()
self.assertNoResult(d1)
f = Failure(ValueError("oops"))
o.fire(f)
eq.flush_sync()
self.assertIdentical(self.failureResultOf(d1), f)
d2 = o.when_next_event()
eq.flush_sync()
self.assertIdentical(self.failureResultOf(d2), f)