From 11f806a3168683e0f8b44d028be67333b701b5f8 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 3 Dec 2015 15:07:47 -0600 Subject: [PATCH] transit_relay: add Producer/Consumer flow control This limits the buffering to about 10MB (per connection*direction). Previously, if the sender had more bandwidth than the receiver, the transit relay would buffer the entire file. With this change, the sender will be throttled to match the receiver's downstream speed. --- src/wormhole/servers/transit_server.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/wormhole/servers/transit_server.py b/src/wormhole/servers/transit_server.py index 122ac5e..469ad3f 100644 --- a/src/wormhole/servers/transit_server.py +++ b/src/wormhole/servers/transit_server.py @@ -20,7 +20,12 @@ class TransitConnection(protocol.Protocol): def dataReceived(self, data): if self.sent_ok: - # TODO: connect as producer/consumer + # We are an IPushProducer to our buddy's IConsumer, so they'll + # throttle us (by calling pauseProducing()) when their outbound + # buffer is full (e.g. when their downstream pipe is full). In + # practice, this buffers about 10MB per connection, after which + # point the sender will only transmit data as fast as the + # receiver can handle it. self.total_sent += len(data) self.buddy.transport.write(data) return @@ -56,7 +61,12 @@ class TransitConnection(protocol.Protocol): self.buddy = them self.transport.write(b"ok\n") self.sent_ok = True - # TODO: connect as producer/consumer + # Connect the two as a producer/consumer pair. We use streaming=True, + # so this expects the IPushProducer interface, and uses + # pauseProducing() to throttle, and resumeProducing() to unthrottle. + self.buddy.transport.registerProducer(self.transport, True) + # The Transit object calls buddy_connected() on both protocols, so + # there will be two producer/consumer pairs. def buddy_disconnected(self): log.msg("buddy_disconnected %r" % self)