diff --git a/avocado/core/test.py b/avocado/core/test.py index 3fb0e744b84269ea0fb25ac4676a6902dd5cf823..fe802db984afe668018864a9891d2eb2ba86e01d 100644 --- a/avocado/core/test.py +++ b/avocado/core/test.py @@ -1120,6 +1120,10 @@ class SimpleTest(Test): self._command = None if self.filename is not None: self._command = pipes.quote(self.filename) + # process.run expects unicode as the command, but pipes.quote + # turns it into a "bytes" array in Python 2 + if not astring.is_text(self._command): + self._command = astring.to_text(self._command, defaults.ENCODING) @property def filename(self): @@ -1145,7 +1149,6 @@ class SimpleTest(Test): test_params = dict([(str(key), str(val)) for _, key, val in self.params.iteritems()]) - # process.run uses shlex.split(), the self.path needs to be escaped result = process.run(self._command, verbose=True, env=test_params, encoding=defaults.ENCODING) diff --git a/avocado/utils/astring.py b/avocado/utils/astring.py index e4b1e75f8352970d026ffb294a983445da135004..17f0de567b5b9b3ed88e64bc592cd48377a33ac1 100644 --- a/avocado/utils/astring.py +++ b/avocado/utils/astring.py @@ -27,6 +27,7 @@ And not notice until their code starts failing. import itertools import re +import sys import string from six import string_types, PY3 @@ -278,3 +279,48 @@ def string_to_safe_path(input_str): for bad_chr in FS_UNSAFE_CHARS: input_str = input_str.replace(bad_chr, "_") return input_str + + +def is_bytes(data): + """ + Checks if the data given is a sequence of bytes + + And not a "text" type, that can be of multi-byte characters. + Also, this does NOT mean a bytearray type. + + :param data: the instance to be checked if it falls under the definition + of an array of bytes. + """ + return isinstance(data, bytes) + + +def is_text(data): + """ + Checks if the data given is a suitable for holding text + + That is, if it can hold text that requires more than one byte for + each character. + """ + if sys.version_info[0] < 3: + return isinstance(data, unicode) + return isinstance(data, str) + + +def to_text(data, encoding=None): + """ + Convert data to text + + Action is only taken if data is "bytes", in which case it's + decoded into the given encoding and should produce a type that + passes the is_text() check. + + :param data: data to be transformed into text + :type data: either bytes or other data that will be returned + unchanged + """ + if is_bytes(data): + if encoding is None: + return data.decode() + else: + return data.decode(encoding) + return data diff --git a/avocado/utils/process.py b/avocado/utils/process.py index 23a8ac2656c87aee3656c7a9b9a1a4f8d302caaa..06e7343d5e9a89407b20feb3de72c62f51d76e8f 100644 --- a/avocado/utils/process.py +++ b/avocado/utils/process.py @@ -32,8 +32,9 @@ import threading import time from io import BytesIO, UnsupportedOperation -from six import PY2, string_types +from six import string_types +from . import astring from . import gdb from . import runtime from . import path @@ -81,52 +82,13 @@ class CmdError(Exception): self.result = result self.additional_text = additional_text - def __str__(self): - if self.result is not None: - if self.result.interrupted: - msg = "Command '%s' interrupted by %s" - msg %= (self.command, self.result.interrupted) - elif self.result.exit_status is None: - msg = "Command '%s' failed and is not responding to signals" - msg %= self.command - else: - msg = "Command '%s' failed (rc=%d)" - msg %= (self.command, self.result.exit_status) - if self.additional_text: - msg += ", " + self.additional_text - return msg - else: - return "CmdError" - - -def normalize_cmd(cmd, encoding=None): - """ - Normalize a command to be safe for :func:`shlex.split` - - :param cmd: the command line to be passed to :func:`shlex.split` - :type cmd: str or bytes - :param encoding: the encoding to use for encode/decode operations - :type encoding: str - """ - if encoding is None: - encoding = sys.getdefaultencoding() - if PY2: - if not isinstance(cmd, str): - cmd = cmd.encode(encoding) - else: - if isinstance(cmd, bytes): - cmd = cmd.decode(encoding) - return cmd - -def can_sudo(cmd=None, encoding=None): +def can_sudo(cmd=None): """ Check whether sudo is available (or running as root) + + :param cmd: unicode string with the commands """ - if cmd is not None: - if encoding is None: - encoding = sys.getdefaultencoding() - cmd = normalize_cmd(cmd, encoding) if os.getuid() == 0: # Root return True @@ -257,30 +219,43 @@ def get_children_pids(ppid, recursive=False): return children -def binary_from_shell_cmd(cmd, encoding=None): +def binary_from_shell_cmd(cmd): """ Tries to find the first binary path from a simple shell-like command. :note: It's a naive implementation, but for commands like: `VAR=VAL binary -args || true` gives the right result (binary) :param cmd: simple shell-like binary + :type cmd: unicode string :return: first found binary from the cmd """ - if encoding is None: - encoding = sys.getdefaultencoding() - cmd = normalize_cmd(cmd, encoding) - try: - cmds = shlex.split(cmd) - except ValueError: - log.warning("binary_from_shell_cmd: Shlex split of %s failed, using " - "using simple split.", cmd) - cmds = cmd.split(" ") + cmds = cmd_split(cmd) for item in cmds: if not _RE_BASH_SET_VARIABLE.match(item): return item raise ValueError("Unable to parse first binary from '%s'" % cmd) +def cmd_split(cmd): + """ + Splits a command line into individual components + + This is a simple wrapper around :func:`shlex.split`, which has the + requirement of having text (not bytes) as its argument on Python 3, + but bytes on Python 2. + + :param cmd: text (a multi byte string) encoded as 'utf-8' + """ + if sys.version_info[0] < 3: + data = cmd.encode('utf-8') + result = shlex.split(data) + result = [i.decode('utf-8') for i in result] + else: + data = astring.to_text(cmd, 'utf-8') + result = shlex.split(data) + return result + + class CmdResult(object): """ @@ -336,19 +311,6 @@ class CmdResult(object): return self.stderr raise TypeError("Unable to decode stderr into a string-like type") - def __repr__(self): - cmd_rep = ("Command: %s\n" - "Exit status: %s\n" - "Duration: %s\n" - "Stdout:\n%s\n" - "Stderr:\n%s\n" - "PID:\n%s\n" % (self.command, self.exit_status, - self.duration, self.stdout, self.stderr, - self.pid)) - if self.interrupted: - cmd_rep += "Command interrupted by %s\n" % self.interrupted - return cmd_rep - class FDDrainer(object): @@ -418,18 +380,20 @@ class FDDrainer(object): bfr += tmp if tmp.endswith(b'\n'): for line in bfr.splitlines(): + line = astring.to_text(line, self._result.encoding) if self._logger is not None: self._logger.debug(self._logger_prefix, line) if self._stream_logger is not None: - self._stream_logger.debug('%s\n', line) + self._stream_logger.debug(line) bfr = b'' # Write the rest of the bfr unfinished by \n if self._verbose and bfr: for line in bfr.splitlines(): + line = astring.to_text(line, self._result.encoding) if self._logger is not None: self._logger.debug(self._logger_prefix, line) if self._stream_logger is not None: - self._stream_logger.debug(line) + self._stream_logger.debug(astring.to_text(line)) def start(self): self._thread = threading.Thread(target=self._drainer, name=self.name) @@ -517,7 +481,6 @@ class SubProcess(object): """ if encoding is None: encoding = sys.getdefaultencoding() - cmd = normalize_cmd(cmd, encoding) if sudo: self.cmd = self._prepend_sudo(cmd, shell) else: @@ -590,7 +553,7 @@ class SubProcess(object): if self.verbose: log.info("Running '%s'", self.cmd) if self.shell is False: - cmd = shlex.split(self.cmd) + cmd = cmd_split(self.cmd) else: cmd = self.cmd try: @@ -609,12 +572,18 @@ class SubProcess(object): self.start_time = time.time() + # The Thread to be started by the FDDrainer cannot have a name + # from a non-ascii string (this is a Python 2 internal limitation). + # To keep some relation between the command name and the Thread + # this resorts to attempting the conversion to ascii, replacing + # characters it can not convert + cmd_name = self.cmd.encode('ascii', 'replace') # prepare fd drainers if self.allow_output_check == 'combined': self._combined_drainer = FDDrainer( self._popen.stdout.fileno(), self.result, - name="%s-combined" % self.cmd, + name="%s-combined" % cmd_name, logger=log, logger_prefix="[output] %s", # FIXME, in fact, a new log has to be used here @@ -633,7 +602,7 @@ class SubProcess(object): self._stdout_drainer = FDDrainer( self._popen.stdout.fileno(), self.result, - name="%s-stdout" % self.cmd, + name="%s-stdout" % cmd_name, logger=log, logger_prefix="[stdout] %s", stream_logger=stdout_stream_logger, @@ -642,7 +611,7 @@ class SubProcess(object): self._stderr_drainer = FDDrainer( self._popen.stderr.fileno(), self.result, - name="%s-stderr" % self.cmd, + name="%s-stderr" % cmd_name, logger=log, logger_prefix="[stderr] %s", stream_logger=stderr_stream_logger, @@ -896,10 +865,9 @@ class GDBSubProcess(object): """ if encoding is None: encoding = sys.getdefaultencoding() - cmd = normalize_cmd(cmd, encoding) self.cmd = cmd - self.args = shlex.split(cmd) + self.args = cmd_split(cmd) self.binary = self.args[0] self.binary_path = os.path.abspath(self.cmd) self.result = CmdResult(cmd, encoding=encoding) @@ -1184,7 +1152,7 @@ def should_run_inside_gdb(cmd): return False try: - args = shlex.split(cmd) + args = cmd_split(cmd) except ValueError: log.warning("Unable to check whether command '%s' should run inside " "GDB, fallback to simplified method...", cmd) @@ -1206,7 +1174,7 @@ def should_run_inside_wrapper(cmd): """ global CURRENT_WRAPPER CURRENT_WRAPPER = None - args = shlex.split(cmd) + args = cmd_split(cmd) cmd_binary_name = args[0] for script, cmd_expr in WRAP_PROCESS_NAMES_EXPR: @@ -1222,7 +1190,7 @@ def should_run_inside_wrapper(cmd): return True -def get_sub_process_klass(cmd, encoding=None): +def get_sub_process_klass(cmd): """ Which sub process implementation should be used @@ -1230,9 +1198,6 @@ def get_sub_process_klass(cmd, encoding=None): :param cmd: the command arguments, from where we extract the binary name """ - if encoding is None: - encoding = sys.getdefaultencoding() - cmd = normalize_cmd(cmd, encoding) if should_run_inside_gdb(cmd): return GDBSubProcess elif should_run_inside_wrapper(cmd): @@ -1299,7 +1264,7 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False, """ if encoding is None: encoding = sys.getdefaultencoding() - klass = get_sub_process_klass(cmd, encoding) + klass = get_sub_process_klass(cmd) sp = klass(cmd=cmd, verbose=verbose, allow_output_check=allow_output_check, shell=shell, env=env, sudo=sudo, ignore_bg_processes=ignore_bg_processes, diff --git a/selftests/functional/test_basic.py b/selftests/functional/test_basic.py index 6118bb5ec597ee629876c7fedf9b1565eaec05c7..50b06fae40d402521db0a5c84b651800bf193b25 100644 --- a/selftests/functional/test_basic.py +++ b/selftests/functional/test_basic.py @@ -1,4 +1,3 @@ -# This Python file uses the following encoding: utf-8 import aexpect import glob import json @@ -139,7 +138,9 @@ CC_BINARY = probe_binary('cc') GNU_ECHO_BINARY = probe_binary('echo') if GNU_ECHO_BINARY is not None: if probe_binary('man') is not None: - echo_manpage = process.run('man %s' % os.path.basename(GNU_ECHO_BINARY)).stdout + echo_cmd = 'man %s' % os.path.basename(GNU_ECHO_BINARY) + echo_manpage = process.run(echo_cmd, env={'LANG': 'C'}, + encoding='ascii').stdout if b'-e' not in echo_manpage: GNU_ECHO_BINARY = probe_binary('gecho') READ_BINARY = probe_binary('read') @@ -606,8 +607,6 @@ class RunnerHumanOutputTest(unittest.TestCase): b'INTERRUPT 0 | CANCEL 1', result.stdout) - @unittest.skipIf(sys.version_info[0] == 3, - "Test currently broken on Python 3") @unittest.skipIf(not GNU_ECHO_BINARY, 'GNU style echo binary not available') def test_ugly_echo_cmd(self): @@ -653,7 +652,7 @@ class RunnerSimpleTest(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__) self.pass_script = script.TemporaryScript( - 'ʊʋʉʈɑ ʅʛʌ', + u'\u00e1 \u00e9 \u00ed \u00f3 \u00fa', "#!/bin/sh\ntrue", 'avocado_simpletest_functional') self.pass_script.save() diff --git a/selftests/functional/test_loader.py b/selftests/functional/test_loader.py index 182cb9b3d8f97cc96020966d66d88e1c84101323..770d08e51c9e2c6fa7ad242a49017e229be4342c 100644 --- a/selftests/functional/test_loader.py +++ b/selftests/functional/test_loader.py @@ -329,8 +329,6 @@ class LoaderTestFunctional(unittest.TestCase): self.assertEqual(test, 11, "Number of tests is not 12 (%s):\n%s" % (test, result)) - @unittest.skipIf(sys.version_info[0] == 3, - "Test currently broken on Python 3") def test_python_unittest(self): test_path = os.path.join(basedir, "selftests", ".data", "unittests.py") cmd = ("%s run --sysinfo=off --job-results-dir %s --json - -- %s" diff --git a/selftests/functional/test_output.py b/selftests/functional/test_output.py index 554da3d7cf8ed56248a5d17f275679cdcd902154..016703a0ea6f157ce64ad96e996f02ce7a26dfe9 100644 --- a/selftests/functional/test_output.py +++ b/selftests/functional/test_output.py @@ -2,7 +2,6 @@ import json import os import re import shutil -import sys import tempfile import unittest from xml.dom import minidom @@ -163,8 +162,6 @@ class OutputTest(unittest.TestCase): "Libc double free can be seen in avocado " "doublefree output:\n%s" % output) - @unittest.skipIf(sys.version_info[0] == 3, - "Test currently broken on Python 3") def test_print_to_std(self): def _check_output(path, exps, name): i = 0 @@ -249,8 +246,6 @@ class OutputTest(unittest.TestCase): with open(output_file_path, 'r') as output: self.assertEqual(output.read(), '') - @unittest.skipIf(sys.version_info[0] == 3, - "Test currently broken on Python 3") def test_check_on_off(self): """ Checks that output will always be kept, but it will only make into diff --git a/selftests/functional/test_output_check.py b/selftests/functional/test_output_check.py index 987cd9f43e917a78f476e5402f19a80126777891..fda9cfff04b1becd06db6c9edd26bfd049b472a6 100644 --- a/selftests/functional/test_output_check.py +++ b/selftests/functional/test_output_check.py @@ -1,7 +1,6 @@ import json import os import shutil -import sys import tempfile import unittest @@ -139,8 +138,6 @@ class RunnerSimpleTest(unittest.TestCase): (expected_rc, result)) self.assertIn(tampered_msg, result.stdout) - @unittest.skipIf(sys.version_info[0] == 3, - "Test currently broken on Python 3") def test_output_diff(self): self._check_output_record_all() tampered_msg_stdout = b"I PITY THE FOOL THAT STANDS ON STDOUT!" diff --git a/selftests/unit/test_astring.py b/selftests/unit/test_astring.py index 7c225a21accfe0ba556a26f1a97b361a3700ca51..1de818e9c3a60a6003c8d3b962950c7c81ee3559 100644 --- a/selftests/unit/test_astring.py +++ b/selftests/unit/test_astring.py @@ -77,6 +77,52 @@ class AstringTest(unittest.TestCase): self.assertEqual(astring.string_to_safe_path(avocado), "%s__" % avocado[:-2]) + def test_is_bytes(self): + """ + Verifies what bytes means, basically that they are the same + thing accross Python 2 and 3 and can be decoded into "text" + """ + binary = b'' + text = u'' + self.assertTrue(astring.is_bytes(binary)) + self.assertFalse(astring.is_bytes(text)) + self.assertTrue(hasattr(binary, 'decode')) + self.assertTrue(astring.is_text(binary.decode())) + # on Python 2, each str member is also a single byte char + if sys.version_info[0] < 3: + self.assertTrue(astring.is_bytes(str(''))) + else: + self.assertFalse(astring.is_bytes(str(''))) + + def test_is_text(self): + """ + Verifies what text means, basically that they can represent + extended set of characters and can be encoded into "bytes" + """ + binary = b'' + text = u'' + self.assertTrue(astring.is_text(text)) + self.assertFalse(astring.is_text(binary)) + self.assertTrue(hasattr(text, 'encode')) + self.assertTrue(astring.is_bytes(text.encode())) + + def test_to_text_is_text(self): + self.assertTrue(astring.is_text(astring.to_text(b''))) + self.assertTrue(astring.is_text(astring.to_text(''))) + self.assertTrue(astring.is_text(astring.to_text(u''))) + + def test_to_text_decode_is_text(self): + self.assertTrue(astring.is_text(astring.to_text(b'', 'ascii'))) + self.assertTrue(astring.is_text(astring.to_text('', 'ascii'))) + self.assertTrue(astring.is_text(astring.to_text(u'', 'ascii'))) + + def test_to_text_decode_utf_8(self): + text_1 = astring.to_text(b'\xc3\xa1', 'utf-8') + text_2 = astring.to_text(u'\u00e1', 'utf-8') + self.assertTrue(astring.is_text(text_1)) + self.assertTrue(astring.is_text(text_1)) + self.assertEqual(text_1, text_2) + if __name__ == '__main__': unittest.main() diff --git a/selftests/unit/test_utils_process.py b/selftests/unit/test_utils_process.py index 7059529a499955f29e93bc9cb830b3ef38fd387c..c79ab7fc9f8e044918e24335570989bf00b77224 100644 --- a/selftests/unit/test_utils_process.py +++ b/selftests/unit/test_utils_process.py @@ -1,6 +1,7 @@ import io import logging import os +import shlex import unittest try: @@ -9,6 +10,7 @@ except ImportError: import mock +from avocado.utils import astring from avocado.utils import gdb from avocado.utils import process from avocado.utils import path @@ -23,15 +25,15 @@ def probe_binary(binary): return None -TRUE_CMD = probe_binary('true') ECHO_CMD = probe_binary('echo') +FICTIONAL_CMD = '/usr/bin/fictional_cmd' class TestSubProcess(unittest.TestCase): def test_allow_output_check_parameter(self): self.assertRaises(ValueError, process.SubProcess, - TRUE_CMD, False, "invalid") + FICTIONAL_CMD, False, "invalid") class TestGDBProcess(unittest.TestCase): @@ -65,11 +67,9 @@ class TestGDBProcess(unittest.TestCase): self.assertFalse(process.should_run_inside_gdb("foo bar baz")) self.assertFalse(process.should_run_inside_gdb("foo ' ")) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') def test_get_sub_process_klass(self): gdb.GDB_RUN_BINARY_NAMES_EXPR = [] - self.assertIs(process.get_sub_process_klass(TRUE_CMD), + self.assertIs(process.get_sub_process_klass(FICTIONAL_CMD), process.SubProcess) gdb.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false') @@ -77,7 +77,7 @@ class TestGDBProcess(unittest.TestCase): process.GDBSubProcess) self.assertIs(process.get_sub_process_klass('false'), process.GDBSubProcess) - self.assertIs(process.get_sub_process_klass('true'), + self.assertIs(process.get_sub_process_klass(FICTIONAL_CMD), process.SubProcess) def test_split_gdb_expr(self): @@ -102,10 +102,6 @@ def mock_fail_find_cmd(cmd, default=None): class TestProcessRun(unittest.TestCase): - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000)) def test_subprocess_nosudo(self): @@ -113,25 +109,20 @@ class TestProcessRun(unittest.TestCase): p = process.SubProcess(cmd='ls -l') self.assertEqual(p.cmd, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=0)) def test_subprocess_nosudo_uid_0(self): expected_command = 'ls -l' p = process.SubProcess(cmd='ls -l') self.assertEqual(p.cmd, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) + mock.Mock(return_value='/bin/sudo')) @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000)) def test_subprocess_sudo(self): - expected_command = '%s -n ls -l' % TRUE_CMD + expected_command = '/bin/sudo -n ls -l' p = process.SubProcess(cmd='ls -l', sudo=True) + path.find_command.assert_called_once_with('sudo') self.assertEqual(p.cmd, expected_command) @mock.patch.object(path, 'find_command', mock_fail_find_cmd) @@ -141,24 +132,19 @@ class TestProcessRun(unittest.TestCase): p = process.SubProcess(cmd='ls -l', sudo=True) self.assertEqual(p.cmd, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=0)) def test_subprocess_sudo_uid_0(self): expected_command = 'ls -l' p = process.SubProcess(cmd='ls -l', sudo=True) self.assertEqual(p.cmd, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) + mock.Mock(return_value='/bin/sudo')) @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000)) def test_subprocess_sudo_shell(self): - expected_command = '%s -n -s ls -l' % TRUE_CMD + expected_command = '/bin/sudo -n -s ls -l' p = process.SubProcess(cmd='ls -l', sudo=True, shell=True) + path.find_command.assert_called_once_with('sudo') self.assertEqual(p.cmd, expected_command) @mock.patch.object(path, 'find_command', mock_fail_find_cmd) @@ -168,44 +154,31 @@ class TestProcessRun(unittest.TestCase): p = process.SubProcess(cmd='ls -l', sudo=True, shell=True) self.assertEqual(p.cmd, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=0)) def test_subprocess_sudo_shell_uid_0(self): expected_command = 'ls -l' p = process.SubProcess(cmd='ls -l', sudo=True, shell=True) self.assertEqual(p.cmd, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000)) def test_run_nosudo(self): expected_command = 'ls -l' p = process.run(cmd='ls -l', ignore_status=True) self.assertEqual(p.command, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=0)) def test_run_nosudo_uid_0(self): expected_command = 'ls -l' p = process.run(cmd='ls -l', ignore_status=True) self.assertEqual(p.command, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) + mock.Mock(return_value='/bin/sudo')) @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000)) def test_run_sudo(self): - expected_command = '%s -n ls -l' % TRUE_CMD + expected_command = '/bin/sudo -n ls -l' p = process.run(cmd='ls -l', sudo=True, ignore_status=True) + path.find_command.assert_called_once_with('sudo') self.assertEqual(p.command, expected_command) @mock.patch.object(path, 'find_command', mock_fail_find_cmd) @@ -215,24 +188,19 @@ class TestProcessRun(unittest.TestCase): p = process.run(cmd='ls -l', sudo=True, ignore_status=True) self.assertEqual(p.command, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=0)) def test_run_sudo_uid_0(self): expected_command = 'ls -l' p = process.run(cmd='ls -l', sudo=True, ignore_status=True) self.assertEqual(p.command, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) + mock.Mock(return_value='/bin/sudo')) @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000)) def test_run_sudo_shell(self): - expected_command = '%s -n -s ls -l' % TRUE_CMD + expected_command = '/bin/sudo -n -s ls -l' p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True) + path.find_command.assert_called_once_with('sudo') self.assertEqual(p.command, expected_command) @mock.patch.object(path, 'find_command', mock_fail_find_cmd) @@ -242,10 +210,6 @@ class TestProcessRun(unittest.TestCase): p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True) self.assertEqual(p.command, expected_command) - @unittest.skipUnless(TRUE_CMD, - '"true" binary not available') - @mock.patch.object(path, 'find_command', - mock.Mock(return_value=TRUE_CMD)) @mock.patch.object(os, 'getuid', mock.Mock(return_value=0)) def test_run_sudo_shell_uid_0(self): expected_command = 'ls -l' @@ -265,7 +229,8 @@ class TestProcessRun(unittest.TestCase): encoded_text = b'Avok\xc3\xa1do' self.assertEqual(text.encode('utf-8'), encoded_text) self.assertEqual(encoded_text.decode('utf-8'), text) - result = process.run("%s -n %s" % (ECHO_CMD, text), encoding='utf-8') + cmd = u"%s -n %s" % (ECHO_CMD, text) + result = process.run(cmd, encoding='utf-8') self.assertEqual(result.stdout, encoded_text) self.assertEqual(result.stdout_text, text) @@ -285,6 +250,32 @@ class MiscProcessTests(unittest.TestCase): res = process.binary_from_shell_cmd("FOO=bar ./bin var=value") self.assertEqual("./bin", res) + def test_cmd_split(self): + plain_str = '' + unicode_str = u'' + empty_bytes = b'' + # shlex.split() can work with "plain_str" and "unicode_str" on both + # Python 2 and Python 3. While we're not testing Python itself, + # this will help us catch possible differences in the Python + # standard library should they arise. + self.assertEqual(shlex.split(plain_str), []) + self.assertEqual(shlex.split(astring.to_text(plain_str)), []) + self.assertEqual(shlex.split(unicode_str), []) + self.assertEqual(shlex.split(astring.to_text(unicode_str)), []) + # on Python 3, shlex.split() won't work with bytes, raising: + # AttributeError: 'bytes' object has no attribute 'read'. + # To turn bytes into text (when necessary), that is, on + # Python 3 only, use astring.to_text() + self.assertEqual(shlex.split(astring.to_text(empty_bytes)), []) + # Now let's test our specific implementation to split commands + self.assertEqual(process.cmd_split(plain_str), []) + self.assertEqual(process.cmd_split(unicode_str), []) + self.assertEqual(process.cmd_split(empty_bytes), []) + unicode_command = u"avok\xe1do_test_runner arguments" + self.assertEqual(process.cmd_split(unicode_command), + [u"avok\xe1do_test_runner", + u"arguments"]) + class CmdResultTests(unittest.TestCase): diff --git a/selftests/unit/test_utils_script.py b/selftests/unit/test_utils_script.py new file mode 100644 index 0000000000000000000000000000000000000000..450e2e461f345cc44ef4a04fe8c4fe9b542251a3 --- /dev/null +++ b/selftests/unit/test_utils_script.py @@ -0,0 +1,20 @@ +import os +import unittest + + +from avocado.utils import script + + +class TestTemporary(unittest.TestCase): + + def test_unicode_name(self): + path = u'\u00e1 \u00e9 \u00ed \u00f3 \u00fa' + content = "a e i o u" + with script.TemporaryScript(path, content) as temp_script: + self.assertTrue(os.path.exists(temp_script.path)) + with open(temp_script.path) as temp_script_file: + self.assertEqual(content, temp_script_file.read()) + + +if __name__ == "__main__": + unittest.main()