未验证 提交 80ebaf23 编写于 作者: A Amador Pahim

Merge branch 'clebergnu-process_unicode_v2'

Signed-off-by: NAmador Pahim <apahim@redhat.com>
......@@ -1120,6 +1120,10 @@ class SimpleTest(Test):
self._command = None
if self.filename is not None:
self._command = pipes.quote(self.filename)
# process.run expects unicode as the command, but pipes.quote
# turns it into a "bytes" array in Python 2
if not astring.is_text(self._command):
self._command = astring.to_text(self._command, defaults.ENCODING)
@property
def filename(self):
......@@ -1145,7 +1149,6 @@ class SimpleTest(Test):
test_params = dict([(str(key), str(val)) for _, key, val in
self.params.iteritems()])
# process.run uses shlex.split(), the self.path needs to be escaped
result = process.run(self._command, verbose=True,
env=test_params, encoding=defaults.ENCODING)
......
......@@ -27,6 +27,7 @@ And not notice until their code starts failing.
import itertools
import re
import sys
import string
from six import string_types, PY3
......@@ -278,3 +279,48 @@ def string_to_safe_path(input_str):
for bad_chr in FS_UNSAFE_CHARS:
input_str = input_str.replace(bad_chr, "_")
return input_str
def is_bytes(data):
"""
Checks if the data given is a sequence of bytes
And not a "text" type, that can be of multi-byte characters.
Also, this does NOT mean a bytearray type.
:param data: the instance to be checked if it falls under the definition
of an array of bytes.
"""
return isinstance(data, bytes)
def is_text(data):
"""
Checks if the data given is a suitable for holding text
That is, if it can hold text that requires more than one byte for
each character.
"""
if sys.version_info[0] < 3:
return isinstance(data, unicode)
return isinstance(data, str)
def to_text(data, encoding=None):
"""
Convert data to text
Action is only taken if data is "bytes", in which case it's
decoded into the given encoding and should produce a type that
passes the is_text() check.
:param data: data to be transformed into text
:type data: either bytes or other data that will be returned
unchanged
"""
if is_bytes(data):
if encoding is None:
return data.decode()
else:
return data.decode(encoding)
return data
......@@ -32,8 +32,9 @@ import threading
import time
from io import BytesIO, UnsupportedOperation
from six import PY2, string_types
from six import string_types
from . import astring
from . import gdb
from . import runtime
from . import path
......@@ -81,52 +82,13 @@ class CmdError(Exception):
self.result = result
self.additional_text = additional_text
def __str__(self):
if self.result is not None:
if self.result.interrupted:
msg = "Command '%s' interrupted by %s"
msg %= (self.command, self.result.interrupted)
elif self.result.exit_status is None:
msg = "Command '%s' failed and is not responding to signals"
msg %= self.command
else:
msg = "Command '%s' failed (rc=%d)"
msg %= (self.command, self.result.exit_status)
if self.additional_text:
msg += ", " + self.additional_text
return msg
else:
return "CmdError"
def normalize_cmd(cmd, encoding=None):
"""
Normalize a command to be safe for :func:`shlex.split`
:param cmd: the command line to be passed to :func:`shlex.split`
:type cmd: str or bytes
:param encoding: the encoding to use for encode/decode operations
:type encoding: str
"""
if encoding is None:
encoding = sys.getdefaultencoding()
if PY2:
if not isinstance(cmd, str):
cmd = cmd.encode(encoding)
else:
if isinstance(cmd, bytes):
cmd = cmd.decode(encoding)
return cmd
def can_sudo(cmd=None, encoding=None):
def can_sudo(cmd=None):
"""
Check whether sudo is available (or running as root)
:param cmd: unicode string with the commands
"""
if cmd is not None:
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
if os.getuid() == 0: # Root
return True
......@@ -257,30 +219,43 @@ def get_children_pids(ppid, recursive=False):
return children
def binary_from_shell_cmd(cmd, encoding=None):
def binary_from_shell_cmd(cmd):
"""
Tries to find the first binary path from a simple shell-like command.
:note: It's a naive implementation, but for commands like:
`VAR=VAL binary -args || true` gives the right result (binary)
:param cmd: simple shell-like binary
:type cmd: unicode string
:return: first found binary from the cmd
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
try:
cmds = shlex.split(cmd)
except ValueError:
log.warning("binary_from_shell_cmd: Shlex split of %s failed, using "
"using simple split.", cmd)
cmds = cmd.split(" ")
cmds = cmd_split(cmd)
for item in cmds:
if not _RE_BASH_SET_VARIABLE.match(item):
return item
raise ValueError("Unable to parse first binary from '%s'" % cmd)
def cmd_split(cmd):
"""
Splits a command line into individual components
This is a simple wrapper around :func:`shlex.split`, which has the
requirement of having text (not bytes) as its argument on Python 3,
but bytes on Python 2.
:param cmd: text (a multi byte string) encoded as 'utf-8'
"""
if sys.version_info[0] < 3:
data = cmd.encode('utf-8')
result = shlex.split(data)
result = [i.decode('utf-8') for i in result]
else:
data = astring.to_text(cmd, 'utf-8')
result = shlex.split(data)
return result
class CmdResult(object):
"""
......@@ -336,19 +311,6 @@ class CmdResult(object):
return self.stderr
raise TypeError("Unable to decode stderr into a string-like type")
def __repr__(self):
cmd_rep = ("Command: %s\n"
"Exit status: %s\n"
"Duration: %s\n"
"Stdout:\n%s\n"
"Stderr:\n%s\n"
"PID:\n%s\n" % (self.command, self.exit_status,
self.duration, self.stdout, self.stderr,
self.pid))
if self.interrupted:
cmd_rep += "Command interrupted by %s\n" % self.interrupted
return cmd_rep
class FDDrainer(object):
......@@ -418,18 +380,20 @@ class FDDrainer(object):
bfr += tmp
if tmp.endswith(b'\n'):
for line in bfr.splitlines():
line = astring.to_text(line, self._result.encoding)
if self._logger is not None:
self._logger.debug(self._logger_prefix, line)
if self._stream_logger is not None:
self._stream_logger.debug('%s\n', line)
self._stream_logger.debug(line)
bfr = b''
# Write the rest of the bfr unfinished by \n
if self._verbose and bfr:
for line in bfr.splitlines():
line = astring.to_text(line, self._result.encoding)
if self._logger is not None:
self._logger.debug(self._logger_prefix, line)
if self._stream_logger is not None:
self._stream_logger.debug(line)
self._stream_logger.debug(astring.to_text(line))
def start(self):
self._thread = threading.Thread(target=self._drainer, name=self.name)
......@@ -517,7 +481,6 @@ class SubProcess(object):
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
if sudo:
self.cmd = self._prepend_sudo(cmd, shell)
else:
......@@ -590,7 +553,7 @@ class SubProcess(object):
if self.verbose:
log.info("Running '%s'", self.cmd)
if self.shell is False:
cmd = shlex.split(self.cmd)
cmd = cmd_split(self.cmd)
else:
cmd = self.cmd
try:
......@@ -609,12 +572,18 @@ class SubProcess(object):
self.start_time = time.time()
# The Thread to be started by the FDDrainer cannot have a name
# from a non-ascii string (this is a Python 2 internal limitation).
# To keep some relation between the command name and the Thread
# this resorts to attempting the conversion to ascii, replacing
# characters it can not convert
cmd_name = self.cmd.encode('ascii', 'replace')
# prepare fd drainers
if self.allow_output_check == 'combined':
self._combined_drainer = FDDrainer(
self._popen.stdout.fileno(),
self.result,
name="%s-combined" % self.cmd,
name="%s-combined" % cmd_name,
logger=log,
logger_prefix="[output] %s",
# FIXME, in fact, a new log has to be used here
......@@ -633,7 +602,7 @@ class SubProcess(object):
self._stdout_drainer = FDDrainer(
self._popen.stdout.fileno(),
self.result,
name="%s-stdout" % self.cmd,
name="%s-stdout" % cmd_name,
logger=log,
logger_prefix="[stdout] %s",
stream_logger=stdout_stream_logger,
......@@ -642,7 +611,7 @@ class SubProcess(object):
self._stderr_drainer = FDDrainer(
self._popen.stderr.fileno(),
self.result,
name="%s-stderr" % self.cmd,
name="%s-stderr" % cmd_name,
logger=log,
logger_prefix="[stderr] %s",
stream_logger=stderr_stream_logger,
......@@ -896,10 +865,9 @@ class GDBSubProcess(object):
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
self.cmd = cmd
self.args = shlex.split(cmd)
self.args = cmd_split(cmd)
self.binary = self.args[0]
self.binary_path = os.path.abspath(self.cmd)
self.result = CmdResult(cmd, encoding=encoding)
......@@ -1184,7 +1152,7 @@ def should_run_inside_gdb(cmd):
return False
try:
args = shlex.split(cmd)
args = cmd_split(cmd)
except ValueError:
log.warning("Unable to check whether command '%s' should run inside "
"GDB, fallback to simplified method...", cmd)
......@@ -1206,7 +1174,7 @@ def should_run_inside_wrapper(cmd):
"""
global CURRENT_WRAPPER
CURRENT_WRAPPER = None
args = shlex.split(cmd)
args = cmd_split(cmd)
cmd_binary_name = args[0]
for script, cmd_expr in WRAP_PROCESS_NAMES_EXPR:
......@@ -1222,7 +1190,7 @@ def should_run_inside_wrapper(cmd):
return True
def get_sub_process_klass(cmd, encoding=None):
def get_sub_process_klass(cmd):
"""
Which sub process implementation should be used
......@@ -1230,9 +1198,6 @@ def get_sub_process_klass(cmd, encoding=None):
:param cmd: the command arguments, from where we extract the binary name
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
if should_run_inside_gdb(cmd):
return GDBSubProcess
elif should_run_inside_wrapper(cmd):
......@@ -1299,7 +1264,7 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False,
"""
if encoding is None:
encoding = sys.getdefaultencoding()
klass = get_sub_process_klass(cmd, encoding)
klass = get_sub_process_klass(cmd)
sp = klass(cmd=cmd, verbose=verbose,
allow_output_check=allow_output_check, shell=shell, env=env,
sudo=sudo, ignore_bg_processes=ignore_bg_processes,
......
# This Python file uses the following encoding: utf-8
import aexpect
import glob
import json
......@@ -139,7 +138,9 @@ CC_BINARY = probe_binary('cc')
GNU_ECHO_BINARY = probe_binary('echo')
if GNU_ECHO_BINARY is not None:
if probe_binary('man') is not None:
echo_manpage = process.run('man %s' % os.path.basename(GNU_ECHO_BINARY)).stdout
echo_cmd = 'man %s' % os.path.basename(GNU_ECHO_BINARY)
echo_manpage = process.run(echo_cmd, env={'LANG': 'C'},
encoding='ascii').stdout
if b'-e' not in echo_manpage:
GNU_ECHO_BINARY = probe_binary('gecho')
READ_BINARY = probe_binary('read')
......@@ -606,8 +607,6 @@ class RunnerHumanOutputTest(unittest.TestCase):
b'INTERRUPT 0 | CANCEL 1',
result.stdout)
@unittest.skipIf(sys.version_info[0] == 3,
"Test currently broken on Python 3")
@unittest.skipIf(not GNU_ECHO_BINARY,
'GNU style echo binary not available')
def test_ugly_echo_cmd(self):
......@@ -653,7 +652,7 @@ class RunnerSimpleTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
self.pass_script = script.TemporaryScript(
'ʊʋʉʈɑ ʅʛʌ',
u'\u00e1 \u00e9 \u00ed \u00f3 \u00fa',
"#!/bin/sh\ntrue",
'avocado_simpletest_functional')
self.pass_script.save()
......
......@@ -329,8 +329,6 @@ class LoaderTestFunctional(unittest.TestCase):
self.assertEqual(test, 11, "Number of tests is not 12 (%s):\n%s"
% (test, result))
@unittest.skipIf(sys.version_info[0] == 3,
"Test currently broken on Python 3")
def test_python_unittest(self):
test_path = os.path.join(basedir, "selftests", ".data", "unittests.py")
cmd = ("%s run --sysinfo=off --job-results-dir %s --json - -- %s"
......
......@@ -2,7 +2,6 @@ import json
import os
import re
import shutil
import sys
import tempfile
import unittest
from xml.dom import minidom
......@@ -163,8 +162,6 @@ class OutputTest(unittest.TestCase):
"Libc double free can be seen in avocado "
"doublefree output:\n%s" % output)
@unittest.skipIf(sys.version_info[0] == 3,
"Test currently broken on Python 3")
def test_print_to_std(self):
def _check_output(path, exps, name):
i = 0
......@@ -249,8 +246,6 @@ class OutputTest(unittest.TestCase):
with open(output_file_path, 'r') as output:
self.assertEqual(output.read(), '')
@unittest.skipIf(sys.version_info[0] == 3,
"Test currently broken on Python 3")
def test_check_on_off(self):
"""
Checks that output will always be kept, but it will only make into
......
import json
import os
import shutil
import sys
import tempfile
import unittest
......@@ -139,8 +138,6 @@ class RunnerSimpleTest(unittest.TestCase):
(expected_rc, result))
self.assertIn(tampered_msg, result.stdout)
@unittest.skipIf(sys.version_info[0] == 3,
"Test currently broken on Python 3")
def test_output_diff(self):
self._check_output_record_all()
tampered_msg_stdout = b"I PITY THE FOOL THAT STANDS ON STDOUT!"
......
......@@ -77,6 +77,52 @@ class AstringTest(unittest.TestCase):
self.assertEqual(astring.string_to_safe_path(avocado),
"%s__" % avocado[:-2])
def test_is_bytes(self):
"""
Verifies what bytes means, basically that they are the same
thing accross Python 2 and 3 and can be decoded into "text"
"""
binary = b''
text = u''
self.assertTrue(astring.is_bytes(binary))
self.assertFalse(astring.is_bytes(text))
self.assertTrue(hasattr(binary, 'decode'))
self.assertTrue(astring.is_text(binary.decode()))
# on Python 2, each str member is also a single byte char
if sys.version_info[0] < 3:
self.assertTrue(astring.is_bytes(str('')))
else:
self.assertFalse(astring.is_bytes(str('')))
def test_is_text(self):
"""
Verifies what text means, basically that they can represent
extended set of characters and can be encoded into "bytes"
"""
binary = b''
text = u''
self.assertTrue(astring.is_text(text))
self.assertFalse(astring.is_text(binary))
self.assertTrue(hasattr(text, 'encode'))
self.assertTrue(astring.is_bytes(text.encode()))
def test_to_text_is_text(self):
self.assertTrue(astring.is_text(astring.to_text(b'')))
self.assertTrue(astring.is_text(astring.to_text('')))
self.assertTrue(astring.is_text(astring.to_text(u'')))
def test_to_text_decode_is_text(self):
self.assertTrue(astring.is_text(astring.to_text(b'', 'ascii')))
self.assertTrue(astring.is_text(astring.to_text('', 'ascii')))
self.assertTrue(astring.is_text(astring.to_text(u'', 'ascii')))
def test_to_text_decode_utf_8(self):
text_1 = astring.to_text(b'\xc3\xa1', 'utf-8')
text_2 = astring.to_text(u'\u00e1', 'utf-8')
self.assertTrue(astring.is_text(text_1))
self.assertTrue(astring.is_text(text_1))
self.assertEqual(text_1, text_2)
if __name__ == '__main__':
unittest.main()
import io
import logging
import os
import shlex
import unittest
try:
......@@ -9,6 +10,7 @@ except ImportError:
import mock
from avocado.utils import astring
from avocado.utils import gdb
from avocado.utils import process
from avocado.utils import path
......@@ -23,15 +25,15 @@ def probe_binary(binary):
return None
TRUE_CMD = probe_binary('true')
ECHO_CMD = probe_binary('echo')
FICTIONAL_CMD = '/usr/bin/fictional_cmd'
class TestSubProcess(unittest.TestCase):
def test_allow_output_check_parameter(self):
self.assertRaises(ValueError, process.SubProcess,
TRUE_CMD, False, "invalid")
FICTIONAL_CMD, False, "invalid")
class TestGDBProcess(unittest.TestCase):
......@@ -65,11 +67,9 @@ class TestGDBProcess(unittest.TestCase):
self.assertFalse(process.should_run_inside_gdb("foo bar baz"))
self.assertFalse(process.should_run_inside_gdb("foo ' "))
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
def test_get_sub_process_klass(self):
gdb.GDB_RUN_BINARY_NAMES_EXPR = []
self.assertIs(process.get_sub_process_klass(TRUE_CMD),
self.assertIs(process.get_sub_process_klass(FICTIONAL_CMD),
process.SubProcess)
gdb.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false')
......@@ -77,7 +77,7 @@ class TestGDBProcess(unittest.TestCase):
process.GDBSubProcess)
self.assertIs(process.get_sub_process_klass('false'),
process.GDBSubProcess)
self.assertIs(process.get_sub_process_klass('true'),
self.assertIs(process.get_sub_process_klass(FICTIONAL_CMD),
process.SubProcess)
def test_split_gdb_expr(self):
......@@ -102,10 +102,6 @@ def mock_fail_find_cmd(cmd, default=None):
class TestProcessRun(unittest.TestCase):
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid',
mock.Mock(return_value=1000))
def test_subprocess_nosudo(self):
......@@ -113,25 +109,20 @@ class TestProcessRun(unittest.TestCase):
p = process.SubProcess(cmd='ls -l')
self.assertEqual(p.cmd, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
def test_subprocess_nosudo_uid_0(self):
expected_command = 'ls -l'
p = process.SubProcess(cmd='ls -l')
self.assertEqual(p.cmd, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
mock.Mock(return_value='/bin/sudo'))
@mock.patch.object(os, 'getuid',
mock.Mock(return_value=1000))
def test_subprocess_sudo(self):
expected_command = '%s -n ls -l' % TRUE_CMD
expected_command = '/bin/sudo -n ls -l'
p = process.SubProcess(cmd='ls -l', sudo=True)
path.find_command.assert_called_once_with('sudo')
self.assertEqual(p.cmd, expected_command)
@mock.patch.object(path, 'find_command', mock_fail_find_cmd)
......@@ -141,24 +132,19 @@ class TestProcessRun(unittest.TestCase):
p = process.SubProcess(cmd='ls -l', sudo=True)
self.assertEqual(p.cmd, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
def test_subprocess_sudo_uid_0(self):
expected_command = 'ls -l'
p = process.SubProcess(cmd='ls -l', sudo=True)
self.assertEqual(p.cmd, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
mock.Mock(return_value='/bin/sudo'))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
def test_subprocess_sudo_shell(self):
expected_command = '%s -n -s ls -l' % TRUE_CMD
expected_command = '/bin/sudo -n -s ls -l'
p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
path.find_command.assert_called_once_with('sudo')
self.assertEqual(p.cmd, expected_command)
@mock.patch.object(path, 'find_command', mock_fail_find_cmd)
......@@ -168,44 +154,31 @@ class TestProcessRun(unittest.TestCase):
p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
self.assertEqual(p.cmd, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
def test_subprocess_sudo_shell_uid_0(self):
expected_command = 'ls -l'
p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
self.assertEqual(p.cmd, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
def test_run_nosudo(self):
expected_command = 'ls -l'
p = process.run(cmd='ls -l', ignore_status=True)
self.assertEqual(p.command, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
def test_run_nosudo_uid_0(self):
expected_command = 'ls -l'
p = process.run(cmd='ls -l', ignore_status=True)
self.assertEqual(p.command, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
mock.Mock(return_value='/bin/sudo'))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
def test_run_sudo(self):
expected_command = '%s -n ls -l' % TRUE_CMD
expected_command = '/bin/sudo -n ls -l'
p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
path.find_command.assert_called_once_with('sudo')
self.assertEqual(p.command, expected_command)
@mock.patch.object(path, 'find_command', mock_fail_find_cmd)
......@@ -215,24 +188,19 @@ class TestProcessRun(unittest.TestCase):
p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
self.assertEqual(p.command, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
def test_run_sudo_uid_0(self):
expected_command = 'ls -l'
p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
self.assertEqual(p.command, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
mock.Mock(return_value='/bin/sudo'))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
def test_run_sudo_shell(self):
expected_command = '%s -n -s ls -l' % TRUE_CMD
expected_command = '/bin/sudo -n -s ls -l'
p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
path.find_command.assert_called_once_with('sudo')
self.assertEqual(p.command, expected_command)
@mock.patch.object(path, 'find_command', mock_fail_find_cmd)
......@@ -242,10 +210,6 @@ class TestProcessRun(unittest.TestCase):
p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
self.assertEqual(p.command, expected_command)
@unittest.skipUnless(TRUE_CMD,
'"true" binary not available')
@mock.patch.object(path, 'find_command',
mock.Mock(return_value=TRUE_CMD))
@mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
def test_run_sudo_shell_uid_0(self):
expected_command = 'ls -l'
......@@ -265,7 +229,8 @@ class TestProcessRun(unittest.TestCase):
encoded_text = b'Avok\xc3\xa1do'
self.assertEqual(text.encode('utf-8'), encoded_text)
self.assertEqual(encoded_text.decode('utf-8'), text)
result = process.run("%s -n %s" % (ECHO_CMD, text), encoding='utf-8')
cmd = u"%s -n %s" % (ECHO_CMD, text)
result = process.run(cmd, encoding='utf-8')
self.assertEqual(result.stdout, encoded_text)
self.assertEqual(result.stdout_text, text)
......@@ -285,6 +250,32 @@ class MiscProcessTests(unittest.TestCase):
res = process.binary_from_shell_cmd("FOO=bar ./bin var=value")
self.assertEqual("./bin", res)
def test_cmd_split(self):
plain_str = ''
unicode_str = u''
empty_bytes = b''
# shlex.split() can work with "plain_str" and "unicode_str" on both
# Python 2 and Python 3. While we're not testing Python itself,
# this will help us catch possible differences in the Python
# standard library should they arise.
self.assertEqual(shlex.split(plain_str), [])
self.assertEqual(shlex.split(astring.to_text(plain_str)), [])
self.assertEqual(shlex.split(unicode_str), [])
self.assertEqual(shlex.split(astring.to_text(unicode_str)), [])
# on Python 3, shlex.split() won't work with bytes, raising:
# AttributeError: 'bytes' object has no attribute 'read'.
# To turn bytes into text (when necessary), that is, on
# Python 3 only, use astring.to_text()
self.assertEqual(shlex.split(astring.to_text(empty_bytes)), [])
# Now let's test our specific implementation to split commands
self.assertEqual(process.cmd_split(plain_str), [])
self.assertEqual(process.cmd_split(unicode_str), [])
self.assertEqual(process.cmd_split(empty_bytes), [])
unicode_command = u"avok\xe1do_test_runner arguments"
self.assertEqual(process.cmd_split(unicode_command),
[u"avok\xe1do_test_runner",
u"arguments"])
class CmdResultTests(unittest.TestCase):
......
import os
import unittest
from avocado.utils import script
class TestTemporary(unittest.TestCase):
def test_unicode_name(self):
path = u'\u00e1 \u00e9 \u00ed \u00f3 \u00fa'
content = "a e i o u"
with script.TemporaryScript(path, content) as temp_script:
self.assertTrue(os.path.exists(temp_script.path))
with open(temp_script.path) as temp_script_file:
self.assertEqual(content, temp_script_file.read())
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册