2017-02-24 02:11:07 +00:00
|
|
|
from __future__ import print_function, absolute_import, unicode_literals
|
2017-02-22 19:26:11 +00:00
|
|
|
from zope.interface import implementer
|
2017-02-23 00:56:39 +00:00
|
|
|
from attr import attrs, attrib
|
|
|
|
from attr.validators import provides, instance_of
|
2017-02-22 19:26:11 +00:00
|
|
|
from automat import MethodicalMachine
|
|
|
|
from . import _interfaces
|
|
|
|
from ._key import derive_phase_key, decrypt_data, CryptoError
|
|
|
|
|
2017-02-23 00:56:39 +00:00
|
|
|
@attrs
|
2017-02-22 19:26:11 +00:00
|
|
|
@implementer(_interfaces.IReceive)
|
|
|
|
class Receive(object):
|
2017-02-23 00:56:39 +00:00
|
|
|
_side = attrib(validator=instance_of(type(u"")))
|
|
|
|
_timing = attrib(validator=provides(_interfaces.ITiming))
|
2017-02-22 19:26:11 +00:00
|
|
|
m = MethodicalMachine()
|
2017-04-04 01:40:55 +00:00
|
|
|
set_trace = getattr(m, "setTrace", lambda self, f: None)
|
2017-02-23 00:56:39 +00:00
|
|
|
|
2017-02-24 02:11:07 +00:00
|
|
|
def __attrs_post_init__(self):
|
2017-02-22 19:26:11 +00:00
|
|
|
self._key = None
|
2017-02-23 00:56:39 +00:00
|
|
|
|
2017-03-03 07:59:45 +00:00
|
|
|
def wire(self, boss, send):
|
2017-02-22 21:45:18 +00:00
|
|
|
self._B = _interfaces.IBoss(boss)
|
2017-02-22 19:26:11 +00:00
|
|
|
self._S = _interfaces.ISend(send)
|
|
|
|
|
|
|
|
@m.state(initial=True)
|
2017-03-03 07:59:24 +00:00
|
|
|
def S0_unknown_key(self): pass # pragma: no cover
|
2017-02-22 19:26:11 +00:00
|
|
|
@m.state()
|
2017-03-03 07:59:24 +00:00
|
|
|
def S1_unverified_key(self): pass # pragma: no cover
|
2017-02-22 19:26:11 +00:00
|
|
|
@m.state()
|
2017-03-03 07:59:24 +00:00
|
|
|
def S2_verified_key(self): pass # pragma: no cover
|
2017-02-22 19:26:11 +00:00
|
|
|
@m.state(terminal=True)
|
2017-03-03 07:59:24 +00:00
|
|
|
def S3_scared(self): pass # pragma: no cover
|
2017-02-22 19:26:11 +00:00
|
|
|
|
2017-02-22 21:45:18 +00:00
|
|
|
# from Ordering
|
2017-02-26 12:13:57 +00:00
|
|
|
def got_message(self, side, phase, body):
|
|
|
|
assert isinstance(side, type("")), type(phase)
|
2017-02-22 20:51:53 +00:00
|
|
|
assert isinstance(phase, type("")), type(phase)
|
|
|
|
assert isinstance(body, type(b"")), type(body)
|
2017-02-22 19:26:11 +00:00
|
|
|
assert self._key
|
2017-02-26 12:13:57 +00:00
|
|
|
data_key = derive_phase_key(self._key, side, phase)
|
2017-02-22 19:26:11 +00:00
|
|
|
try:
|
|
|
|
plaintext = decrypt_data(data_key, body)
|
|
|
|
except CryptoError:
|
|
|
|
self.got_message_bad()
|
|
|
|
return
|
|
|
|
self.got_message_good(phase, plaintext)
|
|
|
|
@m.input()
|
|
|
|
def got_message_good(self, phase, plaintext): pass
|
|
|
|
@m.input()
|
|
|
|
def got_message_bad(self): pass
|
|
|
|
|
2017-02-22 21:45:18 +00:00
|
|
|
# from Key
|
|
|
|
@m.input()
|
|
|
|
def got_key(self, key): pass
|
|
|
|
|
2017-02-22 19:26:11 +00:00
|
|
|
@m.output()
|
|
|
|
def record_key(self, key):
|
|
|
|
self._key = key
|
|
|
|
@m.output()
|
|
|
|
def S_got_verified_key(self, phase, plaintext):
|
|
|
|
assert self._key
|
|
|
|
self._S.got_verified_key(self._key)
|
|
|
|
@m.output()
|
|
|
|
def W_happy(self, phase, plaintext):
|
2017-02-22 21:45:18 +00:00
|
|
|
self._B.happy()
|
2017-02-22 19:26:11 +00:00
|
|
|
@m.output()
|
|
|
|
def W_got_message(self, phase, plaintext):
|
2017-02-22 20:51:53 +00:00
|
|
|
assert isinstance(phase, type("")), type(phase)
|
|
|
|
assert isinstance(plaintext, type(b"")), type(plaintext)
|
2017-02-22 21:45:18 +00:00
|
|
|
self._B.got_message(phase, plaintext)
|
2017-02-22 19:26:11 +00:00
|
|
|
@m.output()
|
|
|
|
def W_scared(self):
|
2017-02-22 21:45:18 +00:00
|
|
|
self._B.scared()
|
2017-02-22 19:26:11 +00:00
|
|
|
|
|
|
|
S0_unknown_key.upon(got_key, enter=S1_unverified_key, outputs=[record_key])
|
|
|
|
S1_unverified_key.upon(got_message_good, enter=S2_verified_key,
|
|
|
|
outputs=[S_got_verified_key, W_happy, W_got_message])
|
|
|
|
S1_unverified_key.upon(got_message_bad, enter=S3_scared,
|
|
|
|
outputs=[W_scared])
|
|
|
|
S2_verified_key.upon(got_message_bad, enter=S3_scared,
|
|
|
|
outputs=[W_scared])
|
|
|
|
S2_verified_key.upon(got_message_good, enter=S2_verified_key,
|
|
|
|
outputs=[W_got_message])
|
|
|
|
S3_scared.upon(got_message_good, enter=S3_scared, outputs=[])
|
|
|
|
S3_scared.upon(got_message_bad, enter=S3_scared, outputs=[])
|
|
|
|
|