提交 7f7440d6 编写于 作者: D deveco_test

增加支持XTS部件化测试

Signed-off-by: Ndeveco_test <liguangjie1@huawei.com>
上级 746ab227
......@@ -238,7 +238,7 @@ class CKit:
shell = "ShellKit"
testbundle = "TestBundleKit"
appinstall = "AppInstallKit"
component = "ComponentKit"
@dataclass
class GTestConst(object):
......
......@@ -36,6 +36,7 @@ from xdevice import ParamError
from xdevice import get_file_absolute_path
from xdevice import get_config_value
from xdevice import exec_cmd
from xdevice import ConfigConst
from xdevice_extension._core.constants import CKit
from xdevice_extension._core.environment.dmlib import CollectingOutputReceiver
......@@ -47,7 +48,7 @@ from xdevice_extension._core.utils import convert_serial
__all__ = ["STSKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
"ConfigKit", "AppInstallKit", "junit_para_parse",
"ConfigKit", "AppInstallKit", "ComponentKit", "junit_para_parse",
"gtest_para_parse", "reset_junit_para"]
LOG = platform_logger("Kit")
......@@ -664,6 +665,89 @@ class AppInstallKit(ITestKit):
return exec_out
@Plugin(type=Plugin.TEST_KIT, id=CKit.component)
class ComponentKit(ITestKit):
def __init__(self):
self._white_list_file = ""
self._white_list = ""
self._cap_file = ""
self.paths = ""
self.cache_subsystem = set()
self.cache_part = set()
def __check_config__(self, config):
self._white_list_file =\
get_config_value('white-list', config, is_list=False)
self._cap_file = get_config_value('cap-file', config, is_list=False)
self.paths = get_config_value('paths', config)
def __setup__(self, device, **kwargs):
if hasattr(device, ConfigConst.support_component):
return
if device.label in ["phone", "watch", "car", "tv", "tablet", "ivi"]:
command = "cat %s" % self._cap_file
result = device.execute_shell_command(command)
part_set = set()
subsystem_set = set()
if "{" in result:
for item in json.loads(result).get("components", []):
part_set.add(item.get("component", ""))
subsystems, parts = self.get_white_list()
part_set.update(parts)
subsystem_set.update(subsystems)
setattr(device, ConfigConst.support_component,
(subsystem_set, part_set))
self.cache_subsystem.update(subsystem_set)
self.cache_part.update(part_set)
def get_cache(self):
return self.cache_subsystem, self.cache_part
def get_white_list(self):
if not self._white_list and self._white_list_file:
self._white_list = self._parse_white_list()
return self._white_list
def _parse_white_list(self):
subsystem = set()
part = set()
white_json_file = os.path.normpath(self._white_list_file)
if not os.path.isabs(white_json_file):
white_json_file = \
get_file_absolute_path(white_json_file, self.paths)
if os.path.isfile(white_json_file):
subsystem_list = list()
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(white_json_file, flags, modes),
"r") as file_content:
json_result = json.load(file_content)
if "subsystems" in json_result.keys():
subsystem_list.extend(json_result["subsystems"])
for subsystem_item_list in subsystem_list:
for key, value in subsystem_item_list.items():
if key == "subsystem":
subsystem.add(value)
elif key == "components":
for component_item in value:
if "component" in component_item.keys():
part.add(
component_item["component"])
return subsystem, part
def __teardown__(self, device):
if hasattr(device, ConfigConst.support_component):
setattr(device, ConfigConst.support_component, None)
self._white_list_file = ""
self._white_list = ""
self._cap_file = ""
self.cache_subsystem.clear()
self.cache_part.clear()
self.cache_device.clear()
def remount(device):
cmd = "shell mount -o rw,remount /" \
if device.usb_type == DeviceConnectorType.hdc else "remount"
......@@ -770,6 +854,7 @@ def junit_para_parse(device, junit_paras, prefix_char="-e"):
return " ".join(ret_str)
def timeout_callback(proc):
try:
LOG.error("Error: execute command timeout.")
......
......@@ -297,6 +297,16 @@ class Console(object):
dest=ConfigConst.repeat,
help="number of times that a task is executed"
" repeatedly")
parser.add_argument("-s", "--subsystem",
dest="subsystem",
action="store",
type=str,
help="- Specify the list of subsystem")
parser.add_argument("-p", "--part",
dest="part",
action="store",
type=str,
help="- Specify the list of part")
self._params_pre_processing(para_list)
(options, unparsed) = parser.parse_known_args(para_list)
if unparsed:
......@@ -344,6 +354,12 @@ class Console(object):
config_file=options.config, env=options.test_environment).\
get_testcases_dir()
setattr(options, ConfigConst.testcases_path, testcases_path)
if options.subsystem:
subsystem_list = str(options.subsystem).split(";")
setattr(options, ConfigConst.subsystem, subsystem_list)
if options.part:
part_list = str(options.part).split(";")
setattr(options, ConfigConst.part, part_list)
def command_parser(self, args):
try:
......
......@@ -231,7 +231,7 @@ class CKit:
liteshell = "LiteShellKit"
app_install = "AppInstallKit"
deploytool = "DeployToolKit"
component = "ComponentKit"
@dataclass
class GTestConst(object):
......@@ -270,6 +270,8 @@ class ConfigConst(object):
check_device = "check_device"
configfile = "config"
repeat = "repeat"
subsystem = "subsystem"
part = "part"
# Runtime Constant
history_report_path = "history_report_path"
......@@ -281,6 +283,9 @@ class ConfigConst(object):
module_kits = "module_kits"
spt = "spt"
version = "version"
component_mapper = "_component_mapper"
component_base_kit = "component_base_kit"
support_component = "support_component"
class FilePermission(object):
......
......@@ -24,6 +24,7 @@ from _core.logger import change_logger_level
from _core.plugin import Plugin
from _core.plugin import get_plugin
from _core.utils import convert_serial
from _core.constants import ConfigConst
__all__ = ["EnvironmentManager", "DeviceSelectionOption",
"DeviceAllocationState", "Environment"]
......@@ -200,6 +201,7 @@ class DeviceSelectionOption(object):
self.test_driver = test_source.test_type
self.source_file = ""
self.extend_value = {}
self.required_component = ""
def get_label(self):
return self.label
......@@ -224,6 +226,14 @@ class DeviceSelectionOption(object):
if self.label and self.label != device.label:
return False
if self.required_component and \
hasattr(device, ConfigConst.support_component):
subsystems, parts = getattr(device, ConfigConst.support_component)
required_subsystems, require_part = self.required_component
if required_subsystems not in subsystems and \
require_part not in parts:
return False
return True
......
......@@ -154,17 +154,20 @@ class Task:
plugin_id = None
source = test_descriptor.source
ignore_test = ""
if isinstance(source, TestSource):
if source.test_type is not None:
plugin_id = source.test_type
else:
ignore_test = source.module_name
LOG.error("'%s' no test driver specified" % source.test_name,
error_no="00106")
drivers = get_plugin(plugin_type=Plugin.DRIVER, plugin_id=plugin_id)
if plugin_id is not None:
if len(drivers) == 0:
ignore_test = source.module_name
error_message = "'%s' can not find test driver '%s'" % (
source.test_name, plugin_id)
LOG.error(error_message, error_no="00106")
......@@ -187,6 +190,8 @@ class Task:
if check_result is False:
LOG.error("'%s' can not find suitable test driver '%s'" %
(source.test_name, plugin_id), error_no="00106")
if ignore_test and hasattr(self.config, ConfigConst.component_mapper):
getattr(self.config, ConfigConst.component_mapper).pop(ignore_test)
for desc in test_descriptor.children:
self._init_driver(desc)
......
......@@ -330,6 +330,13 @@ class Scheduler(object):
device_option.source_file = test_source.source_file or \
test_source.source_string
device_options.append(device_option)
if ConfigConst.component_mapper in options.keys():
required_component = options.get(ConfigConst.component_mapper). \
get(test_source.module_name, None)
for device_option in device_options:
device_option.required_component = required_component
return device_options
@staticmethod
......@@ -454,6 +461,11 @@ class Scheduler(object):
LOG.info("")
test_drivers.pop(0)
continue
if getattr(task.config, ConfigConst.component_mapper, ""):
module_name = test_driver[1].source.module_name
self.component_task_setup(task, module_name)
# get environment
try:
environment = self.__allocate_environment__(
......@@ -751,6 +763,24 @@ class Scheduler(object):
@staticmethod
def _find_test_root_descriptor(config):
if getattr(config, "task", None) or getattr(config, "testargs", None):
Scheduler._pre_component_test(config)
if getattr(config, "subsystem", "") or getattr(config, "part", "") or \
getattr(config, "component_base_kit", ""):
uid = unique_id("Scheduler", "component")
if config.subsystem or config.part:
test_set = (config.subsystem, config.part)
else:
kit = getattr(config, ConfigConst.component_base_kit)
test_set = kit.get_white_list()
root = Descriptor(uuid=uid, name="component",
source=TestSetSource(test_set),
container=True)
root.children = find_test_descriptors(config)
return root
# read test list from testdict
if getattr(config, "testdict", "") != "" and getattr(
config, "testfile", "") == "":
......@@ -1104,3 +1134,53 @@ class Scheduler(object):
def _clear_test_dict_source(cls):
TestDictSource.exe_type.clear()
TestDictSource.test_type.clear()
@classmethod
def _pre_component_test(cls, config):
if not config.kits:
return
cur_kit = None
for kit in config.kits:
if kit.__class__.__name__ == CKit.component:
cur_kit = kit
break
if not cur_kit:
return
get_white_list = getattr(cur_kit, "get_white_list", None)
if not callable(get_white_list):
return
subsystems, parts = get_white_list()
if not subsystems and not parts:
return
setattr(config, ConfigConst.component_base_kit, cur_kit)
@classmethod
def component_task_setup(cls, task, module_name):
component_kit = task.config.get(ConfigConst.component_base_kit, None)
if not component_kit:
# only -p -s .you do not care about the components that can be
# supported. you only want to run the use cases of the current
# component
return
LOG.debug("Start component task setup")
_component_mapper = task.config.get(ConfigConst.component_mapper)
_subsystem, _part = _component_mapper.get(module_name)
is_hit = False
# find in cache. if not find, update cache
cache_subsystem, cache_part = component_kit.get_cache()
if _subsystem in cache_subsystem or _part in cache_subsystem:
is_hit = True
if not is_hit:
env_manager = EnvironmentManager()
for _, manager in env_manager.managers.items():
if getattr(manager, "devices_list", []):
for device in manager.devices_list:
component_kit.__setup__(device)
cache_subsystem, cache_part = component_kit.get_cache()
if _subsystem in cache_subsystem or _part in cache_subsystem:
is_hit = True
if not is_hit:
LOG.warning("%s are skipped, no suitable component found. "
"Require subsystem=%s part=%s, no device match this"
% (module_name, _subsystem, _part))
......@@ -24,6 +24,7 @@ from collections import namedtuple
from _core.constants import DeviceTestType
from _core.constants import ModeType
from _core.constants import HostDrivenTestType
from _core.constants import ConfigConst
from _core.exception import ParamError
from _core.logger import platform_logger
from _core.utils import get_filename_extension
......@@ -51,6 +52,7 @@ EXT_TYPE_DICT = {".hap": DeviceTestType.hap_test,
PY_SUFFIX = ".py"
PYD_SUFFIX = ".pyd"
MODULE_CONFIG_SUFFIX = ".json"
MODULE_INFO_SUFFIX = ".moduleInfo"
MAX_DIR_DEPTH = 6
LOG = platform_logger("TestSource")
......@@ -128,12 +130,43 @@ def find_testdict_descriptors(config):
return test_descriptors
def _append_component_test_source(config, testcases_dir, test_sources):
subsystem_list = config.subsystem if config.subsystem else list()
part_list = config.part if config.part else list()
module_info_files = _get_component_info_file(testcases_dir)
import stat
import json
result_dict = dict()
for info_file in module_info_files:
flags = os.O_RDONLY
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(info_file, flags, modes), "r") as f_handler:
result_dict.update(json.load(f_handler))
module_name = result_dict.get("module_name", "")
part_name = result_dict.get("part_name", "")
subsystem_name = result_dict.get("subsystem", "")
if not module_name or not part_name or not subsystem_name:
continue
module_config_file = \
os.path.join(os.path.dirname(info_file), module_name)
is_append = True
if subsystem_list or part_list:
if part_name not in part_list and \
subsystem_name not in subsystem_list:
is_append = False
if is_append:
getattr(config, ConfigConst.component_mapper, dict()).update(
{module_name: (subsystem_name, part_name)})
test_sources.append(module_config_file)
def _get_test_sources(config, testcases_dirs):
test_sources = []
# get test sources from testcases_dirs
if not config.testfile and not config.testlist and not config.testcase \
and config.task:
and not config.subsystem and not config.part and not \
getattr(config, "component_base_kit", "") and config.task:
for testcases_dir in testcases_dirs:
_append_module_test_source(testcases_dir, test_sources)
return test_sources
......@@ -162,6 +195,14 @@ def _get_test_sources(config, testcases_dirs):
if test_source.strip():
test_sources.append(test_source.strip())
return test_sources
# get test sources according *.moduleInfo file
if getattr(config, "subsystem", []) or getattr(config, "part", []) or \
getattr(config, "component_base_kit", ""):
setattr(config, ConfigConst.component_mapper, dict())
for testcases_dir in testcases_dirs:
_append_component_test_source(config, testcases_dir, test_sources)
return test_sources
return test_sources
......@@ -370,6 +411,18 @@ def _get_testcase_config_file(filename):
return None
def _get_component_info_file(entry_dir):
module_files = []
if not os.path.isdir(entry_dir):
return module_files
for item in os.listdir(entry_dir):
item_path = os.path.join(entry_dir, item)
if os.path.isfile(item_path) and item_path.endswith(
MODULE_INFO_SUFFIX):
module_files.append(item_path)
return module_files
def _get_test_type(config_file, test_driver, ext):
if test_driver:
return test_driver
......
......@@ -487,8 +487,10 @@ class ResultReporter(IReporter):
allowZip64=True)
try:
LOG.info("executing compress process, please wait...")
long_size_file = []
for src_path, target_path in file_path_list:
zip_object.write(src_path, target_path)
long_size_file.append((src_path, target_path))
self._write_long_size_file(zip_object, long_size_file)
LOG.info("generate zip file: %s", zipped_file)
except zipfile.BadZipFile as bad_error:
LOG.error("zip report folder error: %s" % bad_error.args)
......@@ -644,3 +646,15 @@ class ResultReporter(IReporter):
def get_path_of_summary_report(cls):
if cls.summary_report_result:
return cls.summary_report_result[0][0]
@classmethod
def _write_long_size_file(cls, zip_object, long_size_file):
for filename, arcname in long_size_file:
zip_info = zipfile.ZipInfo.from_file(filename, arcname)
zip_info.compress_type = getattr(zip_object, "compression",
zipfile.ZIP_DEFLATED)
zip_info._compresslevel = \
getattr(zip_object, "compresslevel", None)
with open(filename, "rb") as src, \
zip_object.open(zip_info, "w") as des:
shutil.copyfileobj(src, des, 1024 * 1024 * 8)
......@@ -246,6 +246,10 @@ def get_file_absolute_path(input_name, paths=None, alt_dir=None):
_inputs.append(input_name.replace("resource/", "", 1))
elif input_name.startswith("testcases/"):
_inputs.append(input_name.replace("testcases/", "", 1))
elif input_name.startswith("resource\\"):
_inputs.append(input_name.replace("resource\\", "", 1))
elif input_name.startswith("testcases\\"):
_inputs.append(input_name.replace("testcases\\", "", 1))
for _input in _inputs:
for path in abs_paths:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册