Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenHarmony
Test Xdevice
提交
898eab88
T
Test Xdevice
项目概览
OpenHarmony
/
Test Xdevice
大约 1 年 前同步成功
通知
6
Star
23
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
Test Xdevice
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
898eab88
编写于
8月 01, 2022
作者:
D
deveco_test
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix code bug
Signed-off-by:
N
deveco_test
<
liguangjie1@huawei.com
>
上级
5e275552
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
115 addition
and
187 deletion
+115
-187
plugins/ohos/src/ohos/drivers/openharmony.py
plugins/ohos/src/ohos/drivers/openharmony.py
+63
-39
plugins/ohos/src/ohos/parser/parser.py
plugins/ohos/src/ohos/parser/parser.py
+52
-148
未找到文件。
plugins/ohos/src/ohos/drivers/openharmony.py
浏览文件 @
898eab88
...
...
@@ -366,7 +366,6 @@ class OHJSUnitTestDriver(IDriver):
if
test_to_run
else
0
))
if
not
test_to_run
:
self
.
runner
.
run
(
listener
)
self
.
runner
.
notify_finished
()
else
:
self
.
_run_with_rerun
(
listener
,
test_to_run
)
...
...
@@ -393,43 +392,35 @@ class OHJSUnitTestDriver(IDriver):
expected_tests
=
TestDescription
.
remove_test
(
expected_tests
,
test_run
)
if
not
expected_tests
:
LOG
.
warning
(
"No tests to re-run twice,please check"
)
self
.
runner
.
notify_finished
()
LOG
.
debug
(
"No tests to re-run, all tests executed at least "
"once."
)
if
self
.
rerun_all
:
self
.
_rerun_all
(
expected_tests
,
listener
)
else
:
self
.
_rerun_twice
(
expected_tests
,
listener
)
else
:
LOG
.
debug
(
"Rerun once success"
)
self
.
runner
.
notify_finished
()
self
.
_rerun_serially
(
expected_tests
,
listener
)
def
_rerun_
twice
(
self
,
expected_tests
,
listener
):
def
_rerun_
all
(
self
,
expected_tests
,
listener
):
tests
=
[]
for
test
in
expected_tests
:
tests
.
append
(
"%s#%s"
%
(
test
.
class_name
,
test
.
test_name
))
self
.
runner
.
add_arg
(
"class"
,
","
.
join
(
tests
))
LOG
.
debug
(
"Ready to rerun
twice
, expect run: %s"
%
len
(
expected_tests
))
LOG
.
debug
(
"Ready to rerun
all
, expect run: %s"
%
len
(
expected_tests
))
test_run
=
self
.
_run_tests
(
listener
)
LOG
.
debug
(
"Rerun
twice
, has run: %s"
%
len
(
test_run
))
LOG
.
debug
(
"Rerun
all
, has run: %s"
%
len
(
test_run
))
if
len
(
test_run
)
<
len
(
expected_tests
):
expected_tests
=
TestDescription
.
remove_test
(
expected_tests
,
test_run
)
if
not
expected_tests
:
LOG
.
warning
(
"No tests to re-run third,please check"
)
self
.
runner
.
notify_finished
()
else
:
self
.
_rerun_third
(
expected_tests
,
listener
)
else
:
LOG
.
debug
(
"Rerun twice success"
)
self
.
runner
.
notify_finished
()
LOG
.
debug
(
"Rerun textFile success"
)
self
.
_rerun_serially
(
expected_tests
,
listener
)
def
_rerun_
third
(
self
,
expected_tests
,
listener
):
tests
=
[]
def
_rerun_
serially
(
self
,
expected_tests
,
listener
):
LOG
.
debug
(
"Rerun serially, expected run: %s"
%
len
(
expected_tests
))
for
test
in
expected_tests
:
tests
.
append
(
"%s#%s"
%
(
test
.
class_name
,
test
.
test_name
))
self
.
runner
.
add_arg
(
"class"
,
","
.
join
(
tests
))
LOG
.
debug
(
"Ready to rerun third, expect run: %s"
%
len
(
expected_tests
))
self
.
_run_tests
(
listener
)
LOG
.
debug
(
"Rerun third success"
)
self
.
runner
.
notify_finished
()
self
.
runner
.
add_arg
(
"class"
,
"%s#%s"
%
(
test
.
class_name
,
test
.
test_name
))
self
.
runner
.
rerun
(
listener
,
test
)
self
.
runner
.
remove_arg
(
"class"
)
def
__result__
(
self
):
return
self
.
result
if
os
.
path
.
exists
(
self
.
result
)
else
""
...
...
@@ -458,33 +449,66 @@ class OHJSUnitTestRunner:
command
=
self
.
_get_dry_run_command
()
self
.
config
.
device
.
execute_shell_command
(
command
,
timeout
=
self
.
config
.
timeout
,
receiver
=
handler
,
retry
=
0
)
self
.
expect_tests_dict
=
parser_instances
[
0
].
tests_dict
return
parser_instances
[
0
].
tests
def
run
(
self
,
listener
):
handler
=
self
.
_get_shell_handler
(
listener
)
parsers
=
get_plugin
(
Plugin
.
PARSER
,
CommonParserType
.
oh_jsunit
)
if
parsers
:
parsers
=
parsers
[:
1
]
parser_instances
=
[]
for
parser
in
parsers
:
parser_instance
=
parser
.
__class__
()
parser_instance
.
suite_name
=
self
.
suite_name
parser_instance
.
listeners
=
listener
parser_instances
.
append
(
parser_instance
)
handler
=
ShellHandler
(
parser_instances
)
command
=
self
.
_get_run_command
()
self
.
config
.
device
.
execute_shell_command
(
command
,
timeout
=
self
.
config
.
timeout
,
receiver
=
handler
,
retry
=
0
)
def
notify_finished
(
self
):
if
self
.
finished_observer
:
self
.
finished_observer
.
notify_task_finished
()
def
rerun
(
self
,
listener
,
test
):
handler
=
None
if
self
.
rerun_attemp
:
test_tracker
=
CollectingPassListener
()
try
:
def
_get_shell_handler
(
self
,
listener
):
listener_copy
=
listener
.
copy
()
listener_copy
.
append
(
test_tracker
)
parsers
=
get_plugin
(
Plugin
.
PARSER
,
CommonParserType
.
oh_jsunit
)
if
parsers
:
parsers
=
parsers
[:
1
]
parser_instances
=
[]
for
parser
in
parsers
:
parser_instance
=
parser
.
__class__
()
parser_instance
.
suite_name
=
self
.
suite_name
parser_instance
.
listeners
=
listener_copy
parser_instances
.
append
(
parser_instance
)
handler
=
ShellHandler
(
parser_instances
)
command
=
self
.
_get_run_command
()
self
.
config
.
device
.
execute_shell_command
(
command
,
timeout
=
self
.
config
.
timeout
,
receiver
=
handler
,
retry
=
0
)
except
ShellCommandUnresponsiveException
as
_
:
LOG
.
debug
(
"Exception: ShellCommandUnresponsiveException"
)
finally
:
if
not
len
(
test_tracker
.
get_current_run_results
()):
LOG
.
debug
(
"No test case is obtained finally"
)
self
.
rerun_attemp
-=
1
handler
.
parsers
[
0
].
mark_test_as_blocked
(
test
)
else
:
LOG
.
debug
(
"Not execute and mark as blocked finally"
)
parsers
=
get_plugin
(
Plugin
.
PARSER
,
CommonParserType
.
cpptest
)
if
parsers
:
parsers
=
parsers
[:
1
]
parser_instances
=
[]
for
parser
in
parsers
:
parser_instance
=
parser
.
__class__
()
parser_instance
.
suite_name
=
self
.
suite_name
parser_instance
.
listeners
=
listener
parser_instance
.
runner
=
self
parser_instances
.
append
(
parser_instance
)
self
.
finished_observer
=
parser_instance
handler
=
ShellHandler
(
parser_instances
)
return
handler
handler
.
parsers
[
0
].
mark_test_as_blocked
(
test
)
def
add_arg
(
self
,
name
,
value
):
if
not
name
or
not
value
:
...
...
plugins/ohos/src/ohos/parser/parser.py
浏览文件 @
898eab88
...
...
@@ -1024,8 +1024,6 @@ class OHJSUnitTestParser(IParser):
self
.
start_time
=
datetime
.
datetime
.
now
()
self
.
test_time
=
0
self
.
test_run_finished
=
False
self
.
cur_sum
=
-
1
self
.
runner
=
None
def
get_suite_name
(
self
):
return
self
.
suite_name
...
...
@@ -1042,48 +1040,14 @@ class OHJSUnitTestParser(IParser):
def
parse
(
self
,
line
):
if
not
str
(
line
).
strip
():
return
if
line
.
startswith
(
OHJSUnitPrefixes
.
SUM
.
value
):
self
.
handle_sum_line
(
line
)
elif
line
.
startswith
(
OHJSUnitPrefixes
.
STATUS
.
value
):
self
.
handle_status_line
(
line
)
if
line
.
startswith
(
OHJSUnitPrefixes
.
STATUS
.
value
):
self
.
submit_current_key_value
()
self
.
parse_key
(
line
,
len
(
OHJSUnitPrefixes
.
STATUS
.
value
))
elif
line
.
startswith
(
OHJSUnitPrefixes
.
STATUS_CODE
.
value
):
self
.
submit_current_key_value
()
self
.
parse_status_code
(
line
)
def
handle_sum_line
(
self
,
line
):
value
=
line
[
len
(
OHJSUnitPrefixes
.
SUM
.
value
):].
split
(
"="
,
1
)[
0
]
self
.
cur_sum
=
int
(
value
)
def
handle_status_line
(
self
,
line
):
self
.
parse_key
(
line
,
len
(
OHJSUnitPrefixes
.
STATUS
.
value
))
if
self
.
cur_sum
>
0
and
self
.
current_key
==
"class"
:
if
self
.
current_value
not
in
self
.
runner
.
suite_recorder
.
keys
():
current_suite
=
self
.
state_machine
.
suite
(
reset
=
True
)
current_suite
.
test_num
=
self
.
cur_sum
current_suite
.
suite_name
=
self
.
current_value
self
.
runner
.
suite_recorder
.
update
({
self
.
current_value
:
[
len
(
self
.
runner
.
suite_recorder
.
keys
()),
current_suite
]})
else
:
current_suite
=
self
.
runner
.
suite_recorder
.
get
(
self
.
current_value
)[
1
]
self
.
state_machine
.
current_suite
=
current_suite
self
.
cur_sum
=
-
1
self
.
current_key
=
None
self
.
current_value
=
None
self
.
state_machine
.
running_test_index
=
0
for
listener
in
self
.
get_listeners
():
suite
=
copy
.
copy
(
current_suite
)
listener
.
__started__
(
LifeCycle
.
TestSuite
,
suite
)
else
:
if
self
.
current_key
==
"suiteconsuming"
:
self
.
test_time
=
int
(
self
.
current_value
)
elif
line
.
startswith
(
OHJSUnitPrefixes
.
TestFinished
.
value
):
self
.
handle_suite_end
()
else
:
self
.
submit_current_key_value
()
self
.
parse_key
(
line
,
len
(
OHJSUnitPrefixes
.
STATUS
.
value
))
def
submit_current_key_value
(
self
):
if
self
.
current_key
and
self
.
current_value
:
...
...
@@ -1127,13 +1091,13 @@ class OHJSUnitTestParser(IParser):
if
not
test_info
.
test_name
or
not
test_info
.
test_class
:
LOG
.
info
(
"Invalid instrumentation status bundle"
)
return
self
.
report_test_run_started
(
test_info
)
if
test_info
.
code
==
StatusCodes
.
START
.
value
:
self
.
start_time
=
datetime
.
datetime
.
now
()
for
listener
in
self
.
get_listeners
():
result
=
copy
.
copy
(
test_info
)
listener
.
__started__
(
LifeCycle
.
TestCase
,
result
)
return
if
test_info
.
code
==
StatusCodes
.
FAILURE
.
value
:
elif
test_info
.
code
==
StatusCodes
.
FAILURE
.
value
:
self
.
state_machine
.
running_test_index
+=
1
test_info
.
current
=
self
.
state_machine
.
running_test_index
end_time
=
datetime
.
datetime
.
now
()
...
...
@@ -1173,17 +1137,15 @@ class OHJSUnitTestParser(IParser):
listener
.
__ended__
(
LifeCycle
.
TestCase
,
result
)
test_info
.
is_completed
=
True
@
classmethod
def
output_stack_trace
(
cls
,
test_info
):
if
check_pub_key_exist
():
return
if
test_info
.
stacktrace
:
stack_lines
=
test_info
.
stacktrace
.
split
(
r
"\r\n"
)
LOG
.
error
(
"Stacktrace information is:"
)
for
line
in
stack_lines
:
line
.
strip
()
if
line
:
LOG
.
error
(
line
)
def
report_test_run_started
(
self
,
test_result
):
test_suite
=
self
.
state_machine
.
suite
()
if
not
self
.
state_machine
.
suite
().
is_started
:
if
not
test_suite
.
test_num
or
not
test_suite
.
suite_name
:
test_suite
.
suite_name
=
self
.
get_suite_name
()
test_suite
.
test_num
=
test_result
.
num_tests
for
listener
in
self
.
get_listeners
():
suite_report
=
copy
.
copy
(
test_suite
)
listener
.
__started__
(
LifeCycle
.
TestSuite
,
suite_report
)
@
staticmethod
def
check_legality
(
name
):
...
...
@@ -1192,106 +1154,52 @@ class OHJSUnitTestParser(IParser):
return
True
def
__done__
(
self
):
pass
def
handle_suite_end
(
self
):
suite_result
=
self
.
state_machine
.
suite
()
suite_result
.
run_time
=
self
.
test_time
suite_result
.
is_completed
=
True
for
listener
in
self
.
get_listeners
():
suite
=
copy
.
copy
(
suite_result
)
listener
.
__ended__
(
LifeCycle
.
TestSuite
,
suite
,
is_clear
=
True
)
def
handler_suites_end
(
self
):
suite_result
=
self
.
state_machine
.
suite
()
suite_result
.
run_time
=
self
.
test_time
suite_result
.
is_completed
=
True
for
listener
in
self
.
get_listeners
():
if
listener
.
__class__
.
__name__
==
"ReportListener"
:
self
.
_cal_result
(
listener
)
suite
=
copy
.
copy
(
suite_result
)
listener
.
__ended__
(
LifeCycle
.
TestSuites
,
suite
,
suites_name
=
self
.
suite_name
)
self
.
state_machine
.
current_suite
=
None
def
_cal_result
(
self
,
report_listener
):
result_len
=
len
(
report_listener
.
result
)
suites_len
=
len
(
report_listener
.
suites
)
if
result_len
!=
suites_len
:
diff_result_tuple_list
=
report_listener
.
result
[
suites_len
:]
report_listener
.
result
=
report_listener
.
result
[:
suites_len
]
for
diff_result_tuple
in
diff_result_tuple_list
:
suite
,
case_result_list
=
diff_result_tuple
pos
=
self
.
runner
.
suite_recorder
.
get
(
suite
.
suite_name
)[
0
]
report_listener
.
result
[
pos
][
1
].
extend
(
case_result_list
)
self
.
_handle_lacking_one_testcase
(
report_listener
)
self
.
_handle_lacking_whole_suite
(
report_listener
)
def
_handle_lacking_one_testcase
(
self
,
report_listener
):
for
suite
in
report_listener
.
suites
.
values
():
test_des_list
=
self
.
runner
.
expect_tests_dict
.
get
(
suite
.
suite_name
,
[])
pos
=
self
.
runner
.
suite_recorder
.
get
(
suite
.
suite_name
)[
0
]
if
len
(
test_des_list
)
==
len
(
report_listener
.
result
[
pos
][
1
]):
continue
for
test_des
in
test_des_list
:
is_contain
=
False
for
case
in
report_listener
.
result
[
pos
][
1
]:
if
case
.
test_name
==
test_des
.
test_name
:
is_contain
=
True
break
if
not
is_contain
:
test_result
=
self
.
state_machine
.
test
(
reset
=
True
)
test_result
.
test_class
=
test_des
.
class_name
test_result
.
test_name
=
test_des
.
test_name
test_result
.
stacktrace
=
"error_msg: mark blocked"
test_result
.
num_tests
=
1
test_result
.
run_time
=
0
test_result
.
code
=
ResultCode
.
BLOCKED
.
value
report_listener
.
result
[
pos
][
1
].
append
(
test_result
)
def
_handle_lacking_whole_suite
(
self
,
report_listener
):
all_suite_set
=
set
(
self
.
runner
.
expect_tests_dict
.
keys
())
un_suite_set
=
set
()
if
len
(
all_suite_set
)
>
len
(
report_listener
.
suites
):
suite_name_set
=
set
()
for
suite
in
report_listener
.
suites
.
values
():
suite_name_set
.
add
(
suite
.
suite_name
)
un_suite_set
.
union
(
all_suite_set
.
difference
(
suite_name_set
))
for
un_suite
in
un_suite_set
:
test_des_list
=
self
.
runner
.
expect_tests_dict
.
get
(
un_suite
.
suite_name
,
[])
current_suite
=
self
.
state_machine
.
suite
(
reset
=
True
)
current_suite
.
test_num
=
len
(
test_des_list
)
current_suite
.
suite_name
=
self
.
current_value
for
listener
in
self
.
get_listeners
():
suite
=
copy
.
copy
(
current_suite
)
listener
.
__started__
(
LifeCycle
.
TestSuite
,
suite
)
for
test
in
test_des_list
:
def
mark_test_as_blocked
(
self
,
test
):
if
not
self
.
state_machine
.
current_suite
and
not
test
.
class_name
:
return
suite_name
=
self
.
state_machine
.
current_suite
.
suite_name
if
\
self
.
state_machine
.
current_suite
else
self
.
get_suite_name
()
suite_result
=
self
.
state_machine
.
suite
(
reset
=
True
)
test_result
=
self
.
state_machine
.
test
(
reset
=
True
)
suite_result
.
suite_name
=
suite_name
or
test
.
class_name
suite_result
.
suite_num
=
1
test_result
.
test_class
=
test
.
class_name
test_result
.
test_name
=
test
.
test_name
test_result
.
stacktrace
=
"error_msg: mark
blocked"
test_result
.
stacktrace
=
"error_msg: marked
blocked"
test_result
.
num_tests
=
1
test_result
.
run_time
=
0
test_result
.
current
=
self
.
state_machine
.
running_test_index
+
1
test_result
.
code
=
ResultCode
.
BLOCKED
.
value
test_result
=
copy
.
copy
(
test_result
)
for
listener
in
self
.
get_listeners
():
listener
.
__started__
(
LifeCycle
.
TestCase
,
test_result
)
suite_report
=
copy
.
copy
(
suite_result
)
listener
.
__started__
(
LifeCycle
.
TestSuite
,
suite_report
)
for
listener
in
self
.
get_listeners
():
test_result
=
copy
.
copy
(
test_result
)
listener
.
__started__
(
LifeCycle
.
TestCase
,
test_result
)
for
listener
in
self
.
get_listeners
():
test_result
=
copy
.
copy
(
test_result
)
listener
.
__ended__
(
LifeCycle
.
TestCase
,
test_result
)
current_suite
.
run_time
=
self
.
test_time
current_suite
.
is_completed
=
True
for
listener
in
self
.
get_listeners
():
suite
=
copy
.
copy
(
current_suite
)
listener
.
__ended__
(
LifeCycle
.
TestSuite
,
suite
,
is_clear
=
True
)
suite_report
=
copy
.
copy
(
suite_result
)
listener
.
__ended__
(
LifeCycle
.
TestSuite
,
suite_report
,
is_clear
=
True
)
self
.
__done__
()
def
notify_task_finished
(
self
):
self
.
handler_suites_end
()
def
handle_suite_end
(
self
):
suite_result
=
self
.
state_machine
.
suite
()
suite_result
.
run_time
=
self
.
test_time
suite_result
.
is_completed
=
True
for
listener
in
self
.
get_listeners
():
suite
=
copy
.
copy
(
suite_result
)
listener
.
__ended__
(
LifeCycle
.
TestSuite
,
suite
,
is_clear
=
True
)
@
Plugin
(
type
=
Plugin
.
PARSER
,
id
=
CommonParserType
.
oh_jsunit_list
)
...
...
@@ -1319,11 +1227,7 @@ class OHJSUnitTestListParser(IParser):
suite_dict_list
=
json
.
loads
(
self
.
json_str
).
get
(
"suites"
,
[])
for
suite_dict
in
suite_dict_list
:
for
class_name
,
test_name_dict_list
in
suite_dict
.
items
():
self
.
tests_dict
.
update
({
class_name
.
strip
():
[]})
for
test_name_dict
in
test_name_dict_list
:
for
test_name
in
test_name_dict
.
values
():
test
=
TestDescription
(
class_name
.
strip
(),
test_name
.
strip
())
self
.
tests_dict
.
get
(
class_name
.
strip
()).
append
(
test
)
test
=
TestDescription
(
class_name
,
test_name
)
self
.
tests
.
append
(
test
)
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录