Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
openeuler
avocado
提交
a7ad7f4b
A
avocado
项目概览
openeuler
/
avocado
通知
0
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
A
avocado
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
a7ad7f4b
编写于
2月 03, 2015
作者:
L
Lucas Meneghel Rodrigues
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #397 from ldoktor/remote6
avocado.plugins.remote: Unify test locations and use absolute path [v5]
上级
a745d38b
268f2585
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
369 addition
and
496 deletion
+369
-496
avocado/plugins/remote.py
avocado/plugins/remote.py
+109
-139
avocado/plugins/vm.py
avocado/plugins/vm.py
+67
-209
avocado/utils/remote.py
avocado/utils/remote.py
+16
-14
avocado/utils/virt.py
avocado/utils/virt.py
+9
-9
selftests/all/functional/avocado/output_tests.py
selftests/all/functional/avocado/output_tests.py
+4
-2
selftests/all/unit/avocado/remote_unittest.py
selftests/all/unit/avocado/remote_unittest.py
+111
-61
selftests/all/unit/avocado/vm_unittest.py
selftests/all/unit/avocado/vm_unittest.py
+53
-62
未找到文件。
avocado/plugins/remote.py
浏览文件 @
a7ad7f4b
...
...
@@ -14,22 +14,23 @@
"""Run tests on a remote machine."""
import
os
import
getpass
import
json
import
os
from
avocado.core
import
exceptions
from
avocado.core
import
status
from
avocado.core
import
data_dir
from
avocado.runner
import
TestRunner
from
avocado.result
import
TestResult
from
avocado.core
import
status
from
avocado.plugins
import
plugin
from
avocado.
utils
import
remote
from
avocado.
utils
import
archive
from
avocado.
result
import
HumanTestResult
from
avocado.
runner
import
TestRunner
from
avocado.test
import
RemoteTest
from
avocado.utils
import
archive
from
avocado.utils
import
remote
class
RemoteTestRunner
(
TestRunner
):
""" Tooled TestRunner to run on remote machine using ssh """
remote_test_dir
=
'~/avocado/tests'
def
run_test
(
self
,
urls
):
...
...
@@ -39,9 +40,12 @@ class RemoteTestRunner(TestRunner):
:param urls: a string with test URLs.
:return: a dictionary with test results.
"""
avocado_cmd
=
(
'cd %s; avocado run --force-job-id %s --json - --archive %s'
%
(
self
.
remote_test_dir
,
self
.
result
.
stream
.
job_unique_id
,
" "
.
join
(
urls
)))
result
=
self
.
result
.
remote
.
run
(
avocado_cmd
,
ignore_status
=
True
)
avocado_cmd
=
(
'cd %s; avocado run --force-job-id %s --json - '
'--archive %s'
%
(
self
.
remote_test_dir
,
self
.
result
.
stream
.
job_unique_id
,
" "
.
join
(
urls
)))
result
=
self
.
result
.
remote
.
run
(
avocado_cmd
,
ignore_status
=
True
,
timeout
=
None
)
for
json_output
in
result
.
stdout
.
splitlines
():
# We expect dictionary:
if
json_output
.
startswith
(
'{'
)
and
json_output
.
endswith
(
'}'
):
...
...
@@ -52,7 +56,7 @@ class RemoteTestRunner(TestRunner):
raise
ValueError
(
"Can't parse json out of remote's avocado output:"
"
\n
%s"
%
result
.
stdout
)
def
run_suite
(
self
,
params_list
):
def
run_suite
(
self
,
test_suite
):
"""
Run one or more tests and report with test result.
...
...
@@ -60,6 +64,7 @@ class RemoteTestRunner(TestRunner):
:return: a list of test failures.
"""
del
test_suite
# using self.result.urls instead
failures
=
[]
self
.
result
.
setup
()
results
=
self
.
run_test
(
self
.
result
.
urls
)
...
...
@@ -78,7 +83,8 @@ class RemoteTestRunner(TestRunner):
failures
.
append
(
state
[
'tagged_name'
])
local_log_dir
=
os
.
path
.
dirname
(
self
.
result
.
stream
.
debuglog
)
zip_filename
=
remote_log_dir
+
'.zip'
zip_path_filename
=
os
.
path
.
join
(
local_log_dir
,
os
.
path
.
basename
(
zip_filename
))
zip_path_filename
=
os
.
path
.
join
(
local_log_dir
,
os
.
path
.
basename
(
zip_filename
))
self
.
result
.
remote
.
receive_files
(
local_log_dir
,
zip_filename
)
archive
.
uncompress
(
zip_path_filename
,
local_log_dir
)
os
.
remove
(
zip_path_filename
)
...
...
@@ -87,7 +93,7 @@ class RemoteTestRunner(TestRunner):
return
failures
class
RemoteTestResult
(
TestResult
):
class
RemoteTestResult
(
Human
TestResult
):
"""
Remote Machine Test Result class.
...
...
@@ -100,32 +106,49 @@ class RemoteTestResult(TestResult):
:param stream: an instance of :class:`avocado.core.output.View`.
:param args: an instance of :class:`argparse.Namespace`.
"""
TestResult
.
__init__
(
self
,
stream
,
args
)
Human
TestResult
.
__init__
(
self
,
stream
,
args
)
self
.
test_dir
=
os
.
getcwd
()
self
.
remote_test_dir
=
'~/avocado/tests'
self
.
urls
=
self
.
args
.
url
self
.
remote
=
None
# Remote runner initialized during setup
self
.
output
=
'-'
self
.
command_line_arg_name
=
'--remote-hostname'
def
_copy_tests
(
self
):
"""
Gather test's directories and copy them recursively to
$remote_test_dir + $test_absolute_path.
:note: Default tests execution is translated into absolute paths too
"""
# TODO: Use `avocado.loader.TestLoader` instead
self
.
remote
.
makedir
(
self
.
remote_test_dir
)
uniq_urls
=
list
(
set
(
self
.
urls
))
for
url
in
uniq_urls
:
parent_dir
=
url
.
split
(
os
.
path
.
sep
)[
0
]
if
os
.
path
.
isdir
(
parent_dir
):
test_path
=
os
.
path
.
abspath
(
parent_dir
)
else
:
test_path
=
os
.
path
.
join
(
data_dir
.
get_test_dir
(),
"%s*"
%
url
)
self
.
remote
.
send_files
(
test_path
,
self
.
remote_test_dir
)
if
self
.
args
.
remote_no_copy
:
# Leave everytihng as is
return
paths
=
set
()
for
i
in
xrange
(
len
(
self
.
urls
)):
url
=
self
.
urls
[
i
]
if
not
os
.
path
.
exists
(
url
):
# use test_dir path + py
url
=
os
.
path
.
join
(
data_dir
.
get_test_dir
(),
'%s.py'
%
url
)
url
=
os
.
path
.
abspath
(
url
)
# always use abspath; avoid clashes
# modify url to remote_path + abspath
paths
.
add
(
os
.
path
.
dirname
(
url
))
self
.
urls
[
i
]
=
self
.
remote_test_dir
+
url
previous
=
' NOT ABSOLUTE PATH'
for
path
in
sorted
(
paths
):
if
os
.
path
.
commonprefix
((
path
,
previous
))
==
previous
:
continue
# already copied
rpath
=
self
.
remote_test_dir
+
path
self
.
remote
.
makedir
(
rpath
)
self
.
remote
.
send_files
(
path
,
os
.
path
.
dirname
(
rpath
))
previous
=
path
def
setup
(
self
):
self
.
urls
=
self
.
args
.
url
if
self
.
args
.
remote_hostname
is
None
:
e_msg
=
(
'Please set remote machine hostname with option '
'--remote-hostname.'
)
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
e_msg
)
raise
exceptions
.
TestSetupFail
(
e_msg
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"REMOTE LOGIN : %s@%s:%d"
%
(
self
.
args
.
remote_username
,
self
.
args
.
remote_hostname
,
self
.
args
.
remote_port
))
""" Setup remote environment and copy test's directories """
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
(
"REMOTE LOGIN : %s@%s:%d"
%
(
self
.
args
.
remote_username
,
self
.
args
.
remote_hostname
,
self
.
args
.
remote_port
)))
self
.
remote
=
remote
.
Remote
(
self
.
args
.
remote_hostname
,
self
.
args
.
remote_username
,
self
.
args
.
remote_password
,
...
...
@@ -134,95 +157,9 @@ class RemoteTestResult(TestResult):
self
.
_copy_tests
()
def
tear_down
(
self
):
""" Cleanup after test execution """
pass
def
start_tests
(
self
):
"""
Called once before any tests are executed.
"""
TestResult
.
start_tests
(
self
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"JOB ID : %s"
%
self
.
stream
.
job_unique_id
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"JOB LOG : %s"
%
self
.
stream
.
logfile
)
if
self
.
args
is
not
None
:
if
'html_output'
in
self
.
args
:
logdir
=
os
.
path
.
dirname
(
self
.
stream
.
logfile
)
html_file
=
os
.
path
.
join
(
logdir
,
'html'
,
'results.html'
)
self
.
stream
.
notify
(
event
=
"message"
,
msg
=
"JOB HTML : %s"
%
html_file
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"TESTS : %s"
%
self
.
tests_total
)
self
.
stream
.
set_tests_info
({
'tests_total'
:
self
.
tests_total
})
def
end_tests
(
self
):
"""
Called once after all tests are executed.
"""
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"PASS : %d"
%
len
(
self
.
passed
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"ERROR : %d"
%
len
(
self
.
errors
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"FAIL : %d"
%
len
(
self
.
failed
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"SKIP : %d"
%
len
(
self
.
skipped
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"WARN : %d"
%
len
(
self
.
warned
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"TIME : %.2f s"
%
self
.
total_time
)
def
start_test
(
self
,
test
):
"""
Called when the given test is about to run.
:param test: :class:`avocado.test.Test` instance.
"""
self
.
stream
.
add_test
(
test
)
def
end_test
(
self
,
test
):
"""
Called when the given test has been run.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
end_test
(
self
,
test
)
def
add_pass
(
self
,
test
):
"""
Called when a test succeeded.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_pass
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'PASS'
,
state
=
test
)
def
add_error
(
self
,
test
):
"""
Called when a test had a setup error.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_error
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'ERROR'
,
state
=
test
)
def
add_fail
(
self
,
test
):
"""
Called when a test fails.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_fail
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'FAIL'
,
state
=
test
)
def
add_skip
(
self
,
test
):
"""
Called when a test is skipped.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_skip
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'SKIP'
,
state
=
test
)
def
add_warn
(
self
,
test
):
"""
Called when a test had a warning.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_warn
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'WARN'
,
state
=
test
)
class
RunRemote
(
plugin
.
Plugin
):
...
...
@@ -232,34 +169,67 @@ class RunRemote(plugin.Plugin):
name
=
'run_remote'
enabled
=
True
remote_parser
=
None
def
configure
(
self
,
parser
):
if
remote
.
remote_capable
is
False
:
if
remote
.
REMOTE_CAPABLE
is
False
:
self
.
enabled
=
False
return
username
=
getpass
.
getuser
()
self
.
remote_parser
=
parser
.
runner
.
add_argument_group
(
'run on a remote machine '
'arguments'
)
self
.
remote_parser
.
add_argument
(
'--remote-hostname'
,
dest
=
'remote_hostname'
,
default
=
None
,
help
=
'Specify the hostname to login on remote machine'
)
msg
=
'run on a remote machine arguments'
self
.
remote_parser
=
parser
.
runner
.
add_argument_group
(
msg
)
self
.
remote_parser
.
add_argument
(
'--remote-hostname'
,
dest
=
'remote_hostname'
,
default
=
None
,
help
=
'Specify the hostname to login on'
' remote machine'
)
self
.
remote_parser
.
add_argument
(
'--remote-port'
,
dest
=
'remote_port'
,
default
=
22
,
type
=
int
,
help
=
'Specify the port number to login on remote machine. '
'Current: 22'
)
self
.
remote_parser
.
add_argument
(
'--remote-username'
,
dest
=
'remote_username'
,
default
=
22
,
type
=
int
,
help
=
'Specify '
'the port number to login on remote '
'machine. Current: 22'
)
self
.
remote_parser
.
add_argument
(
'--remote-username'
,
dest
=
'remote_username'
,
default
=
username
,
help
=
(
'Specify the username to login on remote machine. '
'Current: %(default)s'
))
self
.
remote_parser
.
add_argument
(
'--remote-password'
,
dest
=
'remote_password'
,
default
=
None
,
help
=
'Specify the password to login on remote machine'
)
help
=
'Specify the username to login on'
' remote machine. Current: '
'%(default)s'
)
self
.
remote_parser
.
add_argument
(
'--remote-password'
,
dest
=
'remote_password'
,
default
=
None
,
help
=
'Specify the password to login on'
' remote machine'
)
self
.
remote_parser
.
add_argument
(
'--remote-no-copy'
,
dest
=
'remote_no_copy'
,
action
=
'store_true'
,
help
=
"Don't copy tests and use the "
"exact uri on guest machine."
)
self
.
configured
=
True
@
staticmethod
def
_check_required_args
(
app_args
,
enable_arg
,
required_args
):
"""
:return: True when enable_arg enabled and all required args are set
:raise sys.exit: When missing required argument.
"""
if
(
not
hasattr
(
app_args
,
enable_arg
)
or
not
getattr
(
app_args
,
enable_arg
)):
return
False
missing
=
[]
for
arg
in
required_args
:
if
not
getattr
(
app_args
,
arg
):
missing
.
append
(
arg
)
if
missing
:
from
avocado.core
import
output
,
exit_codes
import
sys
view
=
output
.
View
(
app_args
=
app_args
,
use_paginator
=
True
)
e_msg
=
(
'Use of %s requires %s arguments to be set. Please set %s'
'.'
%
(
enable_arg
,
', '
.
join
(
required_args
),
', '
.
join
(
missing
)))
view
.
notify
(
event
=
'error'
,
msg
=
e_msg
)
return
sys
.
exit
(
exit_codes
.
AVOCADO_FAIL
)
return
True
def
activate
(
self
,
app_args
):
try
:
if
app_args
.
remote_hostname
is
not
None
:
self
.
remote_parser
.
set_defaults
(
remote_result
=
RemoteTestResult
,
test_runner
=
RemoteTestRunner
)
except
AttributeError
:
pass
if
self
.
_check_required_args
(
app_args
,
'remote_hostname'
,
(
'remote_hostname'
,)):
self
.
remote_parser
.
set_defaults
(
remote_result
=
RemoteTestResult
,
test_runner
=
RemoteTestRunner
)
avocado/plugins/vm.py
浏览文件 @
a7ad7f4b
...
...
@@ -14,111 +14,28 @@
"""Run tests on Virtual Machine."""
import
os
import
getpass
import
json
from
avocado.core
import
exceptions
from
avocado.core
import
status
from
avocado.core
import
data_dir
from
avocado.runner
import
TestRunner
from
avocado.result
import
TestResult
from
avocado.plugins
import
plugin
from
avocado.plugins.remote
import
RemoteTestResult
from
avocado.plugins.remote
import
RemoteTestRunner
from
avocado.utils
import
virt
from
avocado.utils
import
archive
from
avocado.test
import
RemoteTest
class
VMTestRunner
(
TestRunner
):
remote_test_dir
=
'~/avocado/tests'
def
run_test
(
self
,
urls
):
"""
Run tests.
:param urls: a string with test URLs.
:return: a dictionary with test results.
"""
avocado_cmd
=
(
'cd %s; avocado run --force-job-id %s --json - --archive %s'
%
(
self
.
remote_test_dir
,
self
.
result
.
stream
.
job_unique_id
,
" "
.
join
(
urls
)))
result
=
self
.
result
.
vm
.
remote
.
run
(
avocado_cmd
,
ignore_status
=
True
)
xunit_output
=
result
.
stdout
.
splitlines
()[
0
]
try
:
results
=
json
.
loads
(
xunit_output
)
except
Exception
,
details
:
raise
ValueError
(
'Error loading JSON '
'(full output below): %s
\n
"""
\n
%s
\n
"""'
%
(
details
,
result
.
stdout
))
return
results
def
run_suite
(
self
,
params_list
):
"""
Run one or more tests and report with test result.
:param params_list: a list of param dicts.
:return: a list of test failures.
"""
failures
=
[]
self
.
result
.
setup
()
results
=
self
.
run_test
(
self
.
result
.
urls
)
remote_log_dir
=
os
.
path
.
dirname
(
results
[
'debuglog'
])
self
.
result
.
start_tests
()
for
tst
in
results
[
'tests'
]:
test
=
RemoteTest
(
name
=
tst
[
'test'
],
time
=
tst
[
'time'
],
start
=
tst
[
'start'
],
end
=
tst
[
'end'
],
status
=
tst
[
'status'
])
state
=
test
.
get_state
()
self
.
result
.
start_test
(
state
)
self
.
result
.
check_test
(
state
)
if
not
status
.
mapping
[
state
[
'status'
]]:
failures
.
append
(
state
[
'tagged_name'
])
local_log_dir
=
os
.
path
.
dirname
(
self
.
result
.
stream
.
debuglog
)
zip_filename
=
remote_log_dir
+
'.zip'
zip_path_filename
=
os
.
path
.
join
(
local_log_dir
,
os
.
path
.
basename
(
zip_filename
))
self
.
result
.
vm
.
remote
.
receive_files
(
local_log_dir
,
zip_filename
)
archive
.
uncompress
(
zip_path_filename
,
local_log_dir
)
os
.
remove
(
zip_path_filename
)
self
.
result
.
end_tests
()
self
.
result
.
tear_down
()
return
failures
class
VMTestResult
(
TestResult
):
class
VMTestResult
(
RemoteTestResult
):
"""
Virtual Machine Test Result class.
"""
command_line_arg_name
=
'--vm'
def
__init__
(
self
,
stream
,
args
):
"""
Creates an instance of VMTestResult.
:param stream: an instance of :class:`avocado.core.output.View`.
:param args: an instance of :class:`argparse.Namespace`.
"""
TestResult
.
__init__
(
self
,
stream
,
args
)
self
.
test_dir
=
os
.
getcwd
()
self
.
remote_test_dir
=
'~/avocado/tests'
self
.
output
=
'-'
def
_copy_tests
(
self
):
self
.
vm
.
remote
.
makedir
(
self
.
remote_test_dir
)
uniq_urls
=
list
(
set
(
self
.
urls
))
for
url
in
uniq_urls
:
parent_dir
=
url
.
split
(
os
.
path
.
sep
)[
0
]
if
os
.
path
.
isdir
(
parent_dir
):
test_path
=
os
.
path
.
abspath
(
parent_dir
)
else
:
test_path
=
os
.
path
.
join
(
data_dir
.
get_test_dir
(),
"%s*"
%
url
)
self
.
vm
.
remote
.
send_files
(
test_path
,
self
.
remote_test_dir
)
super
(
VMTestResult
,
self
).
__init__
(
stream
,
args
)
self
.
vm
=
None
self
.
command_line_arg_name
=
'--vm-domain'
def
setup
(
self
):
self
.
urls
=
self
.
args
.
url
# Super called after VM is found and initialized
if
self
.
args
.
vm_domain
is
None
:
e_msg
=
(
'Please set Virtual Machine Domain with option '
'--vm-domain.'
)
...
...
@@ -129,127 +46,42 @@ class VMTestResult(TestResult):
'--vm-hostname.'
)
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
e_msg
)
raise
exceptions
.
TestSetupFail
(
e_msg
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"VM DOMAIN : %s"
%
self
.
args
.
vm_domain
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"VM LOGIN : %s@%s"
%
(
self
.
args
.
vm_username
,
self
.
args
.
vm_hostname
)
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"VM DOMAIN : %s"
%
self
.
args
.
vm_domain
)
self
.
vm
=
virt
.
vm_connect
(
self
.
args
.
vm_domain
,
self
.
args
.
vm_hypervisor_uri
)
if
self
.
vm
is
None
:
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not connect to VM '%s'"
%
self
.
args
.
vm_domain
)
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not connect to VM '%s'"
%
self
.
args
.
vm_domain
)
raise
exceptions
.
TestSetupFail
()
if
self
.
vm
.
start
()
is
False
:
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not start VM '%s'"
%
self
.
args
.
vm_domain
)
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not start VM '%s'"
%
self
.
args
.
vm_domain
)
raise
exceptions
.
TestSetupFail
()
assert
self
.
vm
.
domain
.
isActive
()
is
not
False
if
self
.
args
.
vm_cleanup
is
True
:
self
.
vm
.
create_snapshot
()
if
self
.
vm
.
snapshot
is
None
:
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not create snapshot on VM '%s'"
%
self
.
args
.
vm_domain
)
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not create "
"snapshot on VM '%s'"
%
self
.
args
.
vm_domain
)
raise
exceptions
.
TestSetupFail
()
try
:
self
.
vm
.
setup_login
(
self
.
args
.
vm_hostname
,
self
.
args
.
vm_username
,
self
.
args
.
vm_password
)
except
Exception
as
err
:
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not login on VM '%s': %s"
%
(
self
.
args
.
vm_hostname
,
err
))
self
.
tear_down
()
raise
exceptions
.
TestSetupFail
()
if
self
.
vm
.
logged
is
False
or
self
.
vm
.
remote
.
uptime
()
is
''
:
self
.
stream
.
notify
(
event
=
'error'
,
msg
=
"Could not login on VM '%s'"
%
self
.
args
.
vm_hostname
)
# Finish remote setup and copy the tests
self
.
args
.
remote_hostname
=
self
.
args
.
vm_hostname
self
.
args
.
remote_username
=
self
.
args
.
vm_username
self
.
args
.
remote_password
=
self
.
args
.
vm_password
self
.
args
.
remote_no_copy
=
self
.
args
.
vm_no_copy
super
(
VMTestResult
,
self
).
setup
()
except
Exception
:
self
.
tear_down
()
raise
exceptions
.
TestSetupFail
()
self
.
_copy_tests
()
raise
def
tear_down
(
self
):
super
(
VMTestResult
,
self
).
tear_down
()
if
self
.
args
.
vm_cleanup
is
True
and
self
.
vm
.
snapshot
is
not
None
:
self
.
vm
.
restore_snapshot
()
def
start_tests
(
self
):
"""
Called once before any tests are executed.
"""
TestResult
.
start_tests
(
self
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"JOB ID : %s"
%
self
.
stream
.
job_unique_id
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"JOB LOG : %s"
%
self
.
stream
.
logfile
)
if
self
.
args
is
not
None
:
if
'html_output'
in
self
.
args
:
logdir
=
os
.
path
.
dirname
(
self
.
stream
.
logfile
)
html_file
=
os
.
path
.
join
(
logdir
,
'html'
,
'results.html'
)
self
.
stream
.
notify
(
event
=
"message"
,
msg
=
"JOB HTML : %s"
%
html_file
)
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"TESTS : %s"
%
self
.
tests_total
)
self
.
stream
.
set_tests_info
({
'tests_total'
:
self
.
tests_total
})
def
end_tests
(
self
):
"""
Called once after all tests are executed.
"""
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"PASS : %d"
%
len
(
self
.
passed
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"ERROR : %d"
%
len
(
self
.
errors
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"FAIL : %d"
%
len
(
self
.
failed
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"SKIP : %d"
%
len
(
self
.
skipped
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"WARN : %d"
%
len
(
self
.
warned
))
self
.
stream
.
notify
(
event
=
'message'
,
msg
=
"TIME : %.2f s"
%
self
.
total_time
)
def
start_test
(
self
,
test
):
"""
Called when the given test is about to run.
:param test: :class:`avocado.test.Test` instance.
"""
self
.
stream
.
add_test
(
test
)
def
end_test
(
self
,
test
):
"""
Called when the given test has been run.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
end_test
(
self
,
test
)
def
add_pass
(
self
,
test
):
"""
Called when a test succeeded.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_pass
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'PASS'
,
state
=
test
)
def
add_error
(
self
,
test
):
"""
Called when a test had a setup error.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_error
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'ERROR'
,
state
=
test
)
def
add_fail
(
self
,
test
):
"""
Called when a test fails.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_fail
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'FAIL'
,
state
=
test
)
def
add_skip
(
self
,
test
):
"""
Called when a test is skipped.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_skip
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'SKIP'
,
state
=
test
)
def
add_warn
(
self
,
test
):
"""
Called when a test had a warning.
:param test: :class:`avocado.test.Test` instance.
"""
TestResult
.
add_warn
(
self
,
test
)
self
.
stream
.
set_test_status
(
status
=
'WARN'
,
state
=
test
)
class
RunVM
(
plugin
.
Plugin
):
...
...
@@ -259,27 +91,25 @@ class RunVM(plugin.Plugin):
name
=
'run_vm'
enabled
=
True
vm_parser
=
None
def
configure
(
self
,
parser
):
if
virt
.
virt_capable
is
False
:
if
virt
.
VIRT_CAPABLE
is
False
:
self
.
enabled
=
False
return
username
=
getpass
.
getuser
()
default_hypervisor_uri
=
'qemu:///system'
self
.
vm_parser
=
parser
.
runner
.
add_argument_group
(
'run on a libvirt
domain
'
'arguments'
)
self
.
vm_parser
=
parser
.
runner
.
add_argument_group
(
'run on a libvirt '
'
domain
arguments'
)
self
.
vm_parser
.
add_argument
(
'--vm'
,
action
=
'store_true'
,
default
=
False
,
help
=
(
'Run tests on a Virtual Machine '
'(Libvirt Domain)'
))
self
.
vm_parser
.
add_argument
(
'--vm-domain'
,
dest
=
'vm_domain'
,
help
=
(
'Specify Libvirt Domain Name'
))
self
.
vm_parser
.
add_argument
(
'--vm-hypervisor-uri'
,
dest
=
'vm_hypervisor_uri'
,
default
=
default_hypervisor_uri
,
help
=
(
'Specify hypervisor URI driver '
'connection. Current: %s'
%
default_hypervisor_uri
))
self
.
vm_parser
.
add_argument
(
'--vm-domain'
,
dest
=
'vm_domain'
,
help
=
(
'Specify Libvirt Domain Name'
))
self
.
vm_parser
.
add_argument
(
'--vm-hostname'
,
dest
=
'vm_hostname'
,
help
=
'Specify VM hostname to login'
)
self
.
vm_parser
.
add_argument
(
'--vm-username'
,
dest
=
'vm_username'
,
...
...
@@ -291,14 +121,42 @@ class RunVM(plugin.Plugin):
self
.
vm_parser
.
add_argument
(
'--vm-cleanup'
,
dest
=
'vm_cleanup'
,
action
=
'store_true'
,
default
=
False
,
help
=
(
'Restore VM to a previous state, before '
'running tests'
))
help
=
'Restore VM to a previous state, '
'before running tests'
)
self
.
vm_parser
.
add_argument
(
'--vm-no-copy'
,
dest
=
'vm_no_copy'
,
action
=
'store_true'
,
help
=
"Don't copy tests and use the "
"exact uri on VM machine."
)
self
.
configured
=
True
@
staticmethod
def
_check_required_args
(
app_args
,
enable_arg
,
required_args
):
"""
:return: True when enable_arg enabled and all required args are set
:raise sys.exit: When missing required argument.
"""
if
(
not
hasattr
(
app_args
,
enable_arg
)
or
not
getattr
(
app_args
,
enable_arg
)):
return
False
missing
=
[]
for
arg
in
required_args
:
if
not
getattr
(
app_args
,
arg
):
missing
.
append
(
arg
)
if
missing
:
from
avocado.core
import
output
,
exit_codes
import
sys
view
=
output
.
View
(
app_args
=
app_args
,
use_paginator
=
True
)
e_msg
=
(
'Use of %s requires %s arguments to be set. Please set %s'
'.'
%
(
enable_arg
,
', '
.
join
(
required_args
),
', '
.
join
(
missing
)))
view
.
notify
(
event
=
'error'
,
msg
=
e_msg
)
return
sys
.
exit
(
exit_codes
.
AVOCADO_FAIL
)
return
True
def
activate
(
self
,
app_args
):
try
:
if
app_args
.
vm
:
self
.
vm_parser
.
set_defaults
(
vm_result
=
VMTestResult
,
test_runner
=
VMTestRunner
)
except
AttributeError
:
pass
if
self
.
_check_required_args
(
app_args
,
'vm_domain'
,
(
'vm_domain'
,
'vm_hostname'
)):
self
.
vm_parser
.
set_defaults
(
remote_result
=
VMTestResult
,
test_runner
=
RemoteTestRunner
)
avocado/utils/remote.py
浏览文件 @
a7ad7f4b
...
...
@@ -20,20 +20,20 @@ import getpass
import
logging
import
time
log
=
logging
.
getLogger
(
'avocado.test'
)
from
avocado.core
import
exceptions
from
avocado.core
import
output
from
avocado.utils
import
process
LOG
=
logging
.
getLogger
(
'avocado.test'
)
try
:
import
fabric.api
import
fabric.operations
except
ImportError
:
remote_capable
=
False
log
.
info
(
'Remote module is disabled: could not import fabric'
)
REMOTE_CAPABLE
=
False
LOG
.
info
(
'Remote module is disabled: could not import fabric'
)
else
:
remote_capable
=
True
from
avocado.core
import
output
from
avocado.core
import
exceptions
from
avocado.utils
import
process
REMOTE_CAPABLE
=
True
class
Remote
(
object
):
...
...
@@ -70,7 +70,9 @@ class Remote(object):
connection_attempts
=
attempts
,
linewise
=
True
)
def
_setup_environment
(
self
,
**
kwargs
):
@
staticmethod
def
_setup_environment
(
**
kwargs
):
""" Setup fabric environemnt """
fabric
.
api
.
env
.
update
(
kwargs
)
def
run
(
self
,
command
,
ignore_status
=
False
,
timeout
=
60
):
...
...
@@ -84,7 +86,7 @@ class Remote(object):
:raise fabric.exceptions.CommandTimeout: When timeout exhausted.
"""
if
not
self
.
quiet
:
log
.
info
(
'[%s] Running command %s'
,
self
.
hostname
,
command
)
LOG
.
info
(
'[%s] Running command %s'
,
self
.
hostname
,
command
)
result
=
process
.
CmdResult
()
stdout
=
output
.
LoggingFile
(
logger
=
logging
.
getLogger
(
'avocado.test'
))
stderr
=
output
.
LoggingFile
(
logger
=
logging
.
getLogger
(
'avocado.test'
))
...
...
@@ -137,12 +139,12 @@ class Remote(object):
:param remote_path: the remote path.
"""
if
not
self
.
quiet
:
log
.
info
(
'[%s] Receive remote
files %s -> %s'
,
self
.
hostname
,
LOG
.
info
(
'[%s] Sending
files %s -> %s'
,
self
.
hostname
,
local_path
,
remote_path
)
with
fabric
.
context_managers
.
quiet
():
try
:
fabric
.
operations
.
put
(
local_path
,
remote_path
)
fabric
.
operations
.
put
(
local_path
,
remote_path
,
mirror_local_mode
=
True
)
except
ValueError
:
return
False
return
True
...
...
@@ -155,7 +157,7 @@ class Remote(object):
:param remote_path: the remote path.
"""
if
not
self
.
quiet
:
log
.
info
(
'[%s] Receive remote files %s -> %s'
,
self
.
hostname
,
LOG
.
info
(
'[%s] Receive remote files %s -> %s'
,
self
.
hostname
,
local_path
,
remote_path
)
with
fabric
.
context_managers
.
quiet
():
try
:
...
...
avocado/utils/virt.py
浏览文件 @
a7ad7f4b
...
...
@@ -17,23 +17,23 @@ Module to provide classes for Virtual Machines.
"""
import
logging
from
xml.dom
import
minidom
log
=
logging
.
getLogger
(
'avocado.test'
)
from
avocado.utils
import
remote
LOG
=
logging
.
getLogger
(
'avocado.test'
)
try
:
import
libvirt
except
ImportError
:
virt_capable
=
False
log
.
info
(
'Virt module is disabled: could not import libvirt'
)
VIRT_CAPABLE
=
False
LOG
.
info
(
'Virt module is disabled: could not import libvirt'
)
else
:
virt_capable
=
True
VIRT_CAPABLE
=
True
from
xml.dom
import
minidom
from
avocado.utils
import
remote
if
remote
.
remote_capable
is
False
:
virt_capable
=
False
log
.
info
(
'Virt module is disabled: remote module is disabled'
)
if
remote
.
REMOTE_CAPABLE
is
False
:
VIRT_CAPABLE
=
False
LOG
.
info
(
'Virt module is disabled: remote module is disabled'
)
class
Hypervisor
(
object
):
...
...
selftests/all/functional/avocado/output_tests.py
浏览文件 @
a7ad7f4b
...
...
@@ -60,14 +60,16 @@ class OutputPluginTest(unittest.TestCase):
def
test_output_incompatible_setup_2
(
self
):
os
.
chdir
(
basedir
)
cmd_line
=
'./scripts/avocado run --sysinfo=off --vm --json - passtest'
cmd_line
=
(
'./scripts/avocado run --sysinfo=off --vm-domain aaa '
'--vm-hostname host --json - passtest'
)
result
=
process
.
run
(
cmd_line
,
ignore_status
=
True
)
expected_rc
=
2
output
=
result
.
stdout
+
result
.
stderr
self
.
assertEqual
(
result
.
exit_status
,
expected_rc
,
"Avocado did not return rc %d:
\n
%s"
%
(
expected_rc
,
result
))
error_excerpt
=
"Options --json --vm are trying to use stdout simultaneously"
error_excerpt
=
(
"Options --json --vm-domain are trying to use "
"stdout simultaneously"
)
self
.
assertIn
(
error_excerpt
,
output
,
"Missing excerpt error message from output:
\n
%s"
%
output
)
...
...
selftests/all/unit/avocado/remote_unittest.py
浏览文件 @
a7ad7f4b
import
unittest
import
os
import
sys
import
json
import
argparse
# simple magic for using scripts within a source tree
basedir
=
os
.
path
.
dirname
(
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
)))
basedir
=
os
.
path
.
dirname
(
basedir
)
if
os
.
path
.
isdir
(
os
.
path
.
join
(
basedir
,
'avocado'
)):
sys
.
path
.
append
(
basedir
)
from
avocado.core
import
status
from
avocado.core
import
job_id
from
avocado.plugins
import
remote
class
_Stream
(
object
):
job_unique_id
=
job_id
.
create_unique_job_id
()
#!/usr/bin/env python
def
start_file_logging
(
self
,
param1
,
param2
):
pass
import
unittest
def
stop_file_logging
(
self
):
pass
from
flexmock
import
flexmock
,
flexmock_teardown
def
set_tests_info
(
self
,
info
):
pass
from
avocado.plugins
import
remote
def
notify
(
self
,
event
,
msg
):
pass
def
add_test
(
self
,
state
):
pass
JSON_RESULTS
=
(
'Something other than json
\n
'
'{"tests": [{"test": "sleeptest.1", "url": "sleeptest", '
'"status": "PASS", "time": 1.23, "start": 0, "end": 1.23}],'
'"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.'
'37/debug.log", "errors": 0, "skip": 0, "time": 1.4, '
'"start": 0, "end": 1.4, "pass": 1, "failures": 0, "total": '
'1}
\n
Additional stuff other than json'
)
def
set_test_status
(
self
,
status
,
state
):
pass
class
RemoteTestRunnerTest
(
unittest
.
TestCase
):
class
RemoteResultTest
(
unittest
.
TestCase
):
""" Tests RemoteTestRunner """
def
setUp
(
self
):
args
=
argparse
.
Namespace
()
stream
=
_Stream
()
stream
.
logfile
=
'debug.log'
self
.
test_result
=
remote
.
RemoteTestResult
(
stream
,
args
)
j
=
'''{"tests": [{"test": "sleeptest.1", "url": "sleeptest", "status": "PASS",
"time": 1.23, "start": 0, "end": 1.23}],
"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.37/debug.log",
"errors": 0, "skip": 0, "time": 1.4, "start": 0, "end": 1.4,
"pass": 1, "failures": 0, "total": 1}'''
self
.
results
=
json
.
loads
(
j
)
def
test_check
(
self
):
failures
=
[]
self
.
test_result
.
start_tests
()
for
tst
in
self
.
results
[
'tests'
]:
test
=
remote
.
RemoteTest
(
name
=
tst
[
'test'
],
time
=
tst
[
'time'
],
start
=
tst
[
'start'
],
end
=
tst
[
'end'
],
status
=
tst
[
'status'
])
self
.
test_result
.
start_test
(
test
.
get_state
())
self
.
test_result
.
check_test
(
test
.
get_state
())
if
not
status
.
mapping
[
test
.
status
]:
failures
.
append
(
test
.
tagged_name
)
self
.
test_result
.
end_tests
()
self
.
assertEqual
(
self
.
test_result
.
tests_total
,
1
)
self
.
assertEqual
(
len
(
self
.
test_result
.
passed
),
1
)
self
.
assertEqual
(
len
(
self
.
test_result
.
failed
),
0
)
self
.
assertEqual
(
len
(
failures
),
0
)
flexmock
(
remote
.
RemoteTestRunner
).
should_receive
(
'__init__'
)
self
.
remote
=
remote
.
RemoteTestRunner
(
None
,
None
)
test_results
=
flexmock
(
stdout
=
JSON_RESULTS
)
stream
=
flexmock
(
job_unique_id
=
'sleeptest.1'
,
debuglog
=
'/local/path/dirname'
)
Remote
=
flexmock
()
args
=
(
"cd ~/avocado/tests; avocado run --force-job-id sleeptest.1 "
"--json - --archive sleeptest"
)
(
Remote
.
should_receive
(
'run'
)
.
with_args
(
args
,
timeout
=
None
,
ignore_status
=
True
)
.
once
().
and_return
(
test_results
))
Results
=
flexmock
(
remote
=
Remote
,
urls
=
[
'sleeptest'
],
stream
=
stream
)
Results
.
should_receive
(
'setup'
).
once
().
ordered
()
Results
.
should_receive
(
'start_tests'
).
once
().
ordered
()
args
=
{
'status'
:
u
'PASS'
,
'whiteboard'
:
''
,
'time_start'
:
0
,
'name'
:
u
'sleeptest.1'
,
'class_name'
:
'RemoteTest'
,
'traceback'
:
'Not supported yet'
,
'text_output'
:
'Not supported yet'
,
'time_end'
:
1.23
,
'tagged_name'
:
u
'sleeptest.1'
,
'time_elapsed'
:
1.23
,
'fail_class'
:
'Not supported yet'
,
'job_unique_id'
:
''
,
'fail_reason'
:
'Not supported yet'
}
Results
.
should_receive
(
'start_test'
).
once
().
with_args
(
args
).
ordered
()
Results
.
should_receive
(
'check_test'
).
once
().
with_args
(
args
).
ordered
()
(
Remote
.
should_receive
(
'receive_files'
)
.
with_args
(
'/local/path'
,
'/home/user/avocado/logs/run-2014-05-26-'
'15.45.37.zip'
)).
once
().
ordered
()
(
flexmock
(
remote
.
archive
).
should_receive
(
'uncompress'
)
.
with_args
(
'/local/path/run-2014-05-26-15.45.37.zip'
,
'/local/path'
)
.
once
().
ordered
())
(
flexmock
(
remote
.
os
).
should_receive
(
'remove'
)
.
with_args
(
'/local/path/run-2014-05-26-15.45.37.zip'
).
once
()
.
ordered
())
Results
.
should_receive
(
'end_tests'
).
once
().
ordered
()
Results
.
should_receive
(
'tear_down'
).
once
().
ordered
()
self
.
remote
.
result
=
Results
def
tearDown
(
self
):
flexmock_teardown
()
def
test_run_suite
(
self
):
""" Test RemoteTestRunner.run_suite() """
self
.
remote
.
run_suite
(
None
)
flexmock_teardown
()
# Checks the expectations
class
RemoteTestResultTest
(
unittest
.
TestCase
):
""" Tests the RemoteTestResult """
def
setUp
(
self
):
Remote
=
flexmock
()
Stream
=
flexmock
()
(
flexmock
(
remote
.
os
).
should_receive
(
'getcwd'
)
.
and_return
(
'/current/directory'
).
ordered
())
Stream
.
should_receive
(
'notify'
).
once
().
ordered
()
remote_remote
=
flexmock
(
remote
.
remote
)
(
remote_remote
.
should_receive
(
'Remote'
)
.
with_args
(
'hostname'
,
'username'
,
'password'
,
22
,
quiet
=
True
)
.
once
().
ordered
()
.
and_return
(
Remote
))
(
Remote
.
should_receive
(
'makedir'
).
with_args
(
'~/avocado/tests'
)
.
once
().
ordered
())
(
flexmock
(
remote
.
os
.
path
).
should_receive
(
'exists'
)
.
with_args
(
'/tests/sleeptest'
).
once
().
and_return
(
True
).
ordered
())
(
flexmock
(
remote
.
os
.
path
).
should_receive
(
'exists'
)
.
with_args
(
'/tests/other/test'
).
once
().
and_return
(
True
).
ordered
())
(
flexmock
(
remote
.
os
.
path
).
should_receive
(
'exists'
)
.
with_args
(
'passtest'
).
once
().
and_return
(
False
).
ordered
())
(
flexmock
(
remote
.
data_dir
).
should_receive
(
'get_test_dir'
).
once
()
.
and_return
(
'/path/to/default/tests/location'
).
ordered
())
(
Remote
.
should_receive
(
'makedir'
)
.
with_args
(
"~/avocado/tests/path/to/default/tests/location"
)
.
once
().
ordered
())
(
Remote
.
should_receive
(
'send_files'
)
.
with_args
(
"/path/to/default/tests/location"
,
"~/avocado/tests/path/to/default/tests"
).
once
().
ordered
())
(
Remote
.
should_receive
(
'makedir'
)
.
with_args
(
"~/avocado/tests/tests"
)
.
once
().
ordered
())
(
Remote
.
should_receive
(
'send_files'
)
.
with_args
(
"/tests"
,
"~/avocado/tests"
).
once
().
ordered
())
Args
=
flexmock
(
test_result_total
=
1
,
url
=
[
'/tests/sleeptest'
,
'/tests/other/test'
,
'passtest'
],
remote_username
=
'username'
,
remote_hostname
=
'hostname'
,
remote_port
=
22
,
remote_password
=
'password'
,
remote_no_copy
=
False
)
self
.
remote
=
remote
.
RemoteTestResult
(
Stream
,
Args
)
def
tearDown
(
self
):
flexmock_teardown
()
def
test_setup
(
self
):
""" Tests RemoteTestResult.test_setup() """
self
.
remote
.
setup
()
flexmock_teardown
()
if
__name__
==
'__main__'
:
unittest
.
main
()
selftests/all/unit/avocado/vm_unittest.py
浏览文件 @
a7ad7f4b
import
unittest
import
os
import
sys
import
json
import
argparse
# simple magic for using scripts within a source tree
basedir
=
os
.
path
.
dirname
(
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
)))
basedir
=
os
.
path
.
dirname
(
basedir
)
if
os
.
path
.
isdir
(
os
.
path
.
join
(
basedir
,
'avocado'
)):
sys
.
path
.
append
(
basedir
)
from
avocado.core
import
status
from
avocado.core
import
job_id
from
avocado.plugins
import
vm
#!/usr/bin/env python
class
_Stream
(
object
):
job_unique_id
=
job_id
.
create_unique_job_id
()
def
start_file_logging
(
self
,
param1
,
param2
):
pass
import
unittest
def
stop_file_logging
(
self
):
pass
from
flexmock
import
flexmock
,
flexmock_teardown
def
set_tests_info
(
self
,
info
):
pass
from
avocado.plugins
import
vm
,
remote
def
notify
(
self
,
event
,
msg
):
pass
def
add_test
(
self
,
state
):
pass
JSON_RESULTS
=
(
'Something other than json
\n
'
'{"tests": [{"test": "sleeptest.1", "url": "sleeptest", '
'"status": "PASS", "time": 1.23, "start": 0, "end": 1.23}],'
'"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.'
'37/debug.log", "errors": 0, "skip": 0, "time": 1.4, '
'"start": 0, "end": 1.4, "pass": 1, "failures": 0, "total": '
'1}
\n
Additional stuff other than json'
)
def
set_test_status
(
self
,
status
,
state
):
pass
class
VMTestResultTest
(
unittest
.
TestCase
):
class
VMResultTest
(
unittest
.
TestCase
):
""" Tests the VMTestResult """
def
setUp
(
self
):
args
=
argparse
.
Namespace
()
stream
=
_Stream
()
stream
.
logfile
=
'debug.log'
self
.
test_result
=
vm
.
VMTestResult
(
stream
,
args
)
j
=
'''{"tests": [{"test": "sleeptest.1", "url": "sleeptest", "status": "PASS",
"time": 1.23, "start": 0.0, "end": 1.23}],
"debuglog": "/home/user/avocado/logs/run-2014-05-26-15.45.37/debug.log",
"errors": 0, "skip": 0, "time": 1.4,
"pass": 1, "failures": 0, "total": 1}'''
self
.
results
=
json
.
loads
(
j
)
def
test_check
(
self
):
failures
=
[]
self
.
test_result
.
start_tests
()
for
tst
in
self
.
results
[
'tests'
]:
test
=
vm
.
RemoteTest
(
name
=
tst
[
'test'
],
time
=
tst
[
'time'
],
start
=
tst
[
'start'
],
end
=
tst
[
'end'
],
status
=
tst
[
'status'
])
self
.
test_result
.
start_test
(
test
.
get_state
())
self
.
test_result
.
check_test
(
test
.
get_state
())
if
not
status
.
mapping
[
test
.
status
]:
failures
.
append
(
test
.
tagged_name
)
self
.
test_result
.
end_tests
()
self
.
assertEqual
(
self
.
test_result
.
tests_total
,
1
)
self
.
assertEqual
(
len
(
self
.
test_result
.
passed
),
1
)
self
.
assertEqual
(
len
(
self
.
test_result
.
failed
),
0
)
self
.
assertEqual
(
len
(
failures
),
0
)
# remote.RemoteTestResult.__init__()
Stream
=
flexmock
()
(
flexmock
(
remote
.
os
).
should_receive
(
'getcwd'
)
.
and_return
(
'/current/directory'
).
once
().
ordered
())
# vm.VMTestResult.setup()
(
Stream
.
should_receive
(
'notify'
)
.
with_args
(
msg
=
"VM DOMAIN : domain"
,
event
=
"message"
))
mock_vm
=
flexmock
(
snapshot
=
True
,
domain
=
flexmock
(
isActive
=
lambda
:
True
))
virt
=
flexmock
(
vm
.
virt
)
virt
.
should_receive
(
'vm_connect'
).
and_return
(
mock_vm
).
once
().
ordered
()
mock_vm
.
should_receive
(
'start'
).
and_return
(
True
).
once
().
ordered
()
mock_vm
.
should_receive
(
'create_snapshot'
).
once
().
ordered
()
RemoteTestResult
=
flexmock
(
remote
.
RemoteTestResult
)
RemoteTestResult
.
should_receive
(
'setup'
).
once
().
ordered
()
# vm.RemoteTestResult()
Args
=
flexmock
(
test_result_total
=
1
,
url
=
[
'/tests/sleeptest'
,
'/tests/other/test'
,
'passtest'
],
vm_domain
=
'domain'
,
vm_username
=
'username'
,
vm_hostname
=
'hostname'
,
vm_port
=
22
,
vm_password
=
'password'
,
vm_cleanup
=
True
,
vm_no_copy
=
False
,
vm_hypervisor_uri
=
'my_hypervisor_uri'
)
self
.
remote
=
vm
.
VMTestResult
(
Stream
,
Args
)
# vm.RemoteTestResult.tear_down()
RemoteTestResult
.
should_receive
(
'tear_down'
).
once
().
ordered
()
mock_vm
.
should_receive
(
'restore_snapshot'
).
once
().
ordered
()
def
tearDown
(
self
):
flexmock_teardown
()
def
test_setup
(
self
):
""" Tests VMTestResult.test_setup() """
self
.
remote
.
setup
()
self
.
remote
.
tear_down
()
flexmock_teardown
()
if
__name__
==
'__main__'
:
unittest
.
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录