magic-wormhole/src/wormhole/_receive.py

73 lines
2.4 KiB
Python
Raw Normal View History

from zope.interface import implementer
from automat import MethodicalMachine
from . import _interfaces
from ._key import derive_phase_key, decrypt_data, CryptoError
@implementer(_interfaces.IReceive)
class Receive(object):
m = MethodicalMachine()
def __init__(self, side, timing):
self._side = side
self._timing = timing
self._key = None
def wire(self, wormhole, key, send):
self._W = _interfaces.IWormhole(wormhole)
self._K = _interfaces.IKey(key)
self._S = _interfaces.ISend(send)
@m.state(initial=True)
def S0_unknown_key(self): pass
@m.state()
def S1_unverified_key(self): pass
@m.state()
def S2_verified_key(self): pass
@m.state(terminal=True)
def S3_scared(self): pass
def got_message(self, phase, payload):
assert self._key
data_key = derive_phase_key(self._side, phase)
try:
plaintext = decrypt_data(data_key, body)
except CryptoError:
self.got_message_bad()
return
self.got_message_good(phase, plaintext)
@m.input()
def got_key(self, key): pass
@m.input()
def got_message_good(self, phase, plaintext): pass
@m.input()
def got_message_bad(self): pass
@m.output()
def record_key(self, key):
self._key = key
@m.output()
def S_got_verified_key(self, phase, plaintext):
assert self._key
self._S.got_verified_key(self._key)
@m.output()
def W_happy(self, phase, plaintext):
self._W.happy()
@m.output()
def W_got_message(self, phase, plaintext):
self._W.got_message(phase, plaintext)
@m.output()
def W_scared(self):
self._W.scared()
S0_unknown_key.upon(got_key, enter=S1_unverified_key, outputs=[record_key])
S1_unverified_key.upon(got_message_good, enter=S2_verified_key,
outputs=[S_got_verified_key, W_happy, W_got_message])
S1_unverified_key.upon(got_message_bad, enter=S3_scared,
outputs=[W_scared])
S2_verified_key.upon(got_message_bad, enter=S3_scared,
outputs=[W_scared])
S2_verified_key.upon(got_message_good, enter=S2_verified_key,
outputs=[W_got_message])
S3_scared.upon(got_message_good, enter=S3_scared, outputs=[])
S3_scared.upon(got_message_bad, enter=S3_scared, outputs=[])