moving to separate machine class

This commit is contained in:
Brian Warner 2016-12-29 21:29:55 -05:00
parent 3bf762b4f7
commit 11a80f0018
2 changed files with 125 additions and 87 deletions

View File

@ -44,7 +44,7 @@ digraph {
C_P_stop_timer -> C_P_stopped C_P_stop_timer -> C_P_stopped
C_Pc2 [shape="box" label="ep.connect()" color="blue"] C_Pc2 [shape="box" label="ep.connect()" color="blue"]
C_Pc2 -> C_Sc2 [color="blue"] C_Pc2 -> C_Sc2 [color="blue"]
C_Sc2 [label="connecting" color="blue"] C_Sc2 [label="reconnecting" color="blue"]
C_Sc2 -> C_P_reset [label="d.callback" color="blue" fontcolor="blue"] C_Sc2 -> C_P_reset [label="d.callback" color="blue" fontcolor="blue"]
C_P_reset [shape="box" label="reset\ntimer" color="blue"] C_P_reset [shape="box" label="reset\ntimer" color="blue"]
C_P_reset -> C_S_negotiating [color="blue"] C_P_reset -> C_S_negotiating [color="blue"]

View File

@ -16,7 +16,7 @@ class WSClient(websocket.WebSocketClientProtocol):
# this fires when the WebSocket is ready to go. No arguments # this fires when the WebSocket is ready to go. No arguments
print("onOpen", args) print("onOpen", args)
#self.wormhole_open = True #self.wormhole_open = True
self.connection_machine.onOpen(self) self.connection_machine.protocol_onOpen(self)
#self.factory.d.callback(self) #self.factory.d.callback(self)
def onMessage(self, payload, isBinary): def onMessage(self, payload, isBinary):
@ -27,7 +27,7 @@ class WSClient(websocket.WebSocketClientProtocol):
def onClose(self, wasClean, code, reason): def onClose(self, wasClean, code, reason):
print("onClose") print("onClose")
self.connection_machine.onClose(f=None) self.connection_machine.protocol_onClose(wasClean, code, reason)
#if self.wormhole_open: #if self.wormhole_open:
# self.wormhole._ws_closed(wasClean, code, reason) # self.wormhole._ws_closed(wasClean, code, reason)
#else: #else:
@ -51,29 +51,14 @@ class WSFactory(websocket.WebSocketClientFactory):
# as long as its parent Wormhole does. # as long as its parent Wormhole does.
@attrs @attrs
class WSRelayClient(object): class _WSRelayClient_Machine(object):
_wormhole = attrib() _c = attrib()
_ws_url = attrib()
_reactor = attrib()
m = MethodicalMachine() m = MethodicalMachine()
ALLOW_CLOSE = True
def __init__(self):
self._f = f = WSFactory(self._ws_url)
f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600)
f.connection_machine = self # calls onOpen and onClose
p = urlparse(self._ws_url)
self._ep = self._make_endpoint(p.hostname, p.port or 80)
self._connector = None
self._done_d = defer.Deferred()
def _make_endpoint(self, hostname, port):
return endpoints.HostnameEndpoint(self._reactor, hostname, port)
@m.state(initial=True) @m.state(initial=True)
def initial(self): pass def initial(self): pass
@m.state() @m.state()
def first_time_connecting(self): pass def connecting(self): pass
@m.state() @m.state()
def negotiating(self): pass def negotiating(self): pass
@m.state(terminal=True) @m.state(terminal=True)
@ -83,100 +68,150 @@ class WSRelayClient(object):
@m.state() @m.state()
def waiting(self): pass def waiting(self): pass
@m.state() @m.state()
def connecting(self): pass def reconnecting(self): pass
if ALLOW_CLOSE: @m.state()
@m.state() def disconnecting(self): pass
def disconnecting(self): pass @m.state()
@m.state() def cancelling(self): pass
def cancelling(self): pass @m.state(terminal=True)
@m.state(terminal=True) def closed(self): pass
def closed(self): pass
@m.input() @m.input()
def start(self): pass ; print("in start") def start(self): pass ; print("input:start")
@m.input() @m.input()
def d_callback(self, p): pass ; print("in d_callback", p) def d_callback(self): pass ; print("input:d_callback")
@m.input() @m.input()
def d_errback(self, f): pass ; print("in d_errback", f) def d_errback(self): pass ; print("input:d_errback")
@m.input() @m.input()
def d_cancel(self): pass ; print("input:d_cancel")
def d_cancel(self, f): pass # XXX remove f
@m.input() @m.input()
def onOpen(self, ws): pass ; print("in onOpen") def onOpen(self): pass ; print("input:onOpen")
@m.input() @m.input()
def onClose(self, f): pass # XXX p.onClose does cm.onClose("made up failure") def onClose(self): pass ; print("input:onClose")
@m.input() @m.input()
def expire(self): pass def expire(self): pass
if ALLOW_CLOSE: @m.input()
@m.input() def stop(self): pass
def stop(self, f): pass
# outputs
@m.output() @m.output()
def ep_connect(self): def ep_connect(self):
"ep.connect()" "ep.connect()"
print("ep_connect()") self._c.ep_connect()
self._d = self._ep.connect(self._f)
self._d.addCallbacks(self.d_callback, self.d_errback)
@m.output() @m.output()
def add_connection(self, ws): def reset_timer(self):
print("add_connection", ws) self._c.reset_timer()
self._connection = WSConnection(ws, self._wormhole.appid,
self._wormhole.side, self)
self._wormhole.add_connection(self._connection)
@m.output() @m.output()
def M_lost(self, f): # XXX remove f def add_connection(self):
self._wormhole.M_lost(self._connection) print("add_connection")
self._connection = None self._c.add_connection()
@m.output() @m.output()
def start_timer(self, f): # XXX remove f def M_lost(self):
print("start_timer") self._c.M_lost()
self._t = self._reactor.callLater(3.0, self.expire)
@m.output() @m.output()
def cancel_timer(self, f): # XXX remove f def start_timer(self):
print("cancel_timer") self._c.start_timer()
self._t.cancel()
self._t = None
@m.output() @m.output()
def dropConnection(self, f): # XXX remove f def cancel_timer(self):
print("dropConnection") self._c.cancel_timer()
self._ws.dropConnection()
@m.output() @m.output()
def notify_fail(self, f): def dropConnection(self):
print("notify_fail", f.value) self._c.dropConnection()
self._done_d.errback(f) @m.output()
def notify_fail(self):
self._c.notify_fail()
@m.output() @m.output()
def MC_stopped(self): def MC_stopped(self):
pass self._c.MC_stopped()
initial.upon(start, enter=first_time_connecting, outputs=[ep_connect]) initial.upon(start, enter=connecting, outputs=[ep_connect])
first_time_connecting.upon(d_callback, enter=negotiating, outputs=[]) connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer])
first_time_connecting.upon(d_errback, enter=failed, outputs=[notify_fail]) connecting.upon(d_errback, enter=failed, outputs=[notify_fail])
first_time_connecting.upon(onClose, enter=failed, outputs=[notify_fail]) connecting.upon(onClose, enter=failed, outputs=[notify_fail])
if ALLOW_CLOSE: connecting.upon(stop, enter=cancelling, outputs=[d_cancel])
first_time_connecting.upon(stop, enter=cancelling, cancelling.upon(d_errback, enter=closed, outputs=[])
outputs=[d_cancel])
cancelling.upon(d_errback, enter=closed, outputs=[])
negotiating.upon(onOpen, enter=open, outputs=[add_connection]) negotiating.upon(onOpen, enter=open, outputs=[add_connection])
if ALLOW_CLOSE: negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection])
negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection])
negotiating.upon(onClose, enter=failed, outputs=[notify_fail]) negotiating.upon(onClose, enter=failed, outputs=[notify_fail])
open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer]) open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer])
if ALLOW_CLOSE: open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost])
open.upon(stop, enter=disconnecting, reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer])
outputs=[dropConnection, M_lost]) reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer])
connecting.upon(d_callback, enter=negotiating, outputs=[]) reconnecting.upon(onClose, enter=waiting, outputs=[start_timer])
connecting.upon(d_errback, enter=waiting, outputs=[start_timer]) reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel])
connecting.upon(onClose, enter=waiting, outputs=[start_timer])
if ALLOW_CLOSE: waiting.upon(expire, enter=reconnecting, outputs=[ep_connect])
connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) waiting.upon(stop, enter=closed, outputs=[cancel_timer])
disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped])
@attrs
class WSRelayClient(object):
_wormhole = attrib()
_ws_url = attrib()
_reactor = attrib()
INITIAL_DELAY = 1.0
def __init__(self):
self._m = _WSRelayClient_Machine(self)
self._f = f = WSFactory(self._ws_url)
f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600)
f.connection_machine = self # calls onOpen and onClose
p = urlparse(self._ws_url)
self._ep = self._make_endpoint(p.hostname, p.port or 80)
self._connector = None
self._done_d = defer.Deferred()
self._current_delay = self.INITIAL_DELAY
def _make_endpoint(self, hostname, port):
return endpoints.HostnameEndpoint(self._reactor, hostname, port)
# inputs from elsewhere
def d_callback(self, p):
self._p = p
self._m.d_callback()
def d_errback(self, f):
self._f = f
self._m.d_errback()
def protocol_onOpen(self, p):
self._m.onOpen()
def protocol_onClose(self, wasClean, code, reason):
self._m.onClose()
def C_stop(self):
self._m.stop()
def timer_expired(self):
self._m.expire()
# outputs driven by the state machine
def ep_connect(self):
print("ep_connect()")
self._d = self._ep.connect(self._f)
self._d.addCallbacks(self.d_callback, self.d_errback)
def add_connection(self):
self._connection = WSConnection(ws, self._wormhole.appid,
self._wormhole.side, self)
self._wormhole.add_connection(self._connection)
def M_lost(self):
self._wormhole.M_lost(self._connection)
self._connection = None
def start_timer(self):
print("start_timer")
self._t = self._reactor.callLater(3.0, self.expire)
def cancel_timer(self):
print("cancel_timer")
self._t.cancel()
self._t = None
def dropConnection(self):
print("dropConnection")
self._ws.dropConnection()
def notify_fail(self):
print("notify_fail", self._f.value if self._f else None)
self._done_d.errback(self._f)
def MC_stopped(self):
pass
waiting.upon(expire, enter=connecting, outputs=[ep_connect])
if ALLOW_CLOSE:
waiting.upon(stop, enter=closed, outputs=[cancel_timer])
disconnecting.upon(onClose, enter=closed, outputs=[]) #MC_stopped
def tryit(reactor): def tryit(reactor):
cm = WSRelayClient(None, "ws://127.0.0.1:4000/v1", reactor) cm = WSRelayClient(None, "ws://127.0.0.1:4000/v1", reactor)
@ -447,9 +482,12 @@ class Wormhole:
# connection is established, we'll send the new ones. # connection is established, we'll send the new ones.
self._outbound_messages = [] self._outbound_messages = []
# these methods are called from outside
def start(self): def start(self):
self._relay_client.start() self._relay_client.start()
# and these are the state-machine transition functions, which don't take
# args
@m.state() @m.state()
def closed(initial=True): pass def closed(initial=True): pass
@m.state() @m.state()