diff --git a/src/wormhole/observer.py b/src/wormhole/observer.py new file mode 100644 index 0000000..99a22a0 --- /dev/null +++ b/src/wormhole/observer.py @@ -0,0 +1,69 @@ +from __future__ import unicode_literals, print_function +from twisted.internet.defer import Deferred +from twisted.python.failure import Failure + +NoResult = object() + +class OneShotObserver(object): + def __init__(self, eventual_queue): + self._eq = eventual_queue + self._result = NoResult + self._observers = [] # list of Deferreds + + def when_fired(self): + d = Deferred() + self._observers.append(d) + self._maybe_call_observers() + return d + + def fire(self, result): + assert self._result is NoResult + self._result = result + self._maybe_call_observers() + + def _maybe_call_observers(self): + if self._result is NoResult: + return + observers, self._observers = self._observers, [] + for d in observers: + self._eq.eventually(d.callback, self._result) + + def error(self, f): + # errors will override an existing result + assert isinstance(f, Failure) + self._result = f + self._maybe_call_observers() + + def fire_if_not_fired(self, result): + if self._result is NoResult: + self.fire(result) + +class SequenceObserver(object): + def __init__(self, eventual_queue): + self._eq = eventual_queue + self._error = None + self._results = [] + self._observers = [] + + def when_next_event(self): + d = Deferred() + if self._error: + self._eq.eventually(d.errback, self._error) + elif self._results: + result = self._results.pop(0) + self._eq.eventually(d.callback, result) + else: + self._observers.append(d) + return d + + def fire(self, result): + if isinstance(result, Failure): + self._error = result + for d in self._observers: + self._eq.eventually(d.errback, self._error) + self._observers = [] + else: + self._results.append(result) + if self._observers: + d = self._observers.pop(0) + self._eq.eventually(d.callback, self._results.pop(0)) diff --git a/src/wormhole/test/test_observer.py b/src/wormhole/test/test_observer.py new file mode 100644 index 0000000..d974cc5 --- /dev/null +++ b/src/wormhole/test/test_observer.py @@ -0,0 +1,122 @@ +from twisted.trial import unittest +from twisted.internet.task import Clock +from twisted.python.failure import Failure +from ..eventual import EventualQueue +from ..observer import OneShotObserver, SequenceObserver + +class OneShot(unittest.TestCase): + def test_fire(self): + c = Clock() + eq = EventualQueue(c) + o = OneShotObserver(eq) + res = object() + d1 = o.when_fired() + eq.flush_sync() + self.assertNoResult(d1) + o.fire(res) + eq.flush_sync() + self.assertIdentical(self.successResultOf(d1), res) + d2 = o.when_fired() + eq.flush_sync() + self.assertIdentical(self.successResultOf(d2), res) + o.fire_if_not_fired(object()) + eq.flush_sync() + + def test_fire_if_not_fired(self): + c = Clock() + eq = EventualQueue(c) + o = OneShotObserver(eq) + res1 = object() + res2 = object() + d1 = o.when_fired() + eq.flush_sync() + self.assertNoResult(d1) + o.fire_if_not_fired(res1) + o.fire_if_not_fired(res2) + eq.flush_sync() + self.assertIdentical(self.successResultOf(d1), res1) + + def test_error_before_firing(self): + c = Clock() + eq = EventualQueue(c) + o = OneShotObserver(eq) + f = Failure(ValueError("oops")) + d1 = o.when_fired() + eq.flush_sync() + self.assertNoResult(d1) + o.error(f) + eq.flush_sync() + self.assertIdentical(self.failureResultOf(d1), f) + d2 = o.when_fired() + eq.flush_sync() + self.assertIdentical(self.failureResultOf(d2), f) + + def test_error_after_firing(self): + c = Clock() + eq = EventualQueue(c) + o = OneShotObserver(eq) + res = object() + f = Failure(ValueError("oops")) + + o.fire(res) + eq.flush_sync() + d1 = o.when_fired() + o.error(f) + d2 = o.when_fired() + eq.flush_sync() + self.assertIdentical(self.successResultOf(d1), res) + self.assertIdentical(self.failureResultOf(d2), f) + + +class Sequence(unittest.TestCase): + def test_fire(self): + c = Clock() + eq = EventualQueue(c) + o = SequenceObserver(eq) + d1 = o.when_next_event() + eq.flush_sync() + self.assertNoResult(d1) + d2 = o.when_next_event() + eq.flush_sync() + self.assertNoResult(d1) + self.assertNoResult(d2) + + ev1 = object() + ev2 = object() + o.fire(ev1) + eq.flush_sync() + self.assertIdentical(self.successResultOf(d1), ev1) + self.assertNoResult(d2) + + o.fire(ev2) + eq.flush_sync() + self.assertIdentical(self.successResultOf(d2), ev2) + + ev3 = object() + ev4 = object() + o.fire(ev3) + o.fire(ev4) + + d3 = o.when_next_event() + eq.flush_sync() + self.assertIdentical(self.successResultOf(d3), ev3) + + d4 = o.when_next_event() + eq.flush_sync() + self.assertIdentical(self.successResultOf(d4), ev4) + + def test_error(self): + c = Clock() + eq = EventualQueue(c) + o = SequenceObserver(eq) + d1 = o.when_next_event() + eq.flush_sync() + self.assertNoResult(d1) + f = Failure(ValueError("oops")) + o.fire(f) + eq.flush_sync() + self.assertIdentical(self.failureResultOf(d1), f) + d2 = o.when_next_event() + eq.flush_sync() + self.assertIdentical(self.failureResultOf(d2), f) +