未验证 提交 d1044e27 编写于 作者: O openharmony_ci 提交者: Gitee

!141 分布式测试框架适配

Merge pull request !141 from buckwheat/master
......@@ -3,7 +3,7 @@ OpenHarmony为开发者提供了一套全面的自测试框架,开发者可根
本文从基础环境构建,用例开发,编译以及执行等方面介绍OpenHarmony测试框架如何运行和使用。
## 基础环境构建
测试框架依赖于python运行环境在使用测试框架之前可参阅以下方式进行配置。
测试框架依赖于python运行环境,python版本为3.8.X,在使用测试框架之前可参阅以下方式进行配置。
- [环境配置](https://gitee.com/openharmony/docs/tree/master/zh-cn/device-dev/subsystems/subsys-testguide-envbuild.md)
- [源码获取](https://gitee.com/openharmony/docs/blob/master/zh-cn/device-dev/get-code/sourcecode-acquire.md)
......
......@@ -207,8 +207,7 @@ int DistributedAgent::DoCmdServer(int serverSockFd)
auto pclinereturn = reinterpret_cast<DistributedMsg *>(returnValue);
pclinereturn->no = pcline->no;
pclinereturn->cmdTestType = htons(DST_COMMAND_CALL);
(void)sprintf_s(pclinereturn->alignmentCmd, (MAX_BUFF_LEN - DST_COMMAND_HEAD_LEN),
"%d", nresult) < 0);
sprintf_s(pclinereturn->alignmentCmd, (MAX_BUFF_LEN - DST_COMMAND_HEAD_LEN), "%d", nresult);
rlen = strlen(pclinereturn->alignmentCmd) + 1;
pclinereturn->len = htons(rlen);
HiLog::Info(DistributedAgent::LABEL, "agent get message :%s .\n",
......@@ -351,7 +350,6 @@ int DistributedAgent::OnProcessCmd(const std::string &strCommand, int cmdLen,
return nresult;
}
errno_t ret = EOK;
ret = memcpy_s(alignmentCmd, sizeof(alignmentCmd), strCommand.c_str(), cmdNo);
if (ret != EOK) {
return -1;
......
......@@ -21,6 +21,8 @@ import os
##############################################################################
##############################################################################
import time
def get_result_dir(testsuit_path):
result_rootpath = os.environ.get('PYTEST_RESULTPATH')
......@@ -41,6 +43,7 @@ def get_result_dir(testsuit_path):
if not os.path.exists(result_path):
os.makedirs(result_path)
return result_path
......
......@@ -15,8 +15,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
import shutil
import time
import platform
import subprocess
......@@ -26,12 +27,16 @@ import subprocess
__all__ = ["DeviceAdapter", "HDCDeviceAdapter"]
import zipfile
if platform.system() != 'Windows':
QUOTATION_MARKS = "'"
else:
QUOTATION_MARKS = "\""
USB_TOOLS = "hdc"
HDC_TOOLS = "hdc"
USB_HDC = "HDC"
USB_TOOLS = "hdc_std"
HDC_TOOLS = "hdc_std"
##############################################################################
......@@ -85,12 +90,18 @@ class DeviceAdapter:
def __init__(self, remote_ip="", repote_port="", device_sn="", name=""):
self.device_sn = device_sn
self.name = name
self.test_path = "/%s/%s" % ("data", "test")
self.test_path = "/data/test"
self.device_para = self.get_device_para(
remote_ip,
repote_port,
device_sn)
self.init_device()
self.device_hdc_para = self.get_device_hdc_para(
device_sn
)
if len(self.device_para) == 3:
self.init_device()
if len(self.device_hdc_para) == 1:
self.init_device_hdc()
###############################################################
###############################################################
......@@ -102,10 +113,21 @@ class DeviceAdapter:
self.shell('chmod 777 %s' % self.test_path)
self.shell("mount -o rw,remount,rw /%s" % "system")
def init_device_hdc(self):
self.target_mount()
self.hdc_shell('rm -rf %s' % self.test_path)
self.hdc_shell('mkdir -p %s' % self.test_path)
self.hdc_shell('chmod 777 %s' % self.test_path)
self.hdc_shell("mount -o rw,remount,rw /%s" % "system")
def remount(self):
command = "%s %s remount" % (USB_TOOLS, self.device_para)
self.execute_command(command)
def target_mount(self):
command = "%s %s target mount" % (HDC_TOOLS, self.device_hdc_para)
self.execute_command(command)
def push_file(self, srcpath, despath):
command = "%s %s push %s %s" % (
USB_TOOLS,
......@@ -114,6 +136,14 @@ class DeviceAdapter:
despath)
return self.execute_command(command)
def push_hdc_file(self,srcpath, despath):
command = "%s %s file send %s %s" % (
USB_TOOLS,
self.device_hdc_para,
srcpath,
despath)
return self.execute_command(command)
def pull_file(self, srcpath, despath):
command = "%s %s pull %s %s" % (
USB_TOOLS,
......@@ -122,6 +152,14 @@ class DeviceAdapter:
despath)
return self.execute_command(command)
def pull_hdc_file(self, srcpath, despath):
command = "%s %s file recv %s %s" % (
HDC_TOOLS,
self.device_hdc_para,
srcpath,
despath)
return self.execute_command(command)
def unlock_screen(self):
self.shell("svc power stayon true")
......@@ -179,7 +217,6 @@ class DeviceAdapter:
time.sleep(1)
return return_code
###############################################################
###############################################################
......@@ -195,11 +232,21 @@ class DeviceAdapter:
if "" == device_sn:
device_para = "-H %s -P %s" % (remote_ip, remote_port)
else:
device_para = "-H %s -P %s -s %s" % (
device_para = "-H %s -P %s -t %s" % (
remote_ip, remote_port, device_sn)
return device_para
def execute_command(self, command, print_flag=True, timeout=900):
@classmethod
def get_device_hdc_para(cls, device_sn=""):
if " " == device_sn:
device_hdc_para = ""
else:
device_hdc_para = "-t %s" % device_sn
return device_hdc_para
@classmethod
def execute_command(cls, command, print_flag=True, timeout=900):
try:
if print_flag:
print("command: " + command)
......@@ -211,7 +258,8 @@ class DeviceAdapter:
print("results: failed")
return False
def execute_command_with_output(self, command, print_flag=True):
@classmethod
def execute_command_with_output(cls, command, print_flag=True):
if print_flag:
print("command: " + command)
......@@ -237,6 +285,14 @@ class DeviceAdapter:
command,
QUOTATION_MARKS))
def hdc_shell(self, command=""):
return self.execute_command("%s %s shell %s%s%s" % (
USB_HDC,
self.device_hdc_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
def execute_shell_command(self, command):
return self.shell(command)
......@@ -248,7 +304,17 @@ class DeviceAdapter:
command,
QUOTATION_MARKS))
def check_path_legal(self, path):
def hdc_std_shell_with(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
USB_TOOLS,
self.device_hdc_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS
))
@classmethod
def check_path_legal(cls, path):
if path and " " in path:
return "\"%s\"" % path
return path
......@@ -267,12 +333,18 @@ class HDCDeviceAdapter:
def __init__(self, remote_ip="", repote_port="", device_sn="", name=""):
self.device_sn = device_sn
self.name = name
self.test_path = "/%s/%s/" % ("data", "test")
self.test_path = "/data/test/"
self.device_para = self.get_device_para(
remote_ip,
repote_port,
device_sn)
self.init_device()
self.device_hdc_para = self.get_device_hdc_para(
device_sn
)
if len(self.device_para) == 3:
self.init_device()
if len(self.device_hdc_para) == 1:
self.init_device_hdc()
###############################################################
###############################################################
......@@ -284,6 +356,13 @@ class HDCDeviceAdapter:
self.shell('chmod 777 %s' % self.test_path)
self.shell("mount -o rw,remount,rw /%s" % "system")
def init_device_hdc(self):
self.remount()
self.shell('rm -rf %s' % self.test_path)
self.shell('mkdir -p %s' % self.test_path)
self.shell('chmod 777 %s' % self.test_path)
self.shell("mount -o rw,remount,rw /%s" % "system")
def remount(self):
command = "%s %s target mount" % (HDC_TOOLS)
self.execute_command(command)
......@@ -318,7 +397,6 @@ class HDCDeviceAdapter:
self.unlock_screen()
self.unlock_device()
###############################################################
###############################################################
......@@ -338,7 +416,17 @@ class HDCDeviceAdapter:
remote_ip, remote_port, device_sn)
return device_para
def execute_command(self, command, print_flag=True, timeout=900):
@classmethod
def get_device_hdc_para(cls, device_sn=""):
if " " == device_sn:
device_para = ""
else:
device_para = "-t %s" % device_sn
return device_para
@classmethod
def execute_command(cls, command, print_flag=True, timeout=900):
try:
if print_flag:
print("command: " + command)
......@@ -350,7 +438,8 @@ class HDCDeviceAdapter:
print("results: failed")
return False
def execute_command_with_output(self, command, print_flag=True):
@classmethod
def execute_command_with_output(cls, command, print_flag=True):
if print_flag:
print("command: " + command)
......@@ -384,7 +473,17 @@ class HDCDeviceAdapter:
command,
QUOTATION_MARKS))
def check_path_legal(self, path):
def hdc_std_shell_with(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
HDC_TOOLS,
self.device_hdc_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS
))
@classmethod
def check_path_legal(cls, path):
if path and " " in path:
return "\"%s\"" % path
return path
......
......@@ -69,15 +69,40 @@ class ITestDriver:
##############################################################################
class DexTestDriver:
__metaclass__ = ABCMeta
@abstractmethod
def execute(self, suite_file, push_flag=False):
pass
##############################################################################
##############################################################################
class HapTestDriver:
__metaclass__ = ABCMeta
@abstractmethod
def execute(self, suite_file, push_flag=False):
pass
##############################################################################
##############################################################################
class CppTestDriver(ITestDriver):
def __init__(self, device):
def __init__(self, device, hdc_tool):
self.device = device
self.hdc_tool = hdc_tool
def execute(self, suite_file, background=False):
def execute(self, suite_file, result_path, background=False):
file_name = os.path.basename(suite_file)
long_command_path = tempfile.mkdtemp(prefix="long_command_",
dir=os.path.join(os.environ.get('PYTEST_RESULTPATH'), "temp"))
dir=os.path.join(result_path, "temp"))
command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s" % (
DEVICE_TEST_PATH,
file_name,
......@@ -87,7 +112,10 @@ class CppTestDriver(ITestDriver):
sh_file_name, file_path = make_long_command_file(command,
long_command_path,
file_name)
self.device.push_file(file_path, DEVICE_TEST_PATH)
if self.hdc_tool != "hdc":
self.device.push_file(file_path, DEVICE_TEST_PATH)
else:
self.device.push_hdc_file(file_path, DEVICE_TEST_PATH)
# push resource files
resource_manager = ResourceManager()
......@@ -103,9 +131,10 @@ class CppTestDriver(ITestDriver):
sh_command = "sh %s" % (
os.path.join(DEVICE_TEST_PATH, sh_file_name))
return self.device.shell(sh_command)
if self.hdc_tool != "hdc":
return self.device.shell(sh_command)
else:
return self.device.hdc_shell(sh_command)
##############################################################################
##############################################################################
......@@ -17,6 +17,7 @@
#
import os
from distributed.common.devices import DeviceAdapter
from distributed.common.devices import HDCDeviceAdapter
......@@ -25,13 +26,13 @@ from distributed.common.devices import HDCDeviceAdapter
##############################################################################
class DeviceManager:
def __init__(self):
def __init__(self, result_path):
self.has_hdc_tool = False
self.phone_device_list = []
self.ivi_device_list = []
self.tv_device_list = []
self.watch_device_list = []
self.make_device_list()
self.make_device_list(result_path)
def make_device_adapter(self, device_info_list, device_name):
if self.has_hdc_tool:
......@@ -46,9 +47,8 @@ class DeviceManager:
name=device_name)
return device
def make_device_list(self):
device_info_list = self.get_device_info_list()
print(device_info_list)
def make_device_list(self, result_path):
device_info_list = self.get_device_info_list(result_path)
for item in device_info_list:
if len(item) != 4:
......@@ -79,9 +79,10 @@ class DeviceManager:
setattr(self, device.name, device)
return
def get_device_info_list(self):
@staticmethod
def get_device_info_list(result):
device_info_list = []
tmp_path = os.path.join(os.environ.get('PYTEST_RESULTPATH'), "temp")
tmp_path = os.path.join(result, "temp")
device_info_file_path = os.path.join(tmp_path,
"device_info_file.txt")
......@@ -95,7 +96,5 @@ class DeviceManager:
device_info_list.append(temp)
return device_info_list
##############################################################################
##############################################################################
......@@ -21,8 +21,10 @@ import sys
import re
import json
import time
import platform
# insert src path for loading xdevice modules
sys.framework_src_dir = os.path.abspath(os.path.dirname(
os.path.dirname(__file__)))
sys.path.insert(1, sys.framework_src_dir)
......@@ -46,6 +48,7 @@ sys.adapter_dir = os.path.abspath(os.path.join(
"python"))
sys.path.insert(4, sys.adapter_dir)
from distributed.common.common import create_empty_result_file
from distributed.common.common import get_resource_dir
from distributed.common.drivers import CppTestDriver
......@@ -59,13 +62,13 @@ DEVICE_INFO_TEMPLATE = "agentlist:%s\nagentport:%s,\ndevicesuuid:%s"
##############################################################################
def get_current_driver(device, target_name):
def get_current_driver(device, target_name, hdc_tool):
driver = None
_, suffix_name = os.path.splitext(target_name)
if suffix_name == "":
driver = CppTestDriver(device)
driver = CppTestDriver(device, hdc_tool)
elif suffix_name == ".bin":
driver = CppTestDriver(device)
driver = CppTestDriver(device, hdc_tool)
elif suffix_name == ".dex":
driver = DexTestDriver(device)
elif suffix_name == ".hap":
......@@ -78,47 +81,64 @@ def get_current_driver(device, target_name):
class Distribute:
def __init__(self, suite_dir, major, agent_list):
def __init__(self, suite_dir, major, agent_list, hdc_tool):
self.suite_dir = suite_dir
self.major = major
self.agent_list = agent_list
self.hdc_tool = hdc_tool
def exec_agent(self, device, target_name):
driver = get_current_driver(device, target_name)
def exec_agent(self, device, target_name, result_path):
driver = get_current_driver(device, target_name, self.hdc_tool)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_agent_desc_file(device)
device.push_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
if self.hdc_tool != "hdc":
device.push_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
else:
device.push_hdc_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_hdc_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
driver.execute(suite_path, background=True)
driver.execute(suite_path, result_path, background=True)
return self._check_thread(device, target_name)
def exec_major(self, device, target_name):
driver = get_current_driver(device, target_name)
def exec_major(self, device, target_name, result_path):
driver = get_current_driver(device, target_name, self.hdc_tool)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_major_desc_file()
device.push_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
if self.hdc_tool != "hdc":
device.push_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
else:
device.push_hdc_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_hdc_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
return driver.execute(suite_path, background=False)
return driver.execute(suite_path, result_path, background=False)
def pull_result(self, device, source_path, result_save_path):
_, file_name = os.path.split(source_path)
device.pull_file(source_path, result_save_path)
if self.hdc_tool != "hdc":
device.pull_file(source_path, result_save_path)
else:
device.pull_hdc_file(source_path, result_save_path)
if not os.path.exists(os.path.join(result_save_path, file_name)):
create_empty_result_file(result_save_path, file_name)
return
......@@ -126,10 +146,13 @@ class Distribute:
def _check_thread(self, device, thread_name):
check_command = "ps -A | grep %s" % thread_name
checksum = 0
while checksum < 100: # check 100 times
while checksum < 100:
checksum += 1
print("check thread:%s %s times" % (thread_name, checksum))
output = device.shell_with_output(check_command)
if self.hdc_tool != "hdc":
output = device.shell_with_output(check_command)
else:
output = device.hdc_std_shell_with(check_command)
if output == "":
time.sleep(0.1)
else:
......@@ -140,31 +163,75 @@ class Distribute:
def _make_agent_desc_file(self, device):
agent_ip_list = ""
device_uuid_list = ""
device_uuid_list += self._query_device_uuid(self.major) + ","
agent_ip_list += self._query_device_ip(device) + ","
for agent in self.agent_list:
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
if self.hdc_tool != "hdc":
if self._query_device_uuid(self.major) != '':
device_uuid_list += self._query_device_uuid(self.major) + ","
if self._query_device_ip(device) != "":
agent_ip_list += self._query_device_ip(device) + ","
for agent in self.agent_list:
if self._query_device_uuid(agent):
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
else:
if self._query_device_hdc_uuid(self.major):
device_uuid_list += self._query_device_hdc_uuid(self.major) + ","
if self._query_device_hdc_ip(device):
agent_ip_list += self._query_device_hdc_ip(device) + ","
for agent in self.agent_list:
if self._query_device_hdc_uuid(agent):
device_uuid_list += self._query_device_hdc_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_agent_file = os.path.join(self.suite_dir, "agent.desc")
self._write_device_config(config_info, config_agent_file)
def _make_major_desc_file(self):
agent_ip_list = ""
device_uuid_list = ""
device_uuid_list += self._query_device_uuid(self.major) + ","
for agent in self.agent_list:
agent_ip_list += self._query_device_ip(agent) + ","
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
if self.hdc_tool != "hdc":
if self._query_device_uuid(self.major) != "NoneTyple":
device_uuid_list += self._query_device_uuid(self.major) + ","
for agent in self.agent_list:
if self._query_device_ip(agent) != "" and self._query_device_uuid(agent) != "":
agent_ip_list += self._query_device_ip(agent) + ","
device_uuid_list += self._query_device_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
else:
if self._query_device_hdc_uuid(self.major):
device_uuid_list += self._query_device_hdc_uuid(self.major) + ","
for agent in self.agent_list:
if self._query_device_hdc_uuid(agent):
agent_ip_list += self._query_device_hdc_ip(agent) + ","
device_uuid_list += self._query_device_hdc_uuid(agent) + ","
config_info = DEVICE_INFO_TEMPLATE % (agent_ip_list, "8888",
device_uuid_list)
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
def _query_device_ip(self, device):
@classmethod
def _query_device_ip(cls, device):
output = device.shell_with_output("getprop ro.hardware")
if output == "":
return ""
......@@ -185,7 +252,28 @@ class Distribute:
return ipaddress[0]
def _query_device_uuid(self, device):
@classmethod
def _query_device_hdc_ip(cls, device):
output = device.hdc_std_shell_with("param get ohos.boot.hardware")
if output == "":
return ""
isemulator = re.findall('readonly', str(output))
output = device.hdc_std_shell_with("ifconfig")
if output == "":
return ""
if 0 != len(isemulator):
ipaddress = re.findall(r"\b10\.0\.2\.[0-9]{1,3}\b", output)
else:
ip_template = r"\b192\.168\.(?:[0-9]{1,3}\.)[0-9]{1,3}\b"
ipaddress = re.findall(ip_template, output)
if len(ipaddress) == 0:
return ""
return ipaddress[0]
@classmethod
def _query_device_uuid(cls, device):
"""
1. Run the dumpsys DdmpDeviceMonitorService command to query the value
of dev_nodeid.
......@@ -209,7 +297,33 @@ class Distribute:
dev_nodeid_info = device_info[begin:end].replace('"', "")
return dev_nodeid_info.split(":")[1]
def _write_device_config(self, device_info, file_path):
@classmethod
def _query_device_hdc_uuid(cls, device):
"""
1. Run the dumpsys DdmpDeviceMonitorService command to query the value
of dev_nodeid.
2. The dump information reported by the soft bus. Local device info,
should be placed first.
Note: The dump information may not comply with the JSON format.
"""
if platform.system() == "Windows":
file_path = os.path.normpath(os.path.join(sys.framework_root_dir, "../SoftBusdumpdeviceInfo"))
else:
linux_file_path = "out/rk3568/communication/dsoftbus_standard/SoftBusdumpdeviceInfo"
file_path = os.path.normpath(os.path.join("../../", sys.framework_root_dir, linux_file_path))
device.push_hdc_file(file_path, "/data/test")
chmod_device_file_path = "SoftBusdumpdeviceInfo"
device_info = device.hdc_std_shell_with("/data/test/%s" % chmod_device_file_path)
if device_info == "":
return ""
dev_nodeid_info = re.findall(r"Uuid = (.*?\r\n)", device_info)
if dev_nodeid_info:
return str(dev_nodeid_info[0])
@classmethod
def _write_device_config(cls, device_info, file_path):
file_dir, file_name = os.path.split(file_path)
final_file = os.path.join(file_dir, file_name.split('.')[0] + ".desc")
if os.path.exists(final_file):
......
#!/usr/bin/env python3
# coding=utf-8
#
# Copyright (c) 2021 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from distributed.common.manager import DeviceManager
from distributed.distribute.distribute import Distribute
from distributed.common.common import create_empty_result_file
class DbinderTest(unittest.TestCase):
def __init__(self, result_path, suits_dir):
self.result_path = result_path
self.suits_dir = suits_dir
def setUp(self):
print('setUp')
self.manager = DeviceManager(self.result_path)
self.major = self.manager.PHONE1
self.angent_list = [self.manager.PHONE2]
self.hdc = "hdc"
def test_distribute(self, major_target_name="", agent_target_name=""):
major_target_name = major_target_name
agent_target_name = agent_target_name
distribute = Distribute(self.suits_dir, self.major, self.angent_list, self.hdc)
for agent in self.angent_list:
if not distribute.exec_agent(agent, agent_target_name, self.result_path):
print(agent, agent_target_name)
create_empty_result_file(self.result_path, major_target_name)
return
distribute.exec_major(self.major, major_target_name, self.result_path)
source_path = "%s/%s.xml" % (self.major.test_path, major_target_name)
distribute.pull_result(self.major, source_path, self.result_path)
def tearDown(self):
print('tearDown')
......@@ -40,6 +40,8 @@ def execute_distribute_test_file(py_test_file, result_rootpath):
shell=False,
env=environ_dic)
print("proc", proc)
try:
while proc.poll() is None:
line = proc.stdout.readline()
......@@ -153,11 +155,11 @@ def check_zdn_network(device, device_ip=""):
def query_device_ip(device):
output = device.execute_shell_command("getprop ro.hardware")
output = device.execute_shell_command("getprop ohos.boot.hardware")
if output == "":
return ""
isemulator = re.findall(r"ranchu", output)
isemulator = re.findall(r"read only", output)
output = device.execute_shell_command("ifconfig")
if output == "":
return ""
......@@ -172,3 +174,18 @@ def query_device_ip(device):
% device.device_sn)
return ""
return ipaddress[0]
def get_test_case(test_case):
major_test_case = []
agent_test_case = []
for test in test_case:
test_case = test.split("\\")[-1]
test_agent = test_case.split("Test")
if "Agent" in test_agent:
agent_test_case.append(test_case)
else:
major_test_case.append(test_case)
return major_test_case, agent_test_case
......@@ -31,6 +31,7 @@ from core.utils import scan_support_product
from core.utils import is_lite_product
from core.common import is_open_source_product
from core.command.parameter import Parameter
from core.command.distribute_execute import DbinderTest
from core.testcase.testcase_manager import TestCaseManager
from core.config.config_manager import UserConfigManager
from core.config.parse_parts_config import ParsePartsConfig
......@@ -41,20 +42,17 @@ LOG = platform_logger("Run")
class Run(object):
def process_command_run(self, command, options):
para = Parameter()
# 过滤options中的test type参数,确保都是合法的,最终的数据结构为:[{unittest:300,moduletest:300}]
test_type_list = para.get_testtype_list(options.testtype)
if len(test_type_list) == 0:
LOG.error("The testtype parameter is incorrect.")
return
options.testtype = test_type_list
# 初始化ParsePartsConfig,根据productform获取对应的所有子系统名称列表、子系统下的部件名列表
parser = ParsePartsConfig(options.productform)
# 获取部件名列表
partname_list = parser.get_part_list(
options.subsystem,
options.testpart)
options.partname_list = partname_list
# 获取编译结果输出目录
options.coverage_outpath = self.get_coverage_outpath(options)
LOG.info("")
......@@ -71,31 +69,31 @@ class Run(object):
LOG.info("partname_list = %s" % str(options.partname_list))
LOG.info("------------------------------------")
LOG.info("")
# options参数校验
if not para.check_run_parameter(options):
LOG.error("Input parameter is incorrect.")
return
# 编译测试用例
if not self._build_test_cases(options):
LOG.error("Build test cases failed.")
return
if "actstest" in options.testtype:
test_dict = self.get_acts_test_dict(options)
else:
test_dict = self.get_test_dict(options)
test_case_path = self.get_tests_out_path(options.productform)
if not os.path.exists(test_case_path):
LOG.error("%s is not exist." % test_case_path)
return
test_dict = TestCaseManager().get_test_files(test_case_path, options)
if not self._check_test_dictionary(test_dict):
LOG.error("The test file list is empty.")
return
if ("distributedtest" in options.testtype and
len(options.testtype) == 1):
from core.command.distribute_utils import get_test_case
from core.command.distribute_utils \
import check_ditributetest_environment
from core.command.distribute_utils import make_device_info_file
from core.command.distribute_utils \
import execute_distribute_test_file
from core.command.distribute_utils import make_reports
local_time = time.localtime()
......@@ -105,10 +103,13 @@ class Run(object):
if not check_ditributetest_environment():
return
output_test = get_test_case(test_dict["CXX"])
if not output_test:
return
result_rootpath = os.path.join(sys.framework_root_dir,
"reports",
create_time)
print(result_rootpath)
log_path = os.path.join(result_rootpath, "log")
tmp_path = os.path.join(result_rootpath, "temp")
......@@ -118,11 +119,22 @@ class Run(object):
Scheduler.start_task_log(log_path)
make_device_info_file(tmp_path)
pyfile_list = test_dict["PYT"]
for index, element in enumerate(pyfile_list):
LOG.info("[%s / %s] Executing: %s" % (index + 1,
len(pyfile_list), element))
execute_distribute_test_file(element, result_rootpath)
major_test = output_test[0]
agent_test = output_test[1]
suits_dir = os.path.dirname(test_dict["CXX"][0])
for major in major_test:
major_target_name = major
major_lift = major.split("Test")
for agent in agent_test:
agent_lift = agent.split("Test")
if major_lift[0] == agent_lift[0]:
agent_target_name = agent
manager = DbinderTest(result_rootpath, suits_dir)
manager.setUp()
manager.test_distribute(major_target_name, agent_target_name)
manager.tearDown()
make_reports(result_rootpath, start_time)
Scheduler.stop_task_log()
......@@ -141,8 +153,6 @@ class Run(object):
options.testcases_path = options.target_outpath
options.resource_path = os.path.abspath(os.path.join(
sys.framework_root_dir, "..", "resource"))
print(options.testcases_path)
print(options.resource_path)
if options.productform.find("wifiiot") != -1:
scheduler.update_test_type_in_source(".bin",
DeviceTestType.ctest_lite)
......@@ -172,14 +182,13 @@ class Run(object):
if options.coverage:
LOG.info("Coverage testing, no need to compile testcases")
return True
# 从user_config.xml文件获取<testcase>的配置,判断是否编译测试用例
is_build_testcase = UserConfigManager().get_user_config_flag(
"build", "testcase")
project_root_path = sys.source_code_root_path
if is_build_testcase and project_root_path != "":
from core.build.build_manager import BuildManager
build_manager = BuildManager()
# 实际编译调用逻辑
return build_manager.build_testcases(project_root_path, options)
else:
return True
......@@ -220,14 +229,28 @@ class Run(object):
return testcase_path
@classmethod
def get_acts_tests_out_path(cls, product_form):
acts_testcase_path = os.path.abspath(os.path.join(
get_build_output_path(product_form),
"suites",
"acts",
"testcases"))
LOG.info("acts_testcase_path=%s" % acts_testcase_path)
return acts_testcase_path
def get_tests_output_path(cls, product_form):
testcase_path = UserConfigManager().get_test_case_output_dir()
if testcase_path == "":
all_product_list = scan_support_product()
if product_form in all_product_list:
if is_open_source_product(product_form):
testcase_path = os.path.abspath(os.path.join(
get_build_output_path(product_form),
"packages",
"phone",
"tests"))
else:
testcase_path = os.path.abspath(os.path.join(
get_build_output_path(product_form),
"packages",
product_form,
"tests"))
else:
testcase_path = os.path.join(
get_build_output_path(product_form), "test")
LOG.info("testcase_path=%s" % testcase_path)
return testcase_path
@classmethod
def get_coverage_outpath(cls, options):
......@@ -241,18 +264,4 @@ class Run(object):
LOG.error("Coverage test: coverage_outpath is empty.")
return coverage_out_path
def get_acts_test_dict(self, options):
# 获取测试用例编译结果路径
acts_test_case_path = self.get_acts_tests_out_path(options.productform)
acts_test_dict = TestCaseManager().get_acts_test_files(acts_test_case_path, options)
return acts_test_dict
def get_test_dict(self, options):
# 获取测试用例编译结果路径
test_case_path = self.get_tests_out_path(options.productform)
if not os.path.exists(test_case_path):
LOG.error("%s is not exist." % test_case_path)
return
test_dict = TestCaseManager().get_test_files(test_case_path, options)
return test_dict
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册