test_scripts: prep for windows compatibility

newlines, os.mkfifo errors, JSON type differences
This commit is contained in:
Brian Warner 2016-02-27 14:46:38 -08:00
parent cc9b2c574b
commit 1ff0792b32

View File

@ -1,5 +1,5 @@
from __future__ import print_function from __future__ import print_function
import os, sys, re, io, zipfile import os, sys, re, io, zipfile, six
from twisted.trial import unittest from twisted.trial import unittest
from twisted.python import procutils, log from twisted.python import procutils, log
from twisted.internet.utils import getProcessOutputAndValue from twisted.internet.utils import getProcessOutputAndValue
@ -95,7 +95,7 @@ class Phase1Data(unittest.TestCase):
self.assertEqual(d["directory"]["mode"], "zipfile/deflated") self.assertEqual(d["directory"]["mode"], "zipfile/deflated")
self.assertEqual(d["directory"]["numfiles"], 5) self.assertEqual(d["directory"]["numfiles"], 5)
self.assertIn("numbytes", d["directory"]) self.assertIn("numbytes", d["directory"])
self.assertIsInstance(d["directory"]["numbytes"], type(123)) self.assertIsInstance(d["directory"]["numbytes"], six.integer_types)
self.assertEqual(fd_to_send.tell(), 0) self.assertEqual(fd_to_send.tell(), 0)
zdata = fd_to_send.read() zdata = fd_to_send.read()
@ -117,7 +117,7 @@ class Phase1Data(unittest.TestCase):
try: try:
os.mkfifo(abs_filename) os.mkfifo(abs_filename)
except OSError: except AttributeError:
raise unittest.SkipTest("is mkfifo supported on this platform?") raise unittest.SkipTest("is mkfifo supported on this platform?")
self.assertFalse(os.path.isfile(abs_filename)) self.assertFalse(os.path.isfile(abs_filename))
self.assertFalse(os.path.isdir(abs_filename)) self.assertFalse(os.path.isdir(abs_filename))
@ -193,7 +193,7 @@ class ScriptVersion(ServerBase, ScriptsBase, unittest.TestCase):
last = err.strip().split("\n")[-1] last = err.strip().split("\n")[-1]
self.fail("wormhole not runnable: %s" % last) self.fail("wormhole not runnable: %s" % last)
ver = out.decode("utf-8") or err ver = out.decode("utf-8") or err
self.failUnlessEqual(ver, "magic-wormhole %s\n" % __version__) self.failUnlessEqual(ver, "magic-wormhole "+__version__+os.linesep)
self.failUnlessEqual(rc, 0) self.failUnlessEqual(rc, 0)
d.addCallback(_check) d.addCallback(_check)
return d return d
@ -289,6 +289,7 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
receive_stdout = receive_res[0].decode("utf-8") receive_stdout = receive_res[0].decode("utf-8")
receive_stderr = receive_res[1].decode("utf-8") receive_stderr = receive_res[1].decode("utf-8")
receive_rc = receive_res[2] receive_rc = receive_res[2]
NL = os.linesep
else: else:
sargs = runner.parser.parse_args(send_args) sargs = runner.parser.parse_args(send_args)
sargs.cwd = send_dir sargs.cwd = send_dir
@ -313,6 +314,10 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
receive_stdout = rargs.stdout.getvalue() receive_stdout = rargs.stdout.getvalue()
receive_stderr = rargs.stderr.getvalue() receive_stderr = rargs.stderr.getvalue()
# all output here comes from a StringIO, which uses \n for
# newlines, even if we're on windows
NL = "\n"
self.maxDiff = None # show full output for assertion failures self.maxDiff = None # show full output for assertion failures
self.failUnlessEqual(send_stderr, "") self.failUnlessEqual(send_stderr, "")
@ -320,49 +325,55 @@ class PregeneratedCode(ServerBase, ScriptsBase, unittest.TestCase):
# check sender # check sender
if mode == "text": if mode == "text":
expected = ("Sending text message (%d bytes)\n" expected = ("Sending text message ({bytes:d} bytes){NL}"
"On the other computer, please run: " "On the other computer, please run: "
"wormhole receive\n" "wormhole receive{NL}"
"Wormhole code is: %s\n\n" "Wormhole code is: {code}{NL}{NL}"
"text message sent\n" % (len(message), code)) "text message sent{NL}").format(bytes=len(message),
code=code,
NL=NL)
self.failUnlessEqual(send_stdout, expected) self.failUnlessEqual(send_stdout, expected)
elif mode == "file": elif mode == "file":
self.failUnlessIn("Sending %d byte file named '%s'\n" % self.failUnlessIn("Sending {bytes:d} byte file named '{name}'{NL}"
(len(message), send_filename), send_stdout) .format(bytes=len(message), name=send_filename,
NL=NL), send_stdout)
self.failUnlessIn("On the other computer, please run: " self.failUnlessIn("On the other computer, please run: "
"wormhole receive\n" "wormhole receive{NL}"
"Wormhole code is: %s\n\n" % code, "Wormhole code is: {code}{NL}{NL}"
send_stdout) .format(code=code, NL=NL),
self.failUnlessIn("File sent.. waiting for confirmation\n"
"Confirmation received. Transfer complete.\n",
send_stdout) send_stdout)
self.failUnlessIn("File sent.. waiting for confirmation{NL}"
"Confirmation received. Transfer complete.{NL}"
.format(NL=NL), send_stdout)
elif mode == "directory": elif mode == "directory":
self.failUnlessIn("Sending directory", send_stdout) self.failUnlessIn("Sending directory", send_stdout)
self.failUnlessIn("named 'testdir'", send_stdout) self.failUnlessIn("named 'testdir'", send_stdout)
self.failUnlessIn("On the other computer, please run: " self.failUnlessIn("On the other computer, please run: "
"wormhole receive\n" "wormhole receive{NL}"
"Wormhole code is: %s\n\n" % code, "Wormhole code is: {code}{NL}{NL}"
send_stdout) .format(code=code, NL=NL), send_stdout)
self.failUnlessIn("File sent.. waiting for confirmation\n" self.failUnlessIn("File sent.. waiting for confirmation{NL}"
"Confirmation received. Transfer complete.\n", "Confirmation received. Transfer complete.{NL}"
send_stdout) .format(NL=NL), send_stdout)
# check receiver # check receiver
if mode == "text": if mode == "text":
self.failUnlessEqual(receive_stdout, message+"\n") self.failUnlessEqual(receive_stdout, message+NL)
elif mode == "file": elif mode == "file":
self.failUnlessIn("Receiving %d bytes for '%s'" % self.failUnlessIn("Receiving {bytes:d} bytes for '{name}'"
(len(message), receive_filename), receive_stdout) .format(bytes=len(message),
name=receive_filename), receive_stdout)
self.failUnlessIn("Received file written to ", receive_stdout) self.failUnlessIn("Received file written to ", receive_stdout)
fn = os.path.join(receive_dir, receive_filename) fn = os.path.join(receive_dir, receive_filename)
self.failUnless(os.path.exists(fn)) self.failUnless(os.path.exists(fn))
with open(fn, "r") as f: with open(fn, "r") as f:
self.failUnlessEqual(f.read(), message) self.failUnlessEqual(f.read(), message)
elif mode == "directory": elif mode == "directory":
self.failUnless(re.search(r"Receiving \d+ bytes for '%s'" % self.failUnless(re.search(r"Receiving \d+ bytes for '{name}'"
receive_dirname, receive_stdout)) .format(name=receive_dirname),
self.failUnlessIn("Received files written to %s" % receive_stdout))
receive_dirname, receive_stdout) self.failUnlessIn("Received files written to {name}"
.format(name=receive_dirname), receive_stdout)
fn = os.path.join(receive_dir, receive_dirname) fn = os.path.join(receive_dir, receive_dirname)
self.failUnless(os.path.exists(fn), fn) self.failUnless(os.path.exists(fn), fn)
for i in range(5): for i in range(5):