diff --git a/src/wormhole/misc.py b/src/wormhole/misc.py deleted file mode 100644 index 808b70f..0000000 --- a/src/wormhole/misc.py +++ /dev/null @@ -1,489 +0,0 @@ - -from six.moves.urllib_parse import urlparse -from attr import attrs, attrib -from twisted.internet import defer, endpoints #, error -from autobahn.twisted import websocket -from automat import MethodicalMachine - -class WSClient(websocket.WebSocketClientProtocol): - def onConnect(self, response): - # this fires during WebSocket negotiation, and isn't very useful - # unless you want to modify the protocol settings - print("onConnect", response) - #self.connection_machine.onConnect(self) - - def onOpen(self, *args): - # this fires when the WebSocket is ready to go. No arguments - print("onOpen", args) - #self.wormhole_open = True - self.connection_machine.protocol_onOpen(self) - #self.factory.d.callback(self) - - def onMessage(self, payload, isBinary): - print("onMessage") - return - assert not isBinary - self.wormhole._ws_dispatch_response(payload) - - def onClose(self, wasClean, code, reason): - print("onClose") - self.connection_machine.protocol_onClose(wasClean, code, reason) - #if self.wormhole_open: - # self.wormhole._ws_closed(wasClean, code, reason) - #else: - # # we closed before establishing a connection (onConnect) or - # # finishing WebSocket negotiation (onOpen): errback - # self.factory.d.errback(error.ConnectError(reason)) - -class WSFactory(websocket.WebSocketClientFactory): - protocol = WSClient - def buildProtocol(self, addr): - proto = websocket.WebSocketClientFactory.buildProtocol(self, addr) - proto.connection_machine = self.connection_machine - #proto.wormhole_open = False - return proto - -# pip install (path to automat checkout)[visualize] -# automat-visualize wormhole._connection - -@attrs -class _WSRelayClient_Machine(object): - _c = attrib() - m = MethodicalMachine() - - @m.state(initial=True) - def initial(self): pass - @m.state() - def connecting(self): pass - @m.state() - def negotiating(self): pass - @m.state(terminal=True) - def failed(self): pass - @m.state() - def open(self): pass - @m.state() - def waiting(self): pass - @m.state() - def reconnecting(self): pass - @m.state() - def disconnecting(self): pass - @m.state() - def cancelling(self): pass - @m.state(terminal=True) - def closed(self): pass - - @m.input() - def start(self): pass ; print("input:start") - @m.input() - def d_callback(self): pass ; print("input:d_callback") - @m.input() - def d_errback(self): pass ; print("input:d_errback") - @m.input() - def d_cancel(self): pass ; print("input:d_cancel") - @m.input() - def onOpen(self): pass ; print("input:onOpen") - @m.input() - def onClose(self): pass ; print("input:onClose") - @m.input() - def expire(self): pass - @m.input() - def stop(self): pass - - # outputs - @m.output() - def ep_connect(self): - "ep.connect()" - self._c.ep_connect() - @m.output() - def reset_timer(self): - self._c.reset_timer() - @m.output() - def connection_established(self): - print("connection_established") - self._c.connection_established() - @m.output() - def M_lost(self): - self._c.M_lost() - @m.output() - def start_timer(self): - self._c.start_timer() - @m.output() - def cancel_timer(self): - self._c.cancel_timer() - @m.output() - def dropConnection(self): - self._c.dropConnection() - @m.output() - def notify_fail(self): - self._c.notify_fail() - @m.output() - def MC_stopped(self): - self._c.MC_stopped() - - initial.upon(start, enter=connecting, outputs=[ep_connect]) - connecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) - connecting.upon(d_errback, enter=failed, outputs=[notify_fail]) - connecting.upon(onClose, enter=failed, outputs=[notify_fail]) - connecting.upon(stop, enter=cancelling, outputs=[d_cancel]) - cancelling.upon(d_errback, enter=closed, outputs=[]) - - negotiating.upon(onOpen, enter=open, outputs=[connection_established]) - negotiating.upon(stop, enter=disconnecting, outputs=[dropConnection]) - negotiating.upon(onClose, enter=failed, outputs=[notify_fail]) - - open.upon(onClose, enter=waiting, outputs=[M_lost, start_timer]) - open.upon(stop, enter=disconnecting, outputs=[dropConnection, M_lost]) - reconnecting.upon(d_callback, enter=negotiating, outputs=[reset_timer]) - reconnecting.upon(d_errback, enter=waiting, outputs=[start_timer]) - reconnecting.upon(onClose, enter=waiting, outputs=[start_timer]) - reconnecting.upon(stop, enter=cancelling, outputs=[d_cancel]) - - waiting.upon(expire, enter=reconnecting, outputs=[ep_connect]) - waiting.upon(stop, enter=closed, outputs=[cancel_timer]) - disconnecting.upon(onClose, enter=closed, outputs=[MC_stopped]) - -# We have one WSRelayClient for each wsurl we know about, and it lasts -# as long as its parent Wormhole does. - -@attrs -class WSRelayClient(object): - _wormhole = attrib() - _mailbox = attrib() - _ws_url = attrib() - _reactor = attrib() - INITIAL_DELAY = 1.0 - - - def __init__(self): - self._m = _WSRelayClient_Machine(self) - self._f = f = WSFactory(self._ws_url) - f.setProtocolOptions(autoPingInterval=60, autoPingTimeout=600) - f.connection_machine = self # calls onOpen and onClose - p = urlparse(self._ws_url) - self._ep = self._make_endpoint(p.hostname, p.port or 80) - self._connector = None - self._done_d = defer.Deferred() - self._current_delay = self.INITIAL_DELAY - - def _make_endpoint(self, hostname, port): - return endpoints.HostnameEndpoint(self._reactor, hostname, port) - - # inputs from elsewhere - def d_callback(self, p): - self._p = p - self._m.d_callback() - def d_errback(self, f): - self._f = f - self._m.d_errback() - def protocol_onOpen(self, p): - self._m.onOpen() - def protocol_onClose(self, wasClean, code, reason): - self._m.onClose() - def C_stop(self): - self._m.stop() - def timer_expired(self): - self._m.expire() - - # outputs driven by the state machine - def ep_connect(self): - print("ep_connect()") - self._d = self._ep.connect(self._f) - self._d.addCallbacks(self.d_callback, self.d_errback) - def connection_established(self): - self._connection = WSConnection(ws, self._wormhole.appid, - self._wormhole.side, self) - self._mailbox.connected(ws) - self._wormhole.add_connection(self._connection) - self._ws_send_command("bind", appid=self._appid, side=self._side) - def M_lost(self): - self._wormhole.M_lost(self._connection) - self._connection = None - def start_timer(self): - print("start_timer") - self._t = self._reactor.callLater(3.0, self.expire) - def cancel_timer(self): - print("cancel_timer") - self._t.cancel() - self._t = None - def dropConnection(self): - print("dropConnection") - self._ws.dropConnection() - def notify_fail(self): - print("notify_fail", self._f.value if self._f else None) - self._done_d.errback(self._f) - def MC_stopped(self): - pass - - -def tryit(reactor): - cm = WSRelayClient(None, "ws://127.0.0.1:4000/v1", reactor) - print("_ConnectionMachine created") - print("start:", cm.start()) - print("waiting on _done_d to finish") - return cm._done_d - -# http://autobahn-python.readthedocs.io/en/latest/websocket/programming.html -# observed sequence of events: -# success: d_callback, onConnect(response), onOpen(), onMessage() -# negotifail (non-websocket): d_callback, onClose() -# noconnect: d_errback - -def tryws(reactor): - ws_url = "ws://127.0.0.1:40001/v1" - f = WSFactory(ws_url) - p = urlparse(ws_url) - ep = endpoints.HostnameEndpoint(reactor, p.hostname, p.port or 80) - d = ep.connect(f) - def _good(p): print("_good", p) - def _bad(f): print("_bad", f) - d.addCallbacks(_good, _bad) - return defer.Deferred() - -if __name__ == "__main__": - import sys - from twisted.python import log - log.startLogging(sys.stdout) - from twisted.internet.task import react - react(tryit) - -# ??? a new WSConnection is created each time the WSRelayClient gets through -# negotiation - -class NameplateListingMachine(object): - m = MethodicalMachine() - def __init__(self): - self._list_nameplate_waiters = [] - - # Ideally, each API request would spawn a new "list_nameplates" message - # to the server, so the response would be maximally fresh, but that would - # require correlating server request+response messages, and the protocol - # is intended to be less stateful than that. So we offer a weaker - # freshness property: if no server requests are in flight, then a new API - # request will provoke a new server request, and the result will be - # fresh. But if a server request is already in flight when a second API - # request arrives, both requests will be satisfied by the same response. - - @m.state(initial=True) - def idle(self): pass - @m.state() - def requesting(self): pass - - @m.input() - def list_nameplates(self): pass # returns Deferred - @m.input() - def response(self, message): pass - - @m.output() - def add_deferred(self): - d = defer.Deferred() - self._list_nameplate_waiters.append(d) - return d - @m.output() - def send_request(self): - self._connection.send_command("list") - @m.output() - def distribute_response(self, message): - nameplates = parse(message) - waiters = self._list_nameplate_waiters - self._list_nameplate_waiters = [] - for d in waiters: - d.callback(nameplates) - - idle.upon(list_nameplates, enter=requesting, - outputs=[add_deferred, send_request], - collector=lambda outs: outs[0]) - idle.upon(response, enter=idle, outputs=[]) - requesting.upon(list_nameplates, enter=requesting, - outputs=[add_deferred], - collector=lambda outs: outs[0]) - requesting.upon(response, enter=idle, outputs=[distribute_response]) - - # nlm._connection = c = Connection(ws) - # nlm.list_nameplates().addCallback(display_completions) - # c.register_dispatch("nameplates", nlm.response) - -class Wormhole: - m = MethodicalMachine() - - def __init__(self, ws_url, reactor): - self._relay_client = WSRelayClient(self, ws_url, reactor) - # This records all the messages we want the relay to have. Each time - # we establish a connection, we'll send them all (and the relay - # server will filter out duplicates). If we add any while a - # connection is established, we'll send the new ones. - self._outbound_messages = [] - - # these methods are called from outside - def start(self): - self._relay_client.start() - - # and these are the state-machine transition functions, which don't take - # args - @m.state() - def closed(initial=True): pass - @m.state() - def know_code_not_mailbox(): pass - @m.state() - def know_code_and_mailbox(): pass # no longer need nameplate - @m.state() - def waiting_first_msg(): pass # key is established, want any message - @m.state() - def processing_version(): pass - @m.state() - def processing_phase(): pass - @m.state() - def open(): pass # key is verified, can post app messages - @m.state(terminal=True) - def failed(): pass - - @m.input() - def deliver_message(self, message): pass - - def w_set_seed(self, code, mailbox): - """Call w_set_seed when we sprout a Wormhole Seed, which - contains both the code and the mailbox""" - self.w_set_code(code) - self.w_set_mailbox(mailbox) - - @m.input() - def w_set_code(self, code): - """Call w_set_code when you learn the code, probably because the user - typed it in.""" - @m.input() - def w_set_mailbox(self, mailbox): - """Call w_set_mailbox() when you learn the mailbox id, from the - response to claim_nameplate""" - pass - - - @m.input() - def rx_pake(self, pake): pass # reponse["message"][phase=pake] - - @m.input() - def rx_version(self, version): # response["message"][phase=version] - pass - @m.input() - def verify_good(self, verifier): pass - @m.input() - def verify_bad(self, f): pass - - @m.input() - def rx_phase(self, message): pass - @m.input() - def phase_good(self, message): pass - @m.input() - def phase_bad(self, f): pass - - @m.output() - def compute_and_post_pake(self, code): - self._code = code - self._pake = compute(code) - self._post(pake=self._pake) - self._ws_send_command("add", phase="pake", body=XXX(pake)) - @m.output() - def set_mailbox(self, mailbox): - self._mailbox = mailbox - @m.output() - def set_seed(self, code, mailbox): - self._code = code - self._mailbox = mailbox - - @m.output() - def process_version(self, version): # response["message"][phase=version] - their_verifier = com - if OK: - self.verify_good(verifier) - else: - self.verify_bad(f) - pass - - @m.output() - def notify_verified(self, verifier): - for d in self._verify_waiters: - d.callback(verifier) - @m.output() - def notify_failed(self, f): - for d in self._verify_waiters: - d.errback(f) - - @m.output() - def process_phase(self, message): # response["message"][phase=version] - their_verifier = com - if OK: - self.verify_good(verifier) - else: - self.verify_bad(f) - pass - - @m.output() - def post_inbound(self, message): - pass - - @m.output() - def deliver_message(self, message): - self._qc.deliver_message(message) - - @m.output() - def compute_key_and_post_version(self, pake): - self._key = x - self._verifier = x - plaintext = dict_to_bytes(self._my_versions) - phase = "version" - data_key = self._derive_phase_key(self._side, phase) - encrypted = self._encrypt_data(data_key, plaintext) - self._msg_send(phase, encrypted) - - closed.upon(w_set_code, enter=know_code_not_mailbox, - outputs=[compute_and_post_pake]) - know_code_not_mailbox.upon(w_set_mailbox, enter=know_code_and_mailbox, - outputs=[set_mailbox]) - know_code_and_mailbox.upon(rx_pake, enter=waiting_first_msg, - outputs=[compute_key_and_post_version]) - waiting_first_msg.upon(rx_version, enter=processing_version, - outputs=[process_version]) - processing_version.upon(verify_good, enter=open, outputs=[notify_verified]) - processing_version.upon(verify_bad, enter=failed, outputs=[notify_failed]) - open.upon(rx_phase, enter=processing_phase, outputs=[process_phase]) - processing_phase.upon(phase_good, enter=open, outputs=[post_inbound]) - processing_phase.upon(phase_bad, enter=failed, outputs=[notify_failed]) - -class QueueConnect: - m = MethodicalMachine() - def __init__(self): - self._outbound_messages = [] - self._connection = None - @m.state() - def disconnected(): pass - @m.state() - def connected(): pass - - @m.input() - def deliver_message(self, message): pass - @m.input() - def connect(self, connection): pass - @m.input() - def disconnect(self): pass - - @m.output() - def remember_connection(self, connection): - self._connection = connection - @m.output() - def forget_connection(self): - self._connection = None - @m.output() - def queue_message(self, message): - self._outbound_messages.append(message) - @m.output() - def send_message(self, message): - self._connection.send(message) - @m.output() - def send_queued_messages(self, connection): - for m in self._outbound_messages: - connection.send(m) - - disconnected.upon(deliver_message, enter=disconnected, outputs=[queue_message]) - disconnected.upon(connect, enter=connected, outputs=[remember_connection, - send_queued_messages]) - connected.upon(deliver_message, enter=connected, - outputs=[queue_message, send_message]) - connected.upon(disconnect, enter=disconnected, outputs=[forget_connection]) diff --git a/src/wormhole/wormhole.py b/src/wormhole/wormhole.py index ec6b8e1..15023ba 100644 --- a/src/wormhole/wormhole.py +++ b/src/wormhole/wormhole.py @@ -1,4 +1,6 @@ from __future__ import print_function, absolute_import, unicode_literals +import sys +from .timing import DebugTiming from .journal import ImmediateJournal def wormhole(appid, relay_url, reactor, tor_manager=None, timing=None, @@ -33,7 +35,7 @@ class _Wormhole(_JournaledWormhole): def __init__(self, reactor): _JournaledWormhole.__init__(self, reactor, ImmediateJournal(), self) -def wormhole(reactor): +def wormhole2(reactor): w = _Wormhole(reactor) w.startService() return w