diff --git a/docs/w2.dot b/docs/w2.dot index cede4be..f56f3a8 100644 --- a/docs/w2.dot +++ b/docs/w2.dot @@ -32,17 +32,19 @@ digraph { M_P1A_queue -> M_S1A [style="dotted"] {rank=same; M_S2B M_S2A M_P2_claim} - M_S1A -> M_S2A [style="invis"] - M_S2A -> M_S3A [style="invis"] M_S2A [label="S2A:\nmaybe claimed"] M_S2B [label="S2B:\nmaybe claimed\n(bound)" color="orange"] - M_S2B -> M_SrB [label="M_close()" style="dashed"] - M_SrB [label="SrB" style="dashed"] - M_S2A -> M_SrA [label="M_close()" style="dashed"] - M_SrA [label="SrA" style="dashed"] + #M_S2B -> M_SrB [label="M_close()" style="dashed"] + #M_SrB [label="SrB" style="dashed"] + #M_S2A -> M_SrA [label="M_close()" style="dashed"] + #M_SrA [label="SrA" style="dashed"] M_S2A -> M_P2_claim [label="M_connected()"] - M_S2B -> M_S2A [label="M_lost()"] + #M_S2B -> M_S2A [label="M_lost()"] # causes bad layout + M_S2B -> foo [label="M_lost()"] + foo [label="" style="dashed"] + foo -> M_S2A + M_P2_claim [shape="box" label="tx claim" color="orange"] M_P2_claim -> M_S2B [color="orange"] M_S2A -> M_P2C_queue [label="M_send(msg)" style="dotted"] diff --git a/docs/w3.dot b/docs/w3.dot index 7cb6d61..9da7688 100644 --- a/docs/w3.dot +++ b/docs/w3.dot @@ -41,18 +41,17 @@ digraph { MC_Ss [label="Ss: closed" color="green"] + MC_S0A [label="S0A" style="dashed"] + MC_S0A -> MC_P_stop [style="dashed"] + MC_S0B [label="S0B" style="dashed" color="orange"] + MC_S0B -> MC_P_stop [style="dashed" color="orange"] - {rank=same; MC_S2A MC_S2B MC_S2C MC_S1A MC_S1B MC_S3A MC_S3B MC_S4A MC_S4B MC_S5A MC_S5B} + {rank=same; MC_S2A MC_S2B MC_S3A MC_S3B MC_S4A MC_S4B MC_S5A MC_S5B} MC_S1A [label="S1A" style="dashed"] MC_S1A -> MC_P_stop [style="dashed"] - MC_S1B [label="S1B" color="orange" style="dashed"] - MC_S1B -> MC_P_stop [style="dashed" color="orange"] - MC_S2C -> MC_S2A [style="invis"] MC_S2A [label="S2A" style="dashed"] - MC_S2A -> MC_P_stop [style="dashed"] - MC_S2C [label="S2C" style="dashed"] - MC_S2C -> MC_SrA [style="dashed"] + MC_S2A -> MC_SrA [style="dashed"] MC_S2B [label="S2B" color="orange" style="dashed"] MC_S2B -> MC_Pr [color="orange" style="dashed"] diff --git a/misc/demo-journal.py b/misc/demo-journal.py index 8211256..ff76636 100644 --- a/misc/demo-journal.py +++ b/misc/demo-journal.py @@ -2,7 +2,15 @@ import os, sys, json from twisted.internet import task, defer, endpoints from twisted.application import service, internet from twisted.web import server, static, resource -from wormhole import journal +from wormhole import journal, wormhole + +# considerations for state management: +# * be somewhat principled about the data (e.g. have a schema) +# * discourage accidental schema changes +# * avoid surprise mutations by app code (don't hand out mutables) +# * discourage app from keeping state itself: make state object easy enough +# to use for everything. App should only hold objects that are active +# (Services, subscribers, etc). App must wire up these objects each time. class State(object): @classmethod @@ -118,7 +126,8 @@ class Agent(service.MultiService): w = wormhole.journaled_from_data(invitation_state["wormhole"], reactor=self._reactor, journal=self._jm, - event_handler=_dispatch) + event_handler=self, + event_handler_args=(iid,)) self._wormholes[iid] = w w.setServiceParent(self) @@ -134,13 +143,18 @@ class Agent(service.MultiService): def _invite(self, args): print "invite", args petname = args["petname"] + # it'd be better to use a unique object for the event_handler + # correlation, but we can't store them into the state database. I'm + # not 100% sure we need one for the database: maybe it should hold a + # list instead, and assign lookup keys at runtime. If they really + # need to be serializable, they should be allocated rather than + # random. iid = random.randint(1,1000) my_pubkey = random.randint(1,1000) with self._jm.process(): - def _dispatch(event, *args, **kwargs): - self._dispatch_wormhole_event(iid, event, *args, **kwargs) - w = wormhole.journaled(reactor=self._reactor, - journal=self._jm, event_handler=_dispatch) + w = wormhole.journaled(reactor=self._reactor, journal=self._jm, + event_handler=self, + event_handler_args=(iid,)) self._wormholes[iid] = w w.setServiceParent(self) w.get_code() # event_handler means code returns via callback @@ -158,10 +172,9 @@ class Agent(service.MultiService): iid = random.randint(1,1000) my_pubkey = random.randint(2,2000) with self._jm.process(): - def _dispatch(event, *args, **kwargs): - self._dispatch_wormhole_event(iid, event, *args, **kwargs) - w = wormhole.wormhole(reactor=self._reactor, - event_dispatcher=_dispatch) + w = wormhole.journaled(reactor=self._reactor, journal=self._jm, + event_dispatcher=self, + event_dispatcher_args=(iid,)) w.set_code(code) md = {"my_pubkey": my_pubkey} w.send(json.dumps(md).encode("utf-8")) @@ -172,29 +185,61 @@ class Agent(service.MultiService): self._state.add_invitation(iid, invitation_state) return b"ok" - def _dispatch_wormhole_event(self, iid, event, *args, **kwargs): + # dispatch options: + # * register one function, which takes (eventname, *args) + # * to handle multiple wormholes, app must give is a closure + # * register multiple functions (one per event type) + # * register an object, with well-known method names + # * extra: register args and/or kwargs with the callback + # + # events to dispatch: + # generated_code(code) + # got_verifier(verifier_bytes) + # verified() + # got_data(data_bytes) + # closed() + + def wormhole_dispatch_got_code(self, code, iid): # we're already in a jm.process() context invitation_state = self._state.get_all_invitations()[iid] - if event == "got-code": - (code,) = args - invitation_state["code"] = code - self._state.update_invitation(iid, invitation_state) - self._wormholes[iid].set_code(code) - # notify UI subscribers to update the display - elif event == "got-data": - (data,) = args - md = json.loads(data.decode("utf-8")) - contact = {"petname": invitation_state["petname"], - "my_pubkey": invitation_state["my_pubkey"], - "their_pubkey": md["my_pubkey"], - } - self._state.add_contact(contact) - self._wormholes[iid].close() - elif event == "closed": - self._wormholes[iid].disownServiceParent() - del self._wormholes[iid] - self._state.remove_invitation(iid) - + invitation_state["code"] = code + self._state.update_invitation(iid, invitation_state) + self._wormholes[iid].set_code(code) + # notify UI subscribers to update the display + + def wormhole_dispatch_got_verifier(self, verifier, iid): + pass + def wormhole_dispatch_verified(self, _, iid): + pass + + def wormhole_dispatch_got_data(self, data, iid): + invitation_state = self._state.get_all_invitations()[iid] + md = json.loads(data.decode("utf-8")) + contact = {"petname": invitation_state["petname"], + "my_pubkey": invitation_state["my_pubkey"], + "their_pubkey": md["my_pubkey"], + } + self._state.add_contact(contact) + self._wormholes[iid].close() # now waiting for "closed" + + def wormhole_dispatch_closed(self, _, iid): + self._wormholes[iid].disownServiceParent() + del self._wormholes[iid] + self._state.remove_invitation(iid) + + + def handle_app_event(self, args, ack_f): # sample function + # Imagine here that the app has received a message (not + # wormhole-related) from some other server, and needs to act on it. + # Also imagine that ack_f() is how we tell the sender that they can + # stop sending the message, or how we ask our poller/subscriber + # client to send a DELETE message. If the process dies before ack_f() + # delivers whatever it needs to deliver, then in the next launch, + # handle_app_event() will be called again. + stuff = parse(args) + with self._jm.process(): + update_my_state() + self._jm.queue_outbound(ack_f) def create(reactor, basedir): os.mkdir(basedir) diff --git a/src/wormhole/_mailbox.py b/src/wormhole/_mailbox.py index c03e252..2c298b7 100644 --- a/src/wormhole/_mailbox.py +++ b/src/wormhole/_mailbox.py @@ -9,49 +9,66 @@ class _Mailbox_Machine(object): @m.state(initial=True) def initial(self): pass - @m.state() - def S1A(self): pass # know nothing, not connected - @m.state() - def S1B(self): pass # know nothing, yes connected + # all -A states: not connected + # all -B states: yes connected + # B states serialize as A, so they deserialize as unconnected + # S0: know nothing @m.state() - def S2A(self): pass # not claimed, not connected + def S0A(self): pass @m.state() - def S2B(self): pass # maybe claimed, yes connected - @m.state() - def S2C(self): pass # maybe claimed, not connected + def S0B(self): pass + # S1: nameplate known, not claimed @m.state() - def S3A(self): pass # claimed, maybe opened, not connected - @m.state() - def S3B(self): pass # claimed, maybe opened, yes connected + def S1A(self): pass + # S2: nameplate known, maybe claimed @m.state() - def S4A(self): pass # maybe released, maybe opened, not connected + def S2A(self): pass @m.state() - def S4B(self): pass # maybe released, maybe opened, yes connected + def S2B(self): pass + # S3: nameplate claimed, mailbox known, maybe open @m.state() - def S5A(self): pass # released, maybe open, not connected + def S3A(self): pass @m.state() - def S5B(self): pass # released, maybe open, yes connected + def S3B(self): pass + # S4: mailbox maybe open, nameplate maybe released + # We've definitely opened the mailbox at least once, but it must be + # re-opened with each connection, because open() is also subscribe() @m.state() - def SrcA(self): pass # waiting for release+close, not connected + def S4A(self): pass @m.state() - def SrcB(self): pass # waiting for release+close, yes connected + def S4B(self): pass + + # S5: mailbox maybe open, nameplate released @m.state() - def SrA(self): pass # waiting for release, not connected + def S5A(self): pass @m.state() - def SrB(self): pass # waiting for release, yes connected + def S5B(self): pass + + # Src: waiting for release+close @m.state() - def ScA(self): pass # waiting for close, not connected + def SrcA(self): pass @m.state() - def ScB(self): pass # waiting for close, yes connected + def SrcB(self): pass + # Sr: closed (or never opened), waiting for release @m.state() - def SsB(self): pass # closed, stopping + def SrA(self): pass @m.state() - def Ss(self): pass # stopped + def SrB(self): pass + # Sc: released (or never claimed), waiting for close + @m.state() + def ScA(self): pass + @m.state() + def ScB(self): pass + # Ss: closed and released, waiting for stop + @m.state() + def SsB(self): pass + @m.state() + def Ss(self): pass # terminal def connected(self, ws): @@ -110,26 +127,27 @@ class _Mailbox_Machine(object): def C_stop(self): pass - initial.upon(M_start_connected, enter=S1A, outputs=[]) - initial.upon(M_start_unconnected, enter=S1B, outputs=[]) - S1A.upon(M_connected, enter=S1B, outputs=[]) - S1A.upon(M_set_nameplate, enter=S2A, outputs=[]) - S1A.upon(M_stop, enter=SsB, outputs=[C_stop]) - S1B.upon(M_lost, enter=S1A, outputs=[]) - S1B.upon(M_set_nameplate, enter=S2B, outputs=[tx_claim]) - S1B.upon(M_stop, enter=SsB, outputs=[C_stop]) + initial.upon(M_start_unconnected, enter=S0A, outputs=[]) + initial.upon(M_start_connected, enter=S0B, outputs=[]) + S0A.upon(M_connected, enter=S0B, outputs=[]) + S0A.upon(M_set_nameplate, enter=S1A, outputs=[]) + S0A.upon(M_stop, enter=SsB, outputs=[C_stop]) + S0B.upon(M_lost, enter=S0A, outputs=[]) + S0B.upon(M_set_nameplate, enter=S2B, outputs=[tx_claim]) + S0B.upon(M_stop, enter=SsB, outputs=[C_stop]) + + S1A.upon(M_connected, enter=S2B, outputs=[tx_claim]) + S1A.upon(M_send, enter=S1A, outputs=[queue]) + S1A.upon(M_stop, enter=SrA, outputs=[]) S2A.upon(M_connected, enter=S2B, outputs=[tx_claim]) S2A.upon(M_stop, enter=SsB, outputs=[C_stop]) S2A.upon(M_send, enter=S2A, outputs=[queue]) - S2B.upon(M_lost, enter=S2C, outputs=[]) + S2B.upon(M_lost, enter=S2A, outputs=[]) S2B.upon(M_send, enter=S2B, outputs=[queue]) S2B.upon(M_stop, enter=SrB, outputs=[tx_release]) S2B.upon(M_rx_claimed, enter=S3B, outputs=[store_mailbox, tx_open, tx_add_queued]) - S2C.upon(M_connected, enter=S2B, outputs=[tx_claim]) - S2C.upon(M_send, enter=S2C, outputs=[queue]) - S2C.upon(M_stop, enter=SrA, outputs=[]) S3A.upon(M_connected, enter=S3B, outputs=[tx_open, tx_add_queued]) S3A.upon(M_send, enter=S3A, outputs=[queue]) diff --git a/src/wormhole/wormhole.py b/src/wormhole/wormhole.py index ddcf290..4b9c5d9 100644 --- a/src/wormhole/wormhole.py +++ b/src/wormhole/wormhole.py @@ -919,3 +919,42 @@ def wormhole(appid, relay_url, reactor, tor_manager=None, timing=None, # timing = timing or DebugTiming() # w = _Wormhole.from_serialized(data, reactor, timing) # return w + + +# considerations for activity management: +# * websocket to server wants to be a t.a.i.ClientService +# * if Wormhole is a MultiService: +# * makes it easier to chain the ClientService to it +# * implies that nothing will happen before w.startService() +# * implies everything stops upon d=w.stopService() +# * if not: +# * + +class _JournaledWormhole(service.MultiService): + def __init__(self, reactor, journal_manager, event_dispatcher, + event_dispatcher_args=()): + pass + +class ImmediateJM(object): + def queue_outbound(self, fn, *args, **kwargs): + fn(*args, **kwargs) + @contextlib.contextmanager + def process(self): + yield + +class _Wormhole(_JournaledWormhole): + # send events to self, deliver them via Deferreds + def __init__(self, reactor): + _JournaledWormhole.__init__(self, reactor, ImmediateJM(), self) + +def wormhole(reactor): + w = _Wormhole(reactor) + w.startService() + return w + +def journaled_from_data(state, reactor, journal, + event_handler, event_handler_args=()): + pass + +def journaled(reactor, journal, event_handler, event_handler_args()): + pass