提交 9386cc93 编写于 作者: C cuixucui

整改编码规范

上级 f8769a29
......@@ -223,6 +223,7 @@ class Command:
class CertCommandError(Exception):
def __init__(self, command, message):
super(CertCommandError, self).__init__()
self.message = message
self.command = command
self.__message = None
......
......@@ -29,7 +29,7 @@ from .reboot import Reboot
from .client import Client
class EulerCertification():
class EulerCertification:
def __init__(self):
self.certification = None
......@@ -50,12 +50,12 @@ class EulerCertification():
print("All cases are passed, test end.")
return True
devices = certdevice.get_devices()
self.devices = DeviceDocument(CertEnv.devicefile, devices)
oec_devices = certdevice.get_devices()
self.devices = DeviceDocument(CertEnv.devicefile, oec_devices)
self.devices.save()
# test_factory format example: [{"name":"nvme", "device":device, "run":True, "status":"PASS", "reboot":False}]
test_factory = self.get_tests(devices)
test_factory = self.get_tests(oec_devices)
self.update_factory(test_factory)
if not self.choose_tests():
return True
......@@ -76,7 +76,7 @@ class EulerCertification():
reboot.clean()
self.save(job)
return True
except Exception as e:
except (IOError, OSError, TypeError) as e:
print(e)
return False
......@@ -86,7 +86,7 @@ class EulerCertification():
Command("rm -rf %s" % CertEnv.certificationfile).run()
Command("rm -rf %s" % CertEnv.factoryfile).run()
Command("rm -rf %s" % CertEnv.devicefile).run()
except Exception as e:
except CertCommandError as e:
print(e)
return False
return True
......@@ -176,7 +176,7 @@ class EulerCertification():
def get_tests(self, devices):
"""
获取测试项
get test items
:param devices:
:return:
"""
......@@ -221,7 +221,7 @@ class EulerCertification():
for device in devices:
if device.get_property("SUBSYSTEM") == "usb" and \
device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \
("2." in device.get_property("ID_MODEL_FROM_DATABASE") or \
("2." in device.get_property("ID_MODEL_FROM_DATABASE") or
"3." in device.get_property("ID_MODEL_FROM_DATABASE")):
sort_devices["usb"] = [empty_device]
continue
......@@ -328,7 +328,7 @@ class EulerCertification():
def show_tests(self):
"""
显示测试用例
show test items
:return:
"""
print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \
......@@ -344,7 +344,7 @@ class EulerCertification():
status = test["status"]
device = test["device"].get_name()
run = "no"
if test["run"] == True:
if test["run"] is True:
run = "yes"
num = num + 1
......@@ -410,7 +410,8 @@ class EulerCertification():
self.test_factory.sort(key=lambda k: k["name"])
FactoryDocument(CertEnv.factoryfile, self.test_factory).save()
def search_factory(self, obj_test, test_factory):
@staticmethod
def search_factory(obj_test, test_factory):
for test in test_factory:
if test["name"] == obj_test["name"] and test["device"].path == obj_test["device"].path:
return True
......
......@@ -28,6 +28,7 @@ def filter_char(str):
filtered += str[start:]
return filtered
class CertDevice:
def __init__(self):
self.devices = None
......@@ -66,6 +67,7 @@ class CertDevice:
self.devices.sort(key=lambda k: k.path)
return self.devices
class Device:
def __init__(self, properties=None):
self.path = ""
......
......@@ -21,8 +21,8 @@ from .sysinfo import SysInfo
from .env import CertEnv
class Document():
def __init__(self, filename, document=dict()):
class Document:
def __init__(self, filename, document={}):
self.document = document
self.filename = filename
......@@ -49,8 +49,10 @@ class Document():
except (IOError, json.decoder.JSONDecodeError):
return False
class CertDocument(Document):
def __init__(self, filename, document=dict()):
def __init__(self, filename, document={}):
super(CertDocument, self).__init__()
self.document = dict()
self.filename = filename
if not document:
......@@ -103,8 +105,10 @@ class CertDocument(Document):
def get_kernel(self):
return self.document["kernel"]
class DeviceDocument(Document):
def __init__(self, filename, devices=list()):
def __init__(self, filename, devices=[]):
super(DeviceDocument, self).__init__()
self.filename = filename
self.document = list()
if not devices:
......@@ -113,8 +117,10 @@ class DeviceDocument(Document):
for device in devices:
self.document.append(device.properties)
class FactoryDocument(Document):
def __init__(self, filename, factory=list()):
def __init__(self, filename, factory=[]):
super(FactoryDocument, self).__init__()
self.document = list()
self.filename = filename
if not factory:
......@@ -143,21 +149,22 @@ class FactoryDocument(Document):
class ConfigFile:
def __init__(self, filename):
super(ConfigFile, self).__init__()
self.filename = filename
self.parameters = dict()
self.config = list()
self.load()
def load(self):
file = open(self.filename)
self.config = file.readlines()
fp = open(self.filename)
self.config = fp.readlines()
for line in self.config:
if line.strip() and line.strip()[0] == "#":
continue
words = line.strip().split(" ")
if words[0]:
self.parameters[words[0]] = " ".join(words[1:])
file.close()
fp.close()
def get_parameter(self, name):
if self.parameters:
......@@ -199,7 +206,7 @@ class ConfigFile:
self.save()
def save(self):
file = open(self.filename, "w")
fp = open(self.filename, "w")
for line in self.config:
file.write(line)
file.close()
fp.write(line)
fp.close()
......@@ -50,7 +50,8 @@ class Job(object):
for parameter_name, parameter_value in self.args.test_parameters:
self.test_parameters[parameter_name] = parameter_value
def discover(self, testname, device, subtests_filter=None):
@staticmethod
def discover(testname, device, subtests_filter=None):
if not testname:
print("testname not specified, discover test failed")
return None
......
......@@ -50,6 +50,7 @@ class Log(object):
self.log.close()
self.log = None
class Logger():
def __init__(self, logname, logdir, out, err):
self.log = Log(logname, logdir)
......
......@@ -22,7 +22,8 @@ class AcpiTest(Test):
Test.__init__(self)
self.requirements = ["acpica-tools"]
def test(self):
@staticmethod
def test():
try:
Command("acpidump").echo()
return True
......
......@@ -66,7 +66,8 @@ class CDRomTest(Test):
return False
return True
def get_type(self, device):
@staticmethod
def get_type(device):
if not device:
return None
......@@ -196,7 +197,8 @@ class CDRomTest(Test):
print(e)
return False
def cmp_tree(self, dir1, dir2):
@staticmethod
def cmp_tree(dir1, dir2):
if not (dir1 and dir2):
print("Error: invalid input dir.")
return False
......
......@@ -21,7 +21,8 @@ clock_dir = os.path.dirname(os.path.realpath(__file__))
class ClockTest(Test):
def test(self):
@staticmethod
def test():
return 0 == os.system("cd %s; ./clock" % clock_dir)
......
......@@ -59,7 +59,8 @@ class CPU:
return True
def set_freq(self, freq, cpu='all'):
@staticmethod
def set_freq(freq, cpu='all'):
cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq))
try:
cmd.run()
......@@ -68,7 +69,8 @@ class CPU:
print(e)
return False
def get_freq(self, cpu):
@staticmethod
def get_freq(cpu):
cmd = Command("cpupower -c %s frequency-info -w" % cpu)
try:
return int(cmd.get_str(r'.* frequency: (?P<freq>\d+) .*', 'freq', False))
......@@ -76,7 +78,8 @@ class CPU:
print(e)
return False
def set_governor(self, governor, cpu='all'):
@staticmethod
def set_governor(governor, cpu='all'):
cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor))
try:
cmd.run()
......@@ -85,7 +88,8 @@ class CPU:
print(e)
return False
def get_governor(self, cpu):
@staticmethod
def get_governor(cpu):
cmd = Command("cpupower -c %s frequency-info -p" % cpu)
try:
return cmd.get_str(r'.* governor "(?P<governor>\w+)".*', 'governor', False)
......@@ -93,7 +97,8 @@ class CPU:
print(e)
return False
def find_path(self, parent_dir, target_name):
@staticmethod
def find_path(parent_dir, target_name):
cmd = Command("find %s -name %s" % (parent_dir, target_name))
try:
cmd.run()
......@@ -262,7 +267,7 @@ class CPUFreqTest(Test):
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
load_test = Load(target_cpu)
load_test.run()
......
......@@ -205,7 +205,8 @@ class DiskTest(Test):
print("#############")
return return_code
def do_fio(self, filepath, size, option):
@staticmethod
def do_fio(filepath, size, option):
if os.path.isdir(filepath):
file_opt = "-directory=%s" % filepath
else:
......
......@@ -22,7 +22,8 @@ class IpmiTest(Test):
Test.__init__(self)
self.requirements = ["OpenIPMI", "ipmitool"]
def start_ipmi(self):
@staticmethod
def start_ipmi():
try:
Command("systemctl start ipmi").run()
Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False)
......@@ -31,7 +32,8 @@ class IpmiTest(Test):
return False
return True
def ipmitool(self):
@staticmethod
def ipmitool():
cmd_list = ["ipmitool fru","ipmitool sensor"]
for cmd in cmd_list:
try:
......@@ -48,6 +50,7 @@ class IpmiTest(Test):
return False
return True
if __name__ == "__main__":
i = IpmiTest()
i.test()
......
......@@ -162,7 +162,8 @@ class MemoryTest(Test):
return False
return True
def hot_plug_verify(self):
@staticmethod
def hot_plug_verify():
kernel = Command("uname -r").read()
config_file = "/boot/config-" + kernel
if not os.path.exists(config_file):
......@@ -198,7 +199,8 @@ class MemoryTest(Test):
if total_mem_3 != total_mem_1:
return False
def online_memory(self, memory_path):
@staticmethod
def online_memory(memory_path):
try:
Command("echo 1 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("online")
......@@ -207,7 +209,8 @@ class MemoryTest(Test):
print("Error: fail to online %s." % memory_path)
return False
def offline_memory(self, memory_path):
@staticmethod
def offline_memory(memory_path):
try:
Command("echo 0 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("offline")
......
......@@ -48,7 +48,8 @@ class NetworkTest(Test):
self.target_bandwidth_percent = 0.8
self.testfile = 'testfile'
def ifdown(self, interface):
@staticmethod
def ifdown(interface):
os.system("ip link set down %s" % interface)
for _ in range(5):
if 0 == os.system("ip link show %s | grep 'state DOWN'" % interface):
......@@ -56,7 +57,8 @@ class NetworkTest(Test):
time.sleep(1)
return False
def ifup(self, interface):
@staticmethod
def ifup(interface):
os.system("ip link set up %s" % interface)
for _ in range(5):
time.sleep(1)
......@@ -64,7 +66,8 @@ class NetworkTest(Test):
return True
return False
def get_other_interfaces(self):
@staticmethod
def get_other_interfaces():
ignore_interfaces = ['^lo', '^v', 'docker', 'br', 'bond']
cmd = "ip route show default | awk '/default/ {print $5}'"
c = Command(cmd)
......
......@@ -74,7 +74,8 @@ class NvmeTest(Test):
print(e)
return False
def in_use(self, disk):
@staticmethod
def in_use(disk):
os.system("swapon -a 2>/dev/null")
swap_file = open("/proc/swaps", "r")
swap = swap_file.read()
......
......@@ -60,6 +60,7 @@ class PerfTest(Test):
return False
return True
if __name__ == "__main__":
main = PerfTest()
main.test()
......@@ -58,7 +58,8 @@ class SystemTest(Test):
return return_code
def check_certrpm(self):
@staticmethod
def check_certrpm():
print("\nChecking installed cert package...")
return_code = True
for cert_package in ["oec-hardware"]:
......@@ -149,7 +150,8 @@ class SystemTest(Test):
return return_code
def get_modules(self, sign):
@staticmethod
def get_modules(sign):
pattern = re.compile(r"^(?P<mod_name>\w+)[\s\S]+\((?P<signs>[A-Z]+)\)")
proc_modules = open("/proc/modules")
modules = list()
......@@ -208,7 +210,8 @@ class SystemTest(Test):
print("")
return True
def read_abi_whitelist(self, whitelist):
@staticmethod
def read_abi_whitelist(whitelist):
symbols = list()
if not os.path.isfile(whitelist):
print("Error: Cannot read whitelist file")
......@@ -255,7 +258,8 @@ class SystemTest(Test):
nm.close()
return self.readSymbols(symbols)
def get_modulefile(self, module):
@staticmethod
def get_modulefile(module):
try:
modulefile = Command("modinfo -F filename %s" % module).get_str()
if os.path.islink(modulefile):
......@@ -265,7 +269,8 @@ class SystemTest(Test):
print("Error: could no find module file for %s:" % module)
return None
def check_selinux(self):
@staticmethod
def check_selinux():
print("\nChecking selinux...")
status = os.system("/usr/sbin/sestatus | grep 'SELinux status' | grep -qw 'enabled'")
mode = os.system("/usr/sbin/sestatus | grep 'Current mode' | grep -qw 'enforcing'")
......
......@@ -96,10 +96,11 @@ class UsbTest(Test):
devices = CertDevice().get_devices()
usb_devices = list()
for device in devices:
if (device.get_property("SUBSYSTEM") != "usb" or \
device.get_property("DEVTYPE") != "usb_device" or \
device.get_property("ID_BUS") != "usb" or \
device.get_property("BUSNUM") == "" or device.get_property("DEVNUM") == ""):
if (device.get_property("SUBSYSTEM") != "usb" or
device.get_property("DEVTYPE") != "usb_device" or
device.get_property("ID_BUS") != "usb" or
device.get_property("BUSNUM") == "" or
device.get_property("DEVNUM") == ""):
continue
else:
usb_devices.append(device.path)
......
......@@ -59,6 +59,7 @@ class WatchDogTest(Test):
print("")
return False
def startup(self):
@staticmethod
def startup():
print("Recover from watchdog.")
return True
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册