dilation.outbound: registerProducer goes on transport, not Protocol
This commit is contained in:
parent
53ffbe1632
commit
b538dd6758
|
@ -154,7 +154,7 @@ from .connection import KCM, Ping, Pong, Ack
|
||||||
|
|
||||||
|
|
||||||
@attrs
|
@attrs
|
||||||
@implementer(IOutbound)
|
@implementer(IOutbound, IPushProducer)
|
||||||
class Outbound(object):
|
class Outbound(object):
|
||||||
# Manage outbound data: subchannel writes to us, we write to transport
|
# Manage outbound data: subchannel writes to us, we write to transport
|
||||||
_manager = attrib(validator=provides(IDilationManager))
|
_manager = attrib(validator=provides(IDilationManager))
|
||||||
|
@ -265,12 +265,12 @@ class Outbound(object):
|
||||||
assert not self._queued_unsent
|
assert not self._queued_unsent
|
||||||
self._queued_unsent.extend(self._outbound_queue)
|
self._queued_unsent.extend(self._outbound_queue)
|
||||||
# the connection can tell us to pause when we send too much data
|
# the connection can tell us to pause when we send too much data
|
||||||
c.registerProducer(self, True) # IPushProducer: pause+resume
|
c.transport.registerProducer(self, True) # IPushProducer: pause+resume
|
||||||
# send our queued messages
|
# send our queued messages
|
||||||
self.resumeProducing()
|
self.resumeProducing()
|
||||||
|
|
||||||
def stop_using_connection(self):
|
def stop_using_connection(self):
|
||||||
self._connection.unregisterProducer()
|
self._connection.transport.unregisterProducer()
|
||||||
self._connection = None
|
self._connection = None
|
||||||
self._queued_unsent.clear()
|
self._queued_unsent.clear()
|
||||||
self.pauseProducing()
|
self.pauseProducing()
|
||||||
|
@ -290,8 +290,8 @@ class Outbound(object):
|
||||||
# Inbound is responsible for tracking the high watermark and deciding
|
# Inbound is responsible for tracking the high watermark and deciding
|
||||||
# whether to ignore inbound messages or not
|
# whether to ignore inbound messages or not
|
||||||
|
|
||||||
# IProducer: the active connection calls these because we used
|
# IPushProducer: the active connection calls these because we used
|
||||||
# c.registerProducer to ask for them
|
# c.transport.registerProducer to ask for them
|
||||||
|
|
||||||
def pauseProducing(self):
|
def pauseProducing(self):
|
||||||
if self._paused:
|
if self._paused:
|
||||||
|
|
|
@ -105,7 +105,7 @@ class OutboundTest(unittest.TestCase):
|
||||||
|
|
||||||
# as soon as the connection is established, everything is sent
|
# as soon as the connection is established, everything is sent
|
||||||
o.use_connection(c)
|
o.use_connection(c)
|
||||||
self.assertEqual(c.mock_calls, [mock.call.registerProducer(o, True),
|
self.assertEqual(c.mock_calls, [mock.call.transport.registerProducer(o, True),
|
||||||
mock.call.send_record(r1),
|
mock.call.send_record(r1),
|
||||||
mock.call.send_record(r2)])
|
mock.call.send_record(r2)])
|
||||||
self.assertEqual(list(o._outbound_queue), [r1, r2])
|
self.assertEqual(list(o._outbound_queue), [r1, r2])
|
||||||
|
@ -131,7 +131,7 @@ class OutboundTest(unittest.TestCase):
|
||||||
# after each write. So only r1 should have been sent before getting
|
# after each write. So only r1 should have been sent before getting
|
||||||
# paused
|
# paused
|
||||||
o.use_connection(c)
|
o.use_connection(c)
|
||||||
self.assertEqual(c.mock_calls, [mock.call.registerProducer(o, True),
|
self.assertEqual(c.mock_calls, [mock.call.transport.registerProducer(o, True),
|
||||||
mock.call.send_record(r1)])
|
mock.call.send_record(r1)])
|
||||||
self.assertEqual(list(o._outbound_queue), [r1, r2])
|
self.assertEqual(list(o._outbound_queue), [r1, r2])
|
||||||
self.assertEqual(list(o._queued_unsent), [r2])
|
self.assertEqual(list(o._queued_unsent), [r2])
|
||||||
|
@ -172,7 +172,7 @@ class OutboundTest(unittest.TestCase):
|
||||||
self.assertEqual(list(o._queued_unsent), [])
|
self.assertEqual(list(o._queued_unsent), [])
|
||||||
|
|
||||||
o.use_connection(c)
|
o.use_connection(c)
|
||||||
self.assertEqual(c.mock_calls, [mock.call.registerProducer(o, True),
|
self.assertEqual(c.mock_calls, [mock.call.transport.registerProducer(o, True),
|
||||||
mock.call.send_record(r1)])
|
mock.call.send_record(r1)])
|
||||||
self.assertEqual(list(o._outbound_queue), [r1, r2])
|
self.assertEqual(list(o._outbound_queue), [r1, r2])
|
||||||
self.assertEqual(list(o._queued_unsent), [r2])
|
self.assertEqual(list(o._queued_unsent), [r2])
|
||||||
|
@ -191,7 +191,7 @@ class OutboundTest(unittest.TestCase):
|
||||||
def test_pause(self):
|
def test_pause(self):
|
||||||
o, m, c = make_outbound()
|
o, m, c = make_outbound()
|
||||||
o.use_connection(c)
|
o.use_connection(c)
|
||||||
self.assertEqual(c.mock_calls, [mock.call.registerProducer(o, True)])
|
self.assertEqual(c.mock_calls, [mock.call.transport.registerProducer(o, True)])
|
||||||
self.assertEqual(list(o._outbound_queue), [])
|
self.assertEqual(list(o._outbound_queue), [])
|
||||||
self.assertEqual(list(o._queued_unsent), [])
|
self.assertEqual(list(o._queued_unsent), [])
|
||||||
clear_mock_calls(c)
|
clear_mock_calls(c)
|
||||||
|
@ -519,7 +519,7 @@ class OutboundTest(unittest.TestCase):
|
||||||
|
|
||||||
o.use_connection(c)
|
o.use_connection(c)
|
||||||
o.send_if_connected(KCM())
|
o.send_if_connected(KCM())
|
||||||
self.assertEqual(c.mock_calls, [mock.call.registerProducer(o, True),
|
self.assertEqual(c.mock_calls, [mock.call.transport.registerProducer(o, True),
|
||||||
mock.call.send_record(KCM())])
|
mock.call.send_record(KCM())])
|
||||||
|
|
||||||
def test_tolerate_duplicate_pause_resume(self):
|
def test_tolerate_duplicate_pause_resume(self):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user