get mostly-full coverage for rlcompleter, rename, export

This commit is contained in:
Brian Warner 2017-04-04 19:51:59 -07:00
parent 8882e6f64e
commit bdef446ad4
6 changed files with 391 additions and 47 deletions

View File

@ -226,18 +226,18 @@ The code-entry Helper object has the following API:
`MustChooseNameplateFirstError` will be raised. May only be called once, `MustChooseNameplateFirstError` will be raised. May only be called once,
after which `AlreadyChoseWordsError` is raised. after which `AlreadyChoseWordsError` is raised.
The `rlcompleter` wrapper is a function that knows how to use the code-entry The `input_with_completion` wrapper is a function that knows how to use the
helper to do tab completion of wormhole codes: code-entry helper to do tab completion of wormhole codes:
```python ```python
from wormhole import create, rlcompleter_helper from wormhole import create, input_with_completion
w = create(appid, relay_url, reactor) w = create(appid, relay_url, reactor)
rlcompleter_helper("Wormhole code:", w.input_code(), reactor) input_with_completion("Wormhole code:", w.input_code(), reactor)
d = w.when_code() d = w.when_code()
``` ```
This helper runs python's `rawinput()` function inside a thread, since This helper runs python's (raw) `input()` function inside a thread, since
`rawinput()` normally blocks. `input()` normally blocks.
The two machines participating in the wormhole setup are not distinguished: The two machines participating in the wormhole setup are not distinguished:
it doesn't matter which one goes first, and both use the same Wormhole it doesn't matter which one goes first, and both use the same Wormhole

View File

@ -2,3 +2,8 @@
from ._version import get_versions from ._version import get_versions
__version__ = get_versions()['version'] __version__ = get_versions()['version']
del get_versions del get_versions
from .wormhole import create
from ._rlcompleter import input_with_completion
__all__ = ["create", "input_with_completion", "__version__"]

View File

@ -1,19 +1,25 @@
from __future__ import print_function, unicode_literals from __future__ import print_function, unicode_literals
import sys import traceback
import six from sys import stderr
try:
import readline
except ImportError:
readline = None
from six.moves import input
from attr import attrs, attrib from attr import attrs, attrib
from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.threads import deferToThread, blockingCallFromThread from twisted.internet.threads import deferToThread, blockingCallFromThread
import os #import os
errf = None #errf = None
if os.path.exists("err"): #if os.path.exists("err"):
errf = open("err", "w") # errf = open("err", "w")
def debug(*args, **kwargs): def debug(*args, **kwargs):
if errf: # if errf:
kwargs["file"] = errf # kwargs["file"] = errf
print(*args, **kwargs) # print(*args, **kwargs)
errf.flush() # errf.flush()
pass
@attrs @attrs
class CodeInputter(object): class CodeInputter(object):
@ -28,20 +34,19 @@ class CodeInputter(object):
def bcft(self, f, *a, **kw): def bcft(self, f, *a, **kw):
return blockingCallFromThread(self._reactor, f, *a, **kw) return blockingCallFromThread(self._reactor, f, *a, **kw)
def wrap_completer(self, text, state): def completer(self, text, state):
try: try:
return self.completer(text, state) return self._wrapped_completer(text, state)
except Exception as e: except Exception as e:
# completer exceptions are normally silently discarded, which # completer exceptions are normally silently discarded, which
# makes debugging challenging # makes debugging challenging
print("completer exception: %s" % e) print("completer exception: %s" % e)
import traceback
traceback.print_exc() traceback.print_exc()
raise e raise e
def completer(self, text, state): def _wrapped_completer(self, text, state):
self.used_completion = True self.used_completion = True
import readline # if we get here, then readline must be active
ct = readline.get_completion_type() ct = readline.get_completion_type()
if state == 0: if state == 0:
debug("completer starting (%s) (state=0) (ct=%d)" % (text, ct)) debug("completer starting (%s) (state=0) (ct=%d)" % (text, ct))
@ -109,21 +114,20 @@ class CodeInputter(object):
self._input_helper.choose_nameplate(nameplate) self._input_helper.choose_nameplate(nameplate)
self._input_helper.choose_words(words) self._input_helper.choose_words(words)
def input_code_with_completion(prompt, input_helper, reactor): def _input_code_with_completion(prompt, input_helper, reactor):
c = CodeInputter(input_helper, reactor) c = CodeInputter(input_helper, reactor)
try: if readline is not None:
import readline
if readline.__doc__ and "libedit" in readline.__doc__: if readline.__doc__ and "libedit" in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete") readline.parse_and_bind("bind ^I rl_complete")
else: else:
readline.parse_and_bind("tab: complete") readline.parse_and_bind("tab: complete")
readline.set_completer(c.wrap_completer) readline.set_completer(c.completer)
readline.set_completer_delims("") readline.set_completer_delims("")
debug("==== readline-based completion is prepared") debug("==== readline-based completion is prepared")
except ImportError: else:
debug("==== unable to import readline, disabling completion") debug("==== unable to import readline, disabling completion")
pass pass
code = six.moves.input(prompt) code = input(prompt)
# Code is str(bytes) on py2, and str(unicode) on py3. We want unicode. # Code is str(bytes) on py2, and str(unicode) on py3. We want unicode.
if isinstance(code, bytes): if isinstance(code, bytes):
code = code.decode("utf-8") code = code.decode("utf-8")
@ -137,8 +141,7 @@ def warn_readline():
# input_code_with_completion() when SIGINT happened, the readline # input_code_with_completion() when SIGINT happened, the readline
# thread will be blocked waiting for something on stdin. Trick the # thread will be blocked waiting for something on stdin. Trick the
# user into satisfying the blocking read so we can exit. # user into satisfying the blocking read so we can exit.
print("\nCommand interrupted: please press Return to quit", print("\nCommand interrupted: please press Return to quit", file=stderr)
file=sys.stderr)
# Other potential approaches to this problem: # Other potential approaches to this problem:
# * hard-terminate our process with os._exit(1), but make sure the # * hard-terminate our process with os._exit(1), but make sure the
@ -165,10 +168,10 @@ def warn_readline():
# readline finish. # readline finish.
@inlineCallbacks @inlineCallbacks
def rlcompleter_helper(prompt, input_helper, reactor): def input_with_completion(prompt, input_helper, reactor):
t = reactor.addSystemEventTrigger("before", "shutdown", warn_readline) t = reactor.addSystemEventTrigger("before", "shutdown", warn_readline)
#input_helper.refresh_nameplates() #input_helper.refresh_nameplates()
used_completion = yield deferToThread(input_code_with_completion, used_completion = yield deferToThread(_input_code_with_completion,
prompt, input_helper, reactor) prompt, input_helper, reactor)
reactor.removeSystemEventTrigger(t) reactor.removeSystemEventTrigger(t)
returnValue(used_completion) returnValue(used_completion)

View File

@ -5,7 +5,7 @@ from humanize import naturalsize
from twisted.internet import reactor from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.python import log from twisted.python import log
from .. import wormhole, __version__ from wormhole import create, input_with_completion, __version__
from ..transit import TransitReceiver from ..transit import TransitReceiver
from ..errors import TransferError, WormholeClosedError, NoTorError from ..errors import TransferError, WormholeClosedError, NoTorError
from ..util import (dict_to_bytes, bytes_to_dict, bytes_to_hexstr, from ..util import (dict_to_bytes, bytes_to_dict, bytes_to_hexstr,
@ -64,11 +64,11 @@ class TwistedReceiver:
wh = CLIWelcomeHandler(self.args.relay_url, __version__, wh = CLIWelcomeHandler(self.args.relay_url, __version__,
self.args.stderr) self.args.stderr)
w = wormhole.create(self.args.appid or APPID, self.args.relay_url, w = create(self.args.appid or APPID, self.args.relay_url,
self._reactor, self._reactor,
tor_manager=self._tor_manager, tor_manager=self._tor_manager,
timing=self.args.timing, timing=self.args.timing,
welcome_handler=wh.handle_welcome) welcome_handler=wh.handle_welcome)
# I wanted to do this instead: # I wanted to do this instead:
# #
# try: # try:
@ -168,10 +168,10 @@ class TwistedReceiver:
if code: if code:
w.set_code(code) w.set_code(code)
else: else:
from .._rlcompleter import rlcompleter_helper prompt = "Enter receive wormhole code: "
used_completion = yield rlcompleter_helper("Enter receive wormhole code: ", used_completion = yield input_with_completion(prompt,
w.input_code(), w.input_code(),
self._reactor) self._reactor)
if not used_completion: if not used_completion:
print(" (note: you can use <Tab> to complete words)", print(" (note: you can use <Tab> to complete words)",
file=self.args.stderr) file=self.args.stderr)

View File

@ -7,7 +7,7 @@ from twisted.protocols import basic
from twisted.internet import reactor from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.defer import inlineCallbacks, returnValue
from ..errors import TransferError, WormholeClosedError, NoTorError from ..errors import TransferError, WormholeClosedError, NoTorError
from .. import wormhole, __version__ from wormhole import create, __version__
from ..transit import TransitSender from ..transit import TransitSender
from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr from ..util import dict_to_bytes, bytes_to_dict, bytes_to_hexstr
from .welcome import CLIWelcomeHandler from .welcome import CLIWelcomeHandler
@ -55,11 +55,11 @@ class Sender:
wh = CLIWelcomeHandler(self._args.relay_url, __version__, wh = CLIWelcomeHandler(self._args.relay_url, __version__,
self._args.stderr) self._args.stderr)
w = wormhole.create(self._args.appid or APPID, self._args.relay_url, w = create(self._args.appid or APPID, self._args.relay_url,
self._reactor, self._reactor,
tor_manager=self._tor_manager, tor_manager=self._tor_manager,
timing=self._timing, timing=self._timing,
welcome_handler=wh.handle_welcome) welcome_handler=wh.handle_welcome)
d = self._go(w) d = self._go(w)
# if we succeed, we should close and return the w.close results # if we succeed, we should close and return the w.close results

View File

@ -0,0 +1,336 @@
from __future__ import print_function, absolute_import, unicode_literals
import mock
from itertools import count
from twisted.trial import unittest
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.threads import deferToThread
from .._rlcompleter import (input_with_completion,
_input_code_with_completion,
CodeInputter, warn_readline)
APPID = "appid"
class Input(unittest.TestCase):
@inlineCallbacks
def test_wrapper(self):
helper = object()
trueish = object()
with mock.patch("wormhole._rlcompleter._input_code_with_completion",
return_value=trueish) as m:
used_completion = yield input_with_completion("prompt:", helper,
reactor)
self.assertIs(used_completion, trueish)
self.assertEqual(m.mock_calls,
[mock.call("prompt:", helper, reactor)])
# note: if this test fails, the warn_readline() message will probably
# get written to stderr
class Sync(unittest.TestCase):
# exercise _input_code_with_completion, which uses the blocking builtin
# "input()" function, hence _input_code_with_completion is usually in a
# thread with deferToThread
@mock.patch("wormhole._rlcompleter.CodeInputter")
@mock.patch("wormhole._rlcompleter.readline",
__doc__="I am GNU readline")
@mock.patch("wormhole._rlcompleter.input", return_value="code")
def test_readline(self, input, readline, ci):
c = mock.Mock(name="inhibit parenting")
c.completer = object()
trueish = object()
c.used_completion = trueish
ci.configure_mock(return_value=c)
prompt = object()
input_helper = object()
reactor = object()
used = _input_code_with_completion(prompt, input_helper, reactor)
self.assertIs(used, trueish)
self.assertEqual(ci.mock_calls, [mock.call(input_helper, reactor)])
self.assertEqual(c.mock_calls, [mock.call.finish("code")])
self.assertEqual(input.mock_calls, [mock.call(prompt)])
self.assertEqual(readline.mock_calls,
[mock.call.parse_and_bind("tab: complete"),
mock.call.set_completer(c.completer),
mock.call.set_completer_delims(""),
])
@mock.patch("wormhole._rlcompleter.CodeInputter")
@mock.patch("wormhole._rlcompleter.readline")
@mock.patch("wormhole._rlcompleter.input", return_value="code")
def test_readline_no_docstring(self, input, readline, ci):
del readline.__doc__ # when in doubt, it assumes GNU readline
c = mock.Mock(name="inhibit parenting")
c.completer = object()
trueish = object()
c.used_completion = trueish
ci.configure_mock(return_value=c)
prompt = object()
input_helper = object()
reactor = object()
used = _input_code_with_completion(prompt, input_helper, reactor)
self.assertIs(used, trueish)
self.assertEqual(ci.mock_calls, [mock.call(input_helper, reactor)])
self.assertEqual(c.mock_calls, [mock.call.finish("code")])
self.assertEqual(input.mock_calls, [mock.call(prompt)])
self.assertEqual(readline.mock_calls,
[mock.call.parse_and_bind("tab: complete"),
mock.call.set_completer(c.completer),
mock.call.set_completer_delims(""),
])
@mock.patch("wormhole._rlcompleter.CodeInputter")
@mock.patch("wormhole._rlcompleter.readline",
__doc__="I am libedit")
@mock.patch("wormhole._rlcompleter.input", return_value="code")
def test_libedit(self, input, readline, ci):
c = mock.Mock(name="inhibit parenting")
c.completer = object()
trueish = object()
c.used_completion = trueish
ci.configure_mock(return_value=c)
prompt = object()
input_helper = object()
reactor = object()
used = _input_code_with_completion(prompt, input_helper, reactor)
self.assertIs(used, trueish)
self.assertEqual(ci.mock_calls, [mock.call(input_helper, reactor)])
self.assertEqual(c.mock_calls, [mock.call.finish("code")])
self.assertEqual(input.mock_calls, [mock.call(prompt)])
self.assertEqual(readline.mock_calls,
[mock.call.parse_and_bind("bind ^I rl_complete"),
mock.call.set_completer(c.completer),
mock.call.set_completer_delims(""),
])
@mock.patch("wormhole._rlcompleter.CodeInputter")
@mock.patch("wormhole._rlcompleter.readline", None)
@mock.patch("wormhole._rlcompleter.input", return_value="code")
def test_no_readline(self, input, ci):
c = mock.Mock(name="inhibit parenting")
c.completer = object()
trueish = object()
c.used_completion = trueish
ci.configure_mock(return_value=c)
prompt = object()
input_helper = object()
reactor = object()
used = _input_code_with_completion(prompt, input_helper, reactor)
self.assertIs(used, trueish)
self.assertEqual(ci.mock_calls, [mock.call(input_helper, reactor)])
self.assertEqual(c.mock_calls, [mock.call.finish("code")])
self.assertEqual(input.mock_calls, [mock.call(prompt)])
@mock.patch("wormhole._rlcompleter.CodeInputter")
@mock.patch("wormhole._rlcompleter.readline", None)
@mock.patch("wormhole._rlcompleter.input", return_value=b"code")
def test_bytes(self, input, ci):
c = mock.Mock(name="inhibit parenting")
c.completer = object()
trueish = object()
c.used_completion = trueish
ci.configure_mock(return_value=c)
prompt = object()
input_helper = object()
reactor = object()
used = _input_code_with_completion(prompt, input_helper, reactor)
self.assertIs(used, trueish)
self.assertEqual(ci.mock_calls, [mock.call(input_helper, reactor)])
self.assertEqual(c.mock_calls, [mock.call.finish(u"code")])
self.assertEqual(input.mock_calls, [mock.call(prompt)])
def get_completions(c, prefix):
completions = []
for state in count(0):
text = c.completer(prefix, state)
if text is None:
return completions
completions.append(text)
class Completion(unittest.TestCase):
def test_simple(self):
# no actual completion
helper = mock.Mock()
c = CodeInputter(helper, "reactor")
c.finish("1-code-ghost")
self.assertFalse(c.used_completion)
self.assertEqual(helper.mock_calls,
[mock.call.choose_nameplate("1"),
mock.call.choose_words("code-ghost")])
@mock.patch("wormhole._rlcompleter.readline",
get_completion_type=mock.Mock(return_value=0))
def test_call(self, readline):
# check that it calls _commit_and_build_completions correctly
helper = mock.Mock()
c = CodeInputter(helper, "reactor")
# pretend nameplates: 1, 12, 34
# first call will be with "1"
cabc = mock.Mock(return_value=["1", "12"])
c._commit_and_build_completions = cabc
self.assertEqual(get_completions(c, "1"), ["1", "12"])
self.assertEqual(cabc.mock_calls, [mock.call("1")])
# then "12"
cabc.reset_mock()
cabc.configure_mock(return_value=["12"])
self.assertEqual(get_completions(c, "12"), ["12"])
self.assertEqual(cabc.mock_calls, [mock.call("12")])
# now we have three "a" words: "and", "ark", "aaah!zombies!!"
cabc.reset_mock()
cabc.configure_mock(return_value=["aargh", "ark", "aaah!zombies!!"])
self.assertEqual(get_completions(c, "12-a"),
["aargh", "ark", "aaah!zombies!!"])
self.assertEqual(cabc.mock_calls, [mock.call("12-a")])
cabc.reset_mock()
cabc.configure_mock(return_value=["aargh", "aaah!zombies!!"])
self.assertEqual(get_completions(c, "12-aa"),
["aargh", "aaah!zombies!!"])
self.assertEqual(cabc.mock_calls, [mock.call("12-aa")])
cabc.reset_mock()
cabc.configure_mock(return_value=["aaah!zombies!!"])
self.assertEqual(get_completions(c, "12-aaa"), ["aaah!zombies!!"])
self.assertEqual(cabc.mock_calls, [mock.call("12-aaa")])
c.finish("1-code")
self.assert_(c.used_completion)
def test_wrap_error(self):
helper = mock.Mock()
c = CodeInputter(helper, "reactor")
c._wrapped_completer = mock.Mock(side_effect=ValueError("oops"))
with mock.patch("wormhole._rlcompleter.traceback") as traceback:
with mock.patch("wormhole._rlcompleter.print") as mock_print:
with self.assertRaises(ValueError) as e:
c.completer("text", 0)
self.assertEqual(traceback.mock_calls, [mock.call.print_exc()])
self.assertEqual(mock_print.mock_calls,
[mock.call("completer exception: oops")])
self.assertEqual(str(e.exception), "oops")
@inlineCallbacks
def test_build_completions(self):
rn = mock.Mock()
# InputHelper.get_nameplate_completions returns just the suffixes
gnc = mock.Mock() # get_nameplate_completions
cn = mock.Mock() # choose_nameplate
gwc = mock.Mock() # get_word_completions
cw = mock.Mock() # choose_words
helper = mock.Mock(refresh_nameplates=rn,
get_nameplate_completions=gnc,
choose_nameplate=cn,
get_word_completions=gwc,
choose_words=cw,
)
# this needs a real reactor, for blockingCallFromThread
c = CodeInputter(helper, reactor)
cabc = c._commit_and_build_completions
# 1 TAB -> 1, 12 (and refresh_nameplates)
gnc.configure_mock(return_value=["", "2"])
matches = yield deferToThread(cabc, "1")
self.assertEqual(matches, ["1", "12"])
self.assertEqual(rn.mock_calls, [mock.call()])
self.assertEqual(gnc.mock_calls, [mock.call("1")])
self.assertEqual(cn.mock_calls, [])
rn.reset_mock()
gnc.reset_mock()
# current: 12 TAB -> (and refresh_nameplates)
# want: 12 TAB -> 12- (and choose_nameplate)
gnc.configure_mock(return_value=[""])
matches = yield deferToThread(cabc, "12")
self.assertEqual(matches, ["12"]) # 12-
self.assertEqual(rn.mock_calls, [mock.call()])
self.assertEqual(gnc.mock_calls, [mock.call("12")])
self.assertEqual(cn.mock_calls, []) # 12
rn.reset_mock()
gnc.reset_mock()
# current: 12-a TAB -> and ark aaah!zombies!! (and choose nameplate)
gnc.configure_mock(side_effect=ValueError)
gwc.configure_mock(return_value=["nd", "rk", "aah!zombies!!"])
matches = yield deferToThread(cabc, "12-a")
# matches are always sorted
self.assertEqual(matches, ["12-aaah!zombies!!", "12-and", "12-ark"])
self.assertEqual(rn.mock_calls, [])
self.assertEqual(gnc.mock_calls, [])
self.assertEqual(cn.mock_calls, [mock.call("12")])
self.assertEqual(gwc.mock_calls, [mock.call("a")])
cn.reset_mock()
gwc.reset_mock()
# current: 12-and-b TAB -> bat bet but
gnc.configure_mock(side_effect=ValueError)
gwc.configure_mock(return_value=["at", "et", "ut"])
matches = yield deferToThread(cabc, "12-and-b")
self.assertEqual(matches, ["12-and-bat", "12-and-bet", "12-and-but"])
self.assertEqual(rn.mock_calls, [])
self.assertEqual(gnc.mock_calls, [])
self.assertEqual(cn.mock_calls, [])
self.assertEqual(gwc.mock_calls, [mock.call("and-b")])
cn.reset_mock()
gwc.reset_mock()
c.finish("12-and-bat")
self.assertEqual(cw.mock_calls, [mock.call("and-bat")])
def test_incomplete_code(self):
helper = mock.Mock()
c = CodeInputter(helper, "reactor")
with self.assertRaises(ValueError) as e:
c.finish("1")
self.assertEqual(str(e.exception), "incomplete wormhole code")
@inlineCallbacks
def test_rollback_nameplate_during_completion(self):
helper = mock.Mock()
gwc = helper.get_word_completions = mock.Mock()
gwc.configure_mock(return_value=["de", "urt"])
c = CodeInputter(helper, reactor)
cabc = c._commit_and_build_completions
matches = yield deferToThread(cabc, "1-co") # this commits us to 1-
self.assertEqual(helper.mock_calls,
[mock.call.choose_nameplate("1"),
mock.call.get_word_completions("co")])
self.assertEqual(matches, ["1-code", "1-court"])
helper.reset_mock()
with self.assertRaises(ValueError) as e:
yield deferToThread(cabc, "2-co")
self.assertEqual(str(e.exception),
"nameplate (NN-) already entered, cannot go back")
self.assertEqual(helper.mock_calls, [])
@inlineCallbacks
def test_rollback_nameplate_during_finish(self):
helper = mock.Mock()
gwc = helper.get_word_completions = mock.Mock()
gwc.configure_mock(return_value=["de", "urt"])
c = CodeInputter(helper, reactor)
cabc = c._commit_and_build_completions
matches = yield deferToThread(cabc, "1-co") # this commits us to 1-
self.assertEqual(helper.mock_calls,
[mock.call.choose_nameplate("1"),
mock.call.get_word_completions("co")])
self.assertEqual(matches, ["1-code", "1-court"])
helper.reset_mock()
with self.assertRaises(ValueError) as e:
c.finish("2-code")
self.assertEqual(str(e.exception),
"nameplate (NN-) already entered, cannot go back")
self.assertEqual(helper.mock_calls, [])
@mock.patch("wormhole._rlcompleter.stderr")
def test_warn_readline(self, stderr):
# there is no good way to test that this function gets used at the
# right time, since it involves a reactor and a "system event
# trigger", but let's at least make sure it's invocable
warn_readline()
expected ="\nCommand interrupted: please press Return to quit"
self.assertEqual(stderr.mock_calls, [mock.call.write(expected),
mock.call.write("\n")])