提交 5e275552 编写于 作者: D deveco_test

fix code

Signed-off-by: Ndeveco_test <liguangjie1@huawei.com>
上级 f9cd29f2
......@@ -63,7 +63,6 @@ class DeviceLiteKernel(object):
@dataclass
class CKit:
push = "PushKit"
install = "ApkInstallKit"
liteinstall = "LiteAppInstallKit"
command = "CommandKit"
config = "ConfigKit"
......
......@@ -53,8 +53,11 @@ from xdevice import HdcError
from xdevice import DeviceConnectorType
from xdevice import get_filename_extension
from xdevice import junit_para_parse
from xdevice import reset_junit_para
from xdevice import gtest_para_parse
from xdevice import reset_junit_para
from xdevice import disable_keyguard
from xdevice import unlock_screen
from xdevice import unlock_device
from ohos.environment.dmlib import process_command_ret
from ohos.environment.dmlib import DisplayOutputReceiver
......@@ -62,7 +65,7 @@ from ohos.testkit.kit import junit_dex_para_parse
__all__ = ["CppTestDriver", "DexTestDriver", "HapTestDriver",
"JSUnitTestDriver", "JUnitTestDriver", "RemoteTestRunner",
"RemoteDexRunner", "disable_keyguard"]
"RemoteDexRunner"]
LOG = platform_logger("Drivers")
DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
ON_DEVICE_TEST_DIR_LOCATION = "/%s/%s/%s/" % ("data", "local", "tmp")
......@@ -1921,8 +1924,8 @@ class HapTestDriver(IDriver):
return return_message
def _execute_hapfile_junittest(self, filename, testpara, target_test_path):
_unlock_screen(self.config.device)
_unlock_device(self.config.device)
unlock_screen(self.config.device)
unlock_device(self.config.device)
try:
if not filename.endswith(self.instrument_hap_file_suffix):
......@@ -2674,23 +2677,6 @@ class LTPPosixTestDriver(IDriver):
return self.result if os.path.exists(self.result) else ""
def disable_keyguard(device):
_unlock_screen(device)
_unlock_device(device)
def _unlock_screen(device):
device.execute_shell_command("svc power stayon true")
time.sleep(1)
def _unlock_device(device):
device.execute_shell_command("input keyevent 82")
time.sleep(1)
device.execute_shell_command("wm dismiss-keyguard")
time.sleep(1)
def _lock_screen(device):
device.execute_shell_command("svc power stayon false")
time.sleep(1)
......
......@@ -35,14 +35,12 @@ from xdevice import get_plugin
from xdevice import IShellReceiver
from xdevice import exec_cmd
from xdevice import get_file_absolute_path
from xdevice import ParamError
from xdevice import FilePermission
from xdevice import DeviceError
from xdevice import HdcError
from xdevice import HdcCommandRejectedException
from xdevice import ShellCommandUnresponsiveException
from xdevice import DeviceState
from xdevice import DeviceConnectorType
from xdevice import convert_serial
from xdevice import is_proc_running
from xdevice import convert_ip
......
......@@ -27,6 +27,7 @@ from xdevice import platform_logger
from xdevice import ParamError
from xdevice import ConfigConst
from xdevice import HdcCommandRejectedException
from xdevice import DeviceConnectorType
from xdevice import DeviceEvent
from xdevice import TestDeviceState
from xdevice import DeviceState
......@@ -93,7 +94,7 @@ class ManagerDevice(IDeviceManager):
device.get("usb_type"), error))
self.device_connector = DeviceConnector(
device.get("ip"), device.get("port"),
"usb-hdc")
DeviceConnectorType.hdc)
self.device_connector.add_device_change_listener(
self.managed_device_listener)
self.device_connector.start()
......
......@@ -17,10 +17,7 @@
#
import os
import platform
import re
import signal
import subprocess
import zipfile
import stat
import time
......@@ -28,7 +25,6 @@ import json
from dataclasses import dataclass
from tempfile import TemporaryDirectory
from tempfile import NamedTemporaryFile
from threading import Timer
from xdevice import ITestKit
from xdevice import platform_logger
......@@ -38,21 +34,20 @@ from xdevice import get_file_absolute_path
from xdevice import get_config_value
from xdevice import exec_cmd
from xdevice import ConfigConst
from xdevice import DeviceTestType
from xdevice import FilePermission
from xdevice import AppInstallError
from xdevice import DeviceConnectorType
from xdevice import convert_serial
from xdevice import check_path_legal
from xdevice import modify_props
from xdevice import get_app_name_by_tool
from xdevice import remount
from xdevice import disable_keyguard
from ohos.constants import CKit
from ohos.environment.dmlib import CollectingOutputReceiver
__all__ = ["STSKit", "CommandKit","PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
__all__ = ["STSKit", "CommandKit", "PushKit", "PropertyCheckKit", "ShellKit", "WifiKit",
"ConfigKit", "AppInstallKit", "ComponentKit",
"get_app_name_by_tool", "junit_para_parse", "gtest_para_parse",
"junit_dex_para_parse", "reset_junit_para", "oh_jsunit_para_parse"]
"junit_dex_para_parse", "oh_jsunit_para_parse"]
MAX_WAIT_COUNT = 4
TARGET_SDK_VERSION = 22
......@@ -644,6 +639,7 @@ class ConfigKit(ITestKit):
except KeyError:
LOG.error("Get props error.")
continue
return prop_changed
......@@ -692,19 +688,10 @@ class AppInstallKit(ITestKit):
if app_file is None:
LOG.error("The app file {} does not exist".format(app))
continue
if app_file.endswith(".hap"):
if hasattr(device, "is_harmony") and device.is_harmony:
device.connector_command("install {}".format(app_file))
else:
self.install_hap(device, app_file)
if hasattr(device, "is_harmony") and device.is_harmony:
device.connector_command("install {}".format(app_file))
else:
result = device.install_package(
app_file, get_install_args(
device, app_file, self.ex_args))
if not result or not result.startswith("Success") or "successfully" not in result:
raise AppInstallError(
"Failed to install %s on %s. Reason:%s" %
(app_file, device.__get_serial__(), result))
self.install_hap(device, app_file)
self.installed_app.add(app_file)
def __teardown__(self, device):
......@@ -721,11 +708,7 @@ class AppInstallKit(ITestKit):
(device.__get_serial__(), result))
else:
for app in self.installed_app:
if app.endswith(".hap"):
app_name = get_app_name(app)
else:
app_name = get_app_name_by_tool(app, self.paths)
app_name = get_app_name(app)
if app_name:
result = device.uninstall_package(app_name)
if result and (result.startswith("Success") or "successfully" in result):
......@@ -906,48 +889,14 @@ class ComponentKit(ITestKit):
self.cache_device.clear()
def remount(device):
device.enable_hdc_root()
cmd = "target mount" \
if device.usb_type == DeviceConnectorType.hdc else "remount"
device.connector_command(cmd)
device.execute_shell_command("remount")
device.execute_shell_command("mount -o rw,remount /cust")
device.execute_shell_command("mount -o rw,remount /product")
device.execute_shell_command("mount -o rw,remount /hw_product")
device.execute_shell_command("mount -o rw,remount /version")
device.execute_shell_command("mount -o rw,remount /%s" % "system")
def keep_screen_on(device):
device.execute_shell_command("svc power stayon true")
def get_install_args(device, app_name, original_args=None):
"""To obtain all the args of app install
Args:
original_args: the argus configure in .config file
device : the device will be installed app
app_name : the name of the app which will be installed
Returns:
All the args
"""
if original_args is None:
original_args = []
new_args = original_args[:]
try:
sdk_version = device.get_property("ro.build.version.sdk")
if int(sdk_version) > TARGET_SDK_VERSION:
new_args.append("-g")
except TypeError as type_error:
LOG.error("Obtain the sdk version failed with exception {}".format(
type_error))
except ValueError as value_error:
LOG.error("Obtain the sdk version failed with exception {}".format(
value_error))
if app_name.endswith(".apex"):
new_args.append("--apex")
return " ".join(new_args)
def run_command(device, command):
......@@ -986,7 +935,6 @@ def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
# Disable screen keyguard
disable_key_guard = junit_paras.get('disable-keyguard')
if not disable_key_guard or disable_key_guard[0].lower() != 'false':
from ohos.drivers.drivers import disable_keyguard
disable_keyguard(device)
for para_name in junit_paras.keys():
......@@ -1020,67 +968,10 @@ def junit_dex_para_parse(device, junit_paras, prefix_char="--"):
return " ".join(ret_str)
def get_app_name_by_tool(app_path, paths):
"""To obtain the app name by using tool
Args:
app_path: the path of app
paths:
Returns:
The Pkg Name if found else None
"""
rex = "^package:\\s+name='(.*?)'.*$"
aapt_tool_name = "aapt.exe" if os.name == "nt" else "aapt"
if app_path:
proc_timer = None
try:
tool_file = get_file_absolute_path(os.path.join(
"tools", aapt_tool_name), paths)
LOG.debug("Aapt file is %s" % tool_file)
if platform.system() == "Linux":
if not oct(os.stat(tool_file).st_mode)[-3:] == "755":
os.chmod(tool_file, FilePermission.mode_755)
cmd = [tool_file, "dump", "badging", app_path]
timeout = 300
LOG.info("Execute command %s with %s" % (" ".join(cmd), timeout))
sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc_timer = Timer(timeout, timeout_callback, [sub_process])
proc_timer.start()
# The package name must be return in first line
output = sub_process.stdout.readline()
error = sub_process.stderr.readline()
LOG.debug("The output of aapt is {}".format(output))
if error:
LOG.debug("The error of aapt is {}".format(error))
if output:
pkg_match = re.match(rex, output.decode("utf8", 'ignore'))
if pkg_match is not None:
LOG.info(
"Obtain the app name {} successfully by using "
"aapt".format(pkg_match.group(1)))
return pkg_match.group(1)
except (FileNotFoundError, ParamError) as error:
LOG.debug("Aapt error: %s", error.args)
finally:
if proc_timer:
proc_timer.cancel()
def timeout_callback(proc):
try:
LOG.error("Error: execute command timeout.")
LOG.error(proc.pid)
if platform.system() != "Windows":
os.killpg(proc.pid, signal.SIGKILL)
else:
subprocess.call(
["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID",
str(proc.pid)], shell=False)
except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error:
LOG.exception("Timeout callback exception: %s" % error, exc_info=False)
def _get_class(junit_paras, prefix_char, para_name):
......
......@@ -40,11 +40,7 @@ def main():
'xdevice._core': [
'resource/*.txt',
'resource/config/*.xml',
'resource/template/*.html',
'resource/decc/data/log/*.bin',
'resource/decc/develop/file_data/*.bin',
'resource/decc/nice/*.bin',
'resource/decc/unbelievable/*.bin'
'resource/template/*.html'
]
},
entry_points={
......
......@@ -69,6 +69,12 @@ from _core.testkit.json_parser import JsonParser
from _core.testkit.kit import junit_para_parse
from _core.testkit.kit import gtest_para_parse
from _core.testkit.kit import reset_junit_para
from _core.testkit.kit import get_app_name_by_tool
from _core.testkit.kit import get_install_args
from _core.testkit.kit import remount
from _core.testkit.kit import disable_keyguard
from _core.testkit.kit import unlock_screen
from _core.testkit.kit import unlock_device
from _core.driver.parser_lite import ShellHandler
from _core.report.encrypt import check_pub_key_exist
from _core.utils import get_file_absolute_path
......@@ -181,6 +187,12 @@ __all__ = [
"junit_para_parse",
"gtest_para_parse",
"reset_junit_para",
"get_app_name_by_tool",
"get_install_args",
"remount",
"disable_keyguard",
"unlock_screen",
"unlock_device",
"ShellHandler",
"ResultCode",
"check_pub_key_exist",
......
......@@ -192,7 +192,8 @@ class UserConfigManager(object):
def get_device(self, target_name):
for node in self.config_content.findall(target_name):
data_dic = {}
if node.attrib["type"] != "usb-hdc":
if node.attrib["type"] != "usb-hdc" and \
node.attrib["type"] != "usb-adb":
continue
data_dic["usb_type"] = node.attrib["type"]
for sub in node:
......
......@@ -17,14 +17,43 @@
#
import os
import re
import stat
import json
import time
import platform
import subprocess
import signal
from threading import Timer
from _core.utils import get_file_absolute_path
from _core.logger import platform_logger
from _core.exception import ParamError
from _core.constants import DeviceTestType
from _core.constants import FilePermission
from _core.constants import DeviceConnectorType
LOG = platform_logger("Kit")
TARGET_SDK_VERSION = 22
__all__ = ["get_app_name_by_tool", "junit_para_parse", "gtest_para_parse",
"get_install_args", "reset_junit_para", "remount", "disable_keyguard",
"timeout_callback", "unlock_screen", "unlock_device"]
def remount(device):
device.enable_hdc_root()
cmd = "target mount" \
if device.usb_type == DeviceConnectorType.hdc else "remount"
device.connector_command(cmd)
device.execute_shell_command("remount")
device.execute_shell_command("mount -o rw,remount /cust")
device.execute_shell_command("mount -o rw,remount /product")
device.execute_shell_command("mount -o rw,remount /hw_product")
device.execute_shell_command("mount -o rw,remount /version")
device.execute_shell_command("mount -o rw,remount /%s" % "system")
def _get_class(junit_paras, prefix_char, para_name):
if not junit_paras.get(para_name):
......@@ -75,7 +104,6 @@ def junit_para_parse(device, junit_paras, prefix_char="-e"):
# Disable screen keyguard
disable_key_guard = junit_paras.get('disable-keyguard')
if not disable_key_guard or disable_key_guard[0].lower() != 'false':
from ohos.drivers.drivers import disable_keyguard
disable_keyguard(device)
for para_name in junit_paras.keys():
......@@ -165,3 +193,115 @@ def reset_junit_para(junit_para_str, prefix_char="-e", ignore_keys=None):
continue
normal_lines.append("{} {}".format(prefix_char, line))
return " ".join(normal_lines)
def get_install_args(device, app_name, original_args=None):
"""To obtain all the args of app install
Args:
original_args: the argus configure in .config file
device : the device will be installed app
app_name : the name of the app which will be installed
Returns:
All the args
"""
if original_args is None:
original_args = []
new_args = original_args[:]
try:
sdk_version = device.get_property("ro.build.version.sdk")
if int(sdk_version) > TARGET_SDK_VERSION:
new_args.append("-g")
except TypeError as type_error:
LOG.error("Obtain the sdk version failed with exception {}".format(
type_error))
except ValueError as value_error:
LOG.error("Obtain the sdk version failed with exception {}".format(
value_error))
if app_name.endswith(".apex"):
new_args.append("--apex")
return " ".join(new_args)
def get_app_name_by_tool(app_path, paths):
"""To obtain the app name by using tool
Args:
app_path: the path of app
paths:
Returns:
The Pkg Name if found else None
"""
rex = "^package:\\s+name='(.*?)'.*$"
aapt_tool_name = "aapt.exe" if os.name == "nt" else "aapt"
if app_path:
proc_timer = None
try:
tool_file = get_file_absolute_path(os.path.join(
"tools", aapt_tool_name), paths)
LOG.debug("Aapt file is %s" % tool_file)
if platform.system() == "Linux":
if not oct(os.stat(tool_file).st_mode)[-3:] == "755":
os.chmod(tool_file, FilePermission.mode_755)
cmd = [tool_file, "dump", "badging", app_path]
timeout = 300
LOG.info("Execute command %s with %s" % (" ".join(cmd), timeout))
sub_process = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc_timer = Timer(timeout, timeout_callback, [sub_process])
proc_timer.start()
# The package name must be return in first line
output = sub_process.stdout.readline()
error = sub_process.stderr.readline()
LOG.debug("The output of aapt is {}".format(output))
if error:
LOG.debug("The error of aapt is {}".format(error))
if output:
pkg_match = re.match(rex, output.decode("utf8", 'ignore'))
if pkg_match is not None:
LOG.info(
"Obtain the app name {} successfully by using "
"aapt".format(pkg_match.group(1)))
return pkg_match.group(1)
return None
except (FileNotFoundError, ParamError) as error:
LOG.debug("Aapt error: %s", error.args)
return None
finally:
if proc_timer:
proc_timer.cancel()
else:
LOG.error("get_app_name_by_tool error.")
return None
def timeout_callback(proc):
try:
LOG.error("Error: execute command timeout.")
LOG.error(proc.pid)
if platform.system() != "Windows":
os.killpg(proc.pid, signal.SIGKILL)
else:
subprocess.call(
["C:\\Windows\\System32\\taskkill", "/F", "/T", "/PID",
str(proc.pid)], shell=False)
except (FileNotFoundError, KeyboardInterrupt, AttributeError) as error:
LOG.exception("Timeout callback exception: %s" % error, exc_info=False)
def disable_keyguard(device):
unlock_screen(device)
unlock_device(device)
def unlock_screen(device):
device.execute_shell_command("svc power stayon true")
time.sleep(1)
def unlock_device(device):
device.execute_shell_command("input keyevent 82")
time.sleep(1)
device.execute_shell_command("wm dismiss-keyguard")
time.sleep(1)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册