提交 8cc31bfb 编写于 作者: A alex__hold 提交者: alex_hold
上级 a85b082d
......@@ -127,8 +127,6 @@ class DeviceTestType(object):
DeviceTestType enumeration
"""
cpp_test = "CppTest"
dex_test = "DexTest"
dex_junit_test = "DexJUnitTest"
hap_test = "HapTest"
junit_test = "JUnitTest"
jsunit_test = "JSUnitTest"
......@@ -158,10 +156,8 @@ class HostDrivenTestType(object):
TEST_DRIVER_SET = {
DeviceTestType.cpp_test,
DeviceTestType.dex_test,
DeviceTestType.hap_test,
DeviceTestType.junit_test,
DeviceTestType.dex_junit_test,
DeviceTestType.jsunit_test,
DeviceTestType.cpp_test_lite,
DeviceTestType.ctest_lite,
......
......@@ -51,7 +51,6 @@ from xdevice_extension._core.exception import HapNotSupportTest
from xdevice_extension._core.exception import HdcCommandRejectedException
from xdevice_extension._core.exception import HdcError
from xdevice_extension._core.testkit.kit import junit_para_parse
from xdevice_extension._core.testkit.kit import junit_dex_para_parse
from xdevice_extension._core.testkit.kit import reset_junit_para
from xdevice_extension._core.utils import get_filename_extension
from xdevice_extension._core.utils import start_standing_subprocess
......@@ -60,7 +59,7 @@ from xdevice_extension._core.testkit.kit import gtest_para_parse
from xdevice_extension._core.environment.dmlib import process_command_ret
__all__ = ["CppTestDriver", "DexTestDriver", "HapTestDriver",
__all__ = ["CppTestDriver", "HapTestDriver",
"JSUnitTestDriver", "JUnitTestDriver", "RemoteTestRunner",
"RemoteDexRunner", "disable_keyguard"]
LOG = platform_logger("Drivers")
......@@ -904,7 +903,6 @@ class JUnitTestDriver(IDriver):
"""
handler the include-tests parameters in json file of current module.
the main approach is to inject new dict into "testargs".
then leave it to the method of "junit_dex_para_parse" to processing.
"""
if not self.config.include_tests:
return
......@@ -1114,251 +1112,6 @@ class RemoteTestRunner:
return handler
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.dex_junit_test)
class DexJunitDriver(IDriver):
"""
DexJunitDriver is a Test that runs a junit test package on given device.
"""
def __init__(self):
self.result = ""
self.error_message = ""
self.config = None
self.rerun = True
self.runner = None
self.rerun_using_test_file = True
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def __execute__(self, request):
try:
LOG.debug("Start execute xdevice extension Dex Junit Test")
self.config = request.config
self.config.device = request.config.environment.devices[0]
config_file = request.root.source.config_file
self.result = os.path.join(request.config.report_path, "result",
'.'.join((request.get_module_name(),
"xml")))
hilog = get_device_log_file(
request.config.report_path,
request.config.device.__get_serial__(),
"device_hilog")
hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
0o755)
with os.fdopen(hilog_open, "a") as hilog_file_pipe:
self.config.device.start_catch_device_log(hilog_file_pipe)
self._run_dex_junit(config_file, listeners=request.listeners,
request=request)
hilog_file_pipe.flush()
except Exception as exception:
self.error_message = exception
if not getattr(exception, "error_no", ""):
setattr(exception, "error_no", "03406")
LOG.exception(self.error_message, exc_info=False, error_no="03406")
raise exception
finally:
self.config.device.stop_catch_device_log()
self.result = check_result_report(
request.config.report_path, self.result, self.error_message)
def _run_dex_junit(self, config_file, listeners, request):
try:
if not os.path.exists(config_file):
LOG.error("Error: Test cases don't exit %s." % config_file)
raise ParamError(
"Error: Test cases don't exit %s." % config_file,
error_no="00102")
json_config = JsonParser(config_file)
kits = get_kit_instances(json_config, self.config.resource_path,
self.config.testcases_path)
for listener in listeners:
listener.device_sn = self.config.device.device_sn
self._get_driver_config(json_config)
do_module_kit_setup(request, kits)
self.runner = RemoteDexRunner(self.config)
self.runner.suite_name = request.get_module_name()
self._get_runner_config(json_config)
if hasattr(self.config, "history_report_path") and \
self.config.testargs.get("test"):
self._do_test_retry(listeners, self.config.testargs)
else:
self.runner.junit_para = junit_dex_para_parse(
self.config.device, self.config.testargs)
self._do_test_run(listeners)
finally:
do_module_kit_teardown(request)
if self.runner and self.runner.junit_para and (
self.runner.junit_para.find("testFile") != -1
or self.runner.junit_para.find("notTestFile") != -1):
self._junit_clear()
def _do_test_retry(self, listener, testargs):
for test in testargs.get("test"):
test_item = test.split("#")
if len(test_item) != 2:
continue
self.runner.class_name = test_item[0]
self.runner.test_name = test_item[1]
self.runner.run(listener)
def _do_test_run(self, listener):
test_to_run = self._collect_test_to_run()
LOG.debug("collected test count is: %s" % len(test_to_run))
if not test_to_run:
self.runner.run(listener)
else:
self._run_with_rerun(listener, test_to_run)
def _run_tests(self, listener):
test_tracker = CollectingTestListener()
try:
listener_copy = listener.copy()
listener_copy.append(test_tracker)
self.runner.run(listener_copy)
except ShellCommandUnresponsiveException:
LOG.debug("run test exception: ShellCommandUnresponsiveException")
finally:
test_run = test_tracker.get_current_run_results()
return test_run
def _run_with_rerun(self, listener, expected_tests):
LOG.debug("ready to run with rerun, expect run: %s"
% len(expected_tests))
test_run = self._run_tests(listener)
LOG.debug("run with rerun, has run: %s" % len(expected_tests))
if len(test_run) < len(expected_tests):
expected_tests = TestDescription.remove_test(expected_tests,
test_run)
if not expected_tests:
LOG.debug("No tests to re-run, all tests executed at least "
"once.")
if self.rerun_using_test_file:
self._rerun_file(expected_tests, listener)
else:
self._rerun_serially(expected_tests, listener)
def _make_test_file(self, expected_tests, test_file_path):
file_name = 'xdevice_testFile_%s.txt' % self.runner.suite_name
file_path = os.path.join(test_file_path, file_name)
try:
file_path_open = os.open(file_path, os.O_WRONLY | os.O_CREAT |
os.O_APPEND, 0o755)
with os.fdopen(file_path_open, "a") as file_desc:
for test in expected_tests:
file_desc.write("%s#%s" % (test.class_name,
test.test_name))
file_desc.write("\n")
file_desc.flush()
except(IOError, ValueError) as err_msg:
LOG.exception("Error for make long command file: ", err_msg,
exc_info=False, error_no="03200")
return file_name, file_path
def _rerun_file(self, expected_tests, listener):
test_file_path = tempfile.mkdtemp(prefix="test_file_",
dir=self.config.report_path)
file_name, file_path = self._make_test_file(
expected_tests, test_file_path)
self.config.device.push_file(file_path, ON_DEVICE_TEST_DIR_LOCATION)
file_path_on_device = ''.join((ON_DEVICE_TEST_DIR_LOCATION, file_name))
self.runner.add_instrumentation_arg("testFile", file_path_on_device)
self.runner.junit_para = ""
LOG.debug("ready to rerun file, expect run: %s" % len(expected_tests))
test_run = self._run_tests(listener)
LOG.debug("rerun file, has run: %s" % len(test_run))
self.config.device.execute_shell_command("rm %s" % file_path_on_device)
if len(test_run) < len(expected_tests):
expected_tests = TestDescription.remove_test(expected_tests,
test_run)
if not expected_tests:
LOG.debug("Rerun textFile success")
self._rerun_serially(expected_tests, listener)
shutil.rmtree(test_file_path)
def _rerun_serially(self, expected_tests, listener):
LOG.debug("rerun serially, expected run: %s" % len(expected_tests))
self.runner.remove_instrumentation_arg("testFile")
for test in expected_tests:
self.runner.class_name = test.class_name
self.runner.test_name = test.test_name
self.runner.rerun(listener, test)
def _collect_test_to_run(self):
if self.rerun:
self.runner.set_test_collection(True)
tests = self._collect_test_and_retry()
self.runner.set_test_collection(False)
return tests
return None
def _collect_test_and_retry(self):
collector = CollectingTestListener()
listener = [collector]
self.runner.run(listener)
run_results = collector.get_current_run_results()
return run_results
def _junit_clear(self):
_lock_screen(self.config.device)
self.config.device.execute_shell_command(
"rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur"))
def _get_driver_config(self, json_config):
self.config.remote_path = get_config_value(
'device-test-path', json_config.get_driver(),
default="/%s/%s" % ("data", "tmp"), is_list=False)
module_name = get_config_value(
'module-name', json_config.get_driver(), False)
if module_name:
self.config.module_name = module_name
else:
raise ParamError("Can't find module_name.", error_no="03201")
rerun = get_config_value('rerun', json_config.get_driver(), False)
if isinstance(rerun, bool):
self.rerun = rerun
elif rerun == "False" or rerun == "false":
self.rerun = False
timeout_config = get_config_value('shell-timeout',
json_config.get_driver(), False)
if timeout_config:
self.config.timeout = int(timeout_config)
else:
self.config.timeout = TIME_OUT
def _get_runner_config(self, json_config):
test_timeout = get_config_value('test-timeout',
json_config.get_driver(), False)
if test_timeout:
self.runner.add_instrumentation_arg("timeout_msec",
int(test_timeout))
def _query_runner_name(self, package):
stdout = self.config.device.hdc_command(
"shell pm list instrumentation")
packages = stdout.split("\n")
for item in packages:
if package in item:
return item.split("/")[1].split(" ")[0]
def __result__(self):
return self.result if os.path.exists(self.result) else ""
class RemoteDexRunner:
def __init__(self, config):
self.arg_list = {}
......@@ -1476,199 +1229,6 @@ class RemoteDexRunner:
return handler
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.dex_test)
class DexTestDriver(IDriver):
"""
DexTestDriver is a Test that runs a native test package on given device.
"""
# test driver config
config = None
result = ""
def __check_environment__(self, device_options):
pass
def __check_config__(self, config):
pass
def __execute__(self, request):
try:
LOG.debug("Start execute xdevice extension DexTest")
self.config = request.config
self.config.target_test_path = DEFAULT_TEST_PATH
self.config.device = request.config.environment.devices[0]
suite_file = request.root.source.source_file
if not suite_file:
LOG.error("test source '%s' not exists" %
request.root.source.source_string, error_no="00110")
return
LOG.debug("Testsuite FilePath: %s" % suite_file)
serial = request.config.device.__get_serial__()
device_log_file = get_device_log_file(request.config.report_path,
serial)
device_log_file_open = os.open(device_log_file, os.O_WRONLY |
os.O_CREAT | os.O_APPEND, 0o755)
with os.fdopen(device_log_file_open, "a") as file_pipe:
self.config.device.start_catch_device_log(file_pipe)
self._init_junit_test()
self._run_junit_test(suite_file)
file_pipe.flush()
finally:
self.config.device.stop_catch_device_log()
def _init_junit_test(self):
cmd = "target mount" \
if self.config.device.usb_type == DeviceConnectorType.hdc \
else "remount"
self.config.device.hdc_command(cmd)
self.config.device.execute_shell_command(
"rm -rf %s" % self.config.target_test_path)
self.config.device.execute_shell_command(
"mkdir -p %s" % self.config.target_test_path)
self.config.device.execute_shell_command(
"mount -o rw,remount,rw /%s" % "system")
def _run_junit_test(self, suite_file):
filename = os.path.basename(suite_file)
suitefile_target_test_path = self.config.target_test_path
junit_test_para = self._get_junit_test_para(filename, suite_file)
is_coverage_test = True if self.config.coverage == "coverage" else \
False
# push testsuite file
self.config.device.push_file(suite_file, self.config.target_test_path)
# push resource files
resource_manager = ResourceManager()
resource_data_dic, resource_dir = \
resource_manager.get_resource_data_dic(suite_file)
resource_manager.process_preparer_data(resource_data_dic, resource_dir,
self.config.device)
# execute testcase
return_message = self._execute_suitefile_junittest(
filename, junit_test_para, suitefile_target_test_path)
result = ResultManager(suite_file, self.config.report_path,
self.config.device,
self.config.target_test_path)
result.set_is_coverage(is_coverage_test)
self.result = result.get_test_results(return_message)
resource_manager.process_cleaner_data(resource_data_dic, resource_dir,
self.config.device)
def _get_junit_test_para(self, filename, suite_file):
exec_info = get_java_test_para(self.config.testcase,
self.config.testlevel)
java_test_file = get_execute_java_test_files(suite_file)
junit_test_para = self._get_dex_test_para(filename, java_test_file,
exec_info)
return junit_test_para
def _get_dex_test_para(self, filename, java_test_file, exec_info):
exec_class, exec_method, exec_level = exec_info
dex_test_para = "%s %s%s %s%s %s%s %s%s %s%s %s%s" % (
ZunitConst.z_unit_app, ZunitConst.output_dir,
self.config.target_test_path,
ZunitConst.output_file, filename,
ZunitConst.test_class, java_test_file,
ZunitConst.exec_class, exec_class,
ZunitConst.exec_method, exec_method,
ZunitConst.exec_level, exec_level)
if self.config.coverage == "coverage":
dex_test_para = ''.join((dex_test_para, ' ',
ZunitConst.jacoco_exec_file, filename,
".exec"))
return dex_test_para
def _execute_suitefile_junittest(self, filename, testpara,
target_test_path):
return_message = self._execute_dexfile_junittest(filename, testpara,
target_test_path)
return return_message
def _execute_dexfile_junittest(self, filename, testpara, target_test_path):
from xdevice import Variables
long_command_path = tempfile.mkdtemp(prefix="long_command_",
dir=self.config.report_path)
if self.config.coverage == "coverage":
if Variables.source_code_rootpath == "":
LOG.error("source_code_rootpath is empty.", error_no="03202")
strip_num = 0
else:
build_variant_outpath = os.path.join(
Variables.source_code_rootpath, "out",
self.config.build_variant)
strip_num = len(build_variant_outpath.split(os.sep)) - 1
command = "cd %s; rm -rf %s.xml; chmod +x *; " \
"export BOOTCLASSPATH=%s%s:$BOOTCLASSPATH;" \
"export GCOV_PREFIX=%s; export GCOV_PREFIX_STRIP=%d;" \
" app_process %s%s %s" \
% (
target_test_path, filename, target_test_path,
filename,
target_test_path, strip_num,
target_test_path, filename, testpara)
else:
command = "cd %s; rm -rf %s.xml; chmod +x *; " \
"export BOOTCLASSPATH=%s%s:$BOOTCLASSPATH;" \
" app_process %s%s %s" \
% (
target_test_path, filename, target_test_path,
filename,
target_test_path, filename, testpara)
LOG.info("command: %s" % command)
sh_file_name, file_path = \
self._make_long_command_file(command, long_command_path, filename)
remote_command_dir = os.path.join(target_test_path,
ZunitConst.remote_command_dir)
self.config.device.execute_shell_command(
"mkdir -p %s" % remote_command_dir)
cmd = "file send" \
if self.config.device.usb_type == DeviceConnectorType.hdc \
else "push"
self.config.device.hdc_command(
"%s %s %s" % (cmd, file_path, remote_command_dir))
try:
display_receiver = DisplayOutputReceiver()
self.config.device.execute_shell_command(
"sh %s/%s" % (remote_command_dir, sh_file_name),
receiver=display_receiver, timeout=TIME_OUT)
return_message = display_receiver.output
if display_receiver.output:
time.sleep(1)
except (ExecuteTerminate, HdcCommandRejectedException,
ShellCommandUnresponsiveException, HdcError) as exception:
if not getattr("exception", "error_no", ""):
setattr(exception, "error_no", "03203")
return_message = str(exception.args)
shutil.rmtree(long_command_path)
return return_message
@staticmethod
def _make_long_command_file(command, longcommand_path, filename):
sh_file_name = '%s.sh' % filename
file_path = os.path.join(longcommand_path, sh_file_name)
try:
file_path_open = os.open(file_path, os.O_WRONLY | os.O_CREAT |
os.O_APPEND, 0o755)
with os.fdopen(file_path_open, "a") as file_desc:
file_desc.write(command)
file_desc.flush()
except(IOError, ValueError) as err_msg:
LOG.exception("Error for make long command file: ", err_msg,
exc_info=False, error_no="03200")
return sh_file_name, file_path
def __result__(self):
return self.result if os.path.exists(self.result) else ""
@Plugin(type=Plugin.DRIVER, id=DeviceTestType.hap_test)
class HapTestDriver(IDriver):
"""
......
......@@ -48,7 +48,7 @@ from xdevice_extension._core.utils import convert_serial
__all__ = ["STSKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
"ConfigKit", "AppInstallKit", "junit_para_parse",
"gtest_para_parse", "junit_dex_para_parse", "reset_junit_para"]
"gtest_para_parse", "reset_junit_para"]
LOG = platform_logger("Kit")
......@@ -648,7 +648,7 @@ class AppInstallKit(ITestKit):
def remount(device):
cmd = "target mount" \
cmd = "mount -o rw,remount /" \
if device.usb_type == DeviceConnectorType.hdc else "remount"
device.hdc_command(cmd)
......@@ -753,62 +753,6 @@ def junit_para_parse(device, junit_paras, prefix_char="-e"):
return " ".join(ret_str)
def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
"""To parse the para of junit
Args:
device: the device running
junit_paras: the para dict of junit
prefix_char: the prefix char of parsed cmd
Returns:
the new para using in a command like -e testFile xxx
-e coverage true...
"""
ret_str = []
path = "/%s/%s/%s/%s" % ("data", "local", "tmp", "ajur")
include_file = "%s/%s" % (path, "includes.txt")
exclude_file = "%s/%s" % (path, "excludes.txt")
if not isinstance(junit_paras, dict):
LOG.warning("The para of junit is not the dict format as required")
return ""
# Disable screen keyguard
disable_key_guard = junit_paras.get('disable-keyguard')
if not disable_key_guard or disable_key_guard[0].lower() != 'false':
from xdevice_extension._core.driver.drivers import disable_keyguard
disable_keyguard(device)
for para_name in junit_paras.keys():
path = "/%s/%s/%s/%s/" % ("data", "local", "tmp", "ajur")
if para_name.strip() == 'test-file-include-filter':
for file_name in junit_paras[para_name]:
device.push_file(file_name, include_file)
device.execute_shell_command(
'chown -R shell:shell %s' % path)
ret_str.append(prefix_char + " ".join(['testFile', include_file]))
elif para_name.strip() == "test-file-exclude-filter":
for file_name in junit_paras[para_name]:
device.push_file(file_name, include_file)
device.execute_shell_command(
'chown -R shell:shell %s' % path)
ret_str.append(prefix_char + " ".join(['notTestFile',
exclude_file]))
elif para_name.strip() == "test" or para_name.strip() == "class":
result = _get_class(junit_paras, prefix_char, para_name.strip())
ret_str.append(result)
elif para_name.strip() == "include-annotation":
ret_str.append(prefix_char + " ".join(
['annotation', ",".join(junit_paras[para_name])]))
elif para_name.strip() == "exclude-annotation":
ret_str.append(prefix_char + " ".join(
['notAnnotation', ",".join(junit_paras[para_name])]))
else:
ret_str.append(prefix_char + " ".join(
[para_name, ",".join(junit_paras[para_name])]))
return " ".join(ret_str)
def timeout_callback(proc):
try:
LOG.error("Error: execute command timeout.")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册