From 8fb44eba5bb9862de5fcc6c785eb6f21ecbd0aea Mon Sep 17 00:00:00 2001 From: Andy Hayden Date: Thu, 30 May 2019 13:40:40 -0700 Subject: [PATCH] chore: refactor python tests to use unittest (#2414) Move every test to a method on DenoTestCase. test.py is a single TestSuite of every TestCase. Add a Spawn context manager for http_server, this is explicitly used where it's needed. Each python test file can now be run independently without needing to manually run http_server. Add --help and consistent flags using argparse for each python test, including --failfast. Use ColorTextTestRunner so that '... ok' is green. --- .appveyor.yml | 2 +- .travis.yml | 2 +- tools/benchmark_test.py | 119 +++++++---------- tools/complex_permissions_test.py | 103 ++++----------- tools/deno_dir_test.py | 64 ++++----- tools/fetch_test.py | 38 +++--- tools/fmt_test.py | 71 +++++----- tools/http_server.py | 54 ++++---- tools/integration_tests.py | 86 ++++++------ tools/is_tty_test.py | 26 ++-- tools/permission_prompt_test.py | 83 +++--------- tools/repl_test.py | 109 ++++++--------- tools/setup_test.py | 14 +- tools/test.py | 213 +++++++++++++++--------------- tools/unit_tests.py | 35 ++--- tools/util.py | 136 +++++++++++++++++-- tools/util_test.py | 16 +-- 17 files changed, 558 insertions(+), 613 deletions(-) diff --git a/.appveyor.yml b/.appveyor.yml index 348bfdad..dd86bfc6 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -196,7 +196,7 @@ build_script: test_script: - python tools\lint.py - python tools\test_format.py - - ps: Exec { & python tools\test.py $env:DENO_BUILD_PATH } + - ps: Exec { & python tools\test.py -v $env:DENO_BUILD_PATH } after_test: # Delete the the rollup cache, which is unreliable, so that it doesn't get diff --git a/.travis.yml b/.travis.yml index 0109041d..ef29fd32 100644 --- a/.travis.yml +++ b/.travis.yml @@ -73,7 +73,7 @@ script: - ./tools/lint.py - ./tools/test_format.py - ./tools/build.py -C target/release -- DENO_BUILD_MODE=release ./tools/test.py +- DENO_BUILD_MODE=release ./tools/test.py -v jobs: fast_finish: true diff --git a/tools/benchmark_test.py b/tools/benchmark_test.py index 9905d7d0..92eb6d5e 100755 --- a/tools/benchmark_test.py +++ b/tools/benchmark_test.py @@ -3,74 +3,57 @@ import sys import os import benchmark -from util import build_path, executable_suffix - - -def strace_parse_test(): - with open(os.path.join(sys.path[0], "testdata/strace_summary.out"), - "r") as f: - summary = benchmark.strace_parse(f.read()) - # first syscall line - assert summary["munmap"]["calls"] == 60 - assert summary["munmap"]["errors"] == 0 - # line with errors - assert summary["mkdir"]["errors"] == 2 - # last syscall line - assert summary["prlimit64"]["calls"] == 2 - assert summary["prlimit64"]["% time"] == 0 - # summary line - assert summary["total"]["calls"] == 704 - - -def max_mem_parse_test(): - with open(os.path.join(sys.path[0], "testdata/time.out"), "r") as f: - data = f.read() - assert benchmark.find_max_mem_in_bytes(data) == 120380 * 1024 - - -def binary_size_test(build_dir): - binary_size_dict = benchmark.get_binary_sizes(build_dir) - assert binary_size_dict["deno"] > 0 - assert binary_size_dict["main.js"] > 0 - assert binary_size_dict["main.js.map"] > 0 - assert binary_size_dict["snapshot_deno.bin"] > 0 - - -def strace_test(deno_path): - new_data = {} - benchmark.run_strace_benchmarks(deno_path, new_data) - assert "thread_count" in new_data - assert "syscall_count" in new_data - - s = new_data["thread_count"] - assert "hello" in s - assert s["hello"] > 1 - - s = new_data["syscall_count"] - assert "hello" in s - assert s["hello"] > 1 - - -def benchmark_test(build_dir, deno_path): - strace_parse_test() - binary_size_test(build_dir) - max_mem_parse_test() - if "linux" in sys.platform: - strace_test(deno_path) - - -# This test assumes tools/http_server.py is running in the background. -def main(): - if len(sys.argv) == 2: - build_dir = sys.argv[1] - elif len(sys.argv) == 1: - build_dir = build_path() - else: - print "Usage: tools/benchmark_test.py [build_dir]" - sys.exit(1) - deno_exe = os.path.join(build_dir, "deno" + executable_suffix) - benchmark_test(build_dir, deno_exe) +import unittest +from util import DenoTestCase, test_main + + +class TestBenchmark(DenoTestCase): + def test_strace_parse(self): + with open( + os.path.join(sys.path[0], "testdata/strace_summary.out"), + "r") as f: + summary = benchmark.strace_parse(f.read()) + # first syscall line + assert summary["munmap"]["calls"] == 60 + assert summary["munmap"]["errors"] == 0 + # line with errors + assert summary["mkdir"]["errors"] == 2 + # last syscall line + assert summary["prlimit64"]["calls"] == 2 + assert summary["prlimit64"]["% time"] == 0 + # summary line + assert summary["total"]["calls"] == 704 + + def test_max_mem_parse(self): + with open(os.path.join(sys.path[0], "testdata/time.out"), "r") as f: + data = f.read() + assert benchmark.find_max_mem_in_bytes(data) == 120380 * 1024 + + def test_binary_size(self): + binary_size_dict = benchmark.get_binary_sizes(self.build_dir) + assert binary_size_dict["deno"] > 0 + assert binary_size_dict["main.js"] > 0 + assert binary_size_dict["main.js.map"] > 0 + assert binary_size_dict["snapshot_deno.bin"] > 0 + + @unittest.skipIf("linux" not in sys.platform, + "strace only supported on linux") + def test_strace(self): + new_data = {} + benchmark.run_strace_benchmarks(self.deno_exe, new_data) + assert "thread_count" in new_data + assert "syscall_count" in new_data + + s = new_data["thread_count"] + assert "hello" in s + assert s["hello"] > 1 + + s = new_data["syscall_count"] + assert "hello" in s + assert s["hello"] > 1 if __name__ == '__main__': - main() + # FIME this doesn't appear to be the case. + # This test assumes tools/http_server.py is running in the background. + test_main() diff --git a/tools/complex_permissions_test.py b/tools/complex_permissions_test.py index 39ccda33..40ba6181 100755 --- a/tools/complex_permissions_test.py +++ b/tools/complex_permissions_test.py @@ -2,15 +2,13 @@ # -*- coding: utf-8 -*- # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import os -import pty -import select import subprocess import sys import time import unittest -import http_server -from util import build_path, root_path, executable_suffix, green_ok, red_failed +from http_server import spawn +from util import DenoTestCase, root_path, test_main, tty_capture PERMISSIONS_PROMPT_TEST_TS = "tools/complex_permissions_test.ts" @@ -18,47 +16,8 @@ PROMPT_PATTERN = b'⚠️' PERMISSION_DENIED_PATTERN = b'PermissionDenied: permission denied' -# This function is copied from: -# https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e -# https://stackoverflow.com/q/52954248/1240268 -def tty_capture(cmd, bytes_input, timeout=5): - """Capture the output of cmd with bytes_input to stdin, - with stdin, stdout and stderr as TTYs.""" - mo, so = pty.openpty() # provide tty to enable line-buffering - me, se = pty.openpty() - mi, si = pty.openpty() - fdmap = {mo: 'stdout', me: 'stderr', mi: 'stdin'} - - timeout_exact = time.time() + timeout - p = subprocess.Popen( - cmd, bufsize=1, stdin=si, stdout=so, stderr=se, close_fds=True) - os.write(mi, bytes_input) - - select_timeout = .04 #seconds - res = {'stdout': b'', 'stderr': b''} - while True: - ready, _, _ = select.select([mo, me], [], [], select_timeout) - if ready: - for fd in ready: - data = os.read(fd, 512) - if not data: - break - res[fdmap[fd]] += data - elif p.poll() is not None or time.time( - ) > timeout_exact: # select timed-out - break # p exited - for fd in [si, so, se, mi, mo, me]: - os.close(fd) # can't do it sooner: it leads to errno.EIO error - p.wait() - return p.returncode, res['stdout'], res['stderr'] - - -class ComplexPermissionTestCase(unittest.TestCase): - def __init__(self, method_name, test_type, deno_exe): - super(ComplexPermissionTestCase, self).__init__(method_name) - self.test_type = test_type - self.deno_exe = deno_exe - +@unittest.skipIf(os.name == 'nt', "Unable to test tty on Windows") +class BaseComplexPermissionTest(DenoTestCase): def _run_deno(self, flags, args): "Returns (return_code, stdout, stderr)." cmd = ([self.deno_exe, "run", "--no-prompt"] + flags + @@ -66,7 +25,9 @@ class ComplexPermissionTestCase(unittest.TestCase): return tty_capture(cmd, b'') -class TestReadWritePermissions(ComplexPermissionTestCase): +class TestReadPermissions(BaseComplexPermissionTest): + test_type = "read" + def test_inside_project_dir(self): code, _stdout, stderr = self._run_deno( ["--allow-" + self.test_type + "=" + root_path], @@ -136,7 +97,13 @@ class TestReadWritePermissions(ComplexPermissionTestCase): os.chdir(saved_curdir) -class TestNetFetchPermissions(ComplexPermissionTestCase): +class TestWritePermissions(TestReadPermissions): + test_type = "write" + + +class TestNetFetchPermissions(BaseComplexPermissionTest): + test_type = "net_fetch" + def test_allow_localhost_4545(self): code, _stdout, stderr = self._run_deno( ["--allow-net=localhost:4545"], @@ -171,7 +138,9 @@ class TestNetFetchPermissions(ComplexPermissionTestCase): assert not PERMISSION_DENIED_PATTERN in stderr -class TestNetDialPermissions(ComplexPermissionTestCase): +class TestNetDialPermissions(BaseComplexPermissionTest): + test_type = "net_dial" + def test_allow_localhost_ip_4555(self): code, _stdout, stderr = self._run_deno( ["--allow-net=127.0.0.1:4545"], [self.test_type, "127.0.0.1:4545"]) @@ -203,7 +172,9 @@ class TestNetDialPermissions(ComplexPermissionTestCase): assert not PERMISSION_DENIED_PATTERN in stderr -class TestNetListenPermissions(ComplexPermissionTestCase): +class TestNetListenPermissions(BaseComplexPermissionTest): + test_type = "net_listen" + def test_allow_localhost_4555(self): code, _stdout, stderr = self._run_deno( ["--allow-net=localhost:4555"], [self.test_type, "localhost:4555"]) @@ -235,36 +206,10 @@ class TestNetListenPermissions(ComplexPermissionTestCase): assert not PERMISSION_DENIED_PATTERN in stderr -def complex_permissions_test(deno_exe): - runner = unittest.TextTestRunner(verbosity=2) - loader = unittest.TestLoader() - - tests = ( - ("read", TestReadWritePermissions), - ("write", TestReadWritePermissions), - ("net_fetch", TestNetFetchPermissions), - ("net_dial", TestNetDialPermissions), - ("net_listen", TestNetListenPermissions), - ) - - for (test_type, test_class) in tests: - print "Complex permissions tests for \"{}\"".format(test_type) - - test_names = loader.getTestCaseNames(test_class) - suite = unittest.TestSuite() - for test_name in test_names: - suite.addTest(test_class(test_name, test_type, deno_exe)) - - result = runner.run(suite) - if not result.wasSuccessful(): - sys.exit(1) - - -def main(): - deno_exe = os.path.join(build_path(), "deno" + executable_suffix) - http_server.spawn() - complex_permissions_test(deno_exe) +def complex_permissions_tests(): + return BaseComplexPermissionTest.__subclasses__() if __name__ == "__main__": - main() + with spawn(): + test_main() diff --git a/tools/deno_dir_test.py b/tools/deno_dir_test.py index bcfa370b..7133d02d 100755 --- a/tools/deno_dir_test.py +++ b/tools/deno_dir_test.py @@ -5,50 +5,42 @@ import os import subprocess import sys -from util import rmtree, run +from util import DenoTestCase, mkdtemp, rmtree, run, test_main -def deno_dir_test(deno_exe, deno_dir): - assert os.path.isfile(deno_exe) - old_deno_dir = None - if "DENO_DIR" in os.environ: - old_deno_dir = os.environ["DENO_DIR"] - del os.environ["DENO_DIR"] +class TestDenoDir(DenoTestCase): + def setUp(self): + self.old_deno_dir = None + if "DENO_DIR" in os.environ: + self.old_deno_dir = os.environ["DENO_DIR"] + del os.environ["DENO_DIR"] - if os.path.isdir(deno_dir): - rmtree(deno_dir) - - # Run deno with no env flag - run_deno(deno_exe) - assert not os.path.isdir(deno_dir) - - # Run deno with DENO_DIR env flag - run_deno(deno_exe, deno_dir) - assert os.path.isdir(deno_dir) - assert os.path.isdir(os.path.join(deno_dir, "deps")) - assert os.path.isdir(os.path.join(deno_dir, "gen")) - rmtree(deno_dir) - - if old_deno_dir is not None: - os.environ["DENO_DIR"] = old_deno_dir + def tearDown(self): + if self.old_deno_dir is not None: + os.environ["DENO_DIR"] = self.old_deno_dir + def test_deno_dir(self): + deno_dir = mkdtemp() + if os.path.isdir(deno_dir): + rmtree(deno_dir) -def run_deno(deno_exe, deno_dir=None): - cmd = [deno_exe, "run", "tests/002_hello.ts"] - deno_dir_env = {"DENO_DIR": deno_dir} if deno_dir is not None else None - run(cmd, quiet=True, env=deno_dir_env) - - -USAGE = "./tools/deno_dir_test.py target/debug/deno target/debug/.deno_dir" + # Run deno with no env flag + self.run_deno() + assert not os.path.isdir(deno_dir) + # Run deno with DENO_DIR env flag + self.run_deno(deno_dir) + assert os.path.isdir(deno_dir) + assert os.path.isdir(os.path.join(deno_dir, "deps")) + assert os.path.isdir(os.path.join(deno_dir, "gen")) + rmtree(deno_dir) -def main(argv): - if len(sys.argv) != 3: - print "Usage: " + USAGE - sys.exit(1) - deno_dir_test(argv[1], argv[2]) + def run_deno(self, deno_dir=None): + cmd = [self.deno_exe, "run", "tests/002_hello.ts"] + deno_dir_env = {"DENO_DIR": deno_dir} if deno_dir is not None else None + run(cmd, quiet=True, env=deno_dir_env) if __name__ == '__main__': - sys.exit(main(sys.argv)) + test_main() diff --git a/tools/fetch_test.py b/tools/fetch_test.py index 9ecb6fff..35118ab9 100755 --- a/tools/fetch_test.py +++ b/tools/fetch_test.py @@ -2,29 +2,29 @@ # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import os import sys -from util import mkdtemp, tests_path, run_output, green_ok import shutil +from http_server import spawn +from util import DenoTestCase, mkdtemp, tests_path, run_output, test_main -def fetch_test(deno_exe): - sys.stdout.write("fetch_test...") - sys.stdout.flush() - deno_dir = mkdtemp() - try: - t = os.path.join(tests_path, "006_url_imports.ts") - output = run_output([deno_exe, "fetch", t], - merge_env={"DENO_DIR": deno_dir}) - assert output == "" - # Check that we actually did the prefetch. - os.path.exists( - os.path.join(deno_dir, - "deps/http/localhost_PORT4545/tests/subdir/mod2.ts")) - finally: - shutil.rmtree(deno_dir) - - print green_ok() +class FetchTest(DenoTestCase): + def test_fetch(self): + deno_dir = mkdtemp() + try: + t = os.path.join(tests_path, "006_url_imports.ts") + output = run_output([self.deno_exe, "fetch", t], + merge_env={"DENO_DIR": deno_dir}) + assert output == "" + # Check that we actually did the prefetch. + os.path.exists( + os.path.join( + deno_dir, + "deps/http/localhost_PORT4545/tests/subdir/mod2.ts")) + finally: + shutil.rmtree(deno_dir) if __name__ == "__main__": - fetch_test(sys.argv[1]) + with spawn(): + test_main() diff --git a/tools/fmt_test.py b/tools/fmt_test.py index a4fb072f..1d21fb74 100755 --- a/tools/fmt_test.py +++ b/tools/fmt_test.py @@ -1,47 +1,46 @@ #!/usr/bin/env python # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import json import os -import sys -from util import mkdtemp, root_path, tests_path, run, green_ok import shutil -import json +import sys + +from util import (DenoTestCase, mkdtemp, root_path, tests_path, run, test_main) -def fmt_test(deno_exe): - sys.stdout.write("fmt_test...") - sys.stdout.flush() - d = mkdtemp() - try: - fixed_filename = os.path.join(tests_path, "badly_formatted_fixed.js") - src = os.path.join(tests_path, "badly_formatted.js") - dst = os.path.join(d, "badly_formatted.js") - shutil.copyfile(src, dst) - # Set DENO_DIR to the temp dir so we test an initial fetch of prettier. - # TODO(ry) This make the test depend on internet access which is not - # ideal. We should have prettier in the repo already, and we could - # fetch it instead through tools/http_server.py. - deno_dir = d +class FmtTest(DenoTestCase): + def test_fmt(self): + d = mkdtemp() + try: + fixed_filename = os.path.join(tests_path, + "badly_formatted_fixed.js") + src = os.path.join(tests_path, "badly_formatted.js") + dst = os.path.join(d, "badly_formatted.js") + shutil.copyfile(src, dst) - # TODO(kt3k) The below line should be run([deno_exe, "fmt", dst], ...) - # It should be updated when the below issue is addressed - # https://github.com/denoland/deno_std/issues/330 - run([os.path.join(root_path, deno_exe), "fmt", "badly_formatted.js"], - cwd=d, - merge_env={"DENO_DIR": deno_dir}) - with open(fixed_filename) as f: - expected = f.read() - with open(dst) as f: - actual = f.read() - if expected != actual: - print "Expected didn't match actual." - print "expected: ", json.dumps(expected) - print "actual: ", json.dumps(actual) - sys.exit(1) + # Set DENO_DIR to the temp dir to test an initial fetch of prettier. + # TODO(ry) This make the test depend on internet access which is not + # ideal. We should have prettier in the repo already, and we could + # fetch it instead through tools/http_server.py. + deno_dir = d - finally: - shutil.rmtree(d) - print green_ok() + # TODO(kt3k) Below can be run([deno_exe, "fmt", dst], ...) + # once the following issue is addressed: + # https://github.com/denoland/deno_std/issues/330 + run([ + os.path.join(root_path, self.deno_exe), "fmt", + "badly_formatted.js" + ], + cwd=d, + merge_env={"DENO_DIR": deno_dir}) + with open(fixed_filename) as f: + expected = f.read() + with open(dst) as f: + actual = f.read() + self.assertEqual(expected, actual) + finally: + shutil.rmtree(d) if __name__ == "__main__": - fmt_test(sys.argv[1]) + test_main() diff --git a/tools/http_server.py b/tools/http_server.py index e44a86fc..7415ee47 100755 --- a/tools/http_server.py +++ b/tools/http_server.py @@ -2,13 +2,15 @@ # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. # Many tests expect there to be an http server on port 4545 servering the deno # root directory. +from collections import namedtuple +from contextlib import contextmanager import os -import sys -from threading import Thread import SimpleHTTPServer import SocketServer -from util import root_path +import sys from time import sleep +from threading import Thread +from util import root_path PORT = 4545 REDIRECT_PORT = 4546 @@ -87,6 +89,9 @@ class ContentTypeHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): return SimpleHTTPServer.SimpleHTTPRequestHandler.guess_type(self, path) +RunningServer = namedtuple("RunningServer", ["server", "thread"]) + + def server(): os.chdir(root_path) # Hopefully the main thread doesn't also chdir. Handler = ContentTypeHandler @@ -98,7 +103,7 @@ def server(): SocketServer.TCPServer.allow_reuse_address = True s = SocketServer.TCPServer(("", PORT), Handler) print "Deno test server http://localhost:%d/" % PORT - return s + return RunningServer(s, start(s)) def base_redirect_server(host_port, target_port, extra_path_segment=""): @@ -117,7 +122,7 @@ def base_redirect_server(host_port, target_port, extra_path_segment=""): s = SocketServer.TCPServer(("", host_port), Handler) print "redirect server http://localhost:%d/ -> http://localhost:%d/" % ( host_port, target_port) - return s + return RunningServer(s, start(s)) # redirect server @@ -137,35 +142,30 @@ def double_redirects_server(): return base_redirect_server(DOUBLE_REDIRECTS_PORT, REDIRECT_PORT) -def spawn(): - # Main http server - s = server() - thread = Thread(target=s.serve_forever) +def start(s): + thread = Thread(target=s.serve_forever, kwargs={"poll_interval": 0.05}) thread.daemon = True thread.start() - # Redirect server - rs = redirect_server() - r_thread = Thread(target=rs.serve_forever) - r_thread.daemon = True - r_thread.start() - # Another redirect server - ars = another_redirect_server() - ar_thread = Thread(target=ars.serve_forever) - ar_thread.daemon = True - ar_thread.start() - # Double redirects server - drs = double_redirects_server() - dr_thread = Thread(target=drs.serve_forever) - dr_thread.daemon = True - dr_thread.start() - sleep(1) # TODO I'm too lazy to figure out how to do this properly. return thread +@contextmanager +def spawn(): + servers = (server(), redirect_server(), another_redirect_server(), + double_redirects_server()) + sleep(1) # TODO I'm too lazy to figure out how to do this properly. + try: + yield + finally: + for s in servers: + s.server.shutdown() + + def main(): + servers = (server(), redirect_server(), another_redirect_server(), + double_redirects_server()) try: - thread = spawn() - while thread.is_alive(): + while all(s.thread.is_alive() for s in servers): sleep(10) except KeyboardInterrupt: pass diff --git a/tools/integration_tests.py b/tools/integration_tests.py index 32a53e3f..0f02ae2c 100755 --- a/tools/integration_tests.py +++ b/tools/integration_tests.py @@ -7,14 +7,16 @@ # exit code can be specified. # # Usage: integration_tests.py [path to deno executable] +import argparse import os import re import sys import subprocess -import http_server -import argparse -from util import root_path, tests_path, pattern_match, \ - green_ok, red_failed, rmtree, executable_suffix +import unittest + +from http_server import spawn +from util import (DenoTestCase, ColorTextTestRunner, root_path, tests_path, + pattern_match, rmtree, test_main) def strip_ansi_codes(s): @@ -45,38 +47,29 @@ def str2bool(v): raise ValueError("Bad boolean value") -def integration_tests(deno_exe, test_filter=None): - assert os.path.isfile(deno_exe) - tests = sorted([ - filename for filename in os.listdir(tests_path) - if filename.endswith(".test") - ]) - assert len(tests) > 0 - for test_filename in tests: - if test_filter and test_filter not in test_filename: - continue +class TestIntegrations(DenoTestCase): + @classmethod + def _test(cls, test_filename): + # Return thunk to test for js file, + # This is to 'trick' unittest so as to generate these dynamically. + return lambda self: self.generate(test_filename) + def generate(self, test_filename): test_abs = os.path.join(tests_path, test_filename) test = read_test(test_abs) exit_code = int(test.get("exit_code", 0)) args = test.get("args", "").split(" ") - check_stderr = str2bool(test.get("check_stderr", "false")) - stderr = subprocess.STDOUT if check_stderr else open(os.devnull, 'w') - stdin_input = (test.get("input", "").strip().decode("string_escape").replace( "\r\n", "\n")) - has_stdin_input = len(stdin_input) > 0 output_abs = os.path.join(root_path, test.get("output", "")) with open(output_abs, 'r') as f: expected_out = f.read() - cmd = [deno_exe] + args - sys.stdout.write("tests/%s ... " % (test_filename)) - sys.stdout.flush() + cmd = [self.deno_exe] + args actual_code = 0 try: if has_stdin_input: @@ -97,23 +90,22 @@ def integration_tests(deno_exe, test_filter=None): actual_code = e.returncode actual_out = e.output - if exit_code != actual_code: - print "... " + red_failed() - print "Expected exit code %d but got %d" % (exit_code, actual_code) - print "Output:" - print actual_out - sys.exit(1) + self.assertEqual(exit_code, actual_code) actual_out = strip_ansi_codes(actual_out) + if not pattern_match(expected_out, actual_out): + # This will always throw since pattern_match failed. + self.assertEqual(expected_out, actual_out) - if pattern_match(expected_out, actual_out) != True: - print red_failed() - print "Expected output does not match actual." - print "Expected output: \n" + expected_out - print "Actual output: \n" + actual_out - sys.exit(1) - print green_ok() +# Add a methods for each test file in tests_path. +for fn in sorted( + filename for filename in os.listdir(tests_path) + if filename.endswith(".test")): + + t = TestIntegrations._test(fn) + tn = t.__name__ = "test_" + fn.split(".")[0] + setattr(TestIntegrations, tn, t) def main(): @@ -125,26 +117,26 @@ def main(): args = parser.parse_args() target = "release" if args.release else "debug" - - build_dir = None - if "DENO_BUILD_PATH" in os.environ: - build_dir = os.environ["DENO_BUILD_PATH"] - else: - build_dir = os.path.join(root_path, "target", target) + build_dir = os.environ.get("DENO_BUILD_PATH", + os.path.join(root_path, "target", target)) deno_dir = os.path.join(build_dir, ".deno_test") if os.path.isdir(deno_dir): rmtree(deno_dir) os.environ["DENO_DIR"] = deno_dir - deno_exe = os.path.join(build_dir, "deno" + executable_suffix) - if args.executable: - deno_exe = args.executable + test_names = [ + test_name for test_name in unittest.TestLoader().getTestCaseNames( + TestIntegrations) if not args.filter or args.filter in test_name + ] + suite = unittest.TestLoader().loadTestsFromNames( + test_names, module=TestIntegrations) - http_server.spawn() - - integration_tests(deno_exe, args.filter) + with spawn(): + result = ColorTextTestRunner(verbosity=2).run(suite) + if not result.wasSuccessful(): + sys.exit(1) if __name__ == "__main__": - sys.exit(main()) + main() diff --git a/tools/is_tty_test.py b/tools/is_tty_test.py index 6f7509a8..79267fdd 100755 --- a/tools/is_tty_test.py +++ b/tools/is_tty_test.py @@ -1,27 +1,23 @@ #!/usr/bin/env python # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import os -import pty -import select import subprocess -from util import build_path, executable_suffix +import unittest from sys import stdin -from permission_prompt_test import tty_capture - -IS_TTY_TEST_TS = "tests/is_tty.ts" +from util import DenoTestCase, test_main, tty_capture -def is_tty_test(deno_exe): - cmd = [deno_exe, "run", IS_TTY_TEST_TS] - code, stdout, _ = tty_capture(cmd, b'') - assert code == 0 - assert str(stdin.isatty()).lower() in stdout +IS_TTY_TEST_TS = "tests/is_tty.ts" -def main(): - deno_exe = os.path.join(build_path(), "deno" + executable_suffix) - is_tty_test(deno_exe) +@unittest.skipIf(os.name == 'nt', "Unable to test tty on Windows") +class TestIsTty(DenoTestCase): + def test_is_tty(self): + cmd = [self.deno_exe, "run", IS_TTY_TEST_TS] + code, stdout, _ = tty_capture(cmd, b'') + assert code == 0 + assert str(stdin.isatty()).lower() in stdout if __name__ == "__main__": - main() + test_main() diff --git a/tools/permission_prompt_test.py b/tools/permission_prompt_test.py index aef2bdf3..73547881 100755 --- a/tools/permission_prompt_test.py +++ b/tools/permission_prompt_test.py @@ -2,14 +2,12 @@ # -*- coding: utf-8 -*- # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import os -import pty -import select import subprocess import sys import time import unittest -from util import build_path, executable_suffix, green_ok, red_failed +from util import DenoTestCase, test_main, tty_capture PERMISSIONS_PROMPT_TEST_TS = "tools/permission_prompt_test.ts" @@ -18,47 +16,8 @@ FIRST_CHECK_FAILED_PATTERN = b'First check failed' PERMISSION_DENIED_PATTERN = b'PermissionDenied: permission denied' -# This function is copied from: -# https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e -# https://stackoverflow.com/q/52954248/1240268 -def tty_capture(cmd, bytes_input, timeout=5): - """Capture the output of cmd with bytes_input to stdin, - with stdin, stdout and stderr as TTYs.""" - mo, so = pty.openpty() # provide tty to enable line-buffering - me, se = pty.openpty() - mi, si = pty.openpty() - fdmap = {mo: 'stdout', me: 'stderr', mi: 'stdin'} - - timeout_exact = time.time() + timeout - p = subprocess.Popen( - cmd, bufsize=1, stdin=si, stdout=so, stderr=se, close_fds=True) - os.write(mi, bytes_input) - - select_timeout = .04 #seconds - res = {'stdout': b'', 'stderr': b''} - while True: - ready, _, _ = select.select([mo, me], [], [], select_timeout) - if ready: - for fd in ready: - data = os.read(fd, 512) - if not data: - break - res[fdmap[fd]] += data - elif p.poll() is not None or time.time( - ) > timeout_exact: # select timed-out - break # p exited - for fd in [si, so, se, mi, mo, me]: - os.close(fd) # can't do it sooner: it leads to errno.EIO error - p.wait() - return p.returncode, res['stdout'], res['stderr'] - - -class TestPrompt(unittest.TestCase): - def __init__(self, method_name, test_type, deno_exe): - super(TestPrompt, self).__init__(method_name) - self.test_type = test_type - self.deno_exe = deno_exe - +@unittest.skipIf(os.name == 'nt', "Unable to test tty on Windows") +class BasePromptTest(object): def _run_deno(self, flags, args, bytes_input): "Returns (return_code, stdout, stderr)." cmd = [self.deno_exe, "run"] + flags + [PERMISSIONS_PROMPT_TEST_TS @@ -159,27 +118,29 @@ class TestPrompt(unittest.TestCase): assert not PERMISSION_DENIED_PATTERN in stderr -def permission_prompt_test(deno_exe): - runner = unittest.TextTestRunner(verbosity=2) - loader = unittest.TestLoader() +class ReadPromptTest(DenoTestCase, BasePromptTest): + test_type = "read" + + +class WritePromptTest(DenoTestCase, BasePromptTest): + test_type = "write" + + +class EnvPromptTest(DenoTestCase, BasePromptTest): + test_type = "env" + + +class NetPromptTest(DenoTestCase, BasePromptTest): + test_type = "net" - test_types = ["read", "write", "env", "net", "run"] - for test_type in test_types: - print "Permissions prompt tests for \"{}\"".format(test_type) - test_names = loader.getTestCaseNames(TestPrompt) - suite = unittest.TestSuite() - for test_name in test_names: - suite.addTest(TestPrompt(test_name, test_type, deno_exe)) - result = runner.run(suite) - if not result.wasSuccessful(): - sys.exit(1) +class RunPromptTest(DenoTestCase, BasePromptTest): + test_type = "run" -def main(): - deno_exe = os.path.join(build_path(), "deno" + executable_suffix) - permission_prompt_test(deno_exe) +def permission_prompt_tests(): + return BasePromptTest.__subclasses__() if __name__ == "__main__": - main() + test_main() diff --git a/tools/repl_test.py b/tools/repl_test.py index 9ea1b82a..9cc34524 100644 --- a/tools/repl_test.py +++ b/tools/repl_test.py @@ -4,12 +4,12 @@ from subprocess import CalledProcessError, PIPE, Popen import sys import time -from util import build_path, executable_suffix, green_ok +from util import DenoTestCase, test_main -class Repl(object): - def __init__(self, deno_exe): - self.deno_exe = deno_exe +class TestRepl(DenoTestCase): + def __init__(self, *args, **kwargs): + super(TestRepl, self).__init__(*args, **kwargs) self._warm_up() def _warm_up(self): @@ -40,26 +40,17 @@ class Repl(object): # Ignore Windows CRLF (\r\n). return out.replace('\r\n', '\n'), err.replace('\r\n', '\n'), retcode - def run(self): - print('repl_test.py') - test_names = [name for name in dir(self) if name.startswith("test_")] - for t in test_names: - self.__getattribute__(t)() - sys.stdout.write(".") - sys.stdout.flush() - print(' {}\n'.format(green_ok())) - def test_console_log(self): out, err, code = self.input("console.log('hello')", "'world'") - assertEqual(out, 'hello\nundefined\nworld\n') - assertEqual(err, '') - assertEqual(code, 0) + self.assertEqual(out, 'hello\nundefined\nworld\n') + self.assertEqual(err, '') + self.assertEqual(code, 0) def test_exit_command(self): out, err, code = self.input("exit", "'ignored'", exit=False) - assertEqual(out, '') - assertEqual(err, '') - assertEqual(code, 0) + self.assertEqual(out, '') + self.assertEqual(err, '') + self.assertEqual(code, 0) def test_help_command(self): out, err, code = self.input("help") @@ -68,100 +59,86 @@ class Repl(object): "help Print this help message", "", ]) - assertEqual(out, expectedOut) - assertEqual(err, '') - assertEqual(code, 0) + self.assertEqual(out, expectedOut) + self.assertEqual(err, '') + self.assertEqual(code, 0) def test_function(self): out, err, code = self.input("Deno.writeFileSync") - assertEqual(out, '[Function: writeFileSync]\n') - assertEqual(err, '') - assertEqual(code, 0) + self.assertEqual(out, '[Function: writeFileSync]\n') + self.assertEqual(err, '') + self.assertEqual(code, 0) def test_multiline(self): out, err, code = self.input("(\n1 + 2\n)") - assertEqual(out, '3\n') - assertEqual(err, '') - assertEqual(code, 0) + self.assertEqual(out, '3\n') + self.assertEqual(err, '') + self.assertEqual(code, 0) # This should print error instead of wait for input def test_eval_unterminated(self): out, err, code = self.input("eval('{')") - assertEqual(out, '') + self.assertEqual(out, '') assert "Unexpected end of input" in err - assertEqual(code, 0) + self.assertEqual(code, 0) def test_reference_error(self): out, err, code = self.input("not_a_variable") - assertEqual(out, '') + self.assertEqual(out, '') assert "not_a_variable is not defined" in err - assertEqual(code, 0) + self.assertEqual(code, 0) # def test_set_timeout(self): # out, err, code = self.input( # "setTimeout(() => { console.log('b'); Deno.exit(0); }, 1)", # "'a'", # exit=False) - # assertEqual(out, '1\na\nb\n') - # assertEqual(err, '') - # assertEqual(code, 0) + # self.assertEqual(out, '1\na\nb\n') + # self.assertEqual(err, '') + # self.assertEqual(code, 0) # def test_set_timeout_interlaced(self): # out, err, code = self.input( # "setTimeout(() => console.log('a'), 1)", # "setTimeout(() => console.log('b'), 6)", # sleep=0.8) - # assertEqual(out, '1\n2\na\nb\n') - # assertEqual(err, '') - # assertEqual(code, 0) + # self.assertEqual(out, '1\n2\na\nb\n') + # self.assertEqual(err, '') + # self.assertEqual(code, 0) # def test_async_op(self): # out, err, code = self.input( # "fetch('http://localhost:4545/tests/001_hello.js')" + # ".then(res => res.text()).then(console.log)", # sleep=1) - # assertEqual(out, 'Promise {}\nconsole.log("Hello World");\n\n') - # assertEqual(err, '') - # assertEqual(code, 0) + # self.assertEqual(out, 'Promise {}\nconsole.log("Hello World");\n\n') + # self.assertEqual(err, '') + # self.assertEqual(code, 0) def test_syntax_error(self): out, err, code = self.input("syntax error") - assertEqual(out, '') + self.assertEqual(out, '') assert "Unexpected identifier" in err - assertEqual(code, 0) + self.assertEqual(code, 0) def test_type_error(self): out, err, code = self.input("console()") - assertEqual(out, '') + self.assertEqual(out, '') assert "console is not a function" in err - assertEqual(code, 0) + self.assertEqual(code, 0) def test_variable(self): out, err, code = self.input("var a = 123;", "a") - assertEqual(out, 'undefined\n123\n') - assertEqual(err, '') - assertEqual(code, 0) + self.assertEqual(out, 'undefined\n123\n') + self.assertEqual(err, '') + self.assertEqual(code, 0) def test_lexical_scoped_variable(self): out, err, code = self.input("let a = 123;", "a") - assertEqual(out, 'undefined\n123\n') - assertEqual(err, '') - assertEqual(code, 0) - - -def assertEqual(left, right): - if left != right: - raise AssertionError("{} != {}".format(repr(left), repr(right))) - - -def repl_tests(deno_exe): - Repl(deno_exe).run() - - -def main(): - deno_exe = os.path.join(build_path(), "deno" + executable_suffix) - repl_tests(deno_exe) + self.assertEqual(out, 'undefined\n123\n') + self.assertEqual(err, '') + self.assertEqual(code, 0) if __name__ == "__main__": - main() + test_main() diff --git a/tools/setup_test.py b/tools/setup_test.py index 5434b42a..e9d09de8 100644 --- a/tools/setup_test.py +++ b/tools/setup_test.py @@ -1,14 +1,15 @@ # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import os import sys -import unittest from setup import gn_string, read_gn_args, write_gn_args from shutil import rmtree from tempfile import mktemp +from util import DenoTestCase, test_main -class TestSetup(unittest.TestCase): + +class TestSetup(DenoTestCase): def test_gn_string(self): assert '"abc"' == gn_string('abc') assert '"foo\\$bar\\"baz"' == gn_string('foo$bar"baz') @@ -61,12 +62,5 @@ class TestSetup(unittest.TestCase): rmtree(d) -def setup_test(): - suite = unittest.TestLoader().loadTestsFromTestCase(TestSetup) - result = unittest.TextTestRunner(verbosity=2).run(suite) - if not result.wasSuccessful(): - sys.exit(1) - - if __name__ == '__main__': - setup_test() + test_main() diff --git a/tools/test.py b/tools/test.py index 4769b169..3a44748a 100755 --- a/tools/test.py +++ b/tools/test.py @@ -3,127 +3,122 @@ # Runs the full test suite. # Usage: ./tools/test.py out/Debug import os +import subprocess import sys import unittest -from integration_tests import integration_tests -from deno_dir_test import deno_dir_test -from util import build_path, enable_ansi_colors, executable_suffix, run, rmtree -from util import run_output, tests_path, green_ok -from unit_tests import unit_tests -from util_test import util_test -from setup_test import setup_test -from benchmark_test import benchmark_test -from repl_test import repl_tests -from fetch_test import fetch_test -from fmt_test import fmt_test -import subprocess -import http_server - - -def check_exists(filename): - if not os.path.exists(filename): - print "Required target doesn't exist:", filename - print "Run ./tools/build.py" - sys.exit(1) - - -def test_no_color(deno_exe): - sys.stdout.write("no_color test...") - sys.stdout.flush() - t = os.path.join(tests_path, "no_color.js") - output = run_output([deno_exe, "run", t], merge_env={"NO_COLOR": "1"}) - assert output.strip() == "noColor true" - t = os.path.join(tests_path, "no_color.js") - output = run_output([deno_exe, "run", t]) - assert output.strip() == "noColor false" - print green_ok() - - -def exec_path_test(deno_exe): - cmd = [deno_exe, "run", "tests/exec_path.ts"] - output = run_output(cmd) - assert deno_exe in output.strip() +from benchmark_test import TestBenchmark +from deno_dir_test import TestDenoDir +from fetch_test import FetchTest +from fmt_test import FmtTest +from integration_tests import TestIntegrations +from repl_test import TestRepl +from setup_test import TestSetup +from unit_tests import JsUnitTests +from util_test import TestUtil + +from is_tty_test import TestIsTty +# NOTE: These tests are skipped on Windows +from permission_prompt_test import permission_prompt_tests +from complex_permissions_test import complex_permissions_tests + +from http_server import spawn +from util import (DenoTestCase, ColorTextTestRunner, enable_ansi_colors, + executable_suffix, run, run_output, rmtree, tests_path, + test_args) + + +class TestTarget(DenoTestCase): + @staticmethod + def check_exists(filename): + if not os.path.exists(filename): + print "Required target doesn't exist:", filename + print "Run ./tools/build.py" + sys.exit(1) + + def test_executable_exists(self): + self.check_exists(self.deno_exe) + + def _test(self, executable): + "Test executable runs and exits with code 0." + bin_file = os.path.join(self.build_dir, executable + executable_suffix) + self.check_exists(bin_file) + run([bin_file]) + + def test_libdeno(self): + self._test("libdeno_test") + + def test_cli(self): + self._test("cli_test") + + def test_core(self): + self._test("deno_core_test") + + def test_core_http_benchmark(self): + self._test("deno_core_http_bench_test") + + def test_ts_library_builder(self): + run([ + "node", "./node_modules/.bin/ts-node", "--project", + "tools/ts_library_builder/tsconfig.json", + "tools/ts_library_builder/test.ts" + ]) + + def test_no_color(self): + t = os.path.join(tests_path, "no_color.js") + output = run_output([self.deno_exe, "run", t], + merge_env={"NO_COLOR": "1"}) + assert output.strip() == "noColor true" + t = os.path.join(tests_path, "no_color.js") + output = run_output([self.deno_exe, "run", t]) + assert output.strip() == "noColor false" + + def test_exec_path(self): + cmd = [self.deno_exe, "run", "tests/exec_path.ts"] + output = run_output(cmd) + assert self.deno_exe in output.strip() def main(argv): - if len(argv) == 2: - build_dir = sys.argv[1] - elif len(argv) == 1: - build_dir = build_path() - else: - print "Usage: tools/test.py [build_dir]" - sys.exit(1) - - deno_dir = os.path.join(build_dir, ".deno_test") + args = test_args(argv) + + deno_dir = os.path.join(args.build_dir, ".deno_test") if os.path.isdir(deno_dir): rmtree(deno_dir) os.environ["DENO_DIR"] = deno_dir enable_ansi_colors() - http_server.spawn() - - deno_exe = os.path.join(build_dir, "deno" + executable_suffix) - check_exists(deno_exe) - - # Python/build tools testing - setup_test() - util_test() - - run([ - "node", "./node_modules/.bin/ts-node", "--project", - "tools/ts_library_builder/tsconfig.json", - "tools/ts_library_builder/test.ts" - ]) - - libdeno_test = os.path.join(build_dir, "libdeno_test" + executable_suffix) - check_exists(libdeno_test) - run([libdeno_test]) - - cli_test = os.path.join(build_dir, "cli_test" + executable_suffix) - check_exists(cli_test) - run([cli_test]) - - deno_core_test = os.path.join(build_dir, - "deno_core_test" + executable_suffix) - check_exists(deno_core_test) - run([deno_core_test]) - - deno_core_http_bench_test = os.path.join( - build_dir, "deno_core_http_bench_test" + executable_suffix) - check_exists(deno_core_http_bench_test) - run([deno_core_http_bench_test]) - - unit_tests(deno_exe) - - fetch_test(deno_exe) - fmt_test(deno_exe) - - integration_tests(deno_exe) - - # TODO We currently skip testing the prompt and IsTTY in Windows completely. - # Windows does not support the pty module used for testing the permission - # prompt. - if os.name != 'nt': - from is_tty_test import is_tty_test - from permission_prompt_test import permission_prompt_test - from complex_permissions_test import complex_permissions_test - permission_prompt_test(deno_exe) - complex_permissions_test(deno_exe) - is_tty_test(deno_exe) - - repl_tests(deno_exe) - - rmtree(deno_dir) - - deno_dir_test(deno_exe, deno_dir) - - test_no_color(deno_exe) - - benchmark_test(build_dir, deno_exe) - exec_path_test(deno_exe) + with spawn(): + test_cases = [ + TestSetup, + TestUtil, + TestTarget, + JsUnitTests, + FetchTest, + FmtTest, + TestIntegrations, + TestRepl, + TestDenoDir, + TestBenchmark, + ] + # These tests are skipped, but to make the test output less noisy + # we'll avoid triggering them. + if os.name != 'nt': + test_cases.append(TestIsTty) + test_cases += permission_prompt_tests() + test_cases += complex_permissions_tests() + + suite = unittest.TestSuite([ + unittest.TestLoader().loadTestsFromTestCase(tc) + for tc in test_cases + ]) + + result = ColorTextTestRunner( + verbosity=args.verbosity + 1, failfast=args.failfast).run(suite) + if not result.wasSuccessful(): + sys.exit(1) if __name__ == '__main__': - sys.exit(main(sys.argv)) + main(sys.argv[1:]) diff --git a/tools/unit_tests.py b/tools/unit_tests.py index 10f6a4a4..439ea325 100755 --- a/tools/unit_tests.py +++ b/tools/unit_tests.py @@ -2,26 +2,27 @@ # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import sys import subprocess -import http_server +from http_server import spawn +from util import DenoTestCase, test_main -def unit_tests(deno_exe): - cmd = [ - deno_exe, "run", "--reload", "--allow-run", "js/unit_test_runner.ts" - ] - process = subprocess.Popen( - cmd, bufsize=1, universal_newlines=True, stderr=subprocess.STDOUT) - process.wait() - errcode = process.returncode - if errcode != 0: - sys.exit(errcode) +class JsUnitTests(DenoTestCase): + def test_unit_test_runner(self): + cmd = [ + self.deno_exe, "run", "--reload", "--allow-run", + "js/unit_test_runner.ts" + ] + process = subprocess.Popen( + cmd, bufsize=1, universal_newlines=True, stderr=subprocess.STDOUT) + process.wait() + errcode = process.returncode + if errcode != 0: + raise AssertionError( + "js/unit_test_runner.ts exited with exit code %s" % errcode) -if __name__ == '__main__': - if len(sys.argv) < 2: - print "Usage ./tools/unit_tests.py target/debug/deno" - sys.exit(1) - http_server.spawn() - unit_tests(sys.argv[1]) +if __name__ == '__main__': + with spawn(): + test_main() diff --git a/tools/util.py b/tools/util.py index 007e21ba..c6f8a4c8 100644 --- a/tools/util.py +++ b/tools/util.py @@ -1,12 +1,17 @@ # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import argparse import os import re import shutil +import select import stat import sys import subprocess import tempfile +import time +import unittest +# FIXME support nocolor (use "" if passed?) RESET = "\x1b[0m" FG_RED = "\x1b[31m" FG_GREEN = "\x1b[32m" @@ -85,14 +90,6 @@ def shell_quote(arg): return quote(arg) -def red_failed(): - return "%sFAILED%s" % (FG_RED, RESET) - - -def green_ok(): - return "%sok%s" % (FG_GREEN, RESET) - - def symlink(target, name, target_is_dir=False): if os.name == "nt": from ctypes import WinDLL, WinError, GetLastError @@ -176,6 +173,8 @@ def rmtree(directory): def build_mode(default="debug"): if "DENO_BUILD_MODE" in os.environ: return os.environ["DENO_BUILD_MODE"] + elif "--release" in sys.argv: + return "release" else: return default @@ -191,8 +190,6 @@ def build_path(): # Returns True if the expected matches the actual output, allowing variation # from actual where expected has the wildcard (e.g. matches /.*/) def pattern_match(pattern, string, wildcard="[WILDCARD]"): - if len(pattern) == 0: - return string == 0 if pattern == wildcard: return True @@ -374,3 +371,122 @@ def mkdtemp(): # 'TS5009: Cannot find the common subdirectory path for the input files.' temp_dir = os.environ["TEMP"] if os.name == 'nt' else None return tempfile.mkdtemp(dir=temp_dir) + + +class DenoTestCase(unittest.TestCase): + @property + def build_dir(self): + args = test_args() + return args.build_dir + + @property + def deno_exe(self): + return os.path.join(self.build_dir, "deno" + executable_suffix) + + +# overload the test result class +class ColorTextTestResult(unittest.TextTestResult): + def getDescription(self, test): + name = str(test) + if name.startswith("test_"): + name = name[5:] + return name + + def addSuccess(self, test): + if self.showAll: + self.stream.write(FG_GREEN) + super(ColorTextTestResult, self).addSuccess(test) + if self.showAll: + self.stream.write(RESET) + + def addError(self, test, err): + if self.showAll: + self.stream.write(FG_RED) + super(ColorTextTestResult, self).addError(test, err) + if self.showAll: + self.stream.write(RESET) + + def addFailure(self, test, err): + if self.showAll: + self.stream.write(FG_RED) + super(ColorTextTestResult, self).addFailure(test, err) + if self.showAll: + self.stream.write(RESET) + + +class ColorTextTestRunner(unittest.TextTestRunner): + resultclass = ColorTextTestResult + + +def test_main(): + args = test_args() + # FIXME(hayd) support more of the unittest.main API. + return unittest.main( + verbosity=args.verbosity + 1, + testRunner=ColorTextTestRunner, + failfast=args.failfast, + argv=['']) + + +def test_args(argv=None): + if argv is None: + argv = sys.argv[1:] + parser = argparse.ArgumentParser() + parser.add_argument( + '--failfast', '-f', action='store_true', help='Stop on first failure') + parser.add_argument( + '--verbosity', '-v', action='store_true', help='Verbose output') + parser.add_argument( + '--release', + action='store_true', + help='Test against release deno_executable') + parser.add_argument('build_dir', nargs='?', help='Deno build directory') + args = parser.parse_args(argv) + if args.build_dir and args.release: + raise argparse.ArgumentError( + None, "build_dir is inferred from --release, cannot provide both") + if not args.build_dir: + args.build_dir = build_path() + + if not os.path.isfile( + os.path.join(args.build_dir, "deno" + executable_suffix)): + raise argparse.ArgumentError(None, + "deno executable not found in build_dir") + return args + + +# This function is copied from: +# https://gist.github.com/hayd/4f46a68fc697ba8888a7b517a414583e +# https://stackoverflow.com/q/52954248/1240268 +def tty_capture(cmd, bytes_input, timeout=5): + """Capture the output of cmd with bytes_input to stdin, + with stdin, stdout and stderr as TTYs.""" + # pty is not available on windows, so we import it within this function. + import pty + mo, so = pty.openpty() # provide tty to enable line-buffering + me, se = pty.openpty() + mi, si = pty.openpty() + fdmap = {mo: 'stdout', me: 'stderr', mi: 'stdin'} + + timeout_exact = time.time() + timeout + p = subprocess.Popen( + cmd, bufsize=1, stdin=si, stdout=so, stderr=se, close_fds=True) + os.write(mi, bytes_input) + + select_timeout = .04 #seconds + res = {'stdout': b'', 'stderr': b''} + while True: + ready, _, _ = select.select([mo, me], [], [], select_timeout) + if ready: + for fd in ready: + data = os.read(fd, 512) + if not data: + break + res[fdmap[fd]] += data + elif p.poll() is not None or time.time( + ) > timeout_exact: # select timed-out + break # p exited + for fd in [si, so, se, mi, mo, me]: + os.close(fd) # can't do it sooner: it leads to errno.EIO error + p.wait() + return p.returncode, res['stdout'], res['stderr'] diff --git a/tools/util_test.py b/tools/util_test.py index 7d189797..5db50e42 100755 --- a/tools/util_test.py +++ b/tools/util_test.py @@ -1,18 +1,19 @@ # Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import os import sys -import unittest from util import ( + DenoTestCase, pattern_match, parse_exit_code, shell_quote_win, parse_wrk_output, root_path, + test_main, ) -import os -class TestUtil(unittest.TestCase): +class TestUtil(DenoTestCase): def test_pattern_match(self): # yapf: disable fixtures = [("foobarbaz", "foobarbaz", True), @@ -68,12 +69,5 @@ class TestUtil(unittest.TestCase): assert stats3['max_latency'] == 1630.0 -def util_test(): - suite = unittest.TestLoader().loadTestsFromTestCase(TestUtil) - result = unittest.TextTestRunner(verbosity=2).run(suite) - if not result.wasSuccessful(): - sys.exit(1) - - if __name__ == '__main__': - util_test() + test_main() -- GitLab