more demo work

This commit is contained in:
Brian Warner 2017-02-13 23:12:57 -08:00
parent 693e215d8b
commit 16c477424c
5 changed files with 183 additions and 80 deletions

View File

@ -32,17 +32,19 @@ digraph {
M_P1A_queue -> M_S1A [style="dotted"]
{rank=same; M_S2B M_S2A M_P2_claim}
M_S1A -> M_S2A [style="invis"]
M_S2A -> M_S3A [style="invis"]
M_S2A [label="S2A:\nmaybe claimed"]
M_S2B [label="S2B:\nmaybe claimed\n(bound)" color="orange"]
M_S2B -> M_SrB [label="M_close()" style="dashed"]
M_SrB [label="SrB" style="dashed"]
M_S2A -> M_SrA [label="M_close()" style="dashed"]
M_SrA [label="SrA" style="dashed"]
#M_S2B -> M_SrB [label="M_close()" style="dashed"]
#M_SrB [label="SrB" style="dashed"]
#M_S2A -> M_SrA [label="M_close()" style="dashed"]
#M_SrA [label="SrA" style="dashed"]
M_S2A -> M_P2_claim [label="M_connected()"]
M_S2B -> M_S2A [label="M_lost()"]
#M_S2B -> M_S2A [label="M_lost()"] # causes bad layout
M_S2B -> foo [label="M_lost()"]
foo [label="" style="dashed"]
foo -> M_S2A
M_P2_claim [shape="box" label="tx claim" color="orange"]
M_P2_claim -> M_S2B [color="orange"]
M_S2A -> M_P2C_queue [label="M_send(msg)" style="dotted"]

View File

@ -41,18 +41,17 @@ digraph {
MC_Ss [label="Ss: closed" color="green"]
MC_S0A [label="S0A" style="dashed"]
MC_S0A -> MC_P_stop [style="dashed"]
MC_S0B [label="S0B" style="dashed" color="orange"]
MC_S0B -> MC_P_stop [style="dashed" color="orange"]
{rank=same; MC_S2A MC_S2B MC_S2C MC_S1A MC_S1B MC_S3A MC_S3B MC_S4A MC_S4B MC_S5A MC_S5B}
{rank=same; MC_S2A MC_S2B MC_S3A MC_S3B MC_S4A MC_S4B MC_S5A MC_S5B}
MC_S1A [label="S1A" style="dashed"]
MC_S1A -> MC_P_stop [style="dashed"]
MC_S1B [label="S1B" color="orange" style="dashed"]
MC_S1B -> MC_P_stop [style="dashed" color="orange"]
MC_S2C -> MC_S2A [style="invis"]
MC_S2A [label="S2A" style="dashed"]
MC_S2A -> MC_P_stop [style="dashed"]
MC_S2C [label="S2C" style="dashed"]
MC_S2C -> MC_SrA [style="dashed"]
MC_S2A -> MC_SrA [style="dashed"]
MC_S2B [label="S2B" color="orange" style="dashed"]
MC_S2B -> MC_Pr [color="orange" style="dashed"]

View File

@ -2,7 +2,15 @@ import os, sys, json
from twisted.internet import task, defer, endpoints
from twisted.application import service, internet
from twisted.web import server, static, resource
from wormhole import journal
from wormhole import journal, wormhole
# considerations for state management:
# * be somewhat principled about the data (e.g. have a schema)
# * discourage accidental schema changes
# * avoid surprise mutations by app code (don't hand out mutables)
# * discourage app from keeping state itself: make state object easy enough
# to use for everything. App should only hold objects that are active
# (Services, subscribers, etc). App must wire up these objects each time.
class State(object):
@classmethod
@ -118,7 +126,8 @@ class Agent(service.MultiService):
w = wormhole.journaled_from_data(invitation_state["wormhole"],
reactor=self._reactor,
journal=self._jm,
event_handler=_dispatch)
event_handler=self,
event_handler_args=(iid,))
self._wormholes[iid] = w
w.setServiceParent(self)
@ -134,13 +143,18 @@ class Agent(service.MultiService):
def _invite(self, args):
print "invite", args
petname = args["petname"]
# it'd be better to use a unique object for the event_handler
# correlation, but we can't store them into the state database. I'm
# not 100% sure we need one for the database: maybe it should hold a
# list instead, and assign lookup keys at runtime. If they really
# need to be serializable, they should be allocated rather than
# random.
iid = random.randint(1,1000)
my_pubkey = random.randint(1,1000)
with self._jm.process():
def _dispatch(event, *args, **kwargs):
self._dispatch_wormhole_event(iid, event, *args, **kwargs)
w = wormhole.journaled(reactor=self._reactor,
journal=self._jm, event_handler=_dispatch)
w = wormhole.journaled(reactor=self._reactor, journal=self._jm,
event_handler=self,
event_handler_args=(iid,))
self._wormholes[iid] = w
w.setServiceParent(self)
w.get_code() # event_handler means code returns via callback
@ -158,10 +172,9 @@ class Agent(service.MultiService):
iid = random.randint(1,1000)
my_pubkey = random.randint(2,2000)
with self._jm.process():
def _dispatch(event, *args, **kwargs):
self._dispatch_wormhole_event(iid, event, *args, **kwargs)
w = wormhole.wormhole(reactor=self._reactor,
event_dispatcher=_dispatch)
w = wormhole.journaled(reactor=self._reactor, journal=self._jm,
event_dispatcher=self,
event_dispatcher_args=(iid,))
w.set_code(code)
md = {"my_pubkey": my_pubkey}
w.send(json.dumps(md).encode("utf-8"))
@ -172,29 +185,61 @@ class Agent(service.MultiService):
self._state.add_invitation(iid, invitation_state)
return b"ok"
def _dispatch_wormhole_event(self, iid, event, *args, **kwargs):
# dispatch options:
# * register one function, which takes (eventname, *args)
# * to handle multiple wormholes, app must give is a closure
# * register multiple functions (one per event type)
# * register an object, with well-known method names
# * extra: register args and/or kwargs with the callback
#
# events to dispatch:
# generated_code(code)
# got_verifier(verifier_bytes)
# verified()
# got_data(data_bytes)
# closed()
def wormhole_dispatch_got_code(self, code, iid):
# we're already in a jm.process() context
invitation_state = self._state.get_all_invitations()[iid]
if event == "got-code":
(code,) = args
invitation_state["code"] = code
self._state.update_invitation(iid, invitation_state)
self._wormholes[iid].set_code(code)
# notify UI subscribers to update the display
elif event == "got-data":
(data,) = args
md = json.loads(data.decode("utf-8"))
contact = {"petname": invitation_state["petname"],
"my_pubkey": invitation_state["my_pubkey"],
"their_pubkey": md["my_pubkey"],
}
self._state.add_contact(contact)
self._wormholes[iid].close()
elif event == "closed":
self._wormholes[iid].disownServiceParent()
del self._wormholes[iid]
self._state.remove_invitation(iid)
invitation_state["code"] = code
self._state.update_invitation(iid, invitation_state)
self._wormholes[iid].set_code(code)
# notify UI subscribers to update the display
def wormhole_dispatch_got_verifier(self, verifier, iid):
pass
def wormhole_dispatch_verified(self, _, iid):
pass
def wormhole_dispatch_got_data(self, data, iid):
invitation_state = self._state.get_all_invitations()[iid]
md = json.loads(data.decode("utf-8"))
contact = {"petname": invitation_state["petname"],
"my_pubkey": invitation_state["my_pubkey"],
"their_pubkey": md["my_pubkey"],
}
self._state.add_contact(contact)
self._wormholes[iid].close() # now waiting for "closed"
def wormhole_dispatch_closed(self, _, iid):
self._wormholes[iid].disownServiceParent()
del self._wormholes[iid]
self._state.remove_invitation(iid)
def handle_app_event(self, args, ack_f): # sample function
# Imagine here that the app has received a message (not
# wormhole-related) from some other server, and needs to act on it.
# Also imagine that ack_f() is how we tell the sender that they can
# stop sending the message, or how we ask our poller/subscriber
# client to send a DELETE message. If the process dies before ack_f()
# delivers whatever it needs to deliver, then in the next launch,
# handle_app_event() will be called again.
stuff = parse(args)
with self._jm.process():
update_my_state()
self._jm.queue_outbound(ack_f)
def create(reactor, basedir):
os.mkdir(basedir)

View File

@ -9,49 +9,66 @@ class _Mailbox_Machine(object):
@m.state(initial=True)
def initial(self): pass
@m.state()
def S1A(self): pass # know nothing, not connected
@m.state()
def S1B(self): pass # know nothing, yes connected
# all -A states: not connected
# all -B states: yes connected
# B states serialize as A, so they deserialize as unconnected
# S0: know nothing
@m.state()
def S2A(self): pass # not claimed, not connected
def S0A(self): pass
@m.state()
def S2B(self): pass # maybe claimed, yes connected
@m.state()
def S2C(self): pass # maybe claimed, not connected
def S0B(self): pass
# S1: nameplate known, not claimed
@m.state()
def S3A(self): pass # claimed, maybe opened, not connected
@m.state()
def S3B(self): pass # claimed, maybe opened, yes connected
def S1A(self): pass
# S2: nameplate known, maybe claimed
@m.state()
def S4A(self): pass # maybe released, maybe opened, not connected
def S2A(self): pass
@m.state()
def S4B(self): pass # maybe released, maybe opened, yes connected
def S2B(self): pass
# S3: nameplate claimed, mailbox known, maybe open
@m.state()
def S5A(self): pass # released, maybe open, not connected
def S3A(self): pass
@m.state()
def S5B(self): pass # released, maybe open, yes connected
def S3B(self): pass
# S4: mailbox maybe open, nameplate maybe released
# We've definitely opened the mailbox at least once, but it must be
# re-opened with each connection, because open() is also subscribe()
@m.state()
def SrcA(self): pass # waiting for release+close, not connected
def S4A(self): pass
@m.state()
def SrcB(self): pass # waiting for release+close, yes connected
def S4B(self): pass
# S5: mailbox maybe open, nameplate released
@m.state()
def SrA(self): pass # waiting for release, not connected
def S5A(self): pass
@m.state()
def SrB(self): pass # waiting for release, yes connected
def S5B(self): pass
# Src: waiting for release+close
@m.state()
def ScA(self): pass # waiting for close, not connected
def SrcA(self): pass
@m.state()
def ScB(self): pass # waiting for close, yes connected
def SrcB(self): pass
# Sr: closed (or never opened), waiting for release
@m.state()
def SsB(self): pass # closed, stopping
def SrA(self): pass
@m.state()
def Ss(self): pass # stopped
def SrB(self): pass
# Sc: released (or never claimed), waiting for close
@m.state()
def ScA(self): pass
@m.state()
def ScB(self): pass
# Ss: closed and released, waiting for stop
@m.state()
def SsB(self): pass
@m.state()
def Ss(self): pass # terminal
def connected(self, ws):
@ -110,26 +127,27 @@ class _Mailbox_Machine(object):
def C_stop(self): pass
initial.upon(M_start_connected, enter=S1A, outputs=[])
initial.upon(M_start_unconnected, enter=S1B, outputs=[])
S1A.upon(M_connected, enter=S1B, outputs=[])
S1A.upon(M_set_nameplate, enter=S2A, outputs=[])
S1A.upon(M_stop, enter=SsB, outputs=[C_stop])
S1B.upon(M_lost, enter=S1A, outputs=[])
S1B.upon(M_set_nameplate, enter=S2B, outputs=[tx_claim])
S1B.upon(M_stop, enter=SsB, outputs=[C_stop])
initial.upon(M_start_unconnected, enter=S0A, outputs=[])
initial.upon(M_start_connected, enter=S0B, outputs=[])
S0A.upon(M_connected, enter=S0B, outputs=[])
S0A.upon(M_set_nameplate, enter=S1A, outputs=[])
S0A.upon(M_stop, enter=SsB, outputs=[C_stop])
S0B.upon(M_lost, enter=S0A, outputs=[])
S0B.upon(M_set_nameplate, enter=S2B, outputs=[tx_claim])
S0B.upon(M_stop, enter=SsB, outputs=[C_stop])
S1A.upon(M_connected, enter=S2B, outputs=[tx_claim])
S1A.upon(M_send, enter=S1A, outputs=[queue])
S1A.upon(M_stop, enter=SrA, outputs=[])
S2A.upon(M_connected, enter=S2B, outputs=[tx_claim])
S2A.upon(M_stop, enter=SsB, outputs=[C_stop])
S2A.upon(M_send, enter=S2A, outputs=[queue])
S2B.upon(M_lost, enter=S2C, outputs=[])
S2B.upon(M_lost, enter=S2A, outputs=[])
S2B.upon(M_send, enter=S2B, outputs=[queue])
S2B.upon(M_stop, enter=SrB, outputs=[tx_release])
S2B.upon(M_rx_claimed, enter=S3B, outputs=[store_mailbox, tx_open,
tx_add_queued])
S2C.upon(M_connected, enter=S2B, outputs=[tx_claim])
S2C.upon(M_send, enter=S2C, outputs=[queue])
S2C.upon(M_stop, enter=SrA, outputs=[])
S3A.upon(M_connected, enter=S3B, outputs=[tx_open, tx_add_queued])
S3A.upon(M_send, enter=S3A, outputs=[queue])

View File

@ -919,3 +919,42 @@ def wormhole(appid, relay_url, reactor, tor_manager=None, timing=None,
# timing = timing or DebugTiming()
# w = _Wormhole.from_serialized(data, reactor, timing)
# return w
# considerations for activity management:
# * websocket to server wants to be a t.a.i.ClientService
# * if Wormhole is a MultiService:
# * makes it easier to chain the ClientService to it
# * implies that nothing will happen before w.startService()
# * implies everything stops upon d=w.stopService()
# * if not:
# *
class _JournaledWormhole(service.MultiService):
def __init__(self, reactor, journal_manager, event_dispatcher,
event_dispatcher_args=()):
pass
class ImmediateJM(object):
def queue_outbound(self, fn, *args, **kwargs):
fn(*args, **kwargs)
@contextlib.contextmanager
def process(self):
yield
class _Wormhole(_JournaledWormhole):
# send events to self, deliver them via Deferreds
def __init__(self, reactor):
_JournaledWormhole.__init__(self, reactor, ImmediateJM(), self)
def wormhole(reactor):
w = _Wormhole(reactor)
w.startService()
return w
def journaled_from_data(state, reactor, journal,
event_handler, event_handler_args=()):
pass
def journaled(reactor, journal, event_handler, event_handler_args()):
pass