Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openeuler
avocado
提交
4e8d5826
A
avocado
项目概览
openeuler
/
avocado
通知
0
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
avocado
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
4e8d5826
编写于
3月 26, 2018
作者:
A
Amador Pahim
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'clebergnu-avocado_python2_utf8'
Signed-off-by:
N
Amador Pahim
<
apahim@redhat.com
>
上级
3d39de31
80fba68e
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
147 addition
and
20 deletion
+147
-20
avocado/core/defaults.py
avocado/core/defaults.py
+20
-0
avocado/core/test.py
avocado/core/test.py
+2
-1
avocado/utils/process.py
avocado/utils/process.py
+80
-18
selftests/unit/test_utils_process.py
selftests/unit/test_utils_process.py
+45
-1
未找到文件。
avocado/core/defaults.py
0 → 100644
浏览文件 @
4e8d5826
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2018
# Author: Cleber Rosa <crosa@redhat.com>
"""
The Avocado core defaults
"""
#: The encoding used by default on all data input
ENCODING
=
'utf-8'
avocado/core/test.py
浏览文件 @
4e8d5826
...
...
@@ -33,6 +33,7 @@ from difflib import unified_diff
from
six
import
string_types
,
iteritems
from
.
import
data_dir
from
.
import
defaults
from
.
import
exceptions
from
.
import
output
from
.
import
parameters
...
...
@@ -1138,7 +1139,7 @@ class SimpleTest(Test):
# process.run uses shlex.split(), the self.path needs to be escaped
result
=
process
.
run
(
self
.
_command
,
verbose
=
True
,
env
=
test_params
)
env
=
test_params
,
encoding
=
defaults
.
ENCODING
)
self
.
_log_detailed_cmd_info
(
result
)
except
process
.
CmdError
as
details
:
...
...
avocado/utils/process.py
浏览文件 @
4e8d5826
...
...
@@ -32,7 +32,7 @@ import threading
import
time
from
io
import
BytesIO
,
UnsupportedOperation
from
six
import
string_types
from
six
import
PY2
,
string_types
from
.
import
gdb
from
.
import
runtime
...
...
@@ -99,10 +99,34 @@ class CmdError(Exception):
return
"CmdError"
def
can_sudo
(
cmd
=
None
):
def
normalize_cmd
(
cmd
,
encoding
=
None
):
"""
Normalize a command to be safe for :func:`shlex.split`
:param cmd: the command line to be passed to :func:`shlex.split`
:type cmd: str or bytes
:param encoding: the encoding to use for encode/decode operations
:type encoding: str
"""
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
if
PY2
:
if
not
isinstance
(
cmd
,
str
):
cmd
=
cmd
.
encode
(
encoding
)
else
:
if
isinstance
(
cmd
,
bytes
):
cmd
=
cmd
.
decode
(
encoding
)
return
cmd
def
can_sudo
(
cmd
=
None
,
encoding
=
None
):
"""
Check whether sudo is available (or running as root)
"""
if
cmd
is
not
None
:
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
cmd
=
normalize_cmd
(
cmd
,
encoding
)
if
os
.
getuid
()
==
0
:
# Root
return
True
...
...
@@ -233,7 +257,7 @@ def get_children_pids(ppid, recursive=False):
return
children
def
binary_from_shell_cmd
(
cmd
):
def
binary_from_shell_cmd
(
cmd
,
encoding
=
None
):
"""
Tries to find the first binary path from a simple shell-like command.
...
...
@@ -242,6 +266,9 @@ def binary_from_shell_cmd(cmd):
:param cmd: simple shell-like binary
:return: first found binary from the cmd
"""
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
cmd
=
normalize_cmd
(
cmd
,
encoding
)
try
:
cmds
=
shlex
.
split
(
cmd
)
except
ValueError
:
...
...
@@ -437,7 +464,7 @@ class SubProcess(object):
def
__init__
(
self
,
cmd
,
verbose
=
True
,
allow_output_check
=
None
,
shell
=
False
,
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
):
ignore_bg_processes
=
False
,
encoding
=
None
):
"""
Creates the subprocess object, stdout/err, reader threads and locks.
...
...
@@ -481,8 +508,16 @@ class SubProcess(object):
in missing output produced by those daemons after the
main thread finishes and also it allows those daemons
to be running after the process finishes.
:param encoding: the encoding to use for the text representation
of the command result stdout and stderr, with the
default being Python's own, that is,
(:func:`sys.getdefaultencoding`).
:type encoding: str
:raises: ValueError if incorrect values are given to parameters
"""
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
cmd
=
normalize_cmd
(
cmd
,
encoding
)
if
sudo
:
self
.
cmd
=
self
.
_prepend_sudo
(
cmd
,
shell
)
else
:
...
...
@@ -498,7 +533,7 @@ class SubProcess(object):
allow_output_check
)
raise
ValueError
(
msg
)
self
.
allow_output_check
=
allow_output_check
self
.
result
=
CmdResult
(
self
.
cmd
)
self
.
result
=
CmdResult
(
self
.
cmd
,
encoding
=
encoding
)
self
.
shell
=
shell
if
env
:
self
.
env
=
os
.
environ
.
copy
()
...
...
@@ -812,7 +847,7 @@ class WrapSubProcess(SubProcess):
def
__init__
(
self
,
cmd
,
verbose
=
True
,
allow_output_check
=
None
,
shell
=
False
,
env
=
None
,
wrapper
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
):
ignore_bg_processes
=
False
,
encoding
=
None
):
if
wrapper
is
None
and
CURRENT_WRAPPER
is
not
None
:
wrapper
=
CURRENT_WRAPPER
self
.
wrapper
=
wrapper
...
...
@@ -822,7 +857,7 @@ class WrapSubProcess(SubProcess):
cmd
=
wrapper
+
' '
+
cmd
super
(
WrapSubProcess
,
self
).
__init__
(
cmd
,
verbose
,
allow_output_check
,
shell
,
env
,
sudo
,
ignore_bg_processes
)
ignore_bg_processes
,
encoding
)
class
GDBSubProcess
(
object
):
...
...
@@ -833,7 +868,7 @@ class GDBSubProcess(object):
def
__init__
(
self
,
cmd
,
verbose
=
True
,
allow_output_check
=
None
,
shell
=
False
,
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
):
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
,
encoding
=
None
):
"""
Creates the subprocess object, stdout/err, reader threads and locks.
...
...
@@ -859,13 +894,15 @@ class GDBSubProcess(object):
implementation, since the GDB wrapping code does not have
support to run commands in that way.
"""
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
cmd
=
normalize_cmd
(
cmd
,
encoding
)
self
.
cmd
=
cmd
self
.
args
=
shlex
.
split
(
cmd
)
self
.
binary
=
self
.
args
[
0
]
self
.
binary_path
=
os
.
path
.
abspath
(
self
.
cmd
)
self
.
result
=
CmdResult
(
cmd
)
self
.
result
=
CmdResult
(
cmd
,
encoding
=
encoding
)
self
.
gdb_server
=
gdb
.
GDBServer
(
gdb
.
GDBSERVER_PATH
)
self
.
gdb
=
gdb
.
GDB
(
gdb
.
GDB_PATH
)
...
...
@@ -1185,7 +1222,7 @@ def should_run_inside_wrapper(cmd):
return
True
def
get_sub_process_klass
(
cmd
):
def
get_sub_process_klass
(
cmd
,
encoding
=
None
):
"""
Which sub process implementation should be used
...
...
@@ -1193,6 +1230,9 @@ def get_sub_process_klass(cmd):
:param cmd: the command arguments, from where we extract the binary name
"""
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
cmd
=
normalize_cmd
(
cmd
,
encoding
)
if
should_run_inside_gdb
(
cmd
):
return
GDBSubProcess
elif
should_run_inside_wrapper
(
cmd
):
...
...
@@ -1203,7 +1243,8 @@ def get_sub_process_klass(cmd):
def
run
(
cmd
,
timeout
=
None
,
verbose
=
True
,
ignore_status
=
False
,
allow_output_check
=
None
,
shell
=
False
,
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
):
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
,
encoding
=
None
):
"""
Run a subprocess, returning a CmdResult object.
...
...
@@ -1247,14 +1288,22 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False,
has a sudo configuration such that a password won't be
prompted. If that's not the case, the command will
straight out fail.
:param encoding: the encoding to use for the text representation
of the command result stdout and stderr, with the
default being Python's own, that is,
(:func:`sys.getdefaultencoding`).
:type encoding: str
:return: An :class:`CmdResult` object.
:raise: :class:`CmdError`, if ``ignore_status=False``.
"""
klass
=
get_sub_process_klass
(
cmd
)
if
encoding
is
None
:
encoding
=
sys
.
getdefaultencoding
()
klass
=
get_sub_process_klass
(
cmd
,
encoding
)
sp
=
klass
(
cmd
=
cmd
,
verbose
=
verbose
,
allow_output_check
=
allow_output_check
,
shell
=
shell
,
env
=
env
,
sudo
=
sudo
,
ignore_bg_processes
=
ignore_bg_processes
)
sudo
=
sudo
,
ignore_bg_processes
=
ignore_bg_processes
,
encoding
=
encoding
)
cmd_result
=
sp
.
run
(
timeout
=
timeout
)
fail_condition
=
cmd_result
.
exit_status
!=
0
or
cmd_result
.
interrupted
if
fail_condition
and
not
ignore_status
:
...
...
@@ -1264,7 +1313,8 @@ def run(cmd, timeout=None, verbose=True, ignore_status=False,
def
system
(
cmd
,
timeout
=
None
,
verbose
=
True
,
ignore_status
=
False
,
allow_output_check
=
None
,
shell
=
False
,
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
):
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
,
encoding
=
None
):
"""
Run a subprocess, returning its exit code.
...
...
@@ -1308,6 +1358,11 @@ def system(cmd, timeout=None, verbose=True, ignore_status=False,
has a sudo configuration such that a password won't be
prompted. If that's not the case, the command will
straight out fail.
:param encoding: the encoding to use for the text representation
of the command result stdout and stderr, with the
default being Python's own, that is,
(:func:`sys.getdefaultencoding`).
:type encoding: str
:return: Exit code.
:rtype: int
...
...
@@ -1315,14 +1370,15 @@ def system(cmd, timeout=None, verbose=True, ignore_status=False,
"""
cmd_result
=
run
(
cmd
=
cmd
,
timeout
=
timeout
,
verbose
=
verbose
,
ignore_status
=
ignore_status
,
allow_output_check
=
allow_output_check
,
shell
=
shell
,
env
=
env
,
sudo
=
sudo
,
ignore_bg_processes
=
ignore_bg_processes
)
sudo
=
sudo
,
ignore_bg_processes
=
ignore_bg_processes
,
encoding
=
encoding
)
return
cmd_result
.
exit_status
def
system_output
(
cmd
,
timeout
=
None
,
verbose
=
True
,
ignore_status
=
False
,
allow_output_check
=
None
,
shell
=
False
,
env
=
None
,
sudo
=
False
,
ignore_bg_processes
=
False
,
strip_trail_nl
=
True
):
strip_trail_nl
=
True
,
encoding
=
None
):
"""
Run a subprocess, returning its output.
...
...
@@ -1370,6 +1426,11 @@ def system_output(cmd, timeout=None, verbose=True, ignore_status=False,
:type ignore_bg_processes: bool
:param strip_trail_nl: Whether to strip the trailing newline
:type strip_trail_nl: bool
:param encoding: the encoding to use for the text representation
of the command result stdout and stderr, with the
default being Python's own, that is,
(:func:`sys.getdefaultencoding`).
:type encoding: str
:return: Command output.
:rtype: bytes
...
...
@@ -1377,7 +1438,8 @@ def system_output(cmd, timeout=None, verbose=True, ignore_status=False,
"""
cmd_result
=
run
(
cmd
=
cmd
,
timeout
=
timeout
,
verbose
=
verbose
,
ignore_status
=
ignore_status
,
allow_output_check
=
allow_output_check
,
shell
=
shell
,
env
=
env
,
sudo
=
sudo
,
ignore_bg_processes
=
ignore_bg_processes
)
sudo
=
sudo
,
ignore_bg_processes
=
ignore_bg_processes
,
encoding
=
encoding
)
if
strip_trail_nl
:
return
cmd_result
.
stdout
.
rstrip
(
b
'
\n\r
'
)
return
cmd_result
.
stdout
...
...
selftests/unit/test_utils_process.py
浏览文件 @
4e8d5826
...
...
@@ -16,7 +16,15 @@ from avocado.utils import path
from
six
import
string_types
TRUE_CMD
=
path
.
find_command
(
'true'
)
def
probe_binary
(
binary
):
try
:
return
path
.
find_command
(
binary
)
except
path
.
CmdNotFoundError
:
return
None
TRUE_CMD
=
probe_binary
(
'true'
)
ECHO_CMD
=
probe_binary
(
'echo'
)
class
TestSubProcess
(
unittest
.
TestCase
):
...
...
@@ -57,6 +65,8 @@ class TestGDBProcess(unittest.TestCase):
self
.
assertFalse
(
process
.
should_run_inside_gdb
(
"foo bar baz"
))
self
.
assertFalse
(
process
.
should_run_inside_gdb
(
"foo ' "
))
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
def
test_get_sub_process_klass
(
self
):
gdb
.
GDB_RUN_BINARY_NAMES_EXPR
=
[]
self
.
assertIs
(
process
.
get_sub_process_klass
(
TRUE_CMD
),
...
...
@@ -92,6 +102,8 @@ def mock_fail_find_cmd(cmd, default=None):
class
TestProcessRun
(
unittest
.
TestCase
):
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
...
...
@@ -101,6 +113,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
SubProcess
(
cmd
=
'ls -l'
)
self
.
assertEqual
(
p
.
cmd
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
0
))
...
...
@@ -109,6 +123,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
SubProcess
(
cmd
=
'ls -l'
)
self
.
assertEqual
(
p
.
cmd
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
...
...
@@ -125,6 +141,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
SubProcess
(
cmd
=
'ls -l'
,
sudo
=
True
)
self
.
assertEqual
(
p
.
cmd
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
0
))
...
...
@@ -133,6 +151,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
SubProcess
(
cmd
=
'ls -l'
,
sudo
=
True
)
self
.
assertEqual
(
p
.
cmd
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
1000
))
...
...
@@ -148,6 +168,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
SubProcess
(
cmd
=
'ls -l'
,
sudo
=
True
,
shell
=
True
)
self
.
assertEqual
(
p
.
cmd
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
0
))
...
...
@@ -156,6 +178,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
SubProcess
(
cmd
=
'ls -l'
,
sudo
=
True
,
shell
=
True
)
self
.
assertEqual
(
p
.
cmd
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
1000
))
...
...
@@ -164,6 +188,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
run
(
cmd
=
'ls -l'
,
ignore_status
=
True
)
self
.
assertEqual
(
p
.
command
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
0
))
...
...
@@ -172,6 +198,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
run
(
cmd
=
'ls -l'
,
ignore_status
=
True
)
self
.
assertEqual
(
p
.
command
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
1000
))
...
...
@@ -187,6 +215,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
run
(
cmd
=
'ls -l'
,
sudo
=
True
,
ignore_status
=
True
)
self
.
assertEqual
(
p
.
command
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
0
))
...
...
@@ -195,6 +225,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
run
(
cmd
=
'ls -l'
,
sudo
=
True
,
ignore_status
=
True
)
self
.
assertEqual
(
p
.
command
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
1000
))
...
...
@@ -210,6 +242,8 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
run
(
cmd
=
'ls -l'
,
sudo
=
True
,
shell
=
True
,
ignore_status
=
True
)
self
.
assertEqual
(
p
.
command
,
expected_command
)
@
unittest
.
skipUnless
(
TRUE_CMD
,
'"true" binary not available'
)
@
mock
.
patch
.
object
(
path
,
'find_command'
,
mock
.
Mock
(
return_value
=
TRUE_CMD
))
@
mock
.
patch
.
object
(
os
,
'getuid'
,
mock
.
Mock
(
return_value
=
0
))
...
...
@@ -218,6 +252,16 @@ class TestProcessRun(unittest.TestCase):
p
=
process
.
run
(
cmd
=
'ls -l'
,
sudo
=
True
,
shell
=
True
,
ignore_status
=
True
)
self
.
assertEqual
(
p
.
command
,
expected_command
)
@
unittest
.
skipUnless
(
ECHO_CMD
,
"Echo command not available in system"
)
def
test_run_unicode_output
(
self
):
# Using encoded string as shlex does not support decoding
# but the behavior is exactly the same as if shell binary
# produced unicode
text
=
u
"Avok
\xe1
do"
result
=
process
.
run
(
"%s %s"
%
(
ECHO_CMD
,
text
),
encoding
=
'utf-8'
)
self
.
assertEqual
(
result
.
stdout
,
text
.
encode
(
'utf-8'
)
+
b
'
\n
'
)
self
.
assertEqual
(
result
.
stdout_text
,
text
+
'
\n
'
)
class
MiscProcessTests
(
unittest
.
TestCase
):
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录