more work, feels better now
This commit is contained in:
parent
16c477424c
commit
40e0d6b663
36
docs/machines.dot
Normal file
36
docs/machines.dot
Normal file
|
@ -0,0 +1,36 @@
|
|||
digraph {
|
||||
Wormhole [shape="box" label="Wormhole\n(manager)"]
|
||||
Wormhole -> Mailbox [style="dashed"
|
||||
label="M_set_nameplate()\nM_send()\nM_close()"
|
||||
]
|
||||
Wormhole -> Mailbox
|
||||
Mailbox -> Wormhole [style="dashed"
|
||||
label="W_rx_pake()\nW_rx_msg()\nW_closed()"
|
||||
]
|
||||
Mailbox [shape="box"]
|
||||
Mailbox -> Connection [style="dashed"
|
||||
label="C_tx_add()\nC_stop()"
|
||||
]
|
||||
Mailbox -> Connection
|
||||
Connection -> Mailbox [style="dashed"
|
||||
label="M_connected()\nM_lost()\nM_rx_claimed()\nM_rx_message()\nM_rx_released()\nM_stopped()"]
|
||||
|
||||
Connection -> websocket
|
||||
|
||||
Nameplates [shape="box" label="Nameplate\nLister"]
|
||||
Wormhole -> Nameplates [style="dashed"
|
||||
label="NL_refresh_nameplates()"
|
||||
]
|
||||
Nameplates -> Wormhole [style="dashed"
|
||||
label="W_got_nameplates()"
|
||||
]
|
||||
Connection -> Nameplates [style="dashed"
|
||||
label="NL_connected()\nNL_lost()\nNL_rx_nameplates()"
|
||||
]
|
||||
Nameplates -> Connection [style="dashed"
|
||||
label="C_tx_list()"
|
||||
]
|
||||
|
||||
|
||||
|
||||
}
|
47
docs/nameplates.dot
Normal file
47
docs/nameplates.dot
Normal file
|
@ -0,0 +1,47 @@
|
|||
digraph {
|
||||
/*
|
||||
"connected": NL_connected
|
||||
"rx": NL_rx_nameplates
|
||||
"refresh": NL_refresh_nameplates
|
||||
*/
|
||||
{rank=same; NL_title NL_S0A NL_S0B}
|
||||
NL_title [label="Nameplate\nLister" style="dotted"]
|
||||
|
||||
NL_S0A [label="S0A:\nnot wanting\nunconnected"]
|
||||
NL_S0B [label="S0B:\nnot wanting\nconnected" color="orange"]
|
||||
|
||||
NL_S0A -> NL_S0B [label="connected"]
|
||||
NL_S0B -> NL_S0A [label="lost"]
|
||||
|
||||
NL_S0A -> NL_S1A [label="refresh"]
|
||||
NL_S0B -> NL_P_tx [label="refresh" color="orange"]
|
||||
|
||||
NL_S0A -> NL_P_tx [style="invis"]
|
||||
|
||||
{rank=same; NL_S1A NL_P_tx NL_S1B NL_C_notify}
|
||||
|
||||
NL_S1A [label="S1A:\nwant list\nunconnected"]
|
||||
NL_S1B [label="S1B:\nwant list\nconnected" color="orange"]
|
||||
|
||||
NL_S1A -> NL_P_tx [label="connected"]
|
||||
NL_P_tx [shape="box" label="C.tx_list()" color="orange"]
|
||||
NL_P_tx -> NL_S1B [color="orange"]
|
||||
NL_S1B -> NL_S1A [label="lost"]
|
||||
|
||||
NL_S1A -> foo [label="refresh"]
|
||||
foo [label="" style="dashed"]
|
||||
foo -> NL_S1A
|
||||
|
||||
NL_S1B -> foo2 [label="refresh"]
|
||||
foo2 [label="" style="dashed"]
|
||||
foo2 -> NL_P_tx
|
||||
|
||||
NL_S0B -> NL_C_notify [label="rx"]
|
||||
NL_S1B -> NL_C_notify [label="rx"]
|
||||
NL_C_notify [shape="box" label="W.got_nameplates()"]
|
||||
NL_C_notify -> NL_S0B
|
||||
|
||||
{rank=same; foo foo2 legend}
|
||||
legend [shape="box" style="dotted"
|
||||
label="connected: NL_connected()\nlost: NL_lost()\nrefresh: NL_refresh_nameplates()\nrx: NL_rx_nameplates()"]
|
||||
}
|
20
docs/w2.dot
20
docs/w2.dot
|
@ -1,11 +1,11 @@
|
|||
digraph {
|
||||
/* new idea */
|
||||
|
||||
{rank=same; M_entry_whole_code M_title M_entry_allocation M_entry_interactive}
|
||||
{rank=same; M_title M_entry_whole_code M_entry_allocation M_entry_interactive}
|
||||
M_entry_whole_code [label="whole\ncode"]
|
||||
M_entry_whole_code -> M_S0A
|
||||
M_title [label="Message\nMachine" style="dotted"]
|
||||
M_entry_whole_code -> M_title [style="invis"]
|
||||
|
||||
M_entry_allocation [label="allocation" color="orange"]
|
||||
M_entry_allocation -> M_S0B [label="already\nconnected" color="orange" fontcolor="orange"]
|
||||
M_entry_interactive [label="interactive" color="orange"]
|
||||
|
@ -20,13 +20,11 @@ digraph {
|
|||
M_S0A -> M_S1A [label="M_set_nameplate()"]
|
||||
M_S0B -> M_P2_claim [label="M_set_nameplate()" color="orange" fontcolor="orange"]
|
||||
|
||||
{rank=same; M_S1A M_C_stop M_P1A_queue}
|
||||
{rank=same; M_S1A M_P1A_queue}
|
||||
M_S0B -> M_S2B [style="invis"]
|
||||
M_S1A -> M_S2A [style="invis"]
|
||||
M_S1A [label="S1A:\nnot claimed"]
|
||||
M_S1A -> M_P2_claim [label="M_connected()"]
|
||||
M_S1A -> M_C_stop [label="M_close()" style="dashed"]
|
||||
M_C_stop [shape="box" label="C_stop()" style="dashed"]
|
||||
M_S1A -> M_P1A_queue [label="M_send(msg)" style="dotted"]
|
||||
M_P1A_queue [shape="box" label="queue" style="dotted"]
|
||||
M_P1A_queue -> M_S1A [style="dotted"]
|
||||
|
@ -60,8 +58,8 @@ digraph {
|
|||
M_P_open -> M_S3B [color="orange"]
|
||||
|
||||
{rank=same; M_S3A M_S3B M_P3_open M_P3_send}
|
||||
M_S3A [label="S3A:\nclaimed\nmaybe open"]
|
||||
M_S3B [label="S3B:\nclaimed\nmaybe open\n(bound)" color="orange"]
|
||||
M_S3A [label="S3A:\nclaimed\nopened >=once"]
|
||||
M_S3B [label="S3B:\nclaimed\nmaybe open now\n(bound)" color="orange"]
|
||||
M_S3A -> M_P3_open [label="M_connected()"]
|
||||
M_S3B -> M_S3A [label="M_lost()"]
|
||||
M_P3_open [shape="box" label="tx open\ntx add(queued)"]
|
||||
|
@ -92,9 +90,9 @@ digraph {
|
|||
M_P3_process_theirs2 -> M_S4B [color="orange"]
|
||||
|
||||
{rank=same; M_S4A M_P4_release M_S4B M_P4_process M_P4_send M_P4_queue}
|
||||
M_S4A [label="S4A:\nmaybe released\nmaybe open\n"]
|
||||
M_S4A [label="S4A:\nmaybe released\nopened >=once\n"]
|
||||
|
||||
M_S4B [label="S4B:\nmaybe released\nmaybe open\n(bound)" color="orange"]
|
||||
M_S4B [label="S4B:\nmaybe released\nmaybe open now\n(bound)" color="orange"]
|
||||
M_S4A -> M_P4_release [label="M_connected()"]
|
||||
M_P4_release [shape="box" label="tx open\ntx add(queued)\ntx release"]
|
||||
M_S4B -> M_P4_send [label="M_send(msg)"]
|
||||
|
@ -118,8 +116,8 @@ digraph {
|
|||
M_S4A -> seed [style="invis"]
|
||||
seed -> M_S5A
|
||||
{rank=same; seed M_S5A M_S5B M_P5_open M_process}
|
||||
M_S5A [label="S5A:\nreleased\nmaybe open"]
|
||||
M_S5B [label="S5B:\nreleased\nmaybe open\n(bound)" color="green"]
|
||||
M_S5A [label="S5A:\nreleased\nopened >=once"]
|
||||
M_S5B [label="S5B:\nreleased\nmaybe open now\n(bound)" color="green"]
|
||||
M_S5A -> M_P5_open [label="M_connected()"]
|
||||
M_P5_open [shape="box" label="tx open\ntx add(queued)"]
|
||||
M_P5_open -> M_S5B
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
from six.moves.urllib_parse import urlparse
|
||||
from attr import attrs, attrib
|
||||
from twisted.internet import defer, endpoints #, error
|
||||
from twisted.application import internet
|
||||
from autobahn.twisted import websocket
|
||||
from automat import MethodicalMachine
|
||||
|
||||
|
@ -16,6 +17,7 @@ class WSClient(websocket.WebSocketClientProtocol):
|
|||
# this fires when the WebSocket is ready to go. No arguments
|
||||
print("onOpen", args)
|
||||
#self.wormhole_open = True
|
||||
# send BIND, since the MailboxMachine does not
|
||||
self.connection_machine.protocol_onOpen(self)
|
||||
#self.factory.d.callback(self)
|
||||
|
||||
|
@ -46,121 +48,49 @@ class WSFactory(websocket.WebSocketClientFactory):
|
|||
# pip install (path to automat checkout)[visualize]
|
||||
# automat-visualize wormhole._connection
|
||||
|
||||
@attrs
|
||||
class _WSRelayClient_Machine(object):
|
||||
_c = attrib()
|
||||
m = MethodicalMachine()
|
||||
|
||||
@m.state(initial=True)
|
||||
def initial(self): pass
|
||||
@m.state()
|
||||
def connecting(self): pass
|
||||
@m.state()
|
||||
def negotiating(self): pass
|
||||
@m.state(terminal=True)
|
||||
def failed(self): pass
|
||||
@m.state()
|
||||
def open(self): pass
|
||||
@m.state()
|
||||
def waiting(self): pass
|
||||
@m.state()
|
||||
def reconnecting(self): pass
|
||||
@m.state()
|
||||
def disconnecting(self): pass
|
||||
@m.state()
|
||||
def cancelling(self): pass
|
||||
@m.state(terminal=True)
|
||||
def closed(self): pass
|
||||
|
||||
@m.input()
|
||||
def start(self): pass ; print("input:start")
|
||||
@m.input()
|
||||
def d_callback(self): pass ; print("input:d_callback")
|
||||
@m.input()
|
||||
def d_errback(self): pass ; print("input:d_errback")
|
||||
@m.input()
|
||||
def d_cancel(self): pass ; print("input:d_cancel")
|
||||
@m.input()
|
||||
def onOpen(self): pass ; print("input:onOpen")
|
||||
@m.input()
|
||||
def onClose(self): pass ; print("input:onClose")
|
||||
@m.input()
|
||||
def expire(self): pass
|
||||
@m.input()
|
||||
def stop(self): pass
|
||||
|
||||
# outputs
|
||||
@m.output()
|
||||
def ep_connect(self):
|
||||
"ep.connect()"
|
||||
self._c.ep_connect()
|
||||
@m.output()
|
||||
def reset_timer(self):
|
||||
self._c.reset_timer()
|
||||
@m.output()
|
||||
def connection_established(self):
|
||||
print("connection_established")
|
||||
self._c.connection_established()
|
||||
@m.output()
|
||||
def M_lost(self):
|
||||
self._c.M_lost()
|
||||
@m.output()
|
||||
def start_timer(self):
|
||||
self._c.start_timer()
|
||||
@m.output()
|
||||
def cancel_timer(self):
|
||||
self._c.cancel_timer()
|
||||
@m.output()
|
||||
def dropConnection(self):
|
||||
self._c.dropConnection()
|
||||
@m.output()
|
||||
def notify_fail(self):
|
||||
self._c.notify_fail()
|
||||
@m.output()
|
||||
def MC_stopped(self):
|
||||
self._c.MC_stopped()
|
||||
|
||||
initial.upon(start, enter=connecting, outputs=[ep_connect])
|
||||
connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer])
|
||||
connecting.upon(d_errback, enter=failed, outputs=[notify_fail])
|
||||
connecting.upon(onClose, enter=failed, outputs=[notify_fail])
|
||||
connecting.upon(stop, enter=cancelling, outputs=[d_cancel])
|
||||
cancelling.upon(d_errback, enter=closed, outputs=[])
|
||||
|
||||
negotiating.upon(onOpen, enter=open, outputs=[connection_established])
|
||||
negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection])
|
||||
negotiating.upon(onClose, enter=failed, outputs=[notify_fail])
|
||||
|
||||
open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer])
|
||||
open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost])
|
||||
reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer])
|
||||
reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer])
|
||||
reconnecting.upon(onClose, enter=waiting, outputs=[start_timer])
|
||||
reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel])
|
||||
|
||||
waiting.upon(expire, enter=reconnecting, outputs=[ep_connect])
|
||||
waiting.upon(stop, enter=closed, outputs=[cancel_timer])
|
||||
disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped])
|
||||
class IRendezvousClient(Interface):
|
||||
# must be an IService too
|
||||
def set_dispatch(dispatcher):
|
||||
"""Assign a dispatcher object to this client. The following methods
|
||||
will be called on this object when things happen:
|
||||
* rx_welcome(welcome -> dict)
|
||||
* rx_nameplates(nameplates -> list) # [{id: str,..}, ..]
|
||||
* rx_allocated(nameplate -> str)
|
||||
* rx_claimed(mailbox -> str)
|
||||
* rx_released()
|
||||
* rx_message(side -> str, phase -> str, body -> str, msg_id -> str)
|
||||
* rx_closed()
|
||||
* rx_pong(pong -> int)
|
||||
"""
|
||||
pass
|
||||
def tx_list(): pass
|
||||
def tx_allocate(): pass
|
||||
def tx_claim(nameplate): pass
|
||||
def tx_release(): pass
|
||||
def tx_open(mailbox): pass
|
||||
def tx_add(phase, body): pass
|
||||
def tx_close(mood): pass
|
||||
def tx_ping(ping): pass
|
||||
|
||||
# We have one WSRelayClient for each wsurl we know about, and it lasts
|
||||
# as long as its parent Wormhole does.
|
||||
|
||||
@attrs
|
||||
class WSRelayClient(object):
|
||||
class WSRelayClient(service.MultiService, object):
|
||||
_journal = attrib()
|
||||
_wormhole = attrib()
|
||||
_mailbox = attrib()
|
||||
_ws_url = attrib()
|
||||
_reactor = attrib()
|
||||
INITIAL_DELAY = 1.0
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self._m = _WSRelayClient_Machine(self)
|
||||
self._f = f = WSFactory(self._ws_url)
|
||||
f = WSFactory(self._ws_url)
|
||||
f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600)
|
||||
f.connection_machine = self # calls onOpen and onClose
|
||||
p = urlparse(self._ws_url)
|
||||
self._ep = self._make_endpoint(p.hostname, p.port or 80)
|
||||
ep = self._make_endpoint(p.hostname, p.port or 80)
|
||||
# default policy: 1s initial, random exponential backoff, max 60s
|
||||
self._client_service = internet.ClientService(ep, f)
|
||||
self._connector = None
|
||||
self._done_d = defer.Deferred()
|
||||
self._current_delay = self.INITIAL_DELAY
|
||||
|
@ -248,242 +178,3 @@ if __name__ == "__main__":
|
|||
|
||||
# ??? a new WSConnection is created each time the WSRelayClient gets through
|
||||
# negotiation
|
||||
|
||||
class NameplateListingMachine(object):
|
||||
m = MethodicalMachine()
|
||||
def __init__(self):
|
||||
self._list_nameplate_waiters = []
|
||||
|
||||
# Ideally, each API request would spawn a new "list_nameplates" message
|
||||
# to the server, so the response would be maximally fresh, but that would
|
||||
# require correlating server request+response messages, and the protocol
|
||||
# is intended to be less stateful than that. So we offer a weaker
|
||||
# freshness property: if no server requests are in flight, then a new API
|
||||
# request will provoke a new server request, and the result will be
|
||||
# fresh. But if a server request is already in flight when a second API
|
||||
# request arrives, both requests will be satisfied by the same response.
|
||||
|
||||
@m.state(initial=True)
|
||||
def idle(self): pass
|
||||
@m.state()
|
||||
def requesting(self): pass
|
||||
|
||||
@m.input()
|
||||
def list_nameplates(self): pass # returns Deferred
|
||||
@m.input()
|
||||
def response(self, message): pass
|
||||
|
||||
@m.output()
|
||||
def add_deferred(self):
|
||||
d = defer.Deferred()
|
||||
self._list_nameplate_waiters.append(d)
|
||||
return d
|
||||
@m.output()
|
||||
def send_request(self):
|
||||
self._connection.send_command("list")
|
||||
@m.output()
|
||||
def distribute_response(self, message):
|
||||
nameplates = parse(message)
|
||||
waiters = self._list_nameplate_waiters
|
||||
self._list_nameplate_waiters = []
|
||||
for d in waiters:
|
||||
d.callback(nameplates)
|
||||
|
||||
idle.upon(list_nameplates, enter=requesting,
|
||||
outputs=[add_deferred, send_request],
|
||||
collector=lambda outs: outs[0])
|
||||
idle.upon(response, enter=idle, outputs=[])
|
||||
requesting.upon(list_nameplates, enter=requesting,
|
||||
outputs=[add_deferred],
|
||||
collector=lambda outs: outs[0])
|
||||
requesting.upon(response, enter=idle, outputs=[distribute_response])
|
||||
|
||||
# nlm._connection = c = Connection(ws)
|
||||
# nlm.list_nameplates().addCallback(display_completions)
|
||||
# c.register_dispatch("nameplates", nlm.response)
|
||||
|
||||
class Wormhole:
|
||||
m = MethodicalMachine()
|
||||
|
||||
def __init__(self, ws_url, reactor):
|
||||
self._relay_client = WSRelayClient(self, ws_url, reactor)
|
||||
# This records all the messages we want the relay to have. Each time
|
||||
# we establish a connection, we'll send them all (and the relay
|
||||
# server will filter out duplicates). If we add any while a
|
||||
# connection is established, we'll send the new ones.
|
||||
self._outbound_messages = []
|
||||
|
||||
# these methods are called from outside
|
||||
def start(self):
|
||||
self._relay_client.start()
|
||||
|
||||
# and these are the state-machine transition functions, which don't take
|
||||
# args
|
||||
@m.state()
|
||||
def closed(initial=True): pass
|
||||
@m.state()
|
||||
def know_code_not_mailbox(): pass
|
||||
@m.state()
|
||||
def know_code_and_mailbox(): pass # no longer need nameplate
|
||||
@m.state()
|
||||
def waiting_first_msg(): pass # key is established, want any message
|
||||
@m.state()
|
||||
def processing_version(): pass
|
||||
@m.state()
|
||||
def processing_phase(): pass
|
||||
@m.state()
|
||||
def open(): pass # key is verified, can post app messages
|
||||
@m.state(terminal=True)
|
||||
def failed(): pass
|
||||
|
||||
@m.input()
|
||||
def deliver_message(self, message): pass
|
||||
|
||||
def w_set_seed(self, code, mailbox):
|
||||
"""Call w_set_seed when we sprout a Wormhole Seed, which
|
||||
contains both the code and the mailbox"""
|
||||
self.w_set_code(code)
|
||||
self.w_set_mailbox(mailbox)
|
||||
|
||||
@m.input()
|
||||
def w_set_code(self, code):
|
||||
"""Call w_set_code when you learn the code, probably because the user
|
||||
typed it in."""
|
||||
@m.input()
|
||||
def w_set_mailbox(self, mailbox):
|
||||
"""Call w_set_mailbox() when you learn the mailbox id, from the
|
||||
response to claim_nameplate"""
|
||||
pass
|
||||
|
||||
|
||||
@m.input()
|
||||
def rx_pake(self, pake): pass # reponse["message"][phase=pake]
|
||||
|
||||
@m.input()
|
||||
def rx_version(self, version): # response["message"][phase=version]
|
||||
pass
|
||||
@m.input()
|
||||
def verify_good(self, verifier): pass
|
||||
@m.input()
|
||||
def verify_bad(self, f): pass
|
||||
|
||||
@m.input()
|
||||
def rx_phase(self, message): pass
|
||||
@m.input()
|
||||
def phase_good(self, message): pass
|
||||
@m.input()
|
||||
def phase_bad(self, f): pass
|
||||
|
||||
@m.output()
|
||||
def compute_and_post_pake(self, code):
|
||||
self._code = code
|
||||
self._pake = compute(code)
|
||||
self._post(pake=self._pake)
|
||||
self._ws_send_command("add", phase="pake", body=XXX(pake))
|
||||
@m.output()
|
||||
def set_mailbox(self, mailbox):
|
||||
self._mailbox = mailbox
|
||||
@m.output()
|
||||
def set_seed(self, code, mailbox):
|
||||
self._code = code
|
||||
self._mailbox = mailbox
|
||||
|
||||
@m.output()
|
||||
def process_version(self, version): # response["message"][phase=version]
|
||||
their_verifier = com
|
||||
if OK:
|
||||
self.verify_good(verifier)
|
||||
else:
|
||||
self.verify_bad(f)
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def notify_verified(self, verifier):
|
||||
for d in self._verify_waiters:
|
||||
d.callback(verifier)
|
||||
@m.output()
|
||||
def notify_failed(self, f):
|
||||
for d in self._verify_waiters:
|
||||
d.errback(f)
|
||||
|
||||
@m.output()
|
||||
def process_phase(self, message): # response["message"][phase=version]
|
||||
their_verifier = com
|
||||
if OK:
|
||||
self.verify_good(verifier)
|
||||
else:
|
||||
self.verify_bad(f)
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def post_inbound(self, message):
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def deliver_message(self, message):
|
||||
self._qc.deliver_message(message)
|
||||
|
||||
@m.output()
|
||||
def compute_key_and_post_version(self, pake):
|
||||
self._key = x
|
||||
self._verifier = x
|
||||
plaintext = dict_to_bytes(self._my_versions)
|
||||
phase = "version"
|
||||
data_key = self._derive_phase_key(self._side, phase)
|
||||
encrypted = self._encrypt_data(data_key, plaintext)
|
||||
self._msg_send(phase, encrypted)
|
||||
|
||||
closed.upon(w_set_code, enter=know_code_not_mailbox,
|
||||
outputs=[compute_and_post_pake])
|
||||
know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox,
|
||||
outputs=[set_mailbox])
|
||||
know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg,
|
||||
outputs=[compute_key_and_post_version])
|
||||
waiting_first_msg.upon(rx_version, enter=processing_version,
|
||||
outputs=[process_version])
|
||||
processing_version.upon(verify_good, enter=open, outputs=[notify_verified])
|
||||
processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed])
|
||||
open.upon(rx_phase, enter=processing_phase, outputs=[process_phase])
|
||||
processing_phase.upon(phase_good, enter=open, outputs=[post_inbound])
|
||||
processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed])
|
||||
|
||||
class QueueConnect:
|
||||
m = MethodicalMachine()
|
||||
def __init__(self):
|
||||
self._outbound_messages = []
|
||||
self._connection = None
|
||||
@m.state()
|
||||
def disconnected(): pass
|
||||
@m.state()
|
||||
def connected(): pass
|
||||
|
||||
@m.input()
|
||||
def deliver_message(self, message): pass
|
||||
@m.input()
|
||||
def connect(self, connection): pass
|
||||
@m.input()
|
||||
def disconnect(self): pass
|
||||
|
||||
@m.output()
|
||||
def remember_connection(self, connection):
|
||||
self._connection = connection
|
||||
@m.output()
|
||||
def forget_connection(self):
|
||||
self._connection = None
|
||||
@m.output()
|
||||
def queue_message(self, message):
|
||||
self._outbound_messages.append(message)
|
||||
@m.output()
|
||||
def send_message(self, message):
|
||||
self._connection.send(message)
|
||||
@m.output()
|
||||
def send_queued_messages(self, connection):
|
||||
for m in self._outbound_messages:
|
||||
connection.send(m)
|
||||
|
||||
disconnected.upon(deliver_message, enter=disconnected, outputs=[queue_message])
|
||||
disconnected.upon(connect, enter=connected, outputs=[remember_connection,
|
||||
send_queued_messages])
|
||||
connected.upon(deliver_message, enter=connected,
|
||||
outputs=[queue_message, send_message])
|
||||
connected.upon(disconnect, enter=disconnected, outputs=[forget_connection])
|
||||
|
|
|
@ -3,6 +3,7 @@ from automat import MethodicalMachine
|
|||
|
||||
@attrs
|
||||
class _Mailbox_Machine(object):
|
||||
_connection_machine = attrib()
|
||||
_m = attrib()
|
||||
m = MethodicalMachine()
|
||||
|
||||
|
@ -120,12 +121,17 @@ class _Mailbox_Machine(object):
|
|||
@m.output()
|
||||
def tx_close(self): pass
|
||||
@m.output()
|
||||
def process_first_msg_from_them(self, msg):
|
||||
self.tx_release()
|
||||
self.process_msg_from_them(msg)
|
||||
@m.output()
|
||||
def process_msg_from_them(self, msg): pass
|
||||
@m.output()
|
||||
def dequeue(self, msg): pass
|
||||
@m.output()
|
||||
def C_stop(self): pass
|
||||
|
||||
@m.output()
|
||||
def WM_stopped(self): pass
|
||||
|
||||
initial.upon(M_start_unconnected, enter=S0A, outputs=[])
|
||||
initial.upon(M_start_connected, enter=S0B, outputs=[])
|
||||
|
@ -138,10 +144,10 @@ class _Mailbox_Machine(object):
|
|||
|
||||
S1A.upon(M_connected, enter=S2B, outputs=[tx_claim])
|
||||
S1A.upon(M_send, enter=S1A, outputs=[queue])
|
||||
S1A.upon(M_stop, enter=SrA, outputs=[])
|
||||
S1A.upon(M_stop, enter=SsB, outputs=[C_stop])
|
||||
|
||||
S2A.upon(M_connected, enter=S2B, outputs=[tx_claim])
|
||||
S2A.upon(M_stop, enter=SsB, outputs=[C_stop])
|
||||
S2A.upon(M_stop, enter=SrA, outputs=[])
|
||||
S2A.upon(M_send, enter=S2A, outputs=[queue])
|
||||
S2B.upon(M_lost, enter=S2A, outputs=[])
|
||||
S2B.upon(M_send, enter=S2B, outputs=[queue])
|
||||
|
@ -153,14 +159,15 @@ class _Mailbox_Machine(object):
|
|||
S3A.upon(M_send, enter=S3A, outputs=[queue])
|
||||
S3A.upon(M_stop, enter=SrcA, outputs=[])
|
||||
S3B.upon(M_lost, enter=S3A, outputs=[])
|
||||
S3B.upon(M_rx_msg_from_them, enter=S4B, outputs=[#tx_release, # trouble
|
||||
process_msg_from_them])
|
||||
S3B.upon(M_rx_msg_from_them, enter=S4B,
|
||||
outputs=[process_first_msg_from_them])
|
||||
S3B.upon(M_rx_msg_from_me, enter=S3B, outputs=[dequeue])
|
||||
S3B.upon(M_rx_claimed, enter=S3B, outputs=[])
|
||||
S3B.upon(M_send, enter=S3B, outputs=[queue, tx_add])
|
||||
S3B.upon(M_stop, enter=SrcB, outputs=[tx_release, tx_close])
|
||||
|
||||
S4A.upon(M_connected, enter=S4B, outputs=[tx_open, tx_add_queued, tx_release])
|
||||
S4A.upon(M_connected, enter=S4B,
|
||||
outputs=[tx_open, tx_add_queued, tx_release])
|
||||
S4A.upon(M_send, enter=S4A, outputs=[queue])
|
||||
S4A.upon(M_stop, enter=SrcA, outputs=[])
|
||||
S4B.upon(M_lost, enter=S4A, outputs=[])
|
||||
|
@ -192,5 +199,5 @@ class _Mailbox_Machine(object):
|
|||
ScB.upon(M_rx_closed, enter=SsB, outputs=[C_stop])
|
||||
ScA.upon(M_connected, enter=ScB, outputs=[tx_close])
|
||||
|
||||
SsB.upon(M_stopped, enter=Ss, outputs=[])
|
||||
SsB.upon(M_stopped, enter=Ss, outputs=[WM_stopped])
|
||||
|
||||
|
|
53
src/wormhole/_nameplate.py
Normal file
53
src/wormhole/_nameplate.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
|
||||
class NameplateListingMachine(object):
|
||||
m = MethodicalMachine()
|
||||
def __init__(self):
|
||||
self._list_nameplate_waiters = []
|
||||
|
||||
# Ideally, each API request would spawn a new "list_nameplates" message
|
||||
# to the server, so the response would be maximally fresh, but that would
|
||||
# require correlating server request+response messages, and the protocol
|
||||
# is intended to be less stateful than that. So we offer a weaker
|
||||
# freshness property: if no server requests are in flight, then a new API
|
||||
# request will provoke a new server request, and the result will be
|
||||
# fresh. But if a server request is already in flight when a second API
|
||||
# request arrives, both requests will be satisfied by the same response.
|
||||
|
||||
@m.state(initial=True)
|
||||
def idle(self): pass
|
||||
@m.state()
|
||||
def requesting(self): pass
|
||||
|
||||
@m.input()
|
||||
def list_nameplates(self): pass # returns Deferred
|
||||
@m.input()
|
||||
def response(self, message): pass
|
||||
|
||||
@m.output()
|
||||
def add_deferred(self):
|
||||
d = defer.Deferred()
|
||||
self._list_nameplate_waiters.append(d)
|
||||
return d
|
||||
@m.output()
|
||||
def send_request(self):
|
||||
self._connection.send_command("list")
|
||||
@m.output()
|
||||
def distribute_response(self, message):
|
||||
nameplates = parse(message)
|
||||
waiters = self._list_nameplate_waiters
|
||||
self._list_nameplate_waiters = []
|
||||
for d in waiters:
|
||||
d.callback(nameplates)
|
||||
|
||||
idle.upon(list_nameplates, enter=requesting,
|
||||
outputs=[add_deferred, send_request],
|
||||
collector=lambda outs: outs[0])
|
||||
idle.upon(response, enter=idle, outputs=[])
|
||||
requesting.upon(list_nameplates, enter=requesting,
|
||||
outputs=[add_deferred],
|
||||
collector=lambda outs: outs[0])
|
||||
requesting.upon(response, enter=idle, outputs=[distribute_response])
|
||||
|
||||
# nlm._connection = c = Connection(ws)
|
||||
# nlm.list_nameplates().addCallback(display_completions)
|
||||
# c.register_dispatch("nameplates", nlm.response)
|
145
src/wormhole/_wormhole.py
Normal file
145
src/wormhole/_wormhole.py
Normal file
|
@ -0,0 +1,145 @@
|
|||
|
||||
class Wormhole:
|
||||
m = MethodicalMachine()
|
||||
|
||||
def __init__(self, ws_url, reactor):
|
||||
self._relay_client = WSRelayClient(self, ws_url, reactor)
|
||||
# This records all the messages we want the relay to have. Each time
|
||||
# we establish a connection, we'll send them all (and the relay
|
||||
# server will filter out duplicates). If we add any while a
|
||||
# connection is established, we'll send the new ones.
|
||||
self._outbound_messages = []
|
||||
|
||||
# these methods are called from outside
|
||||
def start(self):
|
||||
self._relay_client.start()
|
||||
|
||||
# and these are the state-machine transition functions, which don't take
|
||||
# args
|
||||
@m.state()
|
||||
def closed(initial=True): pass
|
||||
@m.state()
|
||||
def know_code_not_mailbox(): pass
|
||||
@m.state()
|
||||
def know_code_and_mailbox(): pass # no longer need nameplate
|
||||
@m.state()
|
||||
def waiting_first_msg(): pass # key is established, want any message
|
||||
@m.state()
|
||||
def processing_version(): pass
|
||||
@m.state()
|
||||
def processing_phase(): pass
|
||||
@m.state()
|
||||
def open(): pass # key is verified, can post app messages
|
||||
@m.state(terminal=True)
|
||||
def failed(): pass
|
||||
|
||||
@m.input()
|
||||
def deliver_message(self, message): pass
|
||||
|
||||
def w_set_seed(self, code, mailbox):
|
||||
"""Call w_set_seed when we sprout a Wormhole Seed, which
|
||||
contains both the code and the mailbox"""
|
||||
self.w_set_code(code)
|
||||
self.w_set_mailbox(mailbox)
|
||||
|
||||
@m.input()
|
||||
def w_set_code(self, code):
|
||||
"""Call w_set_code when you learn the code, probably because the user
|
||||
typed it in."""
|
||||
@m.input()
|
||||
def w_set_mailbox(self, mailbox):
|
||||
"""Call w_set_mailbox() when you learn the mailbox id, from the
|
||||
response to claim_nameplate"""
|
||||
pass
|
||||
|
||||
|
||||
@m.input()
|
||||
def rx_pake(self, pake): pass # reponse["message"][phase=pake]
|
||||
|
||||
@m.input()
|
||||
def rx_version(self, version): # response["message"][phase=version]
|
||||
pass
|
||||
@m.input()
|
||||
def verify_good(self, verifier): pass
|
||||
@m.input()
|
||||
def verify_bad(self, f): pass
|
||||
|
||||
@m.input()
|
||||
def rx_phase(self, message): pass
|
||||
@m.input()
|
||||
def phase_good(self, message): pass
|
||||
@m.input()
|
||||
def phase_bad(self, f): pass
|
||||
|
||||
@m.output()
|
||||
def compute_and_post_pake(self, code):
|
||||
self._code = code
|
||||
self._pake = compute(code)
|
||||
self._post(pake=self._pake)
|
||||
self._ws_send_command("add", phase="pake", body=XXX(pake))
|
||||
@m.output()
|
||||
def set_mailbox(self, mailbox):
|
||||
self._mailbox = mailbox
|
||||
@m.output()
|
||||
def set_seed(self, code, mailbox):
|
||||
self._code = code
|
||||
self._mailbox = mailbox
|
||||
|
||||
@m.output()
|
||||
def process_version(self, version): # response["message"][phase=version]
|
||||
their_verifier = com
|
||||
if OK:
|
||||
self.verify_good(verifier)
|
||||
else:
|
||||
self.verify_bad(f)
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def notify_verified(self, verifier):
|
||||
for d in self._verify_waiters:
|
||||
d.callback(verifier)
|
||||
@m.output()
|
||||
def notify_failed(self, f):
|
||||
for d in self._verify_waiters:
|
||||
d.errback(f)
|
||||
|
||||
@m.output()
|
||||
def process_phase(self, message): # response["message"][phase=version]
|
||||
their_verifier = com
|
||||
if OK:
|
||||
self.verify_good(verifier)
|
||||
else:
|
||||
self.verify_bad(f)
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def post_inbound(self, message):
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def deliver_message(self, message):
|
||||
self._qc.deliver_message(message)
|
||||
|
||||
@m.output()
|
||||
def compute_key_and_post_version(self, pake):
|
||||
self._key = x
|
||||
self._verifier = x
|
||||
plaintext = dict_to_bytes(self._my_versions)
|
||||
phase = "version"
|
||||
data_key = self._derive_phase_key(self._side, phase)
|
||||
encrypted = self._encrypt_data(data_key, plaintext)
|
||||
self._msg_send(phase, encrypted)
|
||||
|
||||
closed.upon(w_set_code, enter=know_code_not_mailbox,
|
||||
outputs=[compute_and_post_pake])
|
||||
know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox,
|
||||
outputs=[set_mailbox])
|
||||
know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg,
|
||||
outputs=[compute_key_and_post_version])
|
||||
waiting_first_msg.upon(rx_version, enter=processing_version,
|
||||
outputs=[process_version])
|
||||
processing_version.upon(verify_good, enter=open, outputs=[notify_verified])
|
||||
processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed])
|
||||
open.upon(rx_phase, enter=processing_phase, outputs=[process_phase])
|
||||
processing_phase.upon(phase_good, enter=open, outputs=[post_inbound])
|
||||
processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed])
|
489
src/wormhole/misc.py
Normal file
489
src/wormhole/misc.py
Normal file
|
@ -0,0 +1,489 @@
|
|||
|
||||
from six.moves.urllib_parse import urlparse
|
||||
from attr import attrs, attrib
|
||||
from twisted.internet import defer, endpoints #, error
|
||||
from autobahn.twisted import websocket
|
||||
from automat import MethodicalMachine
|
||||
|
||||
class WSClient(websocket.WebSocketClientProtocol):
|
||||
def onConnect(self, response):
|
||||
# this fires during WebSocket negotiation, and isn't very useful
|
||||
# unless you want to modify the protocol settings
|
||||
print("onConnect", response)
|
||||
#self.connection_machine.onConnect(self)
|
||||
|
||||
def onOpen(self, *args):
|
||||
# this fires when the WebSocket is ready to go. No arguments
|
||||
print("onOpen", args)
|
||||
#self.wormhole_open = True
|
||||
self.connection_machine.protocol_onOpen(self)
|
||||
#self.factory.d.callback(self)
|
||||
|
||||
def onMessage(self, payload, isBinary):
|
||||
print("onMessage")
|
||||
return
|
||||
assert not isBinary
|
||||
self.wormhole._ws_dispatch_response(payload)
|
||||
|
||||
def onClose(self, wasClean, code, reason):
|
||||
print("onClose")
|
||||
self.connection_machine.protocol_onClose(wasClean, code, reason)
|
||||
#if self.wormhole_open:
|
||||
# self.wormhole._ws_closed(wasClean, code, reason)
|
||||
#else:
|
||||
# # we closed before establishing a connection (onConnect) or
|
||||
# # finishing WebSocket negotiation (onOpen): errback
|
||||
# self.factory.d.errback(error.ConnectError(reason))
|
||||
|
||||
class WSFactory(websocket.WebSocketClientFactory):
|
||||
protocol = WSClient
|
||||
def buildProtocol(self, addr):
|
||||
proto = websocket.WebSocketClientFactory.buildProtocol(self, addr)
|
||||
proto.connection_machine = self.connection_machine
|
||||
#proto.wormhole_open = False
|
||||
return proto
|
||||
|
||||
# pip install (path to automat checkout)[visualize]
|
||||
# automat-visualize wormhole._connection
|
||||
|
||||
@attrs
|
||||
class _WSRelayClient_Machine(object):
|
||||
_c = attrib()
|
||||
m = MethodicalMachine()
|
||||
|
||||
@m.state(initial=True)
|
||||
def initial(self): pass
|
||||
@m.state()
|
||||
def connecting(self): pass
|
||||
@m.state()
|
||||
def negotiating(self): pass
|
||||
@m.state(terminal=True)
|
||||
def failed(self): pass
|
||||
@m.state()
|
||||
def open(self): pass
|
||||
@m.state()
|
||||
def waiting(self): pass
|
||||
@m.state()
|
||||
def reconnecting(self): pass
|
||||
@m.state()
|
||||
def disconnecting(self): pass
|
||||
@m.state()
|
||||
def cancelling(self): pass
|
||||
@m.state(terminal=True)
|
||||
def closed(self): pass
|
||||
|
||||
@m.input()
|
||||
def start(self): pass ; print("input:start")
|
||||
@m.input()
|
||||
def d_callback(self): pass ; print("input:d_callback")
|
||||
@m.input()
|
||||
def d_errback(self): pass ; print("input:d_errback")
|
||||
@m.input()
|
||||
def d_cancel(self): pass ; print("input:d_cancel")
|
||||
@m.input()
|
||||
def onOpen(self): pass ; print("input:onOpen")
|
||||
@m.input()
|
||||
def onClose(self): pass ; print("input:onClose")
|
||||
@m.input()
|
||||
def expire(self): pass
|
||||
@m.input()
|
||||
def stop(self): pass
|
||||
|
||||
# outputs
|
||||
@m.output()
|
||||
def ep_connect(self):
|
||||
"ep.connect()"
|
||||
self._c.ep_connect()
|
||||
@m.output()
|
||||
def reset_timer(self):
|
||||
self._c.reset_timer()
|
||||
@m.output()
|
||||
def connection_established(self):
|
||||
print("connection_established")
|
||||
self._c.connection_established()
|
||||
@m.output()
|
||||
def M_lost(self):
|
||||
self._c.M_lost()
|
||||
@m.output()
|
||||
def start_timer(self):
|
||||
self._c.start_timer()
|
||||
@m.output()
|
||||
def cancel_timer(self):
|
||||
self._c.cancel_timer()
|
||||
@m.output()
|
||||
def dropConnection(self):
|
||||
self._c.dropConnection()
|
||||
@m.output()
|
||||
def notify_fail(self):
|
||||
self._c.notify_fail()
|
||||
@m.output()
|
||||
def MC_stopped(self):
|
||||
self._c.MC_stopped()
|
||||
|
||||
initial.upon(start, enter=connecting, outputs=[ep_connect])
|
||||
connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer])
|
||||
connecting.upon(d_errback, enter=failed, outputs=[notify_fail])
|
||||
connecting.upon(onClose, enter=failed, outputs=[notify_fail])
|
||||
connecting.upon(stop, enter=cancelling, outputs=[d_cancel])
|
||||
cancelling.upon(d_errback, enter=closed, outputs=[])
|
||||
|
||||
negotiating.upon(onOpen, enter=open, outputs=[connection_established])
|
||||
negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection])
|
||||
negotiating.upon(onClose, enter=failed, outputs=[notify_fail])
|
||||
|
||||
open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer])
|
||||
open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost])
|
||||
reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer])
|
||||
reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer])
|
||||
reconnecting.upon(onClose, enter=waiting, outputs=[start_timer])
|
||||
reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel])
|
||||
|
||||
waiting.upon(expire, enter=reconnecting, outputs=[ep_connect])
|
||||
waiting.upon(stop, enter=closed, outputs=[cancel_timer])
|
||||
disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped])
|
||||
|
||||
# We have one WSRelayClient for each wsurl we know about, and it lasts
|
||||
# as long as its parent Wormhole does.
|
||||
|
||||
@attrs
|
||||
class WSRelayClient(object):
|
||||
_wormhole = attrib()
|
||||
_mailbox = attrib()
|
||||
_ws_url = attrib()
|
||||
_reactor = attrib()
|
||||
INITIAL_DELAY = 1.0
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self._m = _WSRelayClient_Machine(self)
|
||||
self._f = f = WSFactory(self._ws_url)
|
||||
f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600)
|
||||
f.connection_machine = self # calls onOpen and onClose
|
||||
p = urlparse(self._ws_url)
|
||||
self._ep = self._make_endpoint(p.hostname, p.port or 80)
|
||||
self._connector = None
|
||||
self._done_d = defer.Deferred()
|
||||
self._current_delay = self.INITIAL_DELAY
|
||||
|
||||
def _make_endpoint(self, hostname, port):
|
||||
return endpoints.HostnameEndpoint(self._reactor, hostname, port)
|
||||
|
||||
# inputs from elsewhere
|
||||
def d_callback(self, p):
|
||||
self._p = p
|
||||
self._m.d_callback()
|
||||
def d_errback(self, f):
|
||||
self._f = f
|
||||
self._m.d_errback()
|
||||
def protocol_onOpen(self, p):
|
||||
self._m.onOpen()
|
||||
def protocol_onClose(self, wasClean, code, reason):
|
||||
self._m.onClose()
|
||||
def C_stop(self):
|
||||
self._m.stop()
|
||||
def timer_expired(self):
|
||||
self._m.expire()
|
||||
|
||||
# outputs driven by the state machine
|
||||
def ep_connect(self):
|
||||
print("ep_connect()")
|
||||
self._d = self._ep.connect(self._f)
|
||||
self._d.addCallbacks(self.d_callback, self.d_errback)
|
||||
def connection_established(self):
|
||||
self._connection = WSConnection(ws, self._wormhole.appid,
|
||||
self._wormhole.side, self)
|
||||
self._mailbox.connected(ws)
|
||||
self._wormhole.add_connection(self._connection)
|
||||
self._ws_send_command("bind", appid=self._appid, side=self._side)
|
||||
def M_lost(self):
|
||||
self._wormhole.M_lost(self._connection)
|
||||
self._connection = None
|
||||
def start_timer(self):
|
||||
print("start_timer")
|
||||
self._t = self._reactor.callLater(3.0, self.expire)
|
||||
def cancel_timer(self):
|
||||
print("cancel_timer")
|
||||
self._t.cancel()
|
||||
self._t = None
|
||||
def dropConnection(self):
|
||||
print("dropConnection")
|
||||
self._ws.dropConnection()
|
||||
def notify_fail(self):
|
||||
print("notify_fail", self._f.value if self._f else None)
|
||||
self._done_d.errback(self._f)
|
||||
def MC_stopped(self):
|
||||
pass
|
||||
|
||||
|
||||
def tryit(reactor):
|
||||
cm = WSRelayClient(None, "ws://127.0.0.1:4000/v1", reactor)
|
||||
print("_ConnectionMachine created")
|
||||
print("start:", cm.start())
|
||||
print("waiting on _done_d to finish")
|
||||
return cm._done_d
|
||||
|
||||
# http://autobahn-python.readthedocs.io/en/latest/websocket/programming.html
|
||||
# observed sequence of events:
|
||||
# success: d_callback, onConnect(response), onOpen(), onMessage()
|
||||
# negotifail (non-websocket): d_callback, onClose()
|
||||
# noconnect: d_errback
|
||||
|
||||
def tryws(reactor):
|
||||
ws_url = "ws://127.0.0.1:40001/v1"
|
||||
f = WSFactory(ws_url)
|
||||
p = urlparse(ws_url)
|
||||
ep = endpoints.HostnameEndpoint(reactor, p.hostname, p.port or 80)
|
||||
d = ep.connect(f)
|
||||
def _good(p): print("_good", p)
|
||||
def _bad(f): print("_bad", f)
|
||||
d.addCallbacks(_good, _bad)
|
||||
return defer.Deferred()
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
from twisted.python import log
|
||||
log.startLogging(sys.stdout)
|
||||
from twisted.internet.task import react
|
||||
react(tryit)
|
||||
|
||||
# ??? a new WSConnection is created each time the WSRelayClient gets through
|
||||
# negotiation
|
||||
|
||||
class NameplateListingMachine(object):
|
||||
m = MethodicalMachine()
|
||||
def __init__(self):
|
||||
self._list_nameplate_waiters = []
|
||||
|
||||
# Ideally, each API request would spawn a new "list_nameplates" message
|
||||
# to the server, so the response would be maximally fresh, but that would
|
||||
# require correlating server request+response messages, and the protocol
|
||||
# is intended to be less stateful than that. So we offer a weaker
|
||||
# freshness property: if no server requests are in flight, then a new API
|
||||
# request will provoke a new server request, and the result will be
|
||||
# fresh. But if a server request is already in flight when a second API
|
||||
# request arrives, both requests will be satisfied by the same response.
|
||||
|
||||
@m.state(initial=True)
|
||||
def idle(self): pass
|
||||
@m.state()
|
||||
def requesting(self): pass
|
||||
|
||||
@m.input()
|
||||
def list_nameplates(self): pass # returns Deferred
|
||||
@m.input()
|
||||
def response(self, message): pass
|
||||
|
||||
@m.output()
|
||||
def add_deferred(self):
|
||||
d = defer.Deferred()
|
||||
self._list_nameplate_waiters.append(d)
|
||||
return d
|
||||
@m.output()
|
||||
def send_request(self):
|
||||
self._connection.send_command("list")
|
||||
@m.output()
|
||||
def distribute_response(self, message):
|
||||
nameplates = parse(message)
|
||||
waiters = self._list_nameplate_waiters
|
||||
self._list_nameplate_waiters = []
|
||||
for d in waiters:
|
||||
d.callback(nameplates)
|
||||
|
||||
idle.upon(list_nameplates, enter=requesting,
|
||||
outputs=[add_deferred, send_request],
|
||||
collector=lambda outs: outs[0])
|
||||
idle.upon(response, enter=idle, outputs=[])
|
||||
requesting.upon(list_nameplates, enter=requesting,
|
||||
outputs=[add_deferred],
|
||||
collector=lambda outs: outs[0])
|
||||
requesting.upon(response, enter=idle, outputs=[distribute_response])
|
||||
|
||||
# nlm._connection = c = Connection(ws)
|
||||
# nlm.list_nameplates().addCallback(display_completions)
|
||||
# c.register_dispatch("nameplates", nlm.response)
|
||||
|
||||
class Wormhole:
|
||||
m = MethodicalMachine()
|
||||
|
||||
def __init__(self, ws_url, reactor):
|
||||
self._relay_client = WSRelayClient(self, ws_url, reactor)
|
||||
# This records all the messages we want the relay to have. Each time
|
||||
# we establish a connection, we'll send them all (and the relay
|
||||
# server will filter out duplicates). If we add any while a
|
||||
# connection is established, we'll send the new ones.
|
||||
self._outbound_messages = []
|
||||
|
||||
# these methods are called from outside
|
||||
def start(self):
|
||||
self._relay_client.start()
|
||||
|
||||
# and these are the state-machine transition functions, which don't take
|
||||
# args
|
||||
@m.state()
|
||||
def closed(initial=True): pass
|
||||
@m.state()
|
||||
def know_code_not_mailbox(): pass
|
||||
@m.state()
|
||||
def know_code_and_mailbox(): pass # no longer need nameplate
|
||||
@m.state()
|
||||
def waiting_first_msg(): pass # key is established, want any message
|
||||
@m.state()
|
||||
def processing_version(): pass
|
||||
@m.state()
|
||||
def processing_phase(): pass
|
||||
@m.state()
|
||||
def open(): pass # key is verified, can post app messages
|
||||
@m.state(terminal=True)
|
||||
def failed(): pass
|
||||
|
||||
@m.input()
|
||||
def deliver_message(self, message): pass
|
||||
|
||||
def w_set_seed(self, code, mailbox):
|
||||
"""Call w_set_seed when we sprout a Wormhole Seed, which
|
||||
contains both the code and the mailbox"""
|
||||
self.w_set_code(code)
|
||||
self.w_set_mailbox(mailbox)
|
||||
|
||||
@m.input()
|
||||
def w_set_code(self, code):
|
||||
"""Call w_set_code when you learn the code, probably because the user
|
||||
typed it in."""
|
||||
@m.input()
|
||||
def w_set_mailbox(self, mailbox):
|
||||
"""Call w_set_mailbox() when you learn the mailbox id, from the
|
||||
response to claim_nameplate"""
|
||||
pass
|
||||
|
||||
|
||||
@m.input()
|
||||
def rx_pake(self, pake): pass # reponse["message"][phase=pake]
|
||||
|
||||
@m.input()
|
||||
def rx_version(self, version): # response["message"][phase=version]
|
||||
pass
|
||||
@m.input()
|
||||
def verify_good(self, verifier): pass
|
||||
@m.input()
|
||||
def verify_bad(self, f): pass
|
||||
|
||||
@m.input()
|
||||
def rx_phase(self, message): pass
|
||||
@m.input()
|
||||
def phase_good(self, message): pass
|
||||
@m.input()
|
||||
def phase_bad(self, f): pass
|
||||
|
||||
@m.output()
|
||||
def compute_and_post_pake(self, code):
|
||||
self._code = code
|
||||
self._pake = compute(code)
|
||||
self._post(pake=self._pake)
|
||||
self._ws_send_command("add", phase="pake", body=XXX(pake))
|
||||
@m.output()
|
||||
def set_mailbox(self, mailbox):
|
||||
self._mailbox = mailbox
|
||||
@m.output()
|
||||
def set_seed(self, code, mailbox):
|
||||
self._code = code
|
||||
self._mailbox = mailbox
|
||||
|
||||
@m.output()
|
||||
def process_version(self, version): # response["message"][phase=version]
|
||||
their_verifier = com
|
||||
if OK:
|
||||
self.verify_good(verifier)
|
||||
else:
|
||||
self.verify_bad(f)
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def notify_verified(self, verifier):
|
||||
for d in self._verify_waiters:
|
||||
d.callback(verifier)
|
||||
@m.output()
|
||||
def notify_failed(self, f):
|
||||
for d in self._verify_waiters:
|
||||
d.errback(f)
|
||||
|
||||
@m.output()
|
||||
def process_phase(self, message): # response["message"][phase=version]
|
||||
their_verifier = com
|
||||
if OK:
|
||||
self.verify_good(verifier)
|
||||
else:
|
||||
self.verify_bad(f)
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def post_inbound(self, message):
|
||||
pass
|
||||
|
||||
@m.output()
|
||||
def deliver_message(self, message):
|
||||
self._qc.deliver_message(message)
|
||||
|
||||
@m.output()
|
||||
def compute_key_and_post_version(self, pake):
|
||||
self._key = x
|
||||
self._verifier = x
|
||||
plaintext = dict_to_bytes(self._my_versions)
|
||||
phase = "version"
|
||||
data_key = self._derive_phase_key(self._side, phase)
|
||||
encrypted = self._encrypt_data(data_key, plaintext)
|
||||
self._msg_send(phase, encrypted)
|
||||
|
||||
closed.upon(w_set_code, enter=know_code_not_mailbox,
|
||||
outputs=[compute_and_post_pake])
|
||||
know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox,
|
||||
outputs=[set_mailbox])
|
||||
know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg,
|
||||
outputs=[compute_key_and_post_version])
|
||||
waiting_first_msg.upon(rx_version, enter=processing_version,
|
||||
outputs=[process_version])
|
||||
processing_version.upon(verify_good, enter=open, outputs=[notify_verified])
|
||||
processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed])
|
||||
open.upon(rx_phase, enter=processing_phase, outputs=[process_phase])
|
||||
processing_phase.upon(phase_good, enter=open, outputs=[post_inbound])
|
||||
processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed])
|
||||
|
||||
class QueueConnect:
|
||||
m = MethodicalMachine()
|
||||
def __init__(self):
|
||||
self._outbound_messages = []
|
||||
self._connection = None
|
||||
@m.state()
|
||||
def disconnected(): pass
|
||||
@m.state()
|
||||
def connected(): pass
|
||||
|
||||
@m.input()
|
||||
def deliver_message(self, message): pass
|
||||
@m.input()
|
||||
def connect(self, connection): pass
|
||||
@m.input()
|
||||
def disconnect(self): pass
|
||||
|
||||
@m.output()
|
||||
def remember_connection(self, connection):
|
||||
self._connection = connection
|
||||
@m.output()
|
||||
def forget_connection(self):
|
||||
self._connection = None
|
||||
@m.output()
|
||||
def queue_message(self, message):
|
||||
self._outbound_messages.append(message)
|
||||
@m.output()
|
||||
def send_message(self, message):
|
||||
self._connection.send(message)
|
||||
@m.output()
|
||||
def send_queued_messages(self, connection):
|
||||
for m in self._outbound_messages:
|
||||
connection.send(m)
|
||||
|
||||
disconnected.upon(deliver_message, enter=disconnected, outputs=[queue_message])
|
||||
disconnected.upon(connect, enter=connected, outputs=[remember_connection,
|
||||
send_queued_messages])
|
||||
connected.upon(deliver_message, enter=connected,
|
||||
outputs=[queue_message, send_message])
|
||||
connected.upon(disconnect, enter=disconnected, outputs=[forget_connection])
|
Loading…
Reference in New Issue
Block a user