From 4afde2e0bbe798f779cc1f9d6c9a7bbff92fa689 Mon Sep 17 00:00:00 2001 From: alex__hold Date: Wed, 29 Sep 2021 10:18:13 +0800 Subject: [PATCH] codex issue fix Signed-off-by: alex__hold --- .../src/xdevice_extension/_core/constants.py | 4 - .../xdevice_extension/_core/driver/drivers.py | 442 +----------------- .../xdevice_extension/_core/testkit/kit.py | 60 +-- 3 files changed, 3 insertions(+), 503 deletions(-) diff --git a/extension/src/xdevice_extension/_core/constants.py b/extension/src/xdevice_extension/_core/constants.py index ac4ee17..9916642 100644 --- a/extension/src/xdevice_extension/_core/constants.py +++ b/extension/src/xdevice_extension/_core/constants.py @@ -127,8 +127,6 @@ class DeviceTestType(object): DeviceTestType enumeration """ cpp_test = "CppTest" - dex_test = "DexTest" - dex_junit_test = "DexJUnitTest" hap_test = "HapTest" junit_test = "JUnitTest" jsunit_test = "JSUnitTest" @@ -158,10 +156,8 @@ class HostDrivenTestType(object): TEST_DRIVER_SET = { DeviceTestType.cpp_test, - DeviceTestType.dex_test, DeviceTestType.hap_test, DeviceTestType.junit_test, - DeviceTestType.dex_junit_test, DeviceTestType.jsunit_test, DeviceTestType.cpp_test_lite, DeviceTestType.ctest_lite, diff --git a/extension/src/xdevice_extension/_core/driver/drivers.py b/extension/src/xdevice_extension/_core/driver/drivers.py index e35d36c..c31ec4c 100644 --- a/extension/src/xdevice_extension/_core/driver/drivers.py +++ b/extension/src/xdevice_extension/_core/driver/drivers.py @@ -51,7 +51,6 @@ from xdevice_extension._core.exception import HapNotSupportTest from xdevice_extension._core.exception import HdcCommandRejectedException from xdevice_extension._core.exception import HdcError from xdevice_extension._core.testkit.kit import junit_para_parse -from xdevice_extension._core.testkit.kit import junit_dex_para_parse from xdevice_extension._core.testkit.kit import reset_junit_para from xdevice_extension._core.utils import get_filename_extension from xdevice_extension._core.utils import start_standing_subprocess @@ -60,7 +59,7 @@ from xdevice_extension._core.testkit.kit import gtest_para_parse from xdevice_extension._core.environment.dmlib import process_command_ret -__all__ = ["CppTestDriver", "DexTestDriver", "HapTestDriver", +__all__ = ["CppTestDriver", "HapTestDriver", "JSUnitTestDriver", "JUnitTestDriver", "RemoteTestRunner", "RemoteDexRunner", "disable_keyguard"] LOG = platform_logger("Drivers") @@ -904,7 +903,6 @@ class JUnitTestDriver(IDriver): """ handler the include-tests parameters in json file of current module. the main approach is to inject new dict into "testargs". - then leave it to the method of "junit_dex_para_parse" to processing. """ if not self.config.include_tests: return @@ -1114,251 +1112,6 @@ class RemoteTestRunner: return handler -@Plugin(type=Plugin.DRIVER, id=DeviceTestType.dex_junit_test) -class DexJunitDriver(IDriver): - """ - DexJunitDriver is a Test that runs a junit test package on given device. - """ - - def __init__(self): - self.result = "" - self.error_message = "" - self.config = None - self.rerun = True - self.runner = None - self.rerun_using_test_file = True - - def __check_environment__(self, device_options): - pass - - def __check_config__(self, config): - pass - - def __execute__(self, request): - try: - LOG.debug("Start execute xdevice extension Dex Junit Test") - - self.config = request.config - self.config.device = request.config.environment.devices[0] - - config_file = request.root.source.config_file - self.result = os.path.join(request.config.report_path, "result", - '.'.join((request.get_module_name(), - "xml"))) - - hilog = get_device_log_file( - request.config.report_path, - request.config.device.__get_serial__(), - "device_hilog") - hilog_open = os.open(hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, - 0o755) - with os.fdopen(hilog_open, "a") as hilog_file_pipe: - self.config.device.start_catch_device_log(hilog_file_pipe) - self._run_dex_junit(config_file, listeners=request.listeners, - request=request) - hilog_file_pipe.flush() - - except Exception as exception: - self.error_message = exception - if not getattr(exception, "error_no", ""): - setattr(exception, "error_no", "03406") - LOG.exception(self.error_message, exc_info=False, error_no="03406") - raise exception - - finally: - self.config.device.stop_catch_device_log() - self.result = check_result_report( - request.config.report_path, self.result, self.error_message) - - def _run_dex_junit(self, config_file, listeners, request): - try: - if not os.path.exists(config_file): - LOG.error("Error: Test cases don't exit %s." % config_file) - raise ParamError( - "Error: Test cases don't exit %s." % config_file, - error_no="00102") - - json_config = JsonParser(config_file) - kits = get_kit_instances(json_config, self.config.resource_path, - self.config.testcases_path) - - for listener in listeners: - listener.device_sn = self.config.device.device_sn - - self._get_driver_config(json_config) - do_module_kit_setup(request, kits) - self.runner = RemoteDexRunner(self.config) - self.runner.suite_name = request.get_module_name() - self._get_runner_config(json_config) - - if hasattr(self.config, "history_report_path") and \ - self.config.testargs.get("test"): - self._do_test_retry(listeners, self.config.testargs) - else: - self.runner.junit_para = junit_dex_para_parse( - self.config.device, self.config.testargs) - self._do_test_run(listeners) - finally: - do_module_kit_teardown(request) - if self.runner and self.runner.junit_para and ( - self.runner.junit_para.find("testFile") != -1 - or self.runner.junit_para.find("notTestFile") != -1): - self._junit_clear() - - def _do_test_retry(self, listener, testargs): - for test in testargs.get("test"): - test_item = test.split("#") - if len(test_item) != 2: - continue - self.runner.class_name = test_item[0] - self.runner.test_name = test_item[1] - self.runner.run(listener) - - def _do_test_run(self, listener): - test_to_run = self._collect_test_to_run() - LOG.debug("collected test count is: %s" % len(test_to_run)) - if not test_to_run: - self.runner.run(listener) - else: - self._run_with_rerun(listener, test_to_run) - - def _run_tests(self, listener): - test_tracker = CollectingTestListener() - try: - listener_copy = listener.copy() - listener_copy.append(test_tracker) - self.runner.run(listener_copy) - except ShellCommandUnresponsiveException: - LOG.debug("run test exception: ShellCommandUnresponsiveException") - finally: - test_run = test_tracker.get_current_run_results() - return test_run - - def _run_with_rerun(self, listener, expected_tests): - LOG.debug("ready to run with rerun, expect run: %s" - % len(expected_tests)) - test_run = self._run_tests(listener) - LOG.debug("run with rerun, has run: %s" % len(expected_tests)) - if len(test_run) < len(expected_tests): - expected_tests = TestDescription.remove_test(expected_tests, - test_run) - if not expected_tests: - LOG.debug("No tests to re-run, all tests executed at least " - "once.") - if self.rerun_using_test_file: - self._rerun_file(expected_tests, listener) - else: - self._rerun_serially(expected_tests, listener) - - def _make_test_file(self, expected_tests, test_file_path): - file_name = 'xdevice_testFile_%s.txt' % self.runner.suite_name - file_path = os.path.join(test_file_path, file_name) - try: - file_path_open = os.open(file_path, os.O_WRONLY | os.O_CREAT | - os.O_APPEND, 0o755) - with os.fdopen(file_path_open, "a") as file_desc: - for test in expected_tests: - file_desc.write("%s#%s" % (test.class_name, - test.test_name)) - file_desc.write("\n") - file_desc.flush() - except(IOError, ValueError) as err_msg: - LOG.exception("Error for make long command file: ", err_msg, - exc_info=False, error_no="03200") - return file_name, file_path - - def _rerun_file(self, expected_tests, listener): - test_file_path = tempfile.mkdtemp(prefix="test_file_", - dir=self.config.report_path) - file_name, file_path = self._make_test_file( - expected_tests, test_file_path) - self.config.device.push_file(file_path, ON_DEVICE_TEST_DIR_LOCATION) - file_path_on_device = ''.join((ON_DEVICE_TEST_DIR_LOCATION, file_name)) - self.runner.add_instrumentation_arg("testFile", file_path_on_device) - self.runner.junit_para = "" - LOG.debug("ready to rerun file, expect run: %s" % len(expected_tests)) - test_run = self._run_tests(listener) - LOG.debug("rerun file, has run: %s" % len(test_run)) - self.config.device.execute_shell_command("rm %s" % file_path_on_device) - if len(test_run) < len(expected_tests): - expected_tests = TestDescription.remove_test(expected_tests, - test_run) - if not expected_tests: - LOG.debug("Rerun textFile success") - self._rerun_serially(expected_tests, listener) - shutil.rmtree(test_file_path) - - def _rerun_serially(self, expected_tests, listener): - LOG.debug("rerun serially, expected run: %s" % len(expected_tests)) - self.runner.remove_instrumentation_arg("testFile") - for test in expected_tests: - self.runner.class_name = test.class_name - self.runner.test_name = test.test_name - self.runner.rerun(listener, test) - - def _collect_test_to_run(self): - if self.rerun: - self.runner.set_test_collection(True) - tests = self._collect_test_and_retry() - self.runner.set_test_collection(False) - return tests - return None - - def _collect_test_and_retry(self): - collector = CollectingTestListener() - listener = [collector] - self.runner.run(listener) - run_results = collector.get_current_run_results() - return run_results - - def _junit_clear(self): - _lock_screen(self.config.device) - self.config.device.execute_shell_command( - "rm -r /%s/%s/%s/%s" % ("data", "local", "tmp", "ajur")) - - def _get_driver_config(self, json_config): - self.config.remote_path = get_config_value( - 'device-test-path', json_config.get_driver(), - default="/%s/%s" % ("data", "tmp"), is_list=False) - module_name = get_config_value( - 'module-name', json_config.get_driver(), False) - if module_name: - self.config.module_name = module_name - else: - raise ParamError("Can't find module_name.", error_no="03201") - - rerun = get_config_value('rerun', json_config.get_driver(), False) - if isinstance(rerun, bool): - self.rerun = rerun - elif rerun == "False" or rerun == "false": - self.rerun = False - - timeout_config = get_config_value('shell-timeout', - json_config.get_driver(), False) - if timeout_config: - self.config.timeout = int(timeout_config) - else: - self.config.timeout = TIME_OUT - - def _get_runner_config(self, json_config): - test_timeout = get_config_value('test-timeout', - json_config.get_driver(), False) - if test_timeout: - self.runner.add_instrumentation_arg("timeout_msec", - int(test_timeout)) - - def _query_runner_name(self, package): - stdout = self.config.device.hdc_command( - "shell pm list instrumentation") - packages = stdout.split("\n") - for item in packages: - if package in item: - return item.split("/")[1].split(" ")[0] - - def __result__(self): - return self.result if os.path.exists(self.result) else "" - - class RemoteDexRunner: def __init__(self, config): self.arg_list = {} @@ -1476,199 +1229,6 @@ class RemoteDexRunner: return handler -@Plugin(type=Plugin.DRIVER, id=DeviceTestType.dex_test) -class DexTestDriver(IDriver): - """ - DexTestDriver is a Test that runs a native test package on given device. - """ - # test driver config - config = None - result = "" - - def __check_environment__(self, device_options): - pass - - def __check_config__(self, config): - pass - - def __execute__(self, request): - try: - LOG.debug("Start execute xdevice extension DexTest") - - self.config = request.config - self.config.target_test_path = DEFAULT_TEST_PATH - self.config.device = request.config.environment.devices[0] - - suite_file = request.root.source.source_file - if not suite_file: - LOG.error("test source '%s' not exists" % - request.root.source.source_string, error_no="00110") - return - - LOG.debug("Testsuite FilePath: %s" % suite_file) - serial = request.config.device.__get_serial__() - device_log_file = get_device_log_file(request.config.report_path, - serial) - device_log_file_open = os.open(device_log_file, os.O_WRONLY | - os.O_CREAT | os.O_APPEND, 0o755) - with os.fdopen(device_log_file_open, "a") as file_pipe: - self.config.device.start_catch_device_log(file_pipe) - self._init_junit_test() - self._run_junit_test(suite_file) - file_pipe.flush() - finally: - self.config.device.stop_catch_device_log() - - def _init_junit_test(self): - cmd = "target mount" \ - if self.config.device.usb_type == DeviceConnectorType.hdc \ - else "remount" - self.config.device.hdc_command(cmd) - self.config.device.execute_shell_command( - "rm -rf %s" % self.config.target_test_path) - self.config.device.execute_shell_command( - "mkdir -p %s" % self.config.target_test_path) - self.config.device.execute_shell_command( - "mount -o rw,remount,rw /%s" % "system") - - def _run_junit_test(self, suite_file): - filename = os.path.basename(suite_file) - suitefile_target_test_path = self.config.target_test_path - junit_test_para = self._get_junit_test_para(filename, suite_file) - is_coverage_test = True if self.config.coverage == "coverage" else \ - False - - # push testsuite file - self.config.device.push_file(suite_file, self.config.target_test_path) - - # push resource files - resource_manager = ResourceManager() - resource_data_dic, resource_dir = \ - resource_manager.get_resource_data_dic(suite_file) - resource_manager.process_preparer_data(resource_data_dic, resource_dir, - self.config.device) - - # execute testcase - return_message = self._execute_suitefile_junittest( - filename, junit_test_para, suitefile_target_test_path) - result = ResultManager(suite_file, self.config.report_path, - self.config.device, - self.config.target_test_path) - result.set_is_coverage(is_coverage_test) - self.result = result.get_test_results(return_message) - - resource_manager.process_cleaner_data(resource_data_dic, resource_dir, - self.config.device) - - def _get_junit_test_para(self, filename, suite_file): - exec_info = get_java_test_para(self.config.testcase, - self.config.testlevel) - java_test_file = get_execute_java_test_files(suite_file) - junit_test_para = self._get_dex_test_para(filename, java_test_file, - exec_info) - return junit_test_para - - def _get_dex_test_para(self, filename, java_test_file, exec_info): - exec_class, exec_method, exec_level = exec_info - dex_test_para = "%s %s%s %s%s %s%s %s%s %s%s %s%s" % ( - ZunitConst.z_unit_app, ZunitConst.output_dir, - self.config.target_test_path, - ZunitConst.output_file, filename, - ZunitConst.test_class, java_test_file, - ZunitConst.exec_class, exec_class, - ZunitConst.exec_method, exec_method, - ZunitConst.exec_level, exec_level) - if self.config.coverage == "coverage": - dex_test_para = ''.join((dex_test_para, ' ', - ZunitConst.jacoco_exec_file, filename, - ".exec")) - return dex_test_para - - def _execute_suitefile_junittest(self, filename, testpara, - target_test_path): - return_message = self._execute_dexfile_junittest(filename, testpara, - target_test_path) - return return_message - - def _execute_dexfile_junittest(self, filename, testpara, target_test_path): - from xdevice import Variables - long_command_path = tempfile.mkdtemp(prefix="long_command_", - dir=self.config.report_path) - if self.config.coverage == "coverage": - if Variables.source_code_rootpath == "": - LOG.error("source_code_rootpath is empty.", error_no="03202") - strip_num = 0 - else: - build_variant_outpath = os.path.join( - Variables.source_code_rootpath, "out", - self.config.build_variant) - strip_num = len(build_variant_outpath.split(os.sep)) - 1 - - command = "cd %s; rm -rf %s.xml; chmod +x *; " \ - "export BOOTCLASSPATH=%s%s:$BOOTCLASSPATH;" \ - "export GCOV_PREFIX=%s; export GCOV_PREFIX_STRIP=%d;" \ - " app_process %s%s %s" \ - % ( - target_test_path, filename, target_test_path, - filename, - target_test_path, strip_num, - target_test_path, filename, testpara) - else: - command = "cd %s; rm -rf %s.xml; chmod +x *; " \ - "export BOOTCLASSPATH=%s%s:$BOOTCLASSPATH;" \ - " app_process %s%s %s" \ - % ( - target_test_path, filename, target_test_path, - filename, - target_test_path, filename, testpara) - - LOG.info("command: %s" % command) - sh_file_name, file_path = \ - self._make_long_command_file(command, long_command_path, filename) - remote_command_dir = os.path.join(target_test_path, - ZunitConst.remote_command_dir) - self.config.device.execute_shell_command( - "mkdir -p %s" % remote_command_dir) - cmd = "file send" \ - if self.config.device.usb_type == DeviceConnectorType.hdc \ - else "push" - self.config.device.hdc_command( - "%s %s %s" % (cmd, file_path, remote_command_dir)) - try: - display_receiver = DisplayOutputReceiver() - self.config.device.execute_shell_command( - "sh %s/%s" % (remote_command_dir, sh_file_name), - receiver=display_receiver, timeout=TIME_OUT) - return_message = display_receiver.output - if display_receiver.output: - time.sleep(1) - except (ExecuteTerminate, HdcCommandRejectedException, - ShellCommandUnresponsiveException, HdcError) as exception: - if not getattr("exception", "error_no", ""): - setattr(exception, "error_no", "03203") - return_message = str(exception.args) - shutil.rmtree(long_command_path) - return return_message - - @staticmethod - def _make_long_command_file(command, longcommand_path, filename): - sh_file_name = '%s.sh' % filename - file_path = os.path.join(longcommand_path, sh_file_name) - try: - file_path_open = os.open(file_path, os.O_WRONLY | os.O_CREAT | - os.O_APPEND, 0o755) - with os.fdopen(file_path_open, "a") as file_desc: - file_desc.write(command) - file_desc.flush() - except(IOError, ValueError) as err_msg: - LOG.exception("Error for make long command file: ", err_msg, - exc_info=False, error_no="03200") - return sh_file_name, file_path - - def __result__(self): - return self.result if os.path.exists(self.result) else "" - - @Plugin(type=Plugin.DRIVER, id=DeviceTestType.hap_test) class HapTestDriver(IDriver): """ diff --git a/extension/src/xdevice_extension/_core/testkit/kit.py b/extension/src/xdevice_extension/_core/testkit/kit.py index d2ca530..107aa46 100644 --- a/extension/src/xdevice_extension/_core/testkit/kit.py +++ b/extension/src/xdevice_extension/_core/testkit/kit.py @@ -48,7 +48,7 @@ from xdevice_extension._core.utils import convert_serial __all__ = ["STSKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit", "ConfigKit", "AppInstallKit", "junit_para_parse", - "gtest_para_parse", "junit_dex_para_parse", "reset_junit_para"] + "gtest_para_parse", "reset_junit_para"] LOG = platform_logger("Kit") @@ -648,7 +648,7 @@ class AppInstallKit(ITestKit): def remount(device): - cmd = "target mount" \ + cmd = "mount -o rw,remount /" \ if device.usb_type == DeviceConnectorType.hdc else "remount" device.hdc_command(cmd) @@ -753,62 +753,6 @@ def junit_para_parse(device, junit_paras, prefix_char="-e"): return " ".join(ret_str) - -def junit_dex_para_parse(device, junit_paras, prefix_char="--"): - """To parse the para of junit - Args: - device: the device running - junit_paras: the para dict of junit - prefix_char: the prefix char of parsed cmd - Returns: - the new para using in a command like -e testFile xxx - -e coverage true... - """ - ret_str = [] - path = "/%s/%s/%s/%s" % ("data", "local", "tmp", "ajur") - include_file = "%s/%s" % (path, "includes.txt") - exclude_file = "%s/%s" % (path, "excludes.txt") - - if not isinstance(junit_paras, dict): - LOG.warning("The para of junit is not the dict format as required") - return "" - # Disable screen keyguard - disable_key_guard = junit_paras.get('disable-keyguard') - if not disable_key_guard or disable_key_guard[0].lower() != 'false': - from xdevice_extension._core.driver.drivers import disable_keyguard - disable_keyguard(device) - - for para_name in junit_paras.keys(): - path = "/%s/%s/%s/%s/" % ("data", "local", "tmp", "ajur") - if para_name.strip() == 'test-file-include-filter': - for file_name in junit_paras[para_name]: - device.push_file(file_name, include_file) - device.execute_shell_command( - 'chown -R shell:shell %s' % path) - ret_str.append(prefix_char + " ".join(['testFile', include_file])) - elif para_name.strip() == "test-file-exclude-filter": - for file_name in junit_paras[para_name]: - device.push_file(file_name, include_file) - device.execute_shell_command( - 'chown -R shell:shell %s' % path) - ret_str.append(prefix_char + " ".join(['notTestFile', - exclude_file])) - elif para_name.strip() == "test" or para_name.strip() == "class": - result = _get_class(junit_paras, prefix_char, para_name.strip()) - ret_str.append(result) - elif para_name.strip() == "include-annotation": - ret_str.append(prefix_char + " ".join( - ['annotation', ",".join(junit_paras[para_name])])) - elif para_name.strip() == "exclude-annotation": - ret_str.append(prefix_char + " ".join( - ['notAnnotation', ",".join(junit_paras[para_name])])) - else: - ret_str.append(prefix_char + " ".join( - [para_name, ",".join(junit_paras[para_name])])) - - return " ".join(ret_str) - - def timeout_callback(proc): try: LOG.error("Error: execute command timeout.") -- GitLab