add tests for Send and Terminator

This commit is contained in:
Brian Warner 2017-03-19 18:38:35 +01:00
parent e66d2df9f1
commit 3a289f8912
2 changed files with 118 additions and 5 deletions

View File

@ -9,7 +9,7 @@ class Terminator(object):
@m.setTrace()
def set_trace(): pass # pragma: no cover
def __attrs_post_init__(self):
def __init__(self):
self._mood = None
def wire(self, boss, rendezvous_connector, nameplate, mailbox):

View File

@ -1,14 +1,17 @@
from __future__ import print_function, unicode_literals
import json
import mock
from zope.interface import directlyProvides, implementer
from twisted.trial import unittest
from .. import errors, timing, _order, _receive, _key, _code, _lister, _input, _allocator
from .. import (errors, timing, _order, _receive, _key, _code, _lister,
_input, _allocator, _send, _terminator)
from .._interfaces import (IKey, IReceive, IBoss, ISend, IMailbox,
IRendezvousConnector, ILister, IInput, IAllocator,
INameplate, ICode, IWordlist)
from .._key import derive_key, derive_phase_key, encrypt_data
from ..util import dict_to_bytes, hexstr_to_bytes, bytes_to_hexstr, to_bytes
from spake2 import SPAKE2_Symmetric
from nacl.secret import SecretBox
@implementer(IWordlist)
class FakeWordList(object):
@ -30,6 +33,58 @@ class Dummy:
self.events.append(("%s.%s" % (self.name, meth),) + args)
setattr(self, meth, log)
class Send(unittest.TestCase):
def build(self):
events = []
s = _send.Send(u"side", timing.DebugTiming())
m = Dummy("m", events, IMailbox, "add_message")
s.wire(m)
return s, m, events
def test_send_first(self):
s, m, events = self.build()
s.send("phase1", b"msg")
self.assertEqual(events, [])
key = b"\x00" * 32
nonce1 = b"\x00" * SecretBox.NONCE_SIZE
with mock.patch("nacl.utils.random", side_effect=[nonce1]) as r:
s.got_verified_key(key)
self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)])
#print(bytes_to_hexstr(events[0][2]))
enc1 = hexstr_to_bytes("00000000000000000000000000000000000000000000000022f1a46c3c3496423c394621a2a5a8cf275b08")
self.assertEqual(events, [("m.add_message", "phase1", enc1)])
events[:] = []
nonce2 = b"\x02" * SecretBox.NONCE_SIZE
with mock.patch("nacl.utils.random", side_effect=[nonce2]) as r:
s.send("phase2", b"msg")
self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)])
enc2 = hexstr_to_bytes("0202020202020202020202020202020202020202020202026660337c3eac6513c0dac9818b62ef16d9cd7e")
self.assertEqual(events, [("m.add_message", "phase2", enc2)])
def test_key_first(self):
s, m, events = self.build()
key = b"\x00" * 32
s.got_verified_key(key)
self.assertEqual(events, [])
nonce1 = b"\x00" * SecretBox.NONCE_SIZE
with mock.patch("nacl.utils.random", side_effect=[nonce1]) as r:
s.send("phase1", b"msg")
self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)])
enc1 = hexstr_to_bytes("00000000000000000000000000000000000000000000000022f1a46c3c3496423c394621a2a5a8cf275b08")
self.assertEqual(events, [("m.add_message", "phase1", enc1)])
events[:] = []
nonce2 = b"\x02" * SecretBox.NONCE_SIZE
with mock.patch("nacl.utils.random", side_effect=[nonce2]) as r:
s.send("phase2", b"msg")
self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)])
enc2 = hexstr_to_bytes("0202020202020202020202020202020202020202020202026660337c3eac6513c0dac9818b62ef16d9cd7e")
self.assertEqual(events, [("m.add_message", "phase2", enc2)])
class Order(unittest.TestCase):
def build(self):
events = []
@ -451,12 +506,70 @@ class Allocator(unittest.TestCase):
self.assertEqual(events, [("c.allocated", "1", "1-word-word"),
])
class Terminator(unittest.TestCase):
def build(self):
events = []
t = _terminator.Terminator()
b = Dummy("b", events, IBoss, "closed")
rc = Dummy("rc", events, IRendezvousConnector, "stop")
n = Dummy("n", events, INameplate, "close")
m = Dummy("m", events, IMailbox, "close")
t.wire(b, rc, n, m)
return t, b, rc, n, m, events
# there are three events, and we need to test all orderings of them
def _do_test(self, ev1, ev2, ev3):
t, b, rc, n, m, events = self.build()
input_events = {"mailbox": lambda: t.mailbox_done(),
"nameplate": lambda: t.nameplate_done(),
"close": lambda: t.close("happy"),
}
close_events = [("n.close",),
("m.close", "happy"),
]
input_events[ev1]()
expected = []
if ev1 == "close":
expected.extend(close_events)
self.assertEqual(events, expected)
events[:] = []
input_events[ev2]()
expected = []
if ev2 == "close":
expected.extend(close_events)
self.assertEqual(events, expected)
events[:] = []
input_events[ev3]()
expected = []
if ev3 == "close":
expected.extend(close_events)
expected.append(("rc.stop",))
self.assertEqual(events, expected)
events[:] = []
t.stopped()
self.assertEqual(events, [("b.closed",)])
def test_terminate(self):
self._do_test("mailbox", "nameplate", "close")
self._do_test("mailbox", "close", "nameplate")
self._do_test("nameplate", "mailbox", "close")
self._do_test("nameplate", "close", "mailbox")
self._do_test("close", "nameplate", "mailbox")
self._do_test("close", "mailbox", "nameplate")
# TODO
# Send
# #Send
# Mailbox
# Nameplate
# Terminator
# #Terminator
# Boss
# RendezvousConnector (not a state machine)
# #Input: exercise helper methods
# wordlist
# #wordlist