未验证 提交 bceb7c4d 编写于 作者: M mipengwei 提交者: Gitee

Signed-off-by:mipengwei <mipengwei@huawei.com>

上级 940c4374
......@@ -17,11 +17,11 @@
#
import os
import time
##############################################################################
##############################################################################
import time
def get_result_dir(testsuit_path):
......@@ -43,7 +43,6 @@ def get_result_dir(testsuit_path):
if not os.path.exists(result_path):
os.makedirs(result_path)
return result_path
......@@ -102,4 +101,3 @@ def create_empty_result_file(savepath, filename, message=""):
##############################################################################
##############################################################################
......@@ -18,14 +18,13 @@
import json
import os
import shutil
import time
import platform
import subprocess
##############################################################################
##############################################################################
__all__ = ["DeviceAdapter", "HDCDeviceAdapter"]
__all__ = ["DeviceShell"]
import zipfile
......@@ -34,8 +33,6 @@ if platform.system() != 'Windows':
else:
QUOTATION_MARKS = "\""
USB_HDC = "HDC"
USB_TOOLS = "hdc_std"
HDC_TOOLS = "hdc_std"
......@@ -81,148 +78,28 @@ def get_package_name(hap_filepath):
return package_name
##############################################################################
##############################################################################
class DeviceAdapter:
def __init__(self, remote_ip="", repote_port="", device_sn="", name=""):
class DeviceShell:
def __init__(self, conn_type, remote_ip="", device_sn="", repote_port="", name=""):
self.conn_type = conn_type
self.device_sn = device_sn
self.name = name
self.test_path = "/data/test"
self.device_para = self.get_device_para(
remote_ip,
repote_port,
device_sn)
self.device_hdc_para = self.get_device_hdc_para(
device_sn
)
if len(self.device_para) == 3:
self.init_device()
if len(self.device_hdc_para) == 1:
self.init_device_hdc()
###############################################################
###############################################################
def init_device(self):
self.remount()
self.shell('rm -rf %s' % self.test_path)
self.shell('mkdir -p %s' % self.test_path)
self.shell('chmod 777 %s' % self.test_path)
self.shell("mount -o rw,remount,rw /%s" % "system")
def init_device_hdc(self):
self.target_mount()
self.hdc_shell('rm -rf %s' % self.test_path)
self.hdc_shell('mkdir -p %s' % self.test_path)
self.hdc_shell('chmod 777 %s' % self.test_path)
self.hdc_shell("mount -o rw,remount,rw /%s" % "system")
def remount(self):
command = "%s %s remount" % (USB_TOOLS, self.device_para)
self.execute_command(command)
def target_mount(self):
command = "%s %s target mount" % (HDC_TOOLS, self.device_hdc_para)
self.execute_command(command)
def push_file(self, srcpath, despath):
command = "%s %s push %s %s" % (
USB_TOOLS,
self.device_para,
srcpath,
despath)
return self.execute_command(command)
def push_hdc_file(self,srcpath, despath):
command = "%s %s file send %s %s" % (
USB_TOOLS,
self.device_hdc_para,
srcpath,
despath)
return self.execute_command(command)
def pull_file(self, srcpath, despath):
command = "%s %s pull %s %s" % (
USB_TOOLS,
self.device_para,
srcpath,
despath)
return self.execute_command(command)
def pull_hdc_file(self, srcpath, despath):
command = "%s %s file recv %s %s" % (
HDC_TOOLS,
self.device_hdc_para,
srcpath,
despath)
return self.execute_command(command)
def unlock_screen(self):
self.shell("svc power stayon true")
def unlock_device(self):
self.shell("input keyevent 82")
self.shell("wm dismiss-keyguard")
def lock_screen(self):
self.shell("svc power stayon false")
def disable_keyguard(self):
self.unlock_screen()
self.unlock_device()
def install_hap(self, suite_file):
file_name = os.path.basename(suite_file)
message = self.shell_with_output("bm install -p %s" % os.path.join(
self.test_path, file_name))
message = str(message).rstrip()
if message != "":
print(message)
if message == "" or "Success" in message:
return_code = True
if conn_type:
self.device_params = self.get_device_hdc_para(
device_sn
)
else:
return_code = False
time.sleep(1)
return return_code
def uninstall_hap(self, suite_file):
package_name = get_package_name(suite_file)
result = self.shell("pm uninstall %s" % package_name)
time.sleep(1)
return result
def install_app(self, file_path):
command = "%s %s install %s" % (
USB_TOOLS,
self.device_para,
file_path)
message = self.execute_command(command)
message = str(message).rstrip()
if message != "":
print(message)
if message == "" or "Success" in message:
return_code = True
else:
return_code = False
time.sleep(1)
return return_code
def uninstall_app(self, package_name):
command = "pm uninstall %s" % (package_name)
return_code = self.shell_with_output(command)
if return_code:
time.sleep(1)
return return_code
###############################################################
###############################################################
self.device_params = self.get_device_para(
remote_ip, repote_port, device_sn)
self.init_device()
@classmethod
def get_device_para(cls, remote_ip="", remote_port="",
device_sn=""):
device_sn=""):
if "" == remote_ip or "" == remote_port:
if "" == device_sn:
device_para = ""
......@@ -239,157 +116,60 @@ class DeviceAdapter:
@classmethod
def get_device_hdc_para(cls, device_sn=""):
if " " == device_sn:
device_hdc_para = ""
device_para = ""
else:
device_hdc_para = "-t %s" % device_sn
return device_hdc_para
@classmethod
def execute_command(cls, command, print_flag=True, timeout=900):
try:
if print_flag:
print("command: " + command)
if subprocess.call(command, shell=True, timeout=timeout) == 0:
print("results: successed")
return True
except Exception as error:
print("Exception: %s" % str(error))
print("results: failed")
return False
@classmethod
def execute_command_with_output(cls, command, print_flag=True):
if print_flag:
print("command: " + command)
proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
try:
data, _ = proc.communicate()
if isinstance(data, bytes):
data = data.decode('utf-8', 'ignore')
finally:
proc.stdout.close()
proc.stderr.close()
return data
def shell(self, command=""):
return self.execute_command("%s %s shell %s%s%s" % (
USB_TOOLS,
self.device_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
def hdc_shell(self, command=""):
return self.execute_command("%s %s shell %s%s%s" % (
USB_HDC,
self.device_hdc_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
def execute_shell_command(self, command):
return self.shell(command)
def shell_with_output(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
USB_TOOLS,
self.device_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
def hdc_std_shell_with(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
USB_TOOLS,
self.device_hdc_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS
))
@classmethod
def check_path_legal(cls, path):
if path and " " in path:
return "\"%s\"" % path
return path
def is_file_exist(self, file_path):
file_path = self.check_path_legal(file_path)
message = self.shell_with_output("ls %s" % file_path)
return False if message == "" else True
##############################################################################
##############################################################################
device_para = "-t %s" % device_sn
return device_para
class HDCDeviceAdapter:
def __init__(self, remote_ip="", repote_port="", device_sn="", name=""):
self.device_sn = device_sn
self.name = name
self.test_path = "/data/test/"
self.device_para = self.get_device_para(
remote_ip,
repote_port,
device_sn)
self.device_hdc_para = self.get_device_hdc_para(
device_sn
)
if len(self.device_para) == 3:
self.init_device()
if len(self.device_hdc_para) == 1:
self.init_device_hdc()
###############################################################
###############################################################
def remount(self):
if self.conn_type:
remount = "target mount"
else:
remount = "remount"
command = "%s %s %s" % (HDC_TOOLS, self.device_params, remount)
self.execute_command(command)
def init_device(self):
self.remount()
self.shell('rm -rf %s' % self.test_path)
self.shell('mkdir -p %s' % self.test_path)
self.shell('chmod 777 %s' % self.test_path)
self.shell('chmod 777 -R %s' % self.test_path)
self.shell("mount -o rw,remount,rw /%s" % "system")
def init_device_hdc(self):
self.remount()
self.shell('rm -rf %s' % self.test_path)
self.shell('mkdir -p %s' % self.test_path)
self.shell('chmod 777 %s' % self.test_path)
self.shell("mount -o rw,remount,rw /%s" % "system")
def unlock_screen(self):
self.shell("svc power stayon true")
def remount(self):
command = "%s %s target mount" % (HDC_TOOLS)
self.execute_command(command)
def unlock_device(self):
self.shell("input keyevent 82")
self.shell("wm dismiss-keyguard")
def push_file(self, srcpath, despath):
command = "%s %s file send %s %s" % (
if self.conn_type:
push_args = "file send"
else:
push_args = "push"
command = "%s %s %s %s %s" % (
HDC_TOOLS,
self.device_para,
self.device_params,
push_args,
srcpath,
despath)
return self.execute_command(command)
def pull_file(self, srcpath, despath):
command = "%s %s file recv %s %s" % (
if self.conn_type:
pull_args = "file recv"
else:
pull_args = "pull"
command = "%s %s %s %s %s" % (
HDC_TOOLS,
self.device_para,
self.device_params,
pull_args,
srcpath,
despath)
return self.execute_command(command)
def unlock_screen(self):
self.shell("svc power stayon true")
def unlock_device(self):
self.shell("input keyevent 82")
self.shell("wm dismiss-keyguard")
def lock_screen(self):
self.shell("svc power stayon false")
......@@ -397,33 +177,13 @@ class HDCDeviceAdapter:
self.unlock_screen()
self.unlock_device()
###############################################################
###############################################################
@classmethod
def get_device_para(cls, remote_ip="", remote_port="",
device_sn=""):
if "" == remote_ip or "" == remote_port:
if "" == device_sn:
device_para = ""
else:
device_para = "-t %s" % device_sn
else:
if "" == device_sn:
device_para = "-s tcp:%s:%s" % (remote_ip, remote_port)
else:
device_para = "-s tcp:%s:%s -t %s" % (
remote_ip, remote_port, device_sn)
return device_para
@classmethod
def get_device_hdc_para(cls, device_sn=""):
if " " == device_sn:
device_para = ""
else:
device_para = "-t %s" % device_sn
return device_para
def shell(self, command=""):
return self.execute_command("%s %s shell %s%s%s" % (
HDC_TOOLS,
self.device_params,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
@classmethod
def execute_command(cls, command, print_flag=True, timeout=900):
......@@ -438,15 +198,23 @@ class HDCDeviceAdapter:
print("results: failed")
return False
def shell_with_output(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
HDC_TOOLS,
self.device_params,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
@classmethod
def execute_command_with_output(cls, command, print_flag=True):
if print_flag:
print("command: " + command)
proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
try:
data, _ = proc.communicate()
......@@ -457,43 +225,8 @@ class HDCDeviceAdapter:
proc.stderr.close()
return data
def shell(self, command=""):
return self.execute_command("%s %s shell %s%s%s" % (
HDC_TOOLS,
self.device_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
def shell_with_output(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
HDC_TOOLS,
self.device_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS))
def hdc_std_shell_with(self, command=""):
return self.execute_command_with_output("%s %s shell %s%s%s" % (
HDC_TOOLS,
self.device_hdc_para,
QUOTATION_MARKS,
command,
QUOTATION_MARKS
))
@classmethod
def check_path_legal(cls, path):
if path and " " in path:
return "\"%s\"" % path
return path
def is_file_exist(self, file_path):
file_path = self.check_path_legal(file_path)
message = self.shell_with_output("ls %s" % file_path)
return False if message == "" else True
##############################################################################
##############################################################################
return path
\ No newline at end of file
......@@ -61,31 +61,7 @@ class ITestDriver:
__metaclass__ = ABCMeta
@abstractmethod
def execute(self, suite_file, push_flag=False):
pass
##############################################################################
##############################################################################
class DexTestDriver:
__metaclass__ = ABCMeta
@abstractmethod
def execute(self, suite_file, push_flag=False):
pass
##############################################################################
##############################################################################
class HapTestDriver:
__metaclass__ = ABCMeta
@abstractmethod
def execute(self, suite_file, push_flag=False):
def execute(self, suite_file, result_path, push_flag=False):
pass
......@@ -94,9 +70,9 @@ class HapTestDriver:
class CppTestDriver(ITestDriver):
def __init__(self, device, hdc_tool):
def __init__(self, device, hdc_or_adb):
self.device = device
self.hdc_tool = hdc_tool
self.hdc_or_adb = hdc_or_adb
def execute(self, suite_file, result_path, background=False):
file_name = os.path.basename(suite_file)
......@@ -109,13 +85,10 @@ class CppTestDriver(ITestDriver):
file_name)
print("command: %s" % command)
sh_file_name, file_path = make_long_command_file(command,
long_command_path,
file_name)
if self.hdc_tool != "hdc":
self.device.push_file(file_path, DEVICE_TEST_PATH)
else:
self.device.push_hdc_file(file_path, DEVICE_TEST_PATH)
sh_file_name, file_path = make_long_command_file(command,
long_command_path,
file_name)
self.device.push_file(file_path, DEVICE_TEST_PATH)
# push resource files
resource_manager = ResourceManager()
......@@ -131,10 +104,9 @@ class CppTestDriver(ITestDriver):
sh_command = "sh %s" % (
os.path.join(DEVICE_TEST_PATH, sh_file_name))
if self.hdc_tool != "hdc":
return self.device.shell(sh_command)
else:
return self.device.hdc_shell(sh_command)
return self.device.shell(sh_command)
##############################################################################
##############################################################################
......@@ -18,8 +18,7 @@
import os
from distributed.common.devices import DeviceAdapter
from distributed.common.devices import HDCDeviceAdapter
from distributed.common.devices import DeviceShell
##############################################################################
......@@ -27,7 +26,7 @@ from distributed.common.devices import HDCDeviceAdapter
class DeviceManager:
def __init__(self, result_path):
self.has_hdc_tool = False
self.is_hdc = True
self.phone_device_list = []
self.ivi_device_list = []
self.tv_device_list = []
......@@ -35,21 +34,14 @@ class DeviceManager:
self.make_device_list(result_path)
def make_device_adapter(self, device_info_list, device_name):
if self.has_hdc_tool:
device = HDCDeviceAdapter(device_sn=device_info_list[0],
remote_ip=device_info_list[2],
repote_port=device_info_list[3],
name=device_name)
else:
device = DeviceAdapter(device_sn=device_info_list[0],
remote_ip=device_info_list[2],
repote_port=device_info_list[3],
name=device_name)
device = DeviceShell(self.is_hdc, device_sn=device_info_list[0],
remote_ip=device_info_list[2],
repote_port=device_info_list[3],
name=device_name)
return device
def make_device_list(self, result_path):
device_info_list = self.get_device_info_list(result_path)
for item in device_info_list:
if len(item) != 4:
continue
......@@ -83,8 +75,8 @@ class DeviceManager:
def get_device_info_list(result):
device_info_list = []
tmp_path = os.path.join(result, "temp")
device_info_file_path = os.path.join(tmp_path,
"device_info_file.txt")
device_info_file_path = os.path.join(tmp_path,
"device_info_file.txt")
if os.path.exists(device_info_file_path):
with open(device_info_file_path, "r") as file_handle:
......
......@@ -19,7 +19,6 @@
import os
import sys
import re
import json
import time
import platform
......@@ -52,8 +51,7 @@ sys.path.insert(4, sys.adapter_dir)
from distributed.common.common import create_empty_result_file
from distributed.common.common import get_resource_dir
from distributed.common.drivers import CppTestDriver
from distributed.common.drivers import DexTestDriver
from distributed.common.drivers import HapTestDriver
DEVICE_INFO_TEMPLATE = "agentlist:%s\nagentport:%s,\ndevicesuuid:%s"
......@@ -62,17 +60,13 @@ DEVICE_INFO_TEMPLATE = "agentlist:%s\nagentport:%s,\ndevicesuuid:%s"
##############################################################################
def get_current_driver(device, target_name, hdc_tool):
def get_current_driver(device, target_name):
driver = None
_, suffix_name = os.path.splitext(target_name)
if suffix_name == "":
driver = CppTestDriver(device, hdc_tool)
driver = CppTestDriver(device)
elif suffix_name == ".bin":
driver = CppTestDriver(device, hdc_tool)
elif suffix_name == ".dex":
driver = DexTestDriver(device)
elif suffix_name == ".hap":
driver = HapTestDriver(device)
driver = CppTestDriver(device)
return driver
......@@ -81,14 +75,14 @@ def get_current_driver(device, target_name, hdc_tool):
class Distribute:
def __init__(self, suite_dir, major, agent_list, hdc_tool):
def __init__(self, suite_dir, major, agent_list, hdc_tools):
self.suite_dir = suite_dir
self.major = major
self.agent_list = agent_list
self.hdc_tool = hdc_tool
self.hdc_tools = hdc_tools
def exec_agent(self, device, target_name, result_path):
driver = get_current_driver(device, target_name, self.hdc_tool)
driver = get_current_driver(device, target_name)
if driver is None:
print("Error: driver is None.")
return False
......@@ -96,63 +90,46 @@ class Distribute:
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_agent_desc_file(device)
if self.hdc_tool != "hdc":
device.push_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
else:
device.push_hdc_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_hdc_file(os.path.join(resource_dir, target_name),
device.test_path)
device.push_file(os.path.join(self.suite_dir, "agent.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
driver.execute(suite_path, result_path, background=True)
return self._check_thread(device, target_name)
def exec_major(self, device, target_name, result_path):
driver = get_current_driver(device, target_name, self.hdc_tool)
driver = get_current_driver(device, target_name)
if driver is None:
print("Error: driver is None.")
return False
resource_dir = get_resource_dir(self.suite_dir, device.name)
self._make_major_desc_file()
if self.hdc_tool != "hdc":
device.push_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
else:
device.push_hdc_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_hdc_file(os.path.join(resource_dir, target_name),
device.test_path)
device.push_file(os.path.join(self.suite_dir, "major.desc"),
device.test_path)
device.push_file(os.path.join(resource_dir, target_name),
device.test_path)
suite_path = os.path.join(self.suite_dir, target_name)
return driver.execute(suite_path, result_path, background=False)
def pull_result(self, device, source_path, result_save_path):
@staticmethod
def pull_result(device, source_path, result_save_path):
_, file_name = os.path.split(source_path)
if self.hdc_tool != "hdc":
device.pull_file(source_path, result_save_path)
else:
device.pull_hdc_file(source_path, result_save_path)
device.pull_file(source_path, result_save_path)
if not os.path.exists(os.path.join(result_save_path, file_name)):
create_empty_result_file(result_save_path, file_name)
return
def _check_thread(self, device, thread_name):
@staticmethod
def _check_thread(device, thread_name):
check_command = "ps -A | grep %s" % thread_name
checksum = 0
while checksum < 100:
while checksum < 10:
checksum += 1
print("check thread:%s %s times" % (thread_name, checksum))
if self.hdc_tool != "hdc":
output = device.shell_with_output(check_command)
else:
output = device.hdc_std_shell_with(check_command)
output = device.shell_with_output(check_command)
if output == "":
time.sleep(0.1)
else:
......@@ -164,7 +141,7 @@ class Distribute:
agent_ip_list = ""
device_uuid_list = ""
if self.hdc_tool != "hdc":
if self.hdc_tools != "hdc":
if self._query_device_uuid(self.major) != '':
device_uuid_list += self._query_device_uuid(self.major) + ","
......@@ -198,11 +175,10 @@ class Distribute:
self._write_device_config(config_info, config_agent_file)
def _make_major_desc_file(self):
agent_ip_list = ""
device_uuid_list = ""
if self.hdc_tool != "hdc":
if self.hdc_tools != "hdc":
if self._query_device_uuid(self.major) != "NoneTyple":
device_uuid_list += self._query_device_uuid(self.major) + ","
......@@ -230,8 +206,8 @@ class Distribute:
config_major_file = os.path.join(self.suite_dir, "major.desc")
self._write_device_config(config_info, config_major_file)
@classmethod
def _query_device_ip(cls, device):
@staticmethod
def _query_device_ip(device):
output = device.shell_with_output("getprop ro.hardware")
if output == "":
return ""
......@@ -252,14 +228,14 @@ class Distribute:
return ipaddress[0]
@classmethod
def _query_device_hdc_ip(cls, device):
output = device.hdc_std_shell_with("param get ohos.boot.hardware")
@staticmethod
def _query_device_hdc_ip(device):
output = device.shell_with_output("param get ohos.boot.hardware")
if output == "":
return ""
isemulator = re.findall('readonly', str(output))
output = device.hdc_std_shell_with("ifconfig")
output = device.shell_with_output("ifconfig")
if output == "":
return ""
......@@ -272,8 +248,8 @@ class Distribute:
return ""
return ipaddress[0]
@classmethod
def _query_device_uuid(cls, device):
@staticmethod
def _query_device_uuid(device):
"""
1. Run the dumpsys DdmpDeviceMonitorService command to query the value
of dev_nodeid.
......@@ -287,18 +263,18 @@ class Distribute:
return ""
begin = device_info.find("dev_nodeid")
if (begin == -1):
if begin == -1:
return ""
end = device_info.find(",", begin)
if (end == -1):
if end == -1:
return ""
dev_nodeid_info = device_info[begin:end].replace('"', "")
return dev_nodeid_info.split(":")[1]
@classmethod
def _query_device_hdc_uuid(cls, device):
@staticmethod
def _query_device_hdc_uuid(device):
"""
1. Run the dumpsys DdmpDeviceMonitorService command to query the value
of dev_nodeid.
......@@ -309,12 +285,13 @@ class Distribute:
if platform.system() == "Windows":
file_path = os.path.normpath(os.path.join(sys.framework_root_dir, "../SoftBusdumpdeviceInfo"))
else:
linux_file_path = "out/rk3568/communication/dsoftbus_standard/SoftBusdumpdeviceInfo"
file_path = os.path.normpath(os.path.join("../../", sys.framework_root_dir, linux_file_path))
softbus_file_path = "out/rk3568/communication/dsoftbus_standard/SoftBusdumpdeviceInfo"
file_path = os.path.normpath(os.path.join("../../", sys.framework_root_dir, softbus_file_path))
device.push_hdc_file(file_path, "/data/test")
device.push_file(file_path, "/data/test")
device.shell_with_output("chmod 777 -R /data/test/")
chmod_device_file_path = "SoftBusdumpdeviceInfo"
device_info = device.hdc_std_shell_with("/data/test/%s" % chmod_device_file_path)
device_info = device.shell_with_output("/data/test/%s" % chmod_device_file_path)
if device_info == "":
return ""
......@@ -322,8 +299,8 @@ class Distribute:
if dev_nodeid_info:
return str(dev_nodeid_info[0])
@classmethod
def _write_device_config(cls, device_info, file_path):
@staticmethod
def _write_device_config(device_info, file_path):
file_dir, file_name = os.path.split(file_path)
final_file = os.path.join(file_dir, file_name.split('.')[0] + ".desc")
if os.path.exists(final_file):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册