提交 1401f3cf 编写于 作者: C Cleber Rosa

avocado.utils.process: require commands to be given as unicode strings

This change introduces the following requirement: the command to
be run by the avocado.utils.process functions should be given as unicode.
It brings a clearer interface, in which there's no need to deal with
encodings internally.

For simple test, it brings changes to respect the of passing unicode.
And, it changes the presentation of simple tests with unicode names
explicitly expreseed as such.

The way to deal with the process data that may be generated on
stdout/stderr is unchanged.
Signed-off-by: NCleber Rosa <crosa@redhat.com>
上级 0a9a905c
......@@ -1120,6 +1120,10 @@ class SimpleTest(Test):
self._command = None
if self.filename is not None:
self._command = pipes.quote(self.filename)
# process.run expects unicode as the command, but pipes.quote
# turns it into a "bytes" array in Python 2
if not astring.is_text(self._command):
self._command = astring.to_text(self._command, defaults.ENCODING)
@property
def filename(self):
......@@ -1145,7 +1149,6 @@ class SimpleTest(Test):
test_params = dict([(str(key), str(val)) for _, key, val in
self.params.iteritems()])
# process.run uses shlex.split(), the self.path needs to be escaped
result = process.run(self._command, verbose=True,
env=test_params, encoding=defaults.ENCODING)
......
......@@ -32,7 +32,7 @@ import threading
import time
from io import BytesIO, UnsupportedOperation
from six import PY2, string_types
from six import string_types
from . import astring
from . import gdb
......@@ -83,34 +83,12 @@ class CmdError(Exception):
self.additional_text = additional_text
def normalize_cmd(cmd, encoding=None):
"""
Normalize a command to be safe for :func:`shlex.split`
:param cmd: the command line to be passed to :func:`shlex.split`
:type cmd: str or bytes
:param encoding: the encoding to use for encode/decode operations
:type encoding: str
"""
if encoding is None:
encoding = sys.getdefaultencoding()
if PY2:
if not isinstance(cmd, str):
cmd = cmd.encode(encoding)
else:
if isinstance(cmd, bytes):
cmd = cmd.decode(encoding)
return cmd
def can_sudo(cmd=None, encoding=None):
def can_sudo(cmd=None):
"""
Check whether sudo is available (or running as root)
:param cmd: unicode string with the commands
"""
if cmd is not None:
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
if os.getuid() == 0: # Root
return True
......@@ -241,24 +219,17 @@ def get_children_pids(ppid, recursive=False):
return children
def binary_from_shell_cmd(cmd, encoding=None):
def binary_from_shell_cmd(cmd):
"""
Tries to find the first binary path from a simple shell-like command.
:note: It's a naive implementation, but for commands like:
`VAR=VAL binary -args || true` gives the right result (binary)
:param cmd: simple shell-like binary
:type cmd: unicode string
:return: first found binary from the cmd
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
try:
cmds = shlex.split(cmd)
except ValueError:
log.warning("binary_from_shell_cmd: Shlex split of %s failed, using "
"using simple split.", cmd)
cmds = cmd.split(" ")
cmds = cmd_split(cmd)
for item in cmds:
if not _RE_BASH_SET_VARIABLE.match(item):
return item
......@@ -508,7 +479,6 @@ class SubProcess(object):
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
if sudo:
self.cmd = self._prepend_sudo(cmd, shell)
else:
......@@ -581,7 +551,7 @@ class SubProcess(object):
if self.verbose:
log.info("Running '%s'", self.cmd)
if self.shell is False:
cmd = shlex.split(self.cmd)
cmd = cmd_split(self.cmd)
else:
cmd = self.cmd
try:
......@@ -600,12 +570,18 @@ class SubProcess(object):
self.start_time = time.time()
# The Thread to be started by the FDDrainer cannot have a name
# from a non-ascii string (this is a Python 2 internal limitation).
# To keep some relation between the command name and the Thread
# this resorts to attempting the conversion to ascii, replacing
# characters it can not convert
cmd_name = self.cmd.encode('ascii', 'replace')
# prepare fd drainers
if self.allow_output_check == 'combined':
self._combined_drainer = FDDrainer(
self._popen.stdout.fileno(),
self.result,
name="%s-combined" % self.cmd,
name="%s-combined" % cmd_name,
logger=log,
logger_prefix="[output] %s",
# FIXME, in fact, a new log has to be used here
......@@ -624,7 +600,7 @@ class SubProcess(object):
self._stdout_drainer = FDDrainer(
self._popen.stdout.fileno(),
self.result,
name="%s-stdout" % self.cmd,
name="%s-stdout" % cmd_name,
logger=log,
logger_prefix="[stdout] %s",
stream_logger=stdout_stream_logger,
......@@ -633,7 +609,7 @@ class SubProcess(object):
self._stderr_drainer = FDDrainer(
self._popen.stderr.fileno(),
self.result,
name="%s-stderr" % self.cmd,
name="%s-stderr" % cmd_name,
logger=log,
logger_prefix="[stderr] %s",
stream_logger=stderr_stream_logger,
......@@ -887,10 +863,9 @@ class GDBSubProcess(object):
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
self.cmd = cmd
self.args = shlex.split(cmd)
self.args = cmd_split(cmd)
self.binary = self.args[0]
self.binary_path = os.path.abspath(self.cmd)
self.result = CmdResult(cmd, encoding=encoding)
......@@ -1175,7 +1150,7 @@ def should_run_inside_gdb(cmd):
return False
try:
args = shlex.split(cmd)
args = cmd_split(cmd)
except ValueError:
log.warning("Unable to check whether command '%s' should run inside "
"GDB, fallback to simplified method...", cmd)
......@@ -1197,7 +1172,7 @@ def should_run_inside_wrapper(cmd):
"""
global CURRENT_WRAPPER
CURRENT_WRAPPER = None
args = shlex.split(cmd)
args = cmd_split(cmd)
cmd_binary_name = args[0]
for script, cmd_expr in WRAP_PROCESS_NAMES_EXPR:
......@@ -1213,7 +1188,7 @@ def should_run_inside_wrapper(cmd):
return True
def get_sub_process_klass(cmd, encoding=None):
def get_sub_process_klass(cmd):
"""
Which sub process implementation should be used
......@@ -1221,9 +1196,6 @@ def get_sub_process_klass(cmd, encoding=None):
:param cmd: the command arguments, from where we extract the binary name
"""
if encoding is None:
encoding = sys.getdefaultencoding()
cmd = normalize_cmd(cmd, encoding)
if should_run_inside_gdb(cmd):
return GDBSubProcess
elif should_run_inside_wrapper(cmd):
......@@ -1290,7 +1262,7 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False,
"""
if encoding is None:
encoding = sys.getdefaultencoding()
klass = get_sub_process_klass(cmd, encoding)
klass = get_sub_process_klass(cmd)
sp = klass(cmd=cmd, verbose=verbose,
allow_output_check=allow_output_check, shell=shell, env=env,
sudo=sudo, ignore_bg_processes=ignore_bg_processes,
......
# This Python file uses the following encoding: utf-8
import aexpect
import glob
import json
......@@ -653,7 +652,7 @@ class RunnerSimpleTest(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
self.pass_script = script.TemporaryScript(
'ʊʋʉʈɑ ʅʛʌ',
u'\u00e1 \u00e9 \u00ed \u00f3 \u00fa',
"#!/bin/sh\ntrue",
'avocado_simpletest_functional')
self.pass_script.save()
......
......@@ -229,7 +229,8 @@ class TestProcessRun(unittest.TestCase):
encoded_text = b'Avok\xc3\xa1do'
self.assertEqual(text.encode('utf-8'), encoded_text)
self.assertEqual(encoded_text.decode('utf-8'), text)
result = process.run("%s -n %s" % (ECHO_CMD, text), encoding='utf-8')
cmd = u"%s -n %s" % (ECHO_CMD, text)
result = process.run(cmd, encoding='utf-8')
self.assertEqual(result.stdout, encoded_text)
self.assertEqual(result.stdout_text, text)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册