From 3a289f891272f644c08fd023b31055f25b5f31be Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 19 Mar 2017 18:38:35 +0100 Subject: [PATCH] add tests for Send and Terminator --- src/wormhole/_terminator.py | 2 +- src/wormhole/test/test_machines.py | 121 ++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 5 deletions(-) diff --git a/src/wormhole/_terminator.py b/src/wormhole/_terminator.py index f7da3cc..1468c07 100644 --- a/src/wormhole/_terminator.py +++ b/src/wormhole/_terminator.py @@ -9,7 +9,7 @@ class Terminator(object): @m.setTrace() def set_trace(): pass # pragma: no cover - def __attrs_post_init__(self): + def __init__(self): self._mood = None def wire(self, boss, rendezvous_connector, nameplate, mailbox): diff --git a/src/wormhole/test/test_machines.py b/src/wormhole/test/test_machines.py index 3380b86..55461c4 100644 --- a/src/wormhole/test/test_machines.py +++ b/src/wormhole/test/test_machines.py @@ -1,14 +1,17 @@ from __future__ import print_function, unicode_literals import json +import mock from zope.interface import directlyProvides, implementer from twisted.trial import unittest -from .. import errors, timing, _order, _receive, _key, _code, _lister, _input, _allocator +from .. import (errors, timing, _order, _receive, _key, _code, _lister, + _input, _allocator, _send, _terminator) from .._interfaces import (IKey, IReceive, IBoss, ISend, IMailbox, IRendezvousConnector, ILister, IInput, IAllocator, INameplate, ICode, IWordlist) from .._key import derive_key, derive_phase_key, encrypt_data from ..util import dict_to_bytes, hexstr_to_bytes, bytes_to_hexstr, to_bytes from spake2 import SPAKE2_Symmetric +from nacl.secret import SecretBox @implementer(IWordlist) class FakeWordList(object): @@ -30,6 +33,58 @@ class Dummy: self.events.append(("%s.%s" % (self.name, meth),) + args) setattr(self, meth, log) +class Send(unittest.TestCase): + def build(self): + events = [] + s = _send.Send(u"side", timing.DebugTiming()) + m = Dummy("m", events, IMailbox, "add_message") + s.wire(m) + return s, m, events + + def test_send_first(self): + s, m, events = self.build() + s.send("phase1", b"msg") + self.assertEqual(events, []) + key = b"\x00" * 32 + nonce1 = b"\x00" * SecretBox.NONCE_SIZE + with mock.patch("nacl.utils.random", side_effect=[nonce1]) as r: + s.got_verified_key(key) + self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)]) + #print(bytes_to_hexstr(events[0][2])) + enc1 = hexstr_to_bytes("00000000000000000000000000000000000000000000000022f1a46c3c3496423c394621a2a5a8cf275b08") + self.assertEqual(events, [("m.add_message", "phase1", enc1)]) + events[:] = [] + + nonce2 = b"\x02" * SecretBox.NONCE_SIZE + with mock.patch("nacl.utils.random", side_effect=[nonce2]) as r: + s.send("phase2", b"msg") + self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)]) + enc2 = hexstr_to_bytes("0202020202020202020202020202020202020202020202026660337c3eac6513c0dac9818b62ef16d9cd7e") + self.assertEqual(events, [("m.add_message", "phase2", enc2)]) + + def test_key_first(self): + s, m, events = self.build() + key = b"\x00" * 32 + s.got_verified_key(key) + self.assertEqual(events, []) + + nonce1 = b"\x00" * SecretBox.NONCE_SIZE + with mock.patch("nacl.utils.random", side_effect=[nonce1]) as r: + s.send("phase1", b"msg") + self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)]) + enc1 = hexstr_to_bytes("00000000000000000000000000000000000000000000000022f1a46c3c3496423c394621a2a5a8cf275b08") + self.assertEqual(events, [("m.add_message", "phase1", enc1)]) + events[:] = [] + + nonce2 = b"\x02" * SecretBox.NONCE_SIZE + with mock.patch("nacl.utils.random", side_effect=[nonce2]) as r: + s.send("phase2", b"msg") + self.assertEqual(r.mock_calls, [mock.call(SecretBox.NONCE_SIZE)]) + enc2 = hexstr_to_bytes("0202020202020202020202020202020202020202020202026660337c3eac6513c0dac9818b62ef16d9cd7e") + self.assertEqual(events, [("m.add_message", "phase2", enc2)]) + + + class Order(unittest.TestCase): def build(self): events = [] @@ -451,12 +506,70 @@ class Allocator(unittest.TestCase): self.assertEqual(events, [("c.allocated", "1", "1-word-word"), ]) + +class Terminator(unittest.TestCase): + def build(self): + events = [] + t = _terminator.Terminator() + b = Dummy("b", events, IBoss, "closed") + rc = Dummy("rc", events, IRendezvousConnector, "stop") + n = Dummy("n", events, INameplate, "close") + m = Dummy("m", events, IMailbox, "close") + t.wire(b, rc, n, m) + return t, b, rc, n, m, events + + # there are three events, and we need to test all orderings of them + def _do_test(self, ev1, ev2, ev3): + t, b, rc, n, m, events = self.build() + input_events = {"mailbox": lambda: t.mailbox_done(), + "nameplate": lambda: t.nameplate_done(), + "close": lambda: t.close("happy"), + } + close_events = [("n.close",), + ("m.close", "happy"), + ] + + input_events[ev1]() + expected = [] + if ev1 == "close": + expected.extend(close_events) + self.assertEqual(events, expected) + events[:] = [] + + input_events[ev2]() + expected = [] + if ev2 == "close": + expected.extend(close_events) + self.assertEqual(events, expected) + events[:] = [] + + input_events[ev3]() + expected = [] + if ev3 == "close": + expected.extend(close_events) + expected.append(("rc.stop",)) + self.assertEqual(events, expected) + events[:] = [] + + t.stopped() + self.assertEqual(events, [("b.closed",)]) + + def test_terminate(self): + self._do_test("mailbox", "nameplate", "close") + self._do_test("mailbox", "close", "nameplate") + self._do_test("nameplate", "mailbox", "close") + self._do_test("nameplate", "close", "mailbox") + self._do_test("close", "nameplate", "mailbox") + self._do_test("close", "mailbox", "nameplate") + + + # TODO -# Send +# #Send # Mailbox # Nameplate -# Terminator +# #Terminator # Boss # RendezvousConnector (not a state machine) # #Input: exercise helper methods -# wordlist +# #wordlist