diff --git a/src/wormhole/eventual.py b/src/wormhole/eventual.py new file mode 100644 index 0000000..02fbdd5 --- /dev/null +++ b/src/wormhole/eventual.py @@ -0,0 +1,76 @@ +# -*- test-case-name: foolscap.test.test_eventual -*- + +from twisted.internet import reactor, defer +from twisted.python import log + +class _SimpleCallQueue(object): + # XXX TODO: merge epsilon.cooperator in, and make this more complete. + def __init__(self): + self._events = [] + self._flushObservers = [] + self._timer = None + + def append(self, cb, args, kwargs): + self._events.append((cb, args, kwargs)) + if not self._timer: + self._timer = reactor.callLater(0, self._turn) + + def _turn(self): + self._timer = None + # flush all the messages that are currently in the queue. If anything + # gets added to the queue while we're doing this, those events will + # be put off until the next turn. + events, self._events = self._events, [] + for cb, args, kwargs in events: + try: + cb(*args, **kwargs) + except: + log.err() + if not self._events: + observers, self._flushObservers = self._flushObservers, [] + for o in observers: + o.callback(None) + + def flush(self): + """Return a Deferred that will fire (with None) when the call queue + is completely empty.""" + if not self._events: + return defer.succeed(None) + d = defer.Deferred() + self._flushObservers.append(d) + return d + + +_theSimpleQueue = _SimpleCallQueue() + +def eventually(cb, *args, **kwargs): + """This is the eventual-send operation, used as a plan-coordination + primitive. The callable will be invoked (with args and kwargs) in a later + reactor turn. Doing 'eventually(a); eventually(b)' guarantees that a will + be called before b. + + Any exceptions that occur in the callable will be logged with log.err(). + If you really want to ignore them, be sure to provide a callable that + catches those exceptions. + + This function returns None. If you care to know when the callable was + run, be sure to provide a callable that notifies somebody. + """ + _theSimpleQueue.append(cb, args, kwargs) + + +def fireEventually(value=None): + """This returns a Deferred which will fire in a later reactor turn, after + the current call stack has been completed, and after all other deferreds + previously scheduled with callEventually(). + """ + d = defer.Deferred() + eventually(d.callback, value) + return d + +def flushEventualQueue(_ignored=None): + """This returns a Deferred which fires when the eventual-send queue is + finally empty. This is useful to wait upon as the last step of a Trial + test method. + """ + return _theSimpleQueue.flush() diff --git a/src/wormhole/observer.py b/src/wormhole/observer.py new file mode 100644 index 0000000..25ce716 --- /dev/null +++ b/src/wormhole/observer.py @@ -0,0 +1,50 @@ +# -*- test-case-name: foolscap.test_observer -*- + +# many thanks to AllMyData for contributing the initial version of this code + +from twisted.internet import defer +from foolscap import eventual + +class OneShotObserverList(object): + """A one-shot event distributor. + + Subscribers can get a Deferred that will fire with the results of the + event once it finally occurs. The caller does not need to know whether + the event has happened yet or not: they get a Deferred in either case. + + The Deferreds returned to subscribers are guaranteed to not fire in the + current reactor turn; instead, eventually() is used to fire them in a + later turn. Look at Mark Miller's 'Concurrency Among Strangers' paper on + erights.org for a description of why this property is useful. + + I can only be fired once.""" + + def __init__(self): + self._fired = False + self._result = None + self._watchers = [] + self.__repr__ = self._unfired_repr + + def _unfired_repr(self): + return "" % (self._watchers, ) + + def _fired_repr(self): + return " %s>" % (self._result, ) + + def whenFired(self): + if self._fired: + return eventual.fireEventually(self._result) + d = defer.Deferred() + self._watchers.append(d) + return d + + def fire(self, result): + assert not self._fired + self._fired = True + self._result = result + + for w in self._watchers: + eventual.eventually(w.callback, result) + del self._watchers + self.__repr__ = self._fired_repr +