INCOMPATIBILITY: send "transit" message before offer/answer
In the future, both sides should expect to receive "transit" messages at any time, and they will add to the list of hints that they should try. For now, each side only sends a single transit message, before they send the offer (sender) or answer (receiver).
This commit is contained in:
parent
1a9e565fc3
commit
812fd0b4da
|
@ -31,6 +31,7 @@ class TwistedReceiver:
|
||||||
self.args = args
|
self.args = args
|
||||||
self._reactor = reactor
|
self._reactor = reactor
|
||||||
self._tor_manager = None
|
self._tor_manager = None
|
||||||
|
self._transit_receiver = None
|
||||||
|
|
||||||
def msg(self, *args, **kwargs):
|
def msg(self, *args, **kwargs):
|
||||||
print(*args, file=self.args.stdout, **kwargs)
|
print(*args, file=self.args.stdout, **kwargs)
|
||||||
|
@ -75,45 +76,39 @@ class TwistedReceiver:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
them_d = yield self.get_data(w)
|
them_d = yield self._get_data(w)
|
||||||
except WormholeClosedError:
|
except WormholeClosedError:
|
||||||
if done:
|
if done:
|
||||||
returnValue(None)
|
returnValue(None)
|
||||||
raise TransferError("unexpected close")
|
raise TransferError("unexpected close")
|
||||||
|
#print("GOT", them_d)
|
||||||
|
if u"transit" in them_d:
|
||||||
|
yield self._parse_transit(them_d[u"transit"], w)
|
||||||
|
continue
|
||||||
if u"offer" in them_d:
|
if u"offer" in them_d:
|
||||||
if not want_offer:
|
if not want_offer:
|
||||||
raise TransferError("duplicate offer")
|
raise TransferError("duplicate offer")
|
||||||
try:
|
try:
|
||||||
self.parse_offer(them_d[u"offer"], w)
|
yield self.parse_offer(them_d[u"offer"], w)
|
||||||
except RespondError as r:
|
except RespondError as r:
|
||||||
data = json.dumps({"error": r.response}).encode("utf-8")
|
self._send_data({"error": r.response}, w)
|
||||||
w.send(data)
|
|
||||||
raise TransferError(r.response)
|
raise TransferError(r.response)
|
||||||
returnValue(None)
|
returnValue(None)
|
||||||
log.msg("unrecognized message %r" % (them_d,))
|
log.msg("unrecognized message %r" % (them_d,))
|
||||||
raise TransferError("expected offer, got none")
|
raise TransferError("expected offer, got none")
|
||||||
|
|
||||||
|
def _send_data(self, data, w):
|
||||||
|
data_bytes = json.dumps(data).encode("utf-8")
|
||||||
|
w.send(data_bytes)
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def parse_offer(self, them_d, w):
|
def _get_data(self, w):
|
||||||
if "message" in them_d:
|
# this may raise WrongPasswordError
|
||||||
self.handle_text(them_d, w)
|
them_bytes = yield w.get()
|
||||||
returnValue(None)
|
them_d = json.loads(them_bytes.decode("utf-8"))
|
||||||
if "file" in them_d:
|
if "error" in them_d:
|
||||||
f = self.handle_file(them_d)
|
raise TransferError(them_d["error"])
|
||||||
rp = yield self.establish_transit(w, them_d)
|
returnValue(them_d)
|
||||||
yield self.transfer_data(rp, f)
|
|
||||||
self.write_file(f)
|
|
||||||
yield self.close_transit(rp)
|
|
||||||
elif "directory" in them_d:
|
|
||||||
f = self.handle_directory(them_d)
|
|
||||||
rp = yield self.establish_transit(w, them_d)
|
|
||||||
yield self.transfer_data(rp, f)
|
|
||||||
self.write_directory(f)
|
|
||||||
yield self.close_transit(rp)
|
|
||||||
else:
|
|
||||||
self.msg(u"I don't know what they're offering\n")
|
|
||||||
self.msg(u"Offer details: %r" % (them_d,))
|
|
||||||
raise RespondError("unknown offer type")
|
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def handle_code(self, w):
|
def handle_code(self, w):
|
||||||
|
@ -133,19 +128,65 @@ class TwistedReceiver:
|
||||||
self.msg(u"Verifier %s." % verifier_hex)
|
self.msg(u"Verifier %s." % verifier_hex)
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def get_data(self, w):
|
def _parse_transit(self, sender_hints, w):
|
||||||
# this may raise WrongPasswordError
|
if self._transit_receiver:
|
||||||
them_bytes = yield w.get()
|
# TODO: accept multiple messages, add the additional hints to the
|
||||||
them_d = json.loads(them_bytes.decode("utf-8"))
|
# existing TransitReceiver
|
||||||
if "error" in them_d:
|
return
|
||||||
raise TransferError(them_d["error"])
|
yield self._build_transit(w, sender_hints)
|
||||||
returnValue(them_d)
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def _build_transit(self, w, sender_hints):
|
||||||
|
tr = TransitReceiver(self.args.transit_helper,
|
||||||
|
no_listen=self.args.no_listen,
|
||||||
|
tor_manager=self._tor_manager,
|
||||||
|
reactor=self._reactor,
|
||||||
|
timing=self.args.timing)
|
||||||
|
self._transit_receiver = tr
|
||||||
|
transit_key = w.derive_key(APPID+u"/transit-key", tr.TRANSIT_KEY_LENGTH)
|
||||||
|
tr.set_transit_key(transit_key)
|
||||||
|
|
||||||
|
tr.add_their_direct_hints(sender_hints["direct_connection_hints"])
|
||||||
|
tr.add_their_relay_hints(sender_hints["relay_connection_hints"])
|
||||||
|
|
||||||
|
direct_hints = yield tr.get_direct_hints()
|
||||||
|
relay_hints = yield tr.get_relay_hints()
|
||||||
|
receiver_hints = {
|
||||||
|
"direct_connection_hints": direct_hints,
|
||||||
|
"relay_connection_hints": relay_hints,
|
||||||
|
}
|
||||||
|
self._send_data({u"transit": receiver_hints}, w)
|
||||||
|
# TODO: send more hints as the TransitReceiver produces them
|
||||||
|
|
||||||
|
@inlineCallbacks
|
||||||
|
def parse_offer(self, them_d, w):
|
||||||
|
if "message" in them_d:
|
||||||
|
self.handle_text(them_d, w)
|
||||||
|
returnValue(None)
|
||||||
|
# transit will be created by this point, but not connected
|
||||||
|
if "file" in them_d:
|
||||||
|
f = self.handle_file(them_d)
|
||||||
|
self._send_permission(w)
|
||||||
|
rp = yield self._establish_transit()
|
||||||
|
yield self._transfer_data(rp, f)
|
||||||
|
self.write_file(f)
|
||||||
|
yield self.close_transit(rp)
|
||||||
|
elif "directory" in them_d:
|
||||||
|
f = self.handle_directory(them_d)
|
||||||
|
self._send_permission(w)
|
||||||
|
rp = yield self._establish_transit()
|
||||||
|
yield self._transfer_data(rp, f)
|
||||||
|
self.write_directory(f)
|
||||||
|
yield self.close_transit(rp)
|
||||||
|
else:
|
||||||
|
self.msg(u"I don't know what they're offering\n")
|
||||||
|
self.msg(u"Offer details: %r" % (them_d,))
|
||||||
|
raise RespondError("unknown offer type")
|
||||||
|
|
||||||
def handle_text(self, them_d, w):
|
def handle_text(self, them_d, w):
|
||||||
# we're receiving a text message
|
# we're receiving a text message
|
||||||
self.msg(them_d["message"])
|
self.msg(them_d["message"])
|
||||||
data = json.dumps({"answer": {"message_ack": "ok"}}).encode("utf-8")
|
self._send_data({"answer": {"message_ack": "ok"}}, w)
|
||||||
w.send(data)
|
|
||||||
|
|
||||||
def handle_file(self, them_d):
|
def handle_file(self, them_d):
|
||||||
file_data = them_d["file"]
|
file_data = them_d["file"]
|
||||||
|
@ -202,39 +243,18 @@ class TwistedReceiver:
|
||||||
raise RespondError("transfer rejected")
|
raise RespondError("transfer rejected")
|
||||||
t.detail(answer="yes")
|
t.detail(answer="yes")
|
||||||
|
|
||||||
@inlineCallbacks
|
def _send_permission(self, w):
|
||||||
def establish_transit(self, w, them_d):
|
self._send_data({"answer": { "file_ack": "ok" }}, w)
|
||||||
transit_receiver = TransitReceiver(self.args.transit_helper,
|
|
||||||
no_listen=self.args.no_listen,
|
|
||||||
tor_manager=self._tor_manager,
|
|
||||||
reactor=self._reactor,
|
|
||||||
timing=self.args.timing)
|
|
||||||
transit_key = w.derive_key(APPID+u"/transit-key",
|
|
||||||
transit_receiver.TRANSIT_KEY_LENGTH)
|
|
||||||
transit_receiver.set_transit_key(transit_key)
|
|
||||||
direct_hints = yield transit_receiver.get_direct_hints()
|
|
||||||
relay_hints = yield transit_receiver.get_relay_hints()
|
|
||||||
data = json.dumps({
|
|
||||||
"answer": {
|
|
||||||
"file_ack": "ok",
|
|
||||||
"transit": {
|
|
||||||
"direct_connection_hints": direct_hints,
|
|
||||||
"relay_connection_hints": relay_hints,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}).encode("utf-8")
|
|
||||||
w.send(data)
|
|
||||||
|
|
||||||
# now receive the rest of the owl
|
@inlineCallbacks
|
||||||
tdata = them_d["transit"]
|
def _establish_transit(self):
|
||||||
transit_receiver.add_their_direct_hints(tdata["direct_connection_hints"])
|
record_pipe = yield self._transit_receiver.connect()
|
||||||
transit_receiver.add_their_relay_hints(tdata["relay_connection_hints"])
|
|
||||||
record_pipe = yield transit_receiver.connect()
|
|
||||||
self.args.timing.add("transit connected")
|
self.args.timing.add("transit connected")
|
||||||
returnValue(record_pipe)
|
returnValue(record_pipe)
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def transfer_data(self, record_pipe, f):
|
def _transfer_data(self, record_pipe, f):
|
||||||
|
# now receive the rest of the owl
|
||||||
self.msg(u"Receiving (%s).." % record_pipe.describe())
|
self.msg(u"Receiving (%s).." % record_pipe.describe())
|
||||||
|
|
||||||
with self.args.timing.add("rx file"):
|
with self.args.timing.add("rx file"):
|
||||||
|
|
|
@ -51,6 +51,10 @@ class Sender:
|
||||||
d.addBoth(w.close)
|
d.addBoth(w.close)
|
||||||
yield d
|
yield d
|
||||||
|
|
||||||
|
def _send_data(self, data, w):
|
||||||
|
data_bytes = json.dumps(data).encode("utf-8")
|
||||||
|
w.send(data_bytes)
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def _go(self, w):
|
def _go(self, w):
|
||||||
# TODO: run the blocking zip-the-directory IO in a thread, let the
|
# TODO: run the blocking zip-the-directory IO in a thread, let the
|
||||||
|
@ -100,18 +104,20 @@ class Sender:
|
||||||
reactor=self._reactor,
|
reactor=self._reactor,
|
||||||
timing=self._timing)
|
timing=self._timing)
|
||||||
self._transit_sender = ts
|
self._transit_sender = ts
|
||||||
offer["transit"] = transit_data = {}
|
|
||||||
transit_data["relay_connection_hints"] = ts.get_relay_hints()
|
# for now, send this before the main offer
|
||||||
direct_hints = yield ts.get_direct_hints()
|
direct_hints = yield ts.get_direct_hints()
|
||||||
transit_data["direct_connection_hints"] = direct_hints
|
sender_hints = {"relay_connection_hints": ts.get_relay_hints(),
|
||||||
|
"direct_connection_hints": direct_hints,
|
||||||
|
}
|
||||||
|
self._send_data({u"transit": sender_hints}, w)
|
||||||
|
|
||||||
# TODO: move this down below w.get()
|
# TODO: move this down below w.get()
|
||||||
transit_key = w.derive_key(APPID+"/transit-key",
|
transit_key = w.derive_key(APPID+"/transit-key",
|
||||||
ts.TRANSIT_KEY_LENGTH)
|
ts.TRANSIT_KEY_LENGTH)
|
||||||
ts.set_transit_key(transit_key)
|
ts.set_transit_key(transit_key)
|
||||||
|
|
||||||
my_offer_bytes = json.dumps({"offer": offer}).encode("utf-8")
|
self._send_data({"offer": offer}, w)
|
||||||
w.send(my_offer_bytes)
|
|
||||||
|
|
||||||
want_answer = True
|
want_answer = True
|
||||||
done = False
|
done = False
|
||||||
|
@ -125,15 +131,23 @@ class Sender:
|
||||||
raise TransferError("unexpected close")
|
raise TransferError("unexpected close")
|
||||||
# TODO: get() fired, so now it's safe to use w.derive_key()
|
# TODO: get() fired, so now it's safe to use w.derive_key()
|
||||||
them_d = json.loads(them_d_bytes.decode("utf-8"))
|
them_d = json.loads(them_d_bytes.decode("utf-8"))
|
||||||
|
#print("GOT", them_d)
|
||||||
|
if u"transit" in them_d:
|
||||||
|
yield self._handle_transit(them_d[u"transit"])
|
||||||
|
continue
|
||||||
if u"answer" in them_d:
|
if u"answer" in them_d:
|
||||||
if not want_answer:
|
if not want_answer:
|
||||||
raise TransferError("duplicate answer")
|
raise TransferError("duplicate answer")
|
||||||
them_answer = them_d[u"answer"]
|
yield self._handle_answer(them_d[u"answer"])
|
||||||
yield self._handle_answer(them_answer)
|
|
||||||
done = True
|
done = True
|
||||||
returnValue(None)
|
returnValue(None)
|
||||||
log.msg("unrecognized message %r" % (them_d,))
|
log.msg("unrecognized message %r" % (them_d,))
|
||||||
|
|
||||||
|
def _handle_transit(self, receiver_hints):
|
||||||
|
ts = self._transit_sender
|
||||||
|
ts.add_their_direct_hints(receiver_hints["direct_connection_hints"])
|
||||||
|
ts.add_their_relay_hints(receiver_hints["relay_connection_hints"])
|
||||||
|
|
||||||
def _build_offer(self):
|
def _build_offer(self):
|
||||||
offer = {}
|
offer = {}
|
||||||
|
|
||||||
|
@ -223,15 +237,12 @@ class Sender:
|
||||||
raise TransferError("ambiguous response from remote, "
|
raise TransferError("ambiguous response from remote, "
|
||||||
"transfer abandoned: %s" % (them_answer,))
|
"transfer abandoned: %s" % (them_answer,))
|
||||||
|
|
||||||
tdata = them_answer["transit"]
|
yield self._send_file_twisted()
|
||||||
yield self._send_file_twisted(tdata)
|
|
||||||
|
|
||||||
|
|
||||||
@inlineCallbacks
|
@inlineCallbacks
|
||||||
def _send_file_twisted(self, tdata):
|
def _send_file_twisted(self):
|
||||||
ts = self._transit_sender
|
ts = self._transit_sender
|
||||||
ts.add_their_direct_hints(tdata["direct_connection_hints"])
|
|
||||||
ts.add_their_relay_hints(tdata["relay_connection_hints"])
|
|
||||||
|
|
||||||
self._fd_to_send.seek(0,2)
|
self._fd_to_send.seek(0,2)
|
||||||
filesize = self._fd_to_send.tell()
|
filesize = self._fd_to_send.tell()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user