From b934192f204d24b329e4006243e6439257e50ebd Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 30 Dec 2016 00:30:59 -0500 Subject: [PATCH] work on Mailbox machine --- docs/w4.dot | 6 +- src/wormhole/_connection.py | 19 ++--- src/wormhole/_mailbox.py | 148 ++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 12 deletions(-) create mode 100644 src/wormhole/_mailbox.py diff --git a/docs/w4.dot b/docs/w4.dot index 00ea37d..5fd6c52 100644 --- a/docs/w4.dot +++ b/docs/w4.dot @@ -7,7 +7,9 @@ digraph { C_Pc1 [shape="box" label="ep.connect()" color="orange"] C_Pc1 -> C_Sc1 [color="orange"] C_Sc1 [label="connecting\n(1st time)" color="orange"] - C_Sc1 -> C_S_negotiating [label="d.callback" color="orange" fontcolor="orange"] + C_Sc1 -> C_P_reset [label="d.callback" color="orange" fontcolor="orange"] + C_P_reset [shape="box" label="reset\ntimer" color="orange"] + C_P_reset -> C_S_negotiating [color="orange"] C_Sc1 -> C_P_failed [label="d.errback" color="red"] C_Sc1 -> C_P_failed [label="p.onClose" color="red"] C_Sc1 -> C_P_cancel [label="C_stop()"] @@ -46,8 +48,6 @@ digraph { C_Pc2 -> C_Sc2 [color="blue"] C_Sc2 [label="reconnecting" color="blue"] C_Sc2 -> C_P_reset [label="d.callback" color="blue" fontcolor="blue"] - C_P_reset [shape="box" label="reset\ntimer" color="blue"] - C_P_reset -> C_S_negotiating [color="blue"] C_Sc2 -> C_P_wait [label="d.errback"] C_Sc2 -> C_P_cancel [label="C_stop()"] diff --git a/src/wormhole/_connection.py b/src/wormhole/_connection.py index 5e7f034..ebc7209 100644 --- a/src/wormhole/_connection.py +++ b/src/wormhole/_connection.py @@ -43,13 +43,9 @@ class WSFactory(websocket.WebSocketClientFactory): #proto.wormhole_open = False return proto - # pip install (path to automat checkout)[visualize] # automat-visualize wormhole._connection -# We have one WSRelayClient for each wsurl we know about, and it lasts -# as long as its parent Wormhole does. - @attrs class _WSRelayClient_Machine(object): _c = attrib() @@ -102,9 +98,9 @@ class _WSRelayClient_Machine(object): def reset_timer(self): self._c.reset_timer() @m.output() - def add_connection(self): - print("add_connection") - self._c.add_connection() + def connection_established(self): + print("connection_established") + self._c.connection_established() @m.output() def M_lost(self): self._c.M_lost() @@ -131,7 +127,7 @@ class _WSRelayClient_Machine(object): connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) cancelling.upon(d_errback, enter=closed, outputs=[]) - negotiating.upon(onOpen, enter=open, outputs=[add_connection]) + negotiating.upon(onOpen, enter=open, outputs=[connection_established]) negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection]) negotiating.upon(onClose, enter=failed, outputs=[notify_fail]) @@ -146,9 +142,13 @@ class _WSRelayClient_Machine(object): waiting.upon(stop, enter=closed, outputs=[cancel_timer]) disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped]) +# We have one WSRelayClient for each wsurl we know about, and it lasts +# as long as its parent Wormhole does. + @attrs class WSRelayClient(object): _wormhole = attrib() + _mailbox = attrib() _ws_url = attrib() _reactor = attrib() INITIAL_DELAY = 1.0 @@ -189,9 +189,10 @@ class WSRelayClient(object): print("ep_connect()") self._d = self._ep.connect(self._f) self._d.addCallbacks(self.d_callback, self.d_errback) - def add_connection(self): + def connection_established(self): self._connection = WSConnection(ws, self._wormhole.appid, self._wormhole.side, self) + self._mailbox.connected(ws) self._wormhole.add_connection(self._connection) def M_lost(self): self._wormhole.M_lost(self._connection) diff --git a/src/wormhole/_mailbox.py b/src/wormhole/_mailbox.py new file mode 100644 index 0000000..494d913 --- /dev/null +++ b/src/wormhole/_mailbox.py @@ -0,0 +1,148 @@ +from attr import attrs, attrib +from automat import MethodicalMachine + +@attrs +class _Mailbox_Machine(object): + _m = attrib() + m = MethodicalMachine() + + @m.state(initial=True) + def initial(self): pass + + @m.state() + def S1A(self): pass # know nothing, not connected + @m.state() + def S1B(self): pass # know nothing, yes connected + + @m.state() + def S2A(self): pass # not claimed, not connected + @m.state() + def S2B(self): pass # maybe claimed, yes connected + @m.state() + def S2C(self): pass # maybe claimed, not connected + + @m.state() + def S3A(self): pass # claimed, maybe opened, not connected + @m.state() + def S3B(self): pass # claimed, maybe opened, yes connected + + @m.state() + def S4A(self): pass # maybe released, maybe opened, not connected + @m.state() + def S4B(self): pass # maybe released, maybe opened, yes connected + + @m.state() + def S5A(self): pass # released, maybe open, not connected + @m.state() + def S5B(self): pass # released, maybe open, yes connected + + @m.state() + def SrcA(self): pass # waiting for release+close, not connected + @m.state() + def SrcB(self): pass # waiting for release+close, yes connected + @m.state() + def SrA(self): pass # waiting for release, not connected + @m.state() + def SrB(self): pass # waiting for release, yes connected + @m.state() + def ScA(self): pass # waiting for close, not connected + @m.state() + def ScB(self): pass # waiting for close, yes connected + @m.state() + def SsB(self): pass # closed, stopping + @m.state() + def Ss(self): pass # stopped + + + def connected(self, ws): + self._ws = ws + self.M_connected() + + @m.input() + def M_start_unconnected(self): pass + @m.input() + def M_start_connected(self): pass + @m.input() + def M_set_nameplate(self): pass + @m.input() + def M_connected(self): pass + @m.input() + def M_lost(self): pass + @m.input() + def M_send(self, msg): pass + @m.input() + def M_rx_claimed(self): pass + @m.input() + def M_rx_msg_from_me(self, msg): pass + @m.input() + def M_rx_msg_from_them(self, msg): pass + @m.input() + def M_rx_released(self): pass + @m.input() + def M_rx_closed(self): pass + @m.input() + def M_stopped(self): pass + + @m.output() + def tx_claim(self): pass + @m.output() + def tx_open(self): pass + @m.output() + def queue(self, msg): pass + @m.output() + def store_mailbox(self): pass # trouble(mb) + @m.output() + def tx_add(self, msg): pass + @m.output() + def tx_add_queued(self): pass + @m.output() + def tx_release(self): pass + @m.output() + def process_msg_from_them(self, msg): pass + @m.output() + def dequeue(self, msg): pass + + + initial.upon(M_start_connected, enter=S1A, outputs=[]) + initial.upon(M_start_unconnected, enter=S1B, outputs=[]) + S1A.upon(M_connected, enter=S1B, outputs=[]) + S1A.upon(M_set_nameplate, enter=S2A, outputs=[]) + S1B.upon(M_lost, enter=S1A, outputs=[]) + S1B.upon(M_set_nameplate, enter=S2B, outputs=[tx_claim]) + + S2A.upon(M_connected, enter=S2B, outputs=[tx_claim]) + #S2A.upon(M_close + S2A.upon(M_send, enter=S2A, outputs=[queue]) + S2B.upon(M_lost, enter=S2C, outputs=[]) + S2B.upon(M_send, enter=S2B, outputs=[queue]) + #S2B.upon(M_close + S2B.upon(M_rx_claimed, enter=S3B, outputs=[store_mailbox, tx_open, + tx_add_queued]) + S2C.upon(M_connected, enter=S2B, outputs=[tx_claim]) + S2C.upon(M_send, enter=S2C, outputs=[queue]) + + S3A.upon(M_connected, enter=S3B, outputs=[tx_open, tx_add_queued]) + S3A.upon(M_send, enter=S3A, outputs=[queue]) + S3B.upon(M_lost, enter=S3A, outputs=[]) + S3B.upon(M_rx_msg_from_them, enter=S4B, outputs=[#tx_release, # trouble + process_msg_from_them]) + S3B.upon(M_rx_msg_from_me, enter=S3B, outputs=[dequeue]) + S3B.upon(M_rx_claimed, enter=S3B, outputs=[]) + S3B.upon(M_send, enter=S3B, outputs=[queue, tx_add]) + + S4A.upon(M_connected, enter=S4B, outputs=[tx_open, tx_add_queued, tx_release]) + S4A.upon(M_send, enter=S4A, outputs=[queue]) + S4B.upon(M_lost, enter=S4A, outputs=[]) + S4B.upon(M_send, enter=S4B, outputs=[queue, tx_add]) + S4B.upon(M_rx_msg_from_them, enter=S4B, outputs=[process_msg_from_them]) + S4B.upon(M_rx_msg_from_me, enter=S4B, outputs=[dequeue]) + S4B.upon(M_rx_released, enter=S5B, outputs=[]) + + S5A.upon(M_connected, enter=S5B, outputs=[tx_open, tx_add_queued]) + S5A.upon(M_send, enter=S5A, outputs=[queue]) + S5B.upon(M_lost, enter=S5A, outputs=[]) + S5B.upon(M_send, enter=S5B, outputs=[queue, tx_add]) + S5B.upon(M_rx_msg_from_them, enter=S5B, outputs=[process_msg_from_them]) + S5B.upon(M_rx_msg_from_me, enter=S5B, outputs=[dequeue]) + +