transit_relay: add Producer/Consumer flow control

This limits the buffering to about 10MB (per connection*direction).
Previously, if the sender had more bandwidth than the receiver, the
transit relay would buffer the entire file. With this change, the sender
will be throttled to match the receiver's downstream speed.
This commit is contained in:
Brian Warner 2015-12-03 15:07:47 -06:00
parent c103441648
commit 11f806a316

View File

@ -20,7 +20,12 @@ class TransitConnection(protocol.Protocol):
def dataReceived(self, data): def dataReceived(self, data):
if self.sent_ok: if self.sent_ok:
# TODO: connect as producer/consumer # We are an IPushProducer to our buddy's IConsumer, so they'll
# throttle us (by calling pauseProducing()) when their outbound
# buffer is full (e.g. when their downstream pipe is full). In
# practice, this buffers about 10MB per connection, after which
# point the sender will only transmit data as fast as the
# receiver can handle it.
self.total_sent += len(data) self.total_sent += len(data)
self.buddy.transport.write(data) self.buddy.transport.write(data)
return return
@ -56,7 +61,12 @@ class TransitConnection(protocol.Protocol):
self.buddy = them self.buddy = them
self.transport.write(b"ok\n") self.transport.write(b"ok\n")
self.sent_ok = True self.sent_ok = True
# TODO: connect as producer/consumer # Connect the two as a producer/consumer pair. We use streaming=True,
# so this expects the IPushProducer interface, and uses
# pauseProducing() to throttle, and resumeProducing() to unthrottle.
self.buddy.transport.registerProducer(self.transport, True)
# The Transit object calls buddy_connected() on both protocols, so
# there will be two producer/consumer pairs.
def buddy_disconnected(self): def buddy_disconnected(self):
log.msg("buddy_disconnected %r" % self) log.msg("buddy_disconnected %r" % self)