diff --git a/docs/w4.dot b/docs/w4.dot index 7b4e75d..00ea37d 100644 --- a/docs/w4.dot +++ b/docs/w4.dot @@ -44,7 +44,7 @@ digraph { C_P_stop_timer -> C_P_stopped C_Pc2 [shape="box" label="ep.connect()" color="blue"] C_Pc2 -> C_Sc2 [color="blue"] - C_Sc2 [label="connecting" color="blue"] + C_Sc2 [label="reconnecting" color="blue"] C_Sc2 -> C_P_reset [label="d.callback" color="blue" fontcolor="blue"] C_P_reset [shape="box" label="reset\ntimer" color="blue"] C_P_reset -> C_S_negotiating [color="blue"] diff --git a/src/wormhole/_connection.py b/src/wormhole/_connection.py index a58238d..5e7f034 100644 --- a/src/wormhole/_connection.py +++ b/src/wormhole/_connection.py @@ -16,7 +16,7 @@ class WSClient(websocket.WebSocketClientProtocol): # this fires when the WebSocket is ready to go. No arguments print("onOpen", args) #self.wormhole_open = True - self.connection_machine.onOpen(self) + self.connection_machine.protocol_onOpen(self) #self.factory.d.callback(self) def onMessage(self, payload, isBinary): @@ -27,7 +27,7 @@ class WSClient(websocket.WebSocketClientProtocol): def onClose(self, wasClean, code, reason): print("onClose") - self.connection_machine.onClose(f=None) + self.connection_machine.protocol_onClose(wasClean, code, reason) #if self.wormhole_open: # self.wormhole._ws_closed(wasClean, code, reason) #else: @@ -51,29 +51,14 @@ class WSFactory(websocket.WebSocketClientFactory): # as long as its parent Wormhole does. @attrs -class WSRelayClient(object): - _wormhole = attrib() - _ws_url = attrib() - _reactor = attrib() - +class _WSRelayClient_Machine(object): + _c = attrib() m = MethodicalMachine() - ALLOW_CLOSE = True - - def __init__(self): - self._f = f = WSFactory(self._ws_url) - f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600) - f.connection_machine = self # calls onOpen and onClose - p = urlparse(self._ws_url) - self._ep = self._make_endpoint(p.hostname, p.port or 80) - self._connector = None - self._done_d = defer.Deferred() - def _make_endpoint(self, hostname, port): - return endpoints.HostnameEndpoint(self._reactor, hostname, port) @m.state(initial=True) def initial(self): pass @m.state() - def first_time_connecting(self): pass + def connecting(self): pass @m.state() def negotiating(self): pass @m.state(terminal=True) @@ -83,100 +68,150 @@ class WSRelayClient(object): @m.state() def waiting(self): pass @m.state() - def connecting(self): pass - if ALLOW_CLOSE: - @m.state() - def disconnecting(self): pass - @m.state() - def cancelling(self): pass - @m.state(terminal=True) - def closed(self): pass - + def reconnecting(self): pass + @m.state() + def disconnecting(self): pass + @m.state() + def cancelling(self): pass + @m.state(terminal=True) + def closed(self): pass @m.input() - def start(self): pass ; print("in start") + def start(self): pass ; print("input:start") @m.input() - def d_callback(self, p): pass ; print("in d_callback", p) + def d_callback(self): pass ; print("input:d_callback") @m.input() - def d_errback(self, f): pass ; print("in d_errback", f) + def d_errback(self): pass ; print("input:d_errback") @m.input() - - def d_cancel(self, f): pass # XXX remove f + def d_cancel(self): pass ; print("input:d_cancel") @m.input() - def onOpen(self, ws): pass ; print("in onOpen") + def onOpen(self): pass ; print("input:onOpen") @m.input() - def onClose(self, f): pass # XXX p.onClose does cm.onClose("made up failure") + def onClose(self): pass ; print("input:onClose") @m.input() def expire(self): pass - if ALLOW_CLOSE: - @m.input() - def stop(self, f): pass + @m.input() + def stop(self): pass + # outputs @m.output() def ep_connect(self): "ep.connect()" - print("ep_connect()") - self._d = self._ep.connect(self._f) - self._d.addCallbacks(self.d_callback, self.d_errback) + self._c.ep_connect() @m.output() - def add_connection(self, ws): - print("add_connection", ws) - self._connection = WSConnection(ws, self._wormhole.appid, - self._wormhole.side, self) - self._wormhole.add_connection(self._connection) + def reset_timer(self): + self._c.reset_timer() @m.output() - def M_lost(self, f): # XXX remove f - self._wormhole.M_lost(self._connection) - self._connection = None + def add_connection(self): + print("add_connection") + self._c.add_connection() @m.output() - def start_timer(self, f): # XXX remove f - print("start_timer") - self._t = self._reactor.callLater(3.0, self.expire) + def M_lost(self): + self._c.M_lost() @m.output() - def cancel_timer(self, f): # XXX remove f - print("cancel_timer") - self._t.cancel() - self._t = None + def start_timer(self): + self._c.start_timer() @m.output() - def dropConnection(self, f): # XXX remove f - print("dropConnection") - self._ws.dropConnection() + def cancel_timer(self): + self._c.cancel_timer() @m.output() - def notify_fail(self, f): - print("notify_fail", f.value) - self._done_d.errback(f) + def dropConnection(self): + self._c.dropConnection() + @m.output() + def notify_fail(self): + self._c.notify_fail() @m.output() def MC_stopped(self): - pass + self._c.MC_stopped() - initial.upon(start, enter=first_time_connecting, outputs=[ep_connect]) - first_time_connecting.upon(d_callback, enter=negotiating, outputs=[]) - first_time_connecting.upon(d_errback, enter=failed, outputs=[notify_fail]) - first_time_connecting.upon(onClose, enter=failed, outputs=[notify_fail]) - if ALLOW_CLOSE: - first_time_connecting.upon(stop, enter=cancelling, - outputs=[d_cancel]) - cancelling.upon(d_errback, enter=closed, outputs=[]) + initial.upon(start, enter=connecting, outputs=[ep_connect]) + connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) + connecting.upon(d_errback, enter=failed, outputs=[notify_fail]) + connecting.upon(onClose, enter=failed, outputs=[notify_fail]) + connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) + cancelling.upon(d_errback, enter=closed, outputs=[]) negotiating.upon(onOpen, enter=open, outputs=[add_connection]) - if ALLOW_CLOSE: - negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection]) + negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection]) negotiating.upon(onClose, enter=failed, outputs=[notify_fail]) open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer]) - if ALLOW_CLOSE: - open.upon(stop, enter=disconnecting, - outputs=[dropConnection, M_lost]) - connecting.upon(d_callback, enter=negotiating, outputs=[]) - connecting.upon(d_errback, enter=waiting, outputs=[start_timer]) - connecting.upon(onClose, enter=waiting, outputs=[start_timer]) - if ALLOW_CLOSE: - connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) + open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost]) + reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) + reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer]) + reconnecting.upon(onClose, enter=waiting, outputs=[start_timer]) + reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel]) + + waiting.upon(expire, enter=reconnecting, outputs=[ep_connect]) + waiting.upon(stop, enter=closed, outputs=[cancel_timer]) + disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped]) + +@attrs +class WSRelayClient(object): + _wormhole = attrib() + _ws_url = attrib() + _reactor = attrib() + INITIAL_DELAY = 1.0 + + + def __init__(self): + self._m = _WSRelayClient_Machine(self) + self._f = f = WSFactory(self._ws_url) + f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600) + f.connection_machine = self # calls onOpen and onClose + p = urlparse(self._ws_url) + self._ep = self._make_endpoint(p.hostname, p.port or 80) + self._connector = None + self._done_d = defer.Deferred() + self._current_delay = self.INITIAL_DELAY + + def _make_endpoint(self, hostname, port): + return endpoints.HostnameEndpoint(self._reactor, hostname, port) + + # inputs from elsewhere + def d_callback(self, p): + self._p = p + self._m.d_callback() + def d_errback(self, f): + self._f = f + self._m.d_errback() + def protocol_onOpen(self, p): + self._m.onOpen() + def protocol_onClose(self, wasClean, code, reason): + self._m.onClose() + def C_stop(self): + self._m.stop() + def timer_expired(self): + self._m.expire() + + # outputs driven by the state machine + def ep_connect(self): + print("ep_connect()") + self._d = self._ep.connect(self._f) + self._d.addCallbacks(self.d_callback, self.d_errback) + def add_connection(self): + self._connection = WSConnection(ws, self._wormhole.appid, + self._wormhole.side, self) + self._wormhole.add_connection(self._connection) + def M_lost(self): + self._wormhole.M_lost(self._connection) + self._connection = None + def start_timer(self): + print("start_timer") + self._t = self._reactor.callLater(3.0, self.expire) + def cancel_timer(self): + print("cancel_timer") + self._t.cancel() + self._t = None + def dropConnection(self): + print("dropConnection") + self._ws.dropConnection() + def notify_fail(self): + print("notify_fail", self._f.value if self._f else None) + self._done_d.errback(self._f) + def MC_stopped(self): + pass - waiting.upon(expire, enter=connecting, outputs=[ep_connect]) - if ALLOW_CLOSE: - waiting.upon(stop, enter=closed, outputs=[cancel_timer]) - disconnecting.upon(onClose, enter=closed, outputs=[]) #MC_stopped def tryit(reactor): cm = WSRelayClient(None, "ws://127.0.0.1:4000/v1", reactor) @@ -447,9 +482,12 @@ class Wormhole: # connection is established, we'll send the new ones. self._outbound_messages = [] + # these methods are called from outside def start(self): self._relay_client.start() + # and these are the state-machine transition functions, which don't take + # args @m.state() def closed(initial=True): pass @m.state()