diff --git a/docs/machines.dot b/docs/machines.dot new file mode 100644 index 0000000..b54a4e7 --- /dev/null +++ b/docs/machines.dot @@ -0,0 +1,36 @@ +digraph { + Wormhole [shape="box" label="Wormhole\n(manager)"] + Wormhole -> Mailbox [style="dashed" + label="M_set_nameplate()\nM_send()\nM_close()" + ] + Wormhole -> Mailbox + Mailbox -> Wormhole [style="dashed" + label="W_rx_pake()\nW_rx_msg()\nW_closed()" + ] + Mailbox [shape="box"] + Mailbox -> Connection [style="dashed" + label="C_tx_add()\nC_stop()" + ] + Mailbox -> Connection + Connection -> Mailbox [style="dashed" + label="M_connected()\nM_lost()\nM_rx_claimed()\nM_rx_message()\nM_rx_released()\nM_stopped()"] + + Connection -> websocket + + Nameplates [shape="box" label="Nameplate\nLister"] + Wormhole -> Nameplates [style="dashed" + label="NL_refresh_nameplates()" + ] + Nameplates -> Wormhole [style="dashed" + label="W_got_nameplates()" + ] + Connection -> Nameplates [style="dashed" + label="NL_connected()\nNL_lost()\nNL_rx_nameplates()" + ] + Nameplates -> Connection [style="dashed" + label="C_tx_list()" + ] + + + +} diff --git a/docs/nameplates.dot b/docs/nameplates.dot new file mode 100644 index 0000000..83bc86e --- /dev/null +++ b/docs/nameplates.dot @@ -0,0 +1,47 @@ +digraph { + /* + "connected": NL_connected + "rx": NL_rx_nameplates + "refresh": NL_refresh_nameplates + */ + {rank=same; NL_title NL_S0A NL_S0B} + NL_title [label="Nameplate\nLister" style="dotted"] + + NL_S0A [label="S0A:\nnot wanting\nunconnected"] + NL_S0B [label="S0B:\nnot wanting\nconnected" color="orange"] + + NL_S0A -> NL_S0B [label="connected"] + NL_S0B -> NL_S0A [label="lost"] + + NL_S0A -> NL_S1A [label="refresh"] + NL_S0B -> NL_P_tx [label="refresh" color="orange"] + + NL_S0A -> NL_P_tx [style="invis"] + + {rank=same; NL_S1A NL_P_tx NL_S1B NL_C_notify} + + NL_S1A [label="S1A:\nwant list\nunconnected"] + NL_S1B [label="S1B:\nwant list\nconnected" color="orange"] + + NL_S1A -> NL_P_tx [label="connected"] + NL_P_tx [shape="box" label="C.tx_list()" color="orange"] + NL_P_tx -> NL_S1B [color="orange"] + NL_S1B -> NL_S1A [label="lost"] + + NL_S1A -> foo [label="refresh"] + foo [label="" style="dashed"] + foo -> NL_S1A + + NL_S1B -> foo2 [label="refresh"] + foo2 [label="" style="dashed"] + foo2 -> NL_P_tx + + NL_S0B -> NL_C_notify [label="rx"] + NL_S1B -> NL_C_notify [label="rx"] + NL_C_notify [shape="box" label="W.got_nameplates()"] + NL_C_notify -> NL_S0B + + {rank=same; foo foo2 legend} + legend [shape="box" style="dotted" + label="connected: NL_connected()\nlost: NL_lost()\nrefresh: NL_refresh_nameplates()\nrx: NL_rx_nameplates()"] +} diff --git a/docs/w2.dot b/docs/w2.dot index f56f3a8..6b9d82c 100644 --- a/docs/w2.dot +++ b/docs/w2.dot @@ -1,11 +1,11 @@ digraph { /* new idea */ - {rank=same; M_entry_whole_code M_title M_entry_allocation M_entry_interactive} + {rank=same; M_title M_entry_whole_code M_entry_allocation M_entry_interactive} M_entry_whole_code [label="whole\ncode"] M_entry_whole_code -> M_S0A M_title [label="Message\nMachine" style="dotted"] - M_entry_whole_code -> M_title [style="invis"] + M_entry_allocation [label="allocation" color="orange"] M_entry_allocation -> M_S0B [label="already\nconnected" color="orange" fontcolor="orange"] M_entry_interactive [label="interactive" color="orange"] @@ -20,13 +20,11 @@ digraph { M_S0A -> M_S1A [label="M_set_nameplate()"] M_S0B -> M_P2_claim [label="M_set_nameplate()" color="orange" fontcolor="orange"] - {rank=same; M_S1A M_C_stop M_P1A_queue} + {rank=same; M_S1A M_P1A_queue} M_S0B -> M_S2B [style="invis"] M_S1A -> M_S2A [style="invis"] M_S1A [label="S1A:\nnot claimed"] M_S1A -> M_P2_claim [label="M_connected()"] - M_S1A -> M_C_stop [label="M_close()" style="dashed"] - M_C_stop [shape="box" label="C_stop()" style="dashed"] M_S1A -> M_P1A_queue [label="M_send(msg)" style="dotted"] M_P1A_queue [shape="box" label="queue" style="dotted"] M_P1A_queue -> M_S1A [style="dotted"] @@ -60,8 +58,8 @@ digraph { M_P_open -> M_S3B [color="orange"] {rank=same; M_S3A M_S3B M_P3_open M_P3_send} - M_S3A [label="S3A:\nclaimed\nmaybe open"] - M_S3B [label="S3B:\nclaimed\nmaybe open\n(bound)" color="orange"] + M_S3A [label="S3A:\nclaimed\nopened >=once"] + M_S3B [label="S3B:\nclaimed\nmaybe open now\n(bound)" color="orange"] M_S3A -> M_P3_open [label="M_connected()"] M_S3B -> M_S3A [label="M_lost()"] M_P3_open [shape="box" label="tx open\ntx add(queued)"] @@ -92,9 +90,9 @@ digraph { M_P3_process_theirs2 -> M_S4B [color="orange"] {rank=same; M_S4A M_P4_release M_S4B M_P4_process M_P4_send M_P4_queue} - M_S4A [label="S4A:\nmaybe released\nmaybe open\n"] + M_S4A [label="S4A:\nmaybe released\nopened >=once\n"] - M_S4B [label="S4B:\nmaybe released\nmaybe open\n(bound)" color="orange"] + M_S4B [label="S4B:\nmaybe released\nmaybe open now\n(bound)" color="orange"] M_S4A -> M_P4_release [label="M_connected()"] M_P4_release [shape="box" label="tx open\ntx add(queued)\ntx release"] M_S4B -> M_P4_send [label="M_send(msg)"] @@ -118,8 +116,8 @@ digraph { M_S4A -> seed [style="invis"] seed -> M_S5A {rank=same; seed M_S5A M_S5B M_P5_open M_process} - M_S5A [label="S5A:\nreleased\nmaybe open"] - M_S5B [label="S5B:\nreleased\nmaybe open\n(bound)" color="green"] + M_S5A [label="S5A:\nreleased\nopened >=once"] + M_S5B [label="S5B:\nreleased\nmaybe open now\n(bound)" color="green"] M_S5A -> M_P5_open [label="M_connected()"] M_P5_open [shape="box" label="tx open\ntx add(queued)"] M_P5_open -> M_S5B diff --git a/src/wormhole/_connection.py b/src/wormhole/_connection.py index 808b70f..735e8d3 100644 --- a/src/wormhole/_connection.py +++ b/src/wormhole/_connection.py @@ -2,6 +2,7 @@ from six.moves.urllib_parse import urlparse from attr import attrs, attrib from twisted.internet import defer, endpoints #, error +from twisted.application import internet from autobahn.twisted import websocket from automat import MethodicalMachine @@ -16,6 +17,7 @@ class WSClient(websocket.WebSocketClientProtocol): # this fires when the WebSocket is ready to go. No arguments print("onOpen", args) #self.wormhole_open = True + # send BIND, since the MailboxMachine does not self.connection_machine.protocol_onOpen(self) #self.factory.d.callback(self) @@ -46,121 +48,49 @@ class WSFactory(websocket.WebSocketClientFactory): # pip install (path to automat checkout)[visualize] # automat-visualize wormhole._connection -@attrs -class _WSRelayClient_Machine(object): - _c = attrib() - m = MethodicalMachine() - - @m.state(initial=True) - def initial(self): pass - @m.state() - def connecting(self): pass - @m.state() - def negotiating(self): pass - @m.state(terminal=True) - def failed(self): pass - @m.state() - def open(self): pass - @m.state() - def waiting(self): pass - @m.state() - def reconnecting(self): pass - @m.state() - def disconnecting(self): pass - @m.state() - def cancelling(self): pass - @m.state(terminal=True) - def closed(self): pass - - @m.input() - def start(self): pass ; print("input:start") - @m.input() - def d_callback(self): pass ; print("input:d_callback") - @m.input() - def d_errback(self): pass ; print("input:d_errback") - @m.input() - def d_cancel(self): pass ; print("input:d_cancel") - @m.input() - def onOpen(self): pass ; print("input:onOpen") - @m.input() - def onClose(self): pass ; print("input:onClose") - @m.input() - def expire(self): pass - @m.input() - def stop(self): pass - - # outputs - @m.output() - def ep_connect(self): - "ep.connect()" - self._c.ep_connect() - @m.output() - def reset_timer(self): - self._c.reset_timer() - @m.output() - def connection_established(self): - print("connection_established") - self._c.connection_established() - @m.output() - def M_lost(self): - self._c.M_lost() - @m.output() - def start_timer(self): - self._c.start_timer() - @m.output() - def cancel_timer(self): - self._c.cancel_timer() - @m.output() - def dropConnection(self): - self._c.dropConnection() - @m.output() - def notify_fail(self): - self._c.notify_fail() - @m.output() - def MC_stopped(self): - self._c.MC_stopped() - - initial.upon(start, enter=connecting, outputs=[ep_connect]) - connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) - connecting.upon(d_errback, enter=failed, outputs=[notify_fail]) - connecting.upon(onClose, enter=failed, outputs=[notify_fail]) - connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) - cancelling.upon(d_errback, enter=closed, outputs=[]) - - negotiating.upon(onOpen, enter=open, outputs=[connection_established]) - negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection]) - negotiating.upon(onClose, enter=failed, outputs=[notify_fail]) - - open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer]) - open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost]) - reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) - reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer]) - reconnecting.upon(onClose, enter=waiting, outputs=[start_timer]) - reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel]) - - waiting.upon(expire, enter=reconnecting, outputs=[ep_connect]) - waiting.upon(stop, enter=closed, outputs=[cancel_timer]) - disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped]) +class IRendezvousClient(Interface): + # must be an IService too + def set_dispatch(dispatcher): + """Assign a dispatcher object to this client. The following methods + will be called on this object when things happen: + * rx_welcome(welcome -> dict) + * rx_nameplates(nameplates -> list) # [{id: str,..}, ..] + * rx_allocated(nameplate -> str) + * rx_claimed(mailbox -> str) + * rx_released() + * rx_message(side -> str, phase -> str, body -> str, msg_id -> str) + * rx_closed() + * rx_pong(pong -> int) + """ + pass + def tx_list(): pass + def tx_allocate(): pass + def tx_claim(nameplate): pass + def tx_release(): pass + def tx_open(mailbox): pass + def tx_add(phase, body): pass + def tx_close(mood): pass + def tx_ping(ping): pass # We have one WSRelayClient for each wsurl we know about, and it lasts # as long as its parent Wormhole does. @attrs -class WSRelayClient(object): +class WSRelayClient(service.MultiService, object): + _journal = attrib() _wormhole = attrib() _mailbox = attrib() _ws_url = attrib() _reactor = attrib() - INITIAL_DELAY = 1.0 - def __init__(self): - self._m = _WSRelayClient_Machine(self) - self._f = f = WSFactory(self._ws_url) + f = WSFactory(self._ws_url) f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600) f.connection_machine = self # calls onOpen and onClose p = urlparse(self._ws_url) - self._ep = self._make_endpoint(p.hostname, p.port or 80) + ep = self._make_endpoint(p.hostname, p.port or 80) + # default policy: 1s initial, random exponential backoff, max 60s + self._client_service = internet.ClientService(ep, f) self._connector = None self._done_d = defer.Deferred() self._current_delay = self.INITIAL_DELAY @@ -248,242 +178,3 @@ if __name__ == "__main__": # ??? a new WSConnection is created each time the WSRelayClient gets through # negotiation - -class NameplateListingMachine(object): - m = MethodicalMachine() - def __init__(self): - self._list_nameplate_waiters = [] - - # Ideally, each API request would spawn a new "list_nameplates" message - # to the server, so the response would be maximally fresh, but that would - # require correlating server request+response messages, and the protocol - # is intended to be less stateful than that. So we offer a weaker - # freshness property: if no server requests are in flight, then a new API - # request will provoke a new server request, and the result will be - # fresh. But if a server request is already in flight when a second API - # request arrives, both requests will be satisfied by the same response. - - @m.state(initial=True) - def idle(self): pass - @m.state() - def requesting(self): pass - - @m.input() - def list_nameplates(self): pass # returns Deferred - @m.input() - def response(self, message): pass - - @m.output() - def add_deferred(self): - d = defer.Deferred() - self._list_nameplate_waiters.append(d) - return d - @m.output() - def send_request(self): - self._connection.send_command("list") - @m.output() - def distribute_response(self, message): - nameplates = parse(message) - waiters = self._list_nameplate_waiters - self._list_nameplate_waiters = [] - for d in waiters: - d.callback(nameplates) - - idle.upon(list_nameplates, enter=requesting, - outputs=[add_deferred, send_request], - collector=lambda outs: outs[0]) - idle.upon(response, enter=idle, outputs=[]) - requesting.upon(list_nameplates, enter=requesting, - outputs=[add_deferred], - collector=lambda outs: outs[0]) - requesting.upon(response, enter=idle, outputs=[distribute_response]) - - # nlm._connection = c = Connection(ws) - # nlm.list_nameplates().addCallback(display_completions) - # c.register_dispatch("nameplates", nlm.response) - -class Wormhole: - m = MethodicalMachine() - - def __init__(self, ws_url, reactor): - self._relay_client = WSRelayClient(self, ws_url, reactor) - # This records all the messages we want the relay to have. Each time - # we establish a connection, we'll send them all (and the relay - # server will filter out duplicates). If we add any while a - # connection is established, we'll send the new ones. - self._outbound_messages = [] - - # these methods are called from outside - def start(self): - self._relay_client.start() - - # and these are the state-machine transition functions, which don't take - # args - @m.state() - def closed(initial=True): pass - @m.state() - def know_code_not_mailbox(): pass - @m.state() - def know_code_and_mailbox(): pass # no longer need nameplate - @m.state() - def waiting_first_msg(): pass # key is established, want any message - @m.state() - def processing_version(): pass - @m.state() - def processing_phase(): pass - @m.state() - def open(): pass # key is verified, can post app messages - @m.state(terminal=True) - def failed(): pass - - @m.input() - def deliver_message(self, message): pass - - def w_set_seed(self, code, mailbox): - """Call w_set_seed when we sprout a Wormhole Seed, which - contains both the code and the mailbox""" - self.w_set_code(code) - self.w_set_mailbox(mailbox) - - @m.input() - def w_set_code(self, code): - """Call w_set_code when you learn the code, probably because the user - typed it in.""" - @m.input() - def w_set_mailbox(self, mailbox): - """Call w_set_mailbox() when you learn the mailbox id, from the - response to claim_nameplate""" - pass - - - @m.input() - def rx_pake(self, pake): pass # reponse["message"][phase=pake] - - @m.input() - def rx_version(self, version): # response["message"][phase=version] - pass - @m.input() - def verify_good(self, verifier): pass - @m.input() - def verify_bad(self, f): pass - - @m.input() - def rx_phase(self, message): pass - @m.input() - def phase_good(self, message): pass - @m.input() - def phase_bad(self, f): pass - - @m.output() - def compute_and_post_pake(self, code): - self._code = code - self._pake = compute(code) - self._post(pake=self._pake) - self._ws_send_command("add", phase="pake", body=XXX(pake)) - @m.output() - def set_mailbox(self, mailbox): - self._mailbox = mailbox - @m.output() - def set_seed(self, code, mailbox): - self._code = code - self._mailbox = mailbox - - @m.output() - def process_version(self, version): # response["message"][phase=version] - their_verifier = com - if OK: - self.verify_good(verifier) - else: - self.verify_bad(f) - pass - - @m.output() - def notify_verified(self, verifier): - for d in self._verify_waiters: - d.callback(verifier) - @m.output() - def notify_failed(self, f): - for d in self._verify_waiters: - d.errback(f) - - @m.output() - def process_phase(self, message): # response["message"][phase=version] - their_verifier = com - if OK: - self.verify_good(verifier) - else: - self.verify_bad(f) - pass - - @m.output() - def post_inbound(self, message): - pass - - @m.output() - def deliver_message(self, message): - self._qc.deliver_message(message) - - @m.output() - def compute_key_and_post_version(self, pake): - self._key = x - self._verifier = x - plaintext = dict_to_bytes(self._my_versions) - phase = "version" - data_key = self._derive_phase_key(self._side, phase) - encrypted = self._encrypt_data(data_key, plaintext) - self._msg_send(phase, encrypted) - - closed.upon(w_set_code, enter=know_code_not_mailbox, - outputs=[compute_and_post_pake]) - know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox, - outputs=[set_mailbox]) - know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg, - outputs=[compute_key_and_post_version]) - waiting_first_msg.upon(rx_version, enter=processing_version, - outputs=[process_version]) - processing_version.upon(verify_good, enter=open, outputs=[notify_verified]) - processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed]) - open.upon(rx_phase, enter=processing_phase, outputs=[process_phase]) - processing_phase.upon(phase_good, enter=open, outputs=[post_inbound]) - processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed]) - -class QueueConnect: - m = MethodicalMachine() - def __init__(self): - self._outbound_messages = [] - self._connection = None - @m.state() - def disconnected(): pass - @m.state() - def connected(): pass - - @m.input() - def deliver_message(self, message): pass - @m.input() - def connect(self, connection): pass - @m.input() - def disconnect(self): pass - - @m.output() - def remember_connection(self, connection): - self._connection = connection - @m.output() - def forget_connection(self): - self._connection = None - @m.output() - def queue_message(self, message): - self._outbound_messages.append(message) - @m.output() - def send_message(self, message): - self._connection.send(message) - @m.output() - def send_queued_messages(self, connection): - for m in self._outbound_messages: - connection.send(m) - - disconnected.upon(deliver_message, enter=disconnected, outputs=[queue_message]) - disconnected.upon(connect, enter=connected, outputs=[remember_connection, - send_queued_messages]) - connected.upon(deliver_message, enter=connected, - outputs=[queue_message, send_message]) - connected.upon(disconnect, enter=disconnected, outputs=[forget_connection]) diff --git a/src/wormhole/_mailbox.py b/src/wormhole/_mailbox.py index 2c298b7..f67bb5a 100644 --- a/src/wormhole/_mailbox.py +++ b/src/wormhole/_mailbox.py @@ -3,6 +3,7 @@ from automat import MethodicalMachine @attrs class _Mailbox_Machine(object): + _connection_machine = attrib() _m = attrib() m = MethodicalMachine() @@ -120,12 +121,17 @@ class _Mailbox_Machine(object): @m.output() def tx_close(self): pass @m.output() + def process_first_msg_from_them(self, msg): + self.tx_release() + self.process_msg_from_them(msg) + @m.output() def process_msg_from_them(self, msg): pass @m.output() def dequeue(self, msg): pass @m.output() def C_stop(self): pass - + @m.output() + def WM_stopped(self): pass initial.upon(M_start_unconnected, enter=S0A, outputs=[]) initial.upon(M_start_connected, enter=S0B, outputs=[]) @@ -138,10 +144,10 @@ class _Mailbox_Machine(object): S1A.upon(M_connected, enter=S2B, outputs=[tx_claim]) S1A.upon(M_send, enter=S1A, outputs=[queue]) - S1A.upon(M_stop, enter=SrA, outputs=[]) + S1A.upon(M_stop, enter=SsB, outputs=[C_stop]) S2A.upon(M_connected, enter=S2B, outputs=[tx_claim]) - S2A.upon(M_stop, enter=SsB, outputs=[C_stop]) + S2A.upon(M_stop, enter=SrA, outputs=[]) S2A.upon(M_send, enter=S2A, outputs=[queue]) S2B.upon(M_lost, enter=S2A, outputs=[]) S2B.upon(M_send, enter=S2B, outputs=[queue]) @@ -153,14 +159,15 @@ class _Mailbox_Machine(object): S3A.upon(M_send, enter=S3A, outputs=[queue]) S3A.upon(M_stop, enter=SrcA, outputs=[]) S3B.upon(M_lost, enter=S3A, outputs=[]) - S3B.upon(M_rx_msg_from_them, enter=S4B, outputs=[#tx_release, # trouble - process_msg_from_them]) + S3B.upon(M_rx_msg_from_them, enter=S4B, + outputs=[process_first_msg_from_them]) S3B.upon(M_rx_msg_from_me, enter=S3B, outputs=[dequeue]) S3B.upon(M_rx_claimed, enter=S3B, outputs=[]) S3B.upon(M_send, enter=S3B, outputs=[queue, tx_add]) S3B.upon(M_stop, enter=SrcB, outputs=[tx_release, tx_close]) - S4A.upon(M_connected, enter=S4B, outputs=[tx_open, tx_add_queued, tx_release]) + S4A.upon(M_connected, enter=S4B, + outputs=[tx_open, tx_add_queued, tx_release]) S4A.upon(M_send, enter=S4A, outputs=[queue]) S4A.upon(M_stop, enter=SrcA, outputs=[]) S4B.upon(M_lost, enter=S4A, outputs=[]) @@ -192,5 +199,5 @@ class _Mailbox_Machine(object): ScB.upon(M_rx_closed, enter=SsB, outputs=[C_stop]) ScA.upon(M_connected, enter=ScB, outputs=[tx_close]) - SsB.upon(M_stopped, enter=Ss, outputs=[]) + SsB.upon(M_stopped, enter=Ss, outputs=[WM_stopped]) diff --git a/src/wormhole/_nameplate.py b/src/wormhole/_nameplate.py new file mode 100644 index 0000000..8014cfd --- /dev/null +++ b/src/wormhole/_nameplate.py @@ -0,0 +1,53 @@ + +class NameplateListingMachine(object): + m = MethodicalMachine() + def __init__(self): + self._list_nameplate_waiters = [] + + # Ideally, each API request would spawn a new "list_nameplates" message + # to the server, so the response would be maximally fresh, but that would + # require correlating server request+response messages, and the protocol + # is intended to be less stateful than that. So we offer a weaker + # freshness property: if no server requests are in flight, then a new API + # request will provoke a new server request, and the result will be + # fresh. But if a server request is already in flight when a second API + # request arrives, both requests will be satisfied by the same response. + + @m.state(initial=True) + def idle(self): pass + @m.state() + def requesting(self): pass + + @m.input() + def list_nameplates(self): pass # returns Deferred + @m.input() + def response(self, message): pass + + @m.output() + def add_deferred(self): + d = defer.Deferred() + self._list_nameplate_waiters.append(d) + return d + @m.output() + def send_request(self): + self._connection.send_command("list") + @m.output() + def distribute_response(self, message): + nameplates = parse(message) + waiters = self._list_nameplate_waiters + self._list_nameplate_waiters = [] + for d in waiters: + d.callback(nameplates) + + idle.upon(list_nameplates, enter=requesting, + outputs=[add_deferred, send_request], + collector=lambda outs: outs[0]) + idle.upon(response, enter=idle, outputs=[]) + requesting.upon(list_nameplates, enter=requesting, + outputs=[add_deferred], + collector=lambda outs: outs[0]) + requesting.upon(response, enter=idle, outputs=[distribute_response]) + + # nlm._connection = c = Connection(ws) + # nlm.list_nameplates().addCallback(display_completions) + # c.register_dispatch("nameplates", nlm.response) diff --git a/src/wormhole/_wormhole.py b/src/wormhole/_wormhole.py new file mode 100644 index 0000000..697fab3 --- /dev/null +++ b/src/wormhole/_wormhole.py @@ -0,0 +1,145 @@ + +class Wormhole: + m = MethodicalMachine() + + def __init__(self, ws_url, reactor): + self._relay_client = WSRelayClient(self, ws_url, reactor) + # This records all the messages we want the relay to have. Each time + # we establish a connection, we'll send them all (and the relay + # server will filter out duplicates). If we add any while a + # connection is established, we'll send the new ones. + self._outbound_messages = [] + + # these methods are called from outside + def start(self): + self._relay_client.start() + + # and these are the state-machine transition functions, which don't take + # args + @m.state() + def closed(initial=True): pass + @m.state() + def know_code_not_mailbox(): pass + @m.state() + def know_code_and_mailbox(): pass # no longer need nameplate + @m.state() + def waiting_first_msg(): pass # key is established, want any message + @m.state() + def processing_version(): pass + @m.state() + def processing_phase(): pass + @m.state() + def open(): pass # key is verified, can post app messages + @m.state(terminal=True) + def failed(): pass + + @m.input() + def deliver_message(self, message): pass + + def w_set_seed(self, code, mailbox): + """Call w_set_seed when we sprout a Wormhole Seed, which + contains both the code and the mailbox""" + self.w_set_code(code) + self.w_set_mailbox(mailbox) + + @m.input() + def w_set_code(self, code): + """Call w_set_code when you learn the code, probably because the user + typed it in.""" + @m.input() + def w_set_mailbox(self, mailbox): + """Call w_set_mailbox() when you learn the mailbox id, from the + response to claim_nameplate""" + pass + + + @m.input() + def rx_pake(self, pake): pass # reponse["message"][phase=pake] + + @m.input() + def rx_version(self, version): # response["message"][phase=version] + pass + @m.input() + def verify_good(self, verifier): pass + @m.input() + def verify_bad(self, f): pass + + @m.input() + def rx_phase(self, message): pass + @m.input() + def phase_good(self, message): pass + @m.input() + def phase_bad(self, f): pass + + @m.output() + def compute_and_post_pake(self, code): + self._code = code + self._pake = compute(code) + self._post(pake=self._pake) + self._ws_send_command("add", phase="pake", body=XXX(pake)) + @m.output() + def set_mailbox(self, mailbox): + self._mailbox = mailbox + @m.output() + def set_seed(self, code, mailbox): + self._code = code + self._mailbox = mailbox + + @m.output() + def process_version(self, version): # response["message"][phase=version] + their_verifier = com + if OK: + self.verify_good(verifier) + else: + self.verify_bad(f) + pass + + @m.output() + def notify_verified(self, verifier): + for d in self._verify_waiters: + d.callback(verifier) + @m.output() + def notify_failed(self, f): + for d in self._verify_waiters: + d.errback(f) + + @m.output() + def process_phase(self, message): # response["message"][phase=version] + their_verifier = com + if OK: + self.verify_good(verifier) + else: + self.verify_bad(f) + pass + + @m.output() + def post_inbound(self, message): + pass + + @m.output() + def deliver_message(self, message): + self._qc.deliver_message(message) + + @m.output() + def compute_key_and_post_version(self, pake): + self._key = x + self._verifier = x + plaintext = dict_to_bytes(self._my_versions) + phase = "version" + data_key = self._derive_phase_key(self._side, phase) + encrypted = self._encrypt_data(data_key, plaintext) + self._msg_send(phase, encrypted) + + closed.upon(w_set_code, enter=know_code_not_mailbox, + outputs=[compute_and_post_pake]) + know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox, + outputs=[set_mailbox]) + know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg, + outputs=[compute_key_and_post_version]) + waiting_first_msg.upon(rx_version, enter=processing_version, + outputs=[process_version]) + processing_version.upon(verify_good, enter=open, outputs=[notify_verified]) + processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed]) + open.upon(rx_phase, enter=processing_phase, outputs=[process_phase]) + processing_phase.upon(phase_good, enter=open, outputs=[post_inbound]) + processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed]) diff --git a/src/wormhole/misc.py b/src/wormhole/misc.py new file mode 100644 index 0000000..808b70f --- /dev/null +++ b/src/wormhole/misc.py @@ -0,0 +1,489 @@ + +from six.moves.urllib_parse import urlparse +from attr import attrs, attrib +from twisted.internet import defer, endpoints #, error +from autobahn.twisted import websocket +from automat import MethodicalMachine + +class WSClient(websocket.WebSocketClientProtocol): + def onConnect(self, response): + # this fires during WebSocket negotiation, and isn't very useful + # unless you want to modify the protocol settings + print("onConnect", response) + #self.connection_machine.onConnect(self) + + def onOpen(self, *args): + # this fires when the WebSocket is ready to go. No arguments + print("onOpen", args) + #self.wormhole_open = True + self.connection_machine.protocol_onOpen(self) + #self.factory.d.callback(self) + + def onMessage(self, payload, isBinary): + print("onMessage") + return + assert not isBinary + self.wormhole._ws_dispatch_response(payload) + + def onClose(self, wasClean, code, reason): + print("onClose") + self.connection_machine.protocol_onClose(wasClean, code, reason) + #if self.wormhole_open: + # self.wormhole._ws_closed(wasClean, code, reason) + #else: + # # we closed before establishing a connection (onConnect) or + # # finishing WebSocket negotiation (onOpen): errback + # self.factory.d.errback(error.ConnectError(reason)) + +class WSFactory(websocket.WebSocketClientFactory): + protocol = WSClient + def buildProtocol(self, addr): + proto = websocket.WebSocketClientFactory.buildProtocol(self, addr) + proto.connection_machine = self.connection_machine + #proto.wormhole_open = False + return proto + +# pip install (path to automat checkout)[visualize] +# automat-visualize wormhole._connection + +@attrs +class _WSRelayClient_Machine(object): + _c = attrib() + m = MethodicalMachine() + + @m.state(initial=True) + def initial(self): pass + @m.state() + def connecting(self): pass + @m.state() + def negotiating(self): pass + @m.state(terminal=True) + def failed(self): pass + @m.state() + def open(self): pass + @m.state() + def waiting(self): pass + @m.state() + def reconnecting(self): pass + @m.state() + def disconnecting(self): pass + @m.state() + def cancelling(self): pass + @m.state(terminal=True) + def closed(self): pass + + @m.input() + def start(self): pass ; print("input:start") + @m.input() + def d_callback(self): pass ; print("input:d_callback") + @m.input() + def d_errback(self): pass ; print("input:d_errback") + @m.input() + def d_cancel(self): pass ; print("input:d_cancel") + @m.input() + def onOpen(self): pass ; print("input:onOpen") + @m.input() + def onClose(self): pass ; print("input:onClose") + @m.input() + def expire(self): pass + @m.input() + def stop(self): pass + + # outputs + @m.output() + def ep_connect(self): + "ep.connect()" + self._c.ep_connect() + @m.output() + def reset_timer(self): + self._c.reset_timer() + @m.output() + def connection_established(self): + print("connection_established") + self._c.connection_established() + @m.output() + def M_lost(self): + self._c.M_lost() + @m.output() + def start_timer(self): + self._c.start_timer() + @m.output() + def cancel_timer(self): + self._c.cancel_timer() + @m.output() + def dropConnection(self): + self._c.dropConnection() + @m.output() + def notify_fail(self): + self._c.notify_fail() + @m.output() + def MC_stopped(self): + self._c.MC_stopped() + + initial.upon(start, enter=connecting, outputs=[ep_connect]) + connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) + connecting.upon(d_errback, enter=failed, outputs=[notify_fail]) + connecting.upon(onClose, enter=failed, outputs=[notify_fail]) + connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) + cancelling.upon(d_errback, enter=closed, outputs=[]) + + negotiating.upon(onOpen, enter=open, outputs=[connection_established]) + negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection]) + negotiating.upon(onClose, enter=failed, outputs=[notify_fail]) + + open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer]) + open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost]) + reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) + reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer]) + reconnecting.upon(onClose, enter=waiting, outputs=[start_timer]) + reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel]) + + waiting.upon(expire, enter=reconnecting, outputs=[ep_connect]) + waiting.upon(stop, enter=closed, outputs=[cancel_timer]) + disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped]) + +# We have one WSRelayClient for each wsurl we know about, and it lasts +# as long as its parent Wormhole does. + +@attrs +class WSRelayClient(object): + _wormhole = attrib() + _mailbox = attrib() + _ws_url = attrib() + _reactor = attrib() + INITIAL_DELAY = 1.0 + + + def __init__(self): + self._m = _WSRelayClient_Machine(self) + self._f = f = WSFactory(self._ws_url) + f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600) + f.connection_machine = self # calls onOpen and onClose + p = urlparse(self._ws_url) + self._ep = self._make_endpoint(p.hostname, p.port or 80) + self._connector = None + self._done_d = defer.Deferred() + self._current_delay = self.INITIAL_DELAY + + def _make_endpoint(self, hostname, port): + return endpoints.HostnameEndpoint(self._reactor, hostname, port) + + # inputs from elsewhere + def d_callback(self, p): + self._p = p + self._m.d_callback() + def d_errback(self, f): + self._f = f + self._m.d_errback() + def protocol_onOpen(self, p): + self._m.onOpen() + def protocol_onClose(self, wasClean, code, reason): + self._m.onClose() + def C_stop(self): + self._m.stop() + def timer_expired(self): + self._m.expire() + + # outputs driven by the state machine + def ep_connect(self): + print("ep_connect()") + self._d = self._ep.connect(self._f) + self._d.addCallbacks(self.d_callback, self.d_errback) + def connection_established(self): + self._connection = WSConnection(ws, self._wormhole.appid, + self._wormhole.side, self) + self._mailbox.connected(ws) + self._wormhole.add_connection(self._connection) + self._ws_send_command("bind", appid=self._appid, side=self._side) + def M_lost(self): + self._wormhole.M_lost(self._connection) + self._connection = None + def start_timer(self): + print("start_timer") + self._t = self._reactor.callLater(3.0, self.expire) + def cancel_timer(self): + print("cancel_timer") + self._t.cancel() + self._t = None + def dropConnection(self): + print("dropConnection") + self._ws.dropConnection() + def notify_fail(self): + print("notify_fail", self._f.value if self._f else None) + self._done_d.errback(self._f) + def MC_stopped(self): + pass + + +def tryit(reactor): + cm = WSRelayClient(None, "ws://127.0.0.1:4000/v1", reactor) + print("_ConnectionMachine created") + print("start:", cm.start()) + print("waiting on _done_d to finish") + return cm._done_d + +# http://autobahn-python.readthedocs.io/en/latest/websocket/programming.html +# observed sequence of events: +# success: d_callback, onConnect(response), onOpen(), onMessage() +# negotifail (non-websocket): d_callback, onClose() +# noconnect: d_errback + +def tryws(reactor): + ws_url = "ws://127.0.0.1:40001/v1" + f = WSFactory(ws_url) + p = urlparse(ws_url) + ep = endpoints.HostnameEndpoint(reactor, p.hostname, p.port or 80) + d = ep.connect(f) + def _good(p): print("_good", p) + def _bad(f): print("_bad", f) + d.addCallbacks(_good, _bad) + return defer.Deferred() + +if __name__ == "__main__": + import sys + from twisted.python import log + log.startLogging(sys.stdout) + from twisted.internet.task import react + react(tryit) + +# ??? a new WSConnection is created each time the WSRelayClient gets through +# negotiation + +class NameplateListingMachine(object): + m = MethodicalMachine() + def __init__(self): + self._list_nameplate_waiters = [] + + # Ideally, each API request would spawn a new "list_nameplates" message + # to the server, so the response would be maximally fresh, but that would + # require correlating server request+response messages, and the protocol + # is intended to be less stateful than that. So we offer a weaker + # freshness property: if no server requests are in flight, then a new API + # request will provoke a new server request, and the result will be + # fresh. But if a server request is already in flight when a second API + # request arrives, both requests will be satisfied by the same response. + + @m.state(initial=True) + def idle(self): pass + @m.state() + def requesting(self): pass + + @m.input() + def list_nameplates(self): pass # returns Deferred + @m.input() + def response(self, message): pass + + @m.output() + def add_deferred(self): + d = defer.Deferred() + self._list_nameplate_waiters.append(d) + return d + @m.output() + def send_request(self): + self._connection.send_command("list") + @m.output() + def distribute_response(self, message): + nameplates = parse(message) + waiters = self._list_nameplate_waiters + self._list_nameplate_waiters = [] + for d in waiters: + d.callback(nameplates) + + idle.upon(list_nameplates, enter=requesting, + outputs=[add_deferred, send_request], + collector=lambda outs: outs[0]) + idle.upon(response, enter=idle, outputs=[]) + requesting.upon(list_nameplates, enter=requesting, + outputs=[add_deferred], + collector=lambda outs: outs[0]) + requesting.upon(response, enter=idle, outputs=[distribute_response]) + + # nlm._connection = c = Connection(ws) + # nlm.list_nameplates().addCallback(display_completions) + # c.register_dispatch("nameplates", nlm.response) + +class Wormhole: + m = MethodicalMachine() + + def __init__(self, ws_url, reactor): + self._relay_client = WSRelayClient(self, ws_url, reactor) + # This records all the messages we want the relay to have. Each time + # we establish a connection, we'll send them all (and the relay + # server will filter out duplicates). If we add any while a + # connection is established, we'll send the new ones. + self._outbound_messages = [] + + # these methods are called from outside + def start(self): + self._relay_client.start() + + # and these are the state-machine transition functions, which don't take + # args + @m.state() + def closed(initial=True): pass + @m.state() + def know_code_not_mailbox(): pass + @m.state() + def know_code_and_mailbox(): pass # no longer need nameplate + @m.state() + def waiting_first_msg(): pass # key is established, want any message + @m.state() + def processing_version(): pass + @m.state() + def processing_phase(): pass + @m.state() + def open(): pass # key is verified, can post app messages + @m.state(terminal=True) + def failed(): pass + + @m.input() + def deliver_message(self, message): pass + + def w_set_seed(self, code, mailbox): + """Call w_set_seed when we sprout a Wormhole Seed, which + contains both the code and the mailbox""" + self.w_set_code(code) + self.w_set_mailbox(mailbox) + + @m.input() + def w_set_code(self, code): + """Call w_set_code when you learn the code, probably because the user + typed it in.""" + @m.input() + def w_set_mailbox(self, mailbox): + """Call w_set_mailbox() when you learn the mailbox id, from the + response to claim_nameplate""" + pass + + + @m.input() + def rx_pake(self, pake): pass # reponse["message"][phase=pake] + + @m.input() + def rx_version(self, version): # response["message"][phase=version] + pass + @m.input() + def verify_good(self, verifier): pass + @m.input() + def verify_bad(self, f): pass + + @m.input() + def rx_phase(self, message): pass + @m.input() + def phase_good(self, message): pass + @m.input() + def phase_bad(self, f): pass + + @m.output() + def compute_and_post_pake(self, code): + self._code = code + self._pake = compute(code) + self._post(pake=self._pake) + self._ws_send_command("add", phase="pake", body=XXX(pake)) + @m.output() + def set_mailbox(self, mailbox): + self._mailbox = mailbox + @m.output() + def set_seed(self, code, mailbox): + self._code = code + self._mailbox = mailbox + + @m.output() + def process_version(self, version): # response["message"][phase=version] + their_verifier = com + if OK: + self.verify_good(verifier) + else: + self.verify_bad(f) + pass + + @m.output() + def notify_verified(self, verifier): + for d in self._verify_waiters: + d.callback(verifier) + @m.output() + def notify_failed(self, f): + for d in self._verify_waiters: + d.errback(f) + + @m.output() + def process_phase(self, message): # response["message"][phase=version] + their_verifier = com + if OK: + self.verify_good(verifier) + else: + self.verify_bad(f) + pass + + @m.output() + def post_inbound(self, message): + pass + + @m.output() + def deliver_message(self, message): + self._qc.deliver_message(message) + + @m.output() + def compute_key_and_post_version(self, pake): + self._key = x + self._verifier = x + plaintext = dict_to_bytes(self._my_versions) + phase = "version" + data_key = self._derive_phase_key(self._side, phase) + encrypted = self._encrypt_data(data_key, plaintext) + self._msg_send(phase, encrypted) + + closed.upon(w_set_code, enter=know_code_not_mailbox, + outputs=[compute_and_post_pake]) + know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox, + outputs=[set_mailbox]) + know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg, + outputs=[compute_key_and_post_version]) + waiting_first_msg.upon(rx_version, enter=processing_version, + outputs=[process_version]) + processing_version.upon(verify_good, enter=open, outputs=[notify_verified]) + processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed]) + open.upon(rx_phase, enter=processing_phase, outputs=[process_phase]) + processing_phase.upon(phase_good, enter=open, outputs=[post_inbound]) + processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed]) + +class QueueConnect: + m = MethodicalMachine() + def __init__(self): + self._outbound_messages = [] + self._connection = None + @m.state() + def disconnected(): pass + @m.state() + def connected(): pass + + @m.input() + def deliver_message(self, message): pass + @m.input() + def connect(self, connection): pass + @m.input() + def disconnect(self): pass + + @m.output() + def remember_connection(self, connection): + self._connection = connection + @m.output() + def forget_connection(self): + self._connection = None + @m.output() + def queue_message(self, message): + self._outbound_messages.append(message) + @m.output() + def send_message(self, message): + self._connection.send(message) + @m.output() + def send_queued_messages(self, connection): + for m in self._outbound_messages: + connection.send(m) + + disconnected.upon(deliver_message, enter=disconnected, outputs=[queue_message]) + disconnected.upon(connect, enter=connected, outputs=[remember_connection, + send_queued_messages]) + connected.upon(deliver_message, enter=connected, + outputs=[queue_message, send_message]) + connected.upon(disconnect, enter=disconnected, outputs=[forget_connection])