observer.py: add EmptyableSet

This commit is contained in:
Brian Warner 2018-03-13 14:20:48 -07:00
parent e6b4ccb565
commit eb7c04e366
2 changed files with 47 additions and 1 deletions

View File

@ -70,3 +70,24 @@ class SequenceObserver(object):
if self._observers:
d = self._observers.pop(0)
self._eq.eventually(d.callback, self._results.pop(0))
class EmptyableSet(set):
# manage a set which grows and shrinks over time. Fire a Deferred the first
# time it becomes empty after you start watching for it.
def __init__(self, *args, **kwargs):
self._eq = kwargs.pop("_eventual_queue") # required
super(EmptyableSet, self).__init__(*args, **kwargs)
self._observer = None
def when_next_empty(self):
if not self._observer:
self._observer = OneShotObserver(self._eq)
return self._observer.when_fired()
def discard(self, o):
super(EmptyableSet, self).discard(o)
if self._observer and not self:
self._observer.fire(None)
self._observer = None

View File

@ -3,7 +3,7 @@ from twisted.python.failure import Failure
from twisted.trial import unittest
from ..eventual import EventualQueue
from ..observer import OneShotObserver, SequenceObserver
from ..observer import OneShotObserver, SequenceObserver, EmptyableSet
class OneShot(unittest.TestCase):
@ -121,3 +121,28 @@ class Sequence(unittest.TestCase):
d2 = o.when_next_event()
eq.flush_sync()
self.assertIdentical(self.failureResultOf(d2), f)
class Empty(unittest.TestCase):
def test_set(self):
eq = EventualQueue(Clock())
s = EmptyableSet(_eventual_queue=eq)
d1 = s.when_next_empty()
eq.flush_sync()
self.assertNoResult(d1)
s.add(1)
eq.flush_sync()
self.assertNoResult(d1)
s.add(2)
s.discard(1)
d2 = s.when_next_empty()
eq.flush_sync()
self.assertNoResult(d1)
self.assertNoResult(d2)
s.discard(2)
eq.flush_sync()
self.assertEqual(self.successResultOf(d1), None)
self.assertEqual(self.successResultOf(d2), None)
s.add(3)
s.discard(3)