提交 71eb5cb5 编写于 作者: C cuixucui

Code specification rectification

上级 2f1a0028
......@@ -23,13 +23,22 @@ except ImportError:
from urllib2 import urlopen, Request, HTTPError
class Client:
class Client(object):
"""
upload client
"""
def __init__(self, host, id):
self.host = host
self.id = id
self.form = {}
def upload(self, file, server='localhost'):
"""
upload client request
:param file:
:param server:
:return:
"""
filename = os.path.basename(file)
try:
job = filename.split('.')[0]
......
......@@ -16,12 +16,19 @@ import sys
import readline
class CommandUI:
class CommandUI(object):
"""
Command user interface selection
"""
def __init__(self, echoResponses=False):
self.echo = echoResponses
def printPipe(self, pipe):
"""
print pipe data
:param pipe:
:return:
"""
while 1:
line = pipe.readline()
if line:
......@@ -30,6 +37,12 @@ class CommandUI:
return pipe.close()
def prompt(self, question, choices=None):
"""
choice test item
:param question:
:param choices:
:return:
"""
while True:
sys.stdout.write(question)
if choices:
......@@ -45,6 +58,12 @@ class CommandUI:
sys.stdout.write("Please enter a choice\n")
def prompt_integer(self, question, choices=None):
"""
choice test item
:param question:
:param choices:
:return:
"""
while True:
sys.stdout.write(question)
if choices:
......@@ -62,6 +81,11 @@ class CommandUI:
sys.stdout.write("Please enter an integer.\n")
def prompt_confirm(self, question):
"""
Command interface displays confirmation information
:param question:
:return:
"""
YES = "y"
SAMEASYES = ["y", "yes"]
NO = "n"
......@@ -72,9 +96,17 @@ class CommandUI:
return True
if reply.lower() in SAMEASNO:
return False
sys.stdout.write("Please reply %s or %s.\n" %(YES, NO))
sys.stdout.write("Please reply %s or %s.\n" % (YES, NO))
def prompt_edit(self, label, value, choices=None):
@staticmethod
def prompt_edit(label, value, choices=None):
"""
prompt choice edit
:param label:
:param value:
:param choices:
:return:
"""
if not value:
value = ""
if choices:
......
......@@ -29,7 +29,7 @@ def filter_char(string):
if c in ascii_blacklist or (type(string) != unicode and ord(c) >= 128):
if start < i:
filtered += string[start:i]
start = i+1
start = i + 1
filtered += string[start:]
return filtered
......@@ -42,6 +42,10 @@ class CertDevice(object):
self.devices = None
def get_devices(self):
"""
get devices information
:return:
"""
self.devices = list()
try:
pipe = Command("udevadm info --export-db")
......
......@@ -67,6 +67,10 @@ class CertDocument(Document):
self.documemt = document
def new(self):
"""
new document object
:return:
"""
try:
pipe = Command("/usr/sbin/dmidecode -t 1")
pipe.start()
......@@ -148,6 +152,10 @@ class FactoryDocument(Document):
self.document.append(element)
def get_factory(self):
"""
Get factory parameter information
:return:
"""
factory = list()
for element in self.document:
test = dict()
......@@ -205,6 +213,11 @@ class ConfigFile(object):
return False
def remove_parameter(self, name):
"""
Update configuration information
:param name:
:return:
"""
if self.getParameter(name):
del self.parameters[name]
newconfig = list()
......@@ -221,6 +234,10 @@ class ConfigFile(object):
self.save()
def save(self):
"""
Save the config property value to a file
:return:
"""
fp = open(self.filename, "w")
for line in self.config:
fp.write(line)
......
......@@ -13,7 +13,10 @@
# Create: 2020-04-01
class CertEnv:
class CertEnv(object):
"""
Certification file path
"""
environmentfile = "/etc/oech.json"
releasefile = "/etc/os-release"
datadirectory = "/var/oech"
......
......@@ -27,7 +27,9 @@ from .reboot import Reboot
class Job(object):
"""
Test task management
"""
def __init__(self, args=None):
"""
Creates an instance of Job class.
......@@ -98,6 +100,11 @@ class Job(object):
return None
def create_test_suite(self, subtests_filter=None):
"""
Create test suites
:param subtests_filter:
:return:
"""
if self.test_suite:
return
......@@ -121,6 +128,10 @@ class Job(object):
print("No test found")
def check_test_depends(self):
"""
Install dependency packages
:return: depending
"""
required_rpms = []
for tests in self.test_suite:
for pkg in tests["test"].requirements:
......@@ -143,6 +154,12 @@ class Job(object):
return True
def _run_test(self, testcase, subtests_filter=None):
"""
Start a testing item
:param testcase:
:param subtests_filter:
:return:
"""
name = testcase["name"]
if testcase["device"].get_name():
name = testcase["name"] + "-" + testcase["device"].get_name()
......@@ -178,6 +195,11 @@ class Job(object):
return return_code
def run_tests(self, subtests_filter=None):
"""
Start testing
:param subtests_filter:
:return:
"""
if not len(self.test_suite):
print("No test to run.")
return
......@@ -190,6 +212,10 @@ class Job(object):
testcase["status"] = "FAIL"
def run(self):
"""
Test entrance
:return:
"""
logger = Logger("job.log", self.job_id, sys.stdout, sys.stderr)
logger.start()
self.create_test_suite(self.subtests_filter)
......@@ -202,6 +228,10 @@ class Job(object):
self.show_summary()
def show_summary(self):
"""
Command line interface display summary
:return:
"""
print("------------- Summary -------------")
for test in self.test_factory:
if test["run"]:
......@@ -215,6 +245,10 @@ class Job(object):
print("")
def save_result(self):
"""
Get test status
:return:
"""
for test in self.test_factory:
for testcase in self.test_suite:
if test["name"] == testcase["name"] and test["device"].path == testcase["device"].path:
......
......@@ -20,7 +20,9 @@ from .env import CertEnv
class Log(object):
"""
Read and write log
"""
def __init__(self, logname='oech.log', logdir='__temp__'):
if not logdir:
curtime = datetime.datetime.now().isoformat()
......
......@@ -19,8 +19,10 @@ from .env import CertEnv
from .command import Command, CertCommandError
class Reboot:
class Reboot(object):
"""
Special for restart tasks, so that the test can be continued after the machine is restarted
"""
def __init__(self, testname, job, rebootup):
self.testname = testname
self.rebootup = rebootup
......@@ -28,6 +30,10 @@ class Reboot:
self.reboot = dict()
def clean(self):
"""
Remove reboot file
:return:
"""
if not (self.job and self.testname):
return
......@@ -39,6 +45,10 @@ class Reboot:
Command("systemctl disable oech").run(ignore_errors=True)
def setup(self):
"""
Reboot setuping
:return:
"""
if not (self.job and self.testname):
print("Error: invalid reboot input.")
return False
......@@ -70,6 +80,10 @@ class Reboot:
return True
def check(self):
"""
Reboot file check
:return:
"""
doc = Document(CertEnv.rebootfile)
if not doc.load():
print("Error: reboot file load fail.")
......
......@@ -34,6 +34,11 @@ class SysInfo(object):
self.load(filename)
def load(self, filename):
"""
Collect system information
:param filename:
:return:
"""
try:
f = open(filename)
text = f.read()
......@@ -62,5 +67,9 @@ class SysInfo(object):
self.kernel_version = self.kernel.split('-')[0]
def get_version(self):
"""
Get system version information
:return:
"""
return self.version
......@@ -13,8 +13,10 @@
# Create: 2020-04-01
class Test:
class Test(object):
"""
Test set template
"""
def __init__(self, name=None):
self.pri = 0
self.requirements = list()
......
......@@ -74,7 +74,7 @@ def get_results():
@app.route('/results/<host>/<oec_id>/<job>')
def get_job(host, oec_id, job):
"""
获取job信息
get job information
:param host:
:param oec_id:
:param job:
......
......@@ -17,7 +17,9 @@ from hwcompatible.command import Command, CertCommandError
class AcpiTest(Test):
"""
acpi test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["acpica-tools"]
......@@ -29,5 +31,4 @@ class AcpiTest(Test):
return True
except CertCommandError as e:
print(e)
return False
return False
\ No newline at end of file
......@@ -24,7 +24,9 @@ from hwcompatible.command import Command, CertCommandError
class CDRomTest(Test):
"""
CDRom Test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["dvd+rw-tools", "genisoimage", "wodim", "util-linux"]
......@@ -36,12 +38,21 @@ class CDRomTest(Test):
self.test_dir = "/usr/share/doc"
def setup(self, args=None):
"""
The Setup before testing
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(args, "device", None)
self.type = self.get_type(self.device)
self.get_mode(self.type)
def test(self):
"""
Test case
:return:
"""
if not (self.method and self.device and self.type):
return False
......@@ -68,6 +79,11 @@ class CDRomTest(Test):
@staticmethod
def get_type(device):
"""
Get the type of CDROM
:param device:
:return:
"""
if not device:
return None
......@@ -77,17 +93,22 @@ class CDRomTest(Test):
for bd_type in bd_types:
if device.get_property("ID_CDROM_" + bd_type) == "1":
return bd_type
for bd_type in dvd_types:
if device.get_ertpropy("ID_CDROM_" + bd_type) == "1":
return bd_type
for bd_type in cd_types:
if device.get_property("ID_CDROM_" + bd_type) == "1":
return bd_type
for dvd_type in dvd_types:
if device.get_ertpropy("ID_CDROM_" + dvd_type) == "1":
return dvd_type
for cd_type in cd_types:
if device.get_property("ID_CDROM_" + cd_type) == "1":
return cd_type
print("Can not find pr)oper test-type for %s." % device.get_name())
return None
def get_mode(self, device_type):
"""
Get the read-write mode of CDROM
:param device_type:
:return:
"""
if not device_type:
return
......@@ -99,6 +120,10 @@ class CDRomTest(Test):
self.method = "read_test"
def rw_test(self):
"""
RW mode test of CDROM
:return:
"""
try:
devname = self.device.get_property("DEVNAME")
Command("umount %s" % devname).run(ignore_errors=True)
......@@ -127,6 +152,10 @@ class CDRomTest(Test):
return False
def write_test(self):
"""
Write mode test of CDROM
:return:
"""
try:
devname = self.device.get_property("DEVNAME")
Command("umount %s" % devname).run(ignore_errors=True)
......@@ -161,6 +190,10 @@ class CDRomTest(Test):
return False
def read_test(self):
"""
Read mode test of CDROM
:return:
"""
try:
devname = self.device.get_property("DEVNAME")
if os.path.exists("mnt_cdrom"):
......@@ -199,6 +232,12 @@ class CDRomTest(Test):
@staticmethod
def cmp_tree(dir1, dir2):
"""
Compare the differences between the two directories
:param dir1:
:param dir2:
:return:
"""
if not (dir1 and dir2):
print("Error: invalid input dir.")
return False
......@@ -210,6 +249,11 @@ class CDRomTest(Test):
return False
def reload_disc(self, device):
"""
Reloading the media
:param device:
:return:
"""
if not device:
return False
......
......@@ -15,14 +15,20 @@
import os
from hwcompatible.test import Test
from hwcompatible.command import Command
clock_dir = os.path.dirname(os.path.realpath(__file__))
class ClockTest(Test):
"""
Clock Test
"""
@staticmethod
def test():
"""
Clock test case
:return:
"""
return 0 == os.system("cd %s; ./clock" % clock_dir)
......
......@@ -20,7 +20,7 @@ from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
class CPU:
class CPU(object):
def __init__(self):
self.cpu = None
self.nums = None
......@@ -32,6 +32,10 @@ class CPU:
self.min_freq = None
def get_info(self):
"""
Get CPU info
:return:
"""
cmd = Command("lscpu")
try:
nums = cmd.get_str(r'^CPU\S*:\s+(?P<cpus>\d+)$', 'cpus', False)
......@@ -61,6 +65,12 @@ class CPU:
@staticmethod
def set_freq(freq, cpu='all'):
"""
Set CPU frequency
:param freq:
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq))
try:
cmd.run()
......@@ -71,6 +81,11 @@ class CPU:
@staticmethod
def get_freq(cpu):
"""
Get CPU frequency
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-info -w" % cpu)
try:
return int(cmd.get_str(r'.* frequency: (?P<freq>\d+) .*', 'freq', False))
......@@ -80,6 +95,12 @@ class CPU:
@staticmethod
def set_governor(governor, cpu='all'):
"""
Set the frequency governor mode of CPU
:param governor:
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor))
try:
cmd.run()
......@@ -90,6 +111,11 @@ class CPU:
@staticmethod
def get_governor(cpu):
"""
Get cpu governor
:param cpu:
:return:
"""
cmd = Command("cpupower -c %s frequency-info -p" % cpu)
try:
return cmd.get_str(r'.* governor "(?P<governor>\w+)".*', 'governor', False)
......@@ -99,6 +125,12 @@ class CPU:
@staticmethod
def find_path(parent_dir, target_name):
"""
Find the target path from the specified directory
:param parent_dir:
:param target_name:
:return:
"""
cmd = Command("find %s -name %s" % (parent_dir, target_name))
try:
cmd.run()
......@@ -109,15 +141,26 @@ class CPU:
class Load:
"""
Let a program run on a specific CPU
"""
def __init__(self, cpu):
self.cpu = cpu
self.process = Command("taskset -c {} python -u {}/cpufreq/cal.py".format(self.cpu, CertEnv.testdirectoy))
self.returncode = None
def run(self):
self.process.start() ## background
"""
Process started
:return:
"""
self.process.start() # background
def get_runtime(self):
"""
Get the running time of the process
:return:
"""
if not self.process:
return None
......@@ -131,6 +174,9 @@ class Load:
class CPUFreqTest(Test):
"""
CPU frequency test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ['util-linux', 'kernel-tools']
......@@ -138,6 +184,10 @@ class CPUFreqTest(Test):
self.original_governor = self.cpu.get_governor(0)
def test_userspace(self):
"""
userspace mode of testing CPU frequency
:return:
"""
target_cpu = randint(0, self.cpu.nums-1)
target_freq = randint(self.cpu.min_freq, self.cpu.max_freq)
if self.cpu.set_freq(target_freq, cpu=target_cpu) != 0:
......@@ -150,12 +200,12 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'userspace':
print("[X] The governor of CPU%s(%s) is not userspace." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
## min_freq -> max_runtime
# min_freq -> max_runtime
self.cpu.set_freq(self.cpu.min_freq)
load_list = []
runtime_list = []
......@@ -171,9 +221,9 @@ class CPUFreqTest(Test):
print("[X] Max average time is 0.")
return False
print("[.] Max average time of all CPUs userspace load test: %.2f" %
max_average_runtime)
max_average_runtime)
## max_freq -> min_runtime
# max_freq -> min_runtime
self.cpu.set_freq(self.cpu.max_freq)
load_list = []
runtime_list = []
......@@ -189,7 +239,7 @@ class CPUFreqTest(Test):
print("[X] Min average time is 0.")
return False
print("[.] Min average time of all CPUs userspace load test: %.2f" %
min_average_runtime)
min_average_runtime)
measured_speedup = 1.0 * max_average_runtime / min_average_runtime
expected_speedup = 1.0 * self.cpu.max_freq / self.cpu.min_freq
......@@ -198,14 +248,18 @@ class CPUFreqTest(Test):
max_speedup = expected_speedup + (expected_speedup - 1.0) * tolerance
if not min_speedup < measured_speedup < max_speedup:
print("[X] The speedup(%.2f) is not between %.2f and %.2f" %
(measured_speedup, min_speedup, max_speedup))
(measured_speedup, min_speedup, max_speedup))
return False
print("[.] The speedup(%.2f) is between %.2f and %.2f" %
(measured_speedup, min_speedup, max_speedup))
(measured_speedup, min_speedup, max_speedup))
return True
def test_ondemand(self):
"""
ondemand mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('powersave') != 0:
print("[X] Set governor of all CPUs to powersave failed.")
return False
......@@ -220,10 +274,10 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'ondemand':
print("[X] The governor of CPU%s(%s) is not ondemand." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
load_test = Load(target_cpu)
load_test.run()
......@@ -231,25 +285,29 @@ class CPUFreqTest(Test):
target_cpu_freq = self.cpu.get_freq(target_cpu)
if target_cpu_freq != self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s is scaling_max_freq(%d)." %
(target_cpu, target_cpu_freq))
(target_cpu, target_cpu_freq))
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s ondemand load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
if not target_cpu_freq <= self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not less equal than %d." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s(%d) is less equal than %d." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return True
def test_conservative(self):
"""
conservative mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('powersave') != 0:
print("[X] Set governor of all CPUs to powersave failed.")
return False
......@@ -264,7 +322,7 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'conservative':
print("[X] The governor of CPU%s(%s) is not conservative." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
......@@ -275,20 +333,24 @@ class CPUFreqTest(Test):
target_cpu_freq = self.cpu.get_freq(target_cpu)
if not self.cpu.min_freq < target_cpu_freq < self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not between %d~%d." %
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s(%d) is between %d~%d." %
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.min_freq, self.cpu.max_freq))
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s conservative load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
return True
def test_powersave(self):
"""
powersave mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('powersave') != 0:
print("[X] Set governor of all CPUs to powersave failed.")
return False
......@@ -298,15 +360,15 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'powersave':
print("[X] The governor of CPU%s(%s) is not powersave." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
target_cpu_freq = self.cpu.get_freq(target_cpu)
if target_cpu_freq != self.cpu.min_freq:
print("[X] The freq of CPU%s(%d) is not scaling_min_freq(%d)." %
(target_cpu, target_cpu_freq, self.cpu.min_freq))
(target_cpu, target_cpu_freq, self.cpu.min_freq))
return False
print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
......@@ -314,13 +376,17 @@ class CPUFreqTest(Test):
load_test.run()
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s powersave load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
return True
def test_performance(self):
"""
Performance mode of testing CPU frequency
:return:
"""
if self.cpu.set_governor('performance') != 0:
print("[X] Set governor of all CPUs to performance failed.")
return False
......@@ -330,15 +396,15 @@ class CPUFreqTest(Test):
target_cpu_governor = self.cpu.get_governor(target_cpu)
if target_cpu_governor != 'performance':
print("[X] The governor of CPU%s(%s) is not performance." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
return False
print("[.] The governor of CPU%s is %s." %
(target_cpu, target_cpu_governor))
(target_cpu, target_cpu_governor))
target_cpu_freq = self.cpu.get_freq(target_cpu)
if target_cpu_freq != self.cpu.max_freq:
print("[X] The freq of CPU%s(%d) is not scaling_max_freq(%d)." %
(target_cpu, target_cpu_freq, self.cpu.max_freq))
(target_cpu, target_cpu_freq, self.cpu.max_freq))
return False
print("[.] The freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
......@@ -346,16 +412,20 @@ class CPUFreqTest(Test):
load_test.run()
load_test_time = load_test.get_runtime()
print("[.] Time of CPU%s performance load test: %.2f" %
(target_cpu, load_test_time))
(target_cpu, load_test_time))
target_cpu_freq = self.cpu.get_freq(target_cpu)
print("[.] Current freq of CPU%s is %d." % (target_cpu, target_cpu_freq))
return True
def test(self):
"""
Test case
:return:
"""
if not self.cpu.get_info():
print("[X] Fail to get CPU info." \
" Please check if the CPU supports cpufreq.")
print("[X] Fail to get CPU info."
" Please check if the CPU supports cpufreq.")
return False
ret = True
......
......@@ -17,33 +17,47 @@ from hwcompatible.command import Command, CertCommandError
class IpmiTest(Test):
"""
Intelligent Platform Management Interface test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["OpenIPMI", "ipmitool"]
@staticmethod
def start_ipmi():
"""
Start IPMI test
:return:
"""
try:
Command("systemctl start ipmi").run()
Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False)
except CertCommandError as e:
except CertCommandError:
print("ipmi service cant't be started")
return False
return True
@staticmethod
def ipmitool():
cmd_list = ["ipmitool fru","ipmitool sensor"]
"""
Testing with iptool tools
:return:
"""
cmd_list = ["ipmitool fru", "ipmitool sensor"]
for cmd in cmd_list:
try:
Command(cmd).echo()
except CertCommandError as e:
except CertCommandError:
print("%s return error." % cmd)
return False
return True
def test(self):
"""
Test case
:return:
"""
if not self.start_ipmi():
return False
if not self.ipmitool():
......@@ -54,4 +68,3 @@ class IpmiTest(Test):
if __name__ == "__main__":
i = IpmiTest()
i.test()
......@@ -23,7 +23,9 @@ from hwcompatible.command import Command, CertCommandError
class KdumpTest(Test):
"""
Kdump Test
"""
def __init__(self):
Test.__init__(self)
self.pri = 9
......@@ -34,6 +36,10 @@ class KdumpTest(Test):
self.requirements = ["crash", "kernel-debuginfo", "kexec-tools"]
def test(self):
"""
Test case
:return:
"""
try:
Command("cat /proc/cmdline").get_str(r"crashkernel=[^\ ]*")
except (OSError, ValueError):
......@@ -75,11 +81,17 @@ class KdumpTest(Test):
return False
def verify_vmcore(self):
"""
Verify vmcore
:return:
"""
config = ConfigFile(self.kdump_conf)
if config.get_parameter("path"):
self.vmcore_path = config.get_parameter("path")
dir_pattern = re.compile(r"(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+(-|\.)[0-9]+(-|\.)[0-9]+)-(?P<time>[0-9]+:[0-9]+:[0-9]+)")
dir_pattern = re.compile(r'(?P<ipaddr>[0-9]+\.[0-9]+\.[0-9]+)-(?P<date>[0-9]+[-.][0-9]+[-.][0-9]+)-'
r'(?P<time>[0-9]+:[0-9]+:[0-9]+)')
vmcore_dirs = list()
for (root, dirs, files) in os.walk(self.vmcore_path):
for eve_dir in dirs:
......@@ -96,4 +108,3 @@ class KdumpTest(Test):
print("Error: could not verify kdump image %s" % vmcore_file)
print(e)
return False
......@@ -13,16 +13,16 @@
# Create: 2020-04-01
import os
import sys
import time
import re
import string
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
class MemoryTest(Test):
"""
Memory Test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["libhugetlbfs-utils"]
......@@ -36,10 +36,19 @@ class MemoryTest(Test):
self.retry_list = list()
self.test_dir = os.path.dirname(os.path.realpath(__file__))
def setup(self):
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.get_memory()
def test(self):
"""
test case
:return:
"""
if not self.memory_rw():
return False
......@@ -55,6 +64,10 @@ class MemoryTest(Test):
return True
def get_memory(self):
"""
Get memory
:return:
"""
proc_meminfo = open("/proc/meminfo", "r")
self.free_memory = 0
self.system_memory = 0
......@@ -86,6 +99,10 @@ class MemoryTest(Test):
self.free_memory = self.free_memory/1024
def memory_rw(self):
"""
Test memory request
:return:
"""
if not self.system_memory:
print("Error: get system memory fail.")
return False
......@@ -100,6 +117,10 @@ class MemoryTest(Test):
return True
def eat_memory(self):
"""
Eat memory test
:return:
"""
print("\nEat memory testing...")
print("System Memory: %u MB" % self.system_memory)
print("Free Memory: %u MB" % self.free_memory)
......@@ -124,6 +145,10 @@ class MemoryTest(Test):
return True
def hugetlb_test(self):
"""
hugetlb test
:return:
"""
print("\nHugetlb testing...")
self.get_memory()
print("HugePages Total: %u" % self.hugepage_total)
......@@ -135,10 +160,10 @@ class MemoryTest(Test):
if not self.hugepage_total:
os.system("hugeadm --create-mounts")
os.system("hugeadm --pool-pages-min %dMB:%d" %
(self.hugepage_size, self.huge_pages))
(self.hugepage_size, self.huge_pages))
elif self.hugepage_free < self.huge_pages:
os.system("hugeadm --pool-pages-min %dMB:%d" %
(self.hugepage_size, self.hugepage_total + self.huge_pages))
(self.hugepage_size, self.hugepage_total + self.huge_pages))
else:
update_hugepage = 0
......@@ -164,6 +189,10 @@ class MemoryTest(Test):
@staticmethod
def hot_plug_verify():
"""
Verify hot plug
:return:
"""
kernel = Command("uname -r").read()
config_file = "/boot/config-" + kernel
if not os.path.exists(config_file):
......@@ -175,6 +204,11 @@ class MemoryTest(Test):
return True
def hotplug_memory_test(self, memory_path):
"""
Hotplug memory test
:param memory_path:
:return:
"""
print("Keep %s online before test." % memory_path)
if not self.online_memory(memory_path):
return False
......@@ -201,25 +235,39 @@ class MemoryTest(Test):
@staticmethod
def online_memory(memory_path):
"""
Set memory online
:param memory_path:
:return:
"""
try:
Command("echo 1 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("online")
return True
except CertCommandError as e:
except CertCommandError:
print("Error: fail to online %s." % memory_path)
return False
@staticmethod
def offline_memory(memory_path):
"""
Set memory offline
:param memory_path:
:return:
"""
try:
Command("echo 0 > %s/online" % memory_path).run()
Command("cat %s/state" % memory_path).get_str("offline")
return True
except CertCommandError as e:
except CertCommandError:
print("Error: fail to online %s." % memory_path)
return False
def memory_hotplug(self):
"""
Memory hotplug test
:return:
"""
print("\nMemory hotplug testing...")
if not self.hot_plug_verify():
print("Warning: memory hotplug test skipped.")
......@@ -247,7 +295,7 @@ class MemoryTest(Test):
Command("cat %s/removable" % memory_path).get_str("1")
print("%s is removable, start testing..." % os.path.basename(memory_path))
test_flag = 1
except CertCommandError as e:
except CertCommandError:
continue
if not self.hotplug_memory_test(memory_path):
print("%s hotplug test fail." % os.path.basename(memory_path))
......
......@@ -15,13 +15,16 @@
import os
import argparse
from hwcompatible.test import Test
from hwcompatible.env import CertEnv
from hwcompatible.document import CertDocument
from rdma import RDMATest
class EthernetTest(RDMATest):
"""
Ethernet Test
"""
def __init__(self):
RDMATest.__init__(self)
self.args = None
......@@ -32,6 +35,10 @@ class EthernetTest(RDMATest):
self.target_bandwidth_percent = 0.75
def is_RoCE(self):
"""
Judge whether ethernet is roce
:return:
"""
path_netdev = ''.join(['/sys', self.device.get_property("DEVPATH")])
path_pci = path_netdev.split('net')[0]
cmd = "ls %s | grep -q infiniband" % path_pci
......@@ -39,6 +46,11 @@ class EthernetTest(RDMATest):
return 0 == os.system(cmd)
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(self.args, 'device', None)
self.interface = self.device.get_property("INTERFACE")
......@@ -50,7 +62,7 @@ class EthernetTest(RDMATest):
input = raw_input
except NameError:
from builtins import input
choice = input("[!] RoCE interface found. " \
choice = input("[!] RoCE interface found. "
"Run RDMA tests instead? [y/N] ")
if choice.lower() != "y":
return
......@@ -61,6 +73,10 @@ class EthernetTest(RDMATest):
self.test_eth_link, self.test_icmp, self.test_rdma]
def test(self):
"""
Test case
:return:
"""
for subtest in self.subtests:
if not subtest():
return False
......
......@@ -21,6 +21,9 @@ from rdma import RDMATest
class InfiniBandTest(RDMATest):
"""
InfiniBand Test
"""
def __init__(self):
RDMATest.__init__(self)
self.link_layer = 'InfiniBand'
......@@ -30,6 +33,10 @@ class InfiniBandTest(RDMATest):
self.target_bandwidth_percent = 0.5
def test_ib_link(self):
"""
IB Link test
:return:
"""
if 'LinkUp' not in self.phys_state:
print("[X] Device is not LinkUp.")
......
......@@ -13,7 +13,6 @@
# Create: 2020-04-01
import os
import re
import time
import argparse
import base64
......@@ -32,6 +31,9 @@ from hwcompatible.env import CertEnv
class NetworkTest(Test):
"""
Network Test
"""
def __init__(self):
Test.__init__(self)
self.args = None
......@@ -50,6 +52,11 @@ class NetworkTest(Test):
@staticmethod
def ifdown(interface):
"""
Judge whether the specified interface is closed successfully
:param interface:
:return:
"""
os.system("ip link set down %s" % interface)
for _ in range(5):
if 0 == os.system("ip link show %s | grep 'state DOWN'" % interface):
......@@ -59,6 +66,11 @@ class NetworkTest(Test):
@staticmethod
def ifup(interface):
"""
Judge whether the specified interface is enabled successfully
:param interface:
:return:
"""
os.system("ip link set up %s" % interface)
for _ in range(5):
time.sleep(1)
......@@ -68,6 +80,10 @@ class NetworkTest(Test):
@staticmethod
def get_other_interfaces():
"""
Get other interfaces
:return:
"""
ignore_interfaces = ['^lo', '^v', 'docker', 'br', 'bond']
cmd = "ip route show default | awk '/default/ {print $5}'"
c = Command(cmd)
......@@ -90,39 +106,59 @@ class NetworkTest(Test):
return []
def set_other_interfaces_down(self):
"""
Judge whether the interface is closed
:return:
"""
for interface in self.other_interfaces:
if not self.ifdown(interface):
return False
return True
def set_other_interfaces_up(self):
"""
Set other interfaces to up
:return:
"""
for interface in self.other_interfaces:
## Not ifup(), as some interfaces may not be linked
# Not ifup(), as some interfaces may not be linked
os.system("ip link set up %s" % interface)
# os.system("ip link | grep -w %s" % interface)
return True
def get_speed(self):
"""
Get speed on the interface
:return:
"""
c = Command("ethtool %s" % self.interface)
pattern = r".*Speed:\s+(?P<speed>\d+)Mb/s"
try:
speed = c.get_str(pattern, 'speed', False)
return int(speed)
except CertCommandError as e:
except CertCommandError:
print("[X] No speed found on the interface.")
return None
def get_interface_ip(self):
"""
Get interface ip
:return:
"""
c = Command("ip addr show %s" % self.interface)
pattern = r".*inet.? (?P<ip>.+)/.*"
try:
ip = c.get_str(pattern, 'ip', False)
return ip
except CertCommandError as e:
except CertCommandError:
print("[X] No available ip on the interface.")
return None
def test_icmp(self):
"""
Test ICMP
:return:
"""
count = 500
c = Command("ping -q -c %d -i 0 %s" % (count, self.server_ip))
pattern = r".*, (?P<loss>\d+\.{0,1}\d*)% packet loss.*"
......@@ -139,7 +175,14 @@ class NetworkTest(Test):
return False
def call_remote_server(self, cmd, act='start', ib_server_ip=''):
form = {}
"""
Call remote server
:param cmd:
:param act:
:param ib_server_ip:
:return:
"""
form = dict()
form['cmd'] = cmd
form['ib_server_ip'] = ib_server_ip
url = 'http://%s/api/%s' % (self.server_ip, act)
......@@ -158,6 +201,10 @@ class NetworkTest(Test):
return int(response.code) == 200
def test_udp_latency(self):
"""
Test udp latency
:return:
"""
cmd = "qperf %s udp_lat" % self.server_ip
print(cmd)
for _ in range(self.retries):
......@@ -174,6 +221,10 @@ class NetworkTest(Test):
return False
def test_tcp_bandwidth(self):
"""
Test tcp bandwidth
:return:
"""
cmd = "qperf %s tcp_bw" % self.server_ip
print(cmd)
c = Command(cmd)
......@@ -197,12 +248,20 @@ class NetworkTest(Test):
return False
def create_testfile(self):
"""
Create testfile
:return:
"""
bs = 128
count = self.speed/8
cmd = "dd if=/dev/urandom of=%s bs=%uk count=%u" % (self.testfile, bs, count)
return 0 == os.system(cmd)
def test_http_upload(self):
"""
Test http upload
:return:
"""
form = {}
size = os.path.getsize(self.testfile)
filename = os.path.basename(self.testfile)
......@@ -240,6 +299,10 @@ class NetworkTest(Test):
return True
def test_http_download(self):
"""
Test http download
:return:
"""
filename = os.path.basename(self.testfile)
url = "http://%s/files/%s" % (self.server_ip, filename)
......@@ -268,6 +331,10 @@ class NetworkTest(Test):
return True
def test_udp_tcp(self):
"""
Test udp tcp
:return:
"""
if not self.call_remote_server('qperf', 'start'):
print("[X] start qperf server failed.")
return False
......@@ -291,6 +358,10 @@ class NetworkTest(Test):
return True
def test_http(self):
"""
Test http
:return:
"""
print("[+] Creating testfile to upload...")
if not self.create_testfile():
print("[X] Create testfile failed.")
......@@ -309,6 +380,10 @@ class NetworkTest(Test):
return True
def test_eth_link(self):
"""
Test eth link
:return:
"""
self.other_interfaces = self.get_other_interfaces()
print("[+] Setting irrelevant interfaces down...")
if not self.set_other_interfaces_down():
......@@ -337,6 +412,10 @@ class NetworkTest(Test):
return True
def test_ip_info(self):
"""
Test ip info
:return:
"""
if not self.interface:
print("[X] No interface assigned.")
return False
......@@ -356,6 +435,11 @@ class NetworkTest(Test):
return True
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(self.args, 'device', None)
self.interface = self.device.get_property("INTERFACE")
......@@ -364,12 +448,20 @@ class NetworkTest(Test):
self.server_ip = self.cert.get_server()
def test(self):
"""
test case
:return:
"""
for subtest in self.subtests:
if not subtest():
return False
return True
def teardown(self):
"""
Environment recovery after test
:return:
"""
print("[.] Stop all test servers...")
self.call_remote_server('all', 'stop')
......
......@@ -16,7 +16,6 @@ import os
import re
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.document import CertDocument
from hwcompatible.env import CertEnv
......@@ -24,6 +23,9 @@ from network import NetworkTest
class RDMATest(NetworkTest):
"""
RDMA Test
"""
def __init__(self):
NetworkTest.__init__(self)
self.args = None
......@@ -47,6 +49,10 @@ class RDMATest(NetworkTest):
self.target_bandwidth_percent = 0.5
def get_ibstatus(self):
"""
Get ibstatus
:return:
"""
path_netdev = ''.join(['/sys', self.device.get_property("DEVPATH")])
path_pci = path_netdev.split('net')[0]
path_ibdev = 'infiniband_verbs/uverb*/ibdev'
......@@ -94,6 +100,10 @@ class RDMATest(NetworkTest):
return True
def test_rping(self):
"""
Test rping
:return:
"""
if not self.call_remote_server('rping', 'start', self.server_ip):
print("start rping server failed.")
return False
......@@ -107,6 +117,10 @@ class RDMATest(NetworkTest):
return False
def test_rcopy(self):
"""
Test rcopy
:return:
"""
if not self.call_remote_server('rcopy', 'start', self.server_ip):
print("start rcopy server failed.")
return False
......@@ -118,6 +132,11 @@ class RDMATest(NetworkTest):
return 0 == ret
def test_bw(self, cmd):
"""
Test bandwidth
:param cmd:
:return:
"""
if self.link_layer == 'Ethernet':
cmd = cmd + ' -R'
......@@ -130,7 +149,7 @@ class RDMATest(NetworkTest):
c = Command(cmd)
pattern = r"\s+(\d+)\s+(\d+)\s+([\.\d]+)\s+(?P<avg_bw>[\.\d]+)\s+([\.\d]+)"
try:
avg_bw = c.get_str(pattern, 'avg_bw', False) ## MB/sec
avg_bw = c.get_str(pattern, 'avg_bw', False) # MB/sec
avg_bw = float(avg_bw) * 8
tgt_bw = self.target_bandwidth_percent * self.speed
......@@ -143,6 +162,10 @@ class RDMATest(NetworkTest):
return False
def test_rdma(self):
"""
Test Remote Direct Memory Access
:return:
"""
print("[+] Testing rping...")
if not self.test_rping():
print("[X] Test rping failed.")
......@@ -176,6 +199,10 @@ class RDMATest(NetworkTest):
return True
def test_ibstatus(self):
"""
Test ibstatus
:return:
"""
if 0 != os.system("systemctl start opensm"):
print("[X] start opensm failed.")
return False
......@@ -191,6 +218,11 @@ class RDMATest(NetworkTest):
return True
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(self.args, 'device', None)
self.interface = self.device.get_property("INTERFACE")
......@@ -199,6 +231,10 @@ class RDMATest(NetworkTest):
self.server_ip = self.cert.get_server()
def test(self):
"""
test case
:return:
"""
try:
input = raw_input
except NameError:
......
......@@ -21,7 +21,9 @@ from hwcompatible.device import CertDevice, Device
class NvmeTest(Test):
"""
Test Non-Volatile Memory express
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["nvme-cli"]
......@@ -29,11 +31,20 @@ class NvmeTest(Test):
self.device = None
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(args, "device", None)
Command("nvme list").echo(ignore_errors=True)
def test(self):
"""
test case
:return:
"""
disk = self.device.get_name()
if self.in_use(disk):
print("%s is in use now, skip this test." % disk)
......@@ -78,6 +89,11 @@ class NvmeTest(Test):
@staticmethod
def in_use(disk):
"""
Determine whether the swapon is in use
:param disk:
:return:
"""
os.system("swapon -a 2>/dev/null")
swap_file = open("/proc/swaps", "r")
swap = swap_file.read()
......
......@@ -14,11 +14,13 @@
import re
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
class PerfTest(Test):
"""
Perf Test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["perf"]
......@@ -27,6 +29,10 @@ class PerfTest(Test):
self.perfReport = "perf report -i hwcompatible-perf.data --stdio"
def exec_perf(self):
"""
Execute perf command
:return:
"""
# record
print("Collecting the perf record using the command '%s'." % self.perfRecord)
perfRecordEcho = Command(self.perfRecord).read()
......@@ -56,6 +62,10 @@ class PerfTest(Test):
return True
def test(self):
"""
test case
:return:
"""
if not self.exec_perf():
return False
return True
......
......@@ -15,18 +15,19 @@
import os
import sys
import re
import shutil
import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.command import Command
from hwcompatible.sysinfo import SysInfo
from hwcompatible.env import CertEnv
from hwcompatible.document import Document
class SystemTest(Test):
"""
System Test
"""
def __init__(self):
Test.__init__(self)
self.pri = 1
......@@ -35,10 +36,19 @@ class SystemTest(Test):
self.logdir = None
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.logdir = getattr(args, "logdir", None)
def test(self):
"""
test case
:return:
"""
os.system("uname -a")
print("")
os.system("lsmod")
......@@ -60,6 +70,10 @@ class SystemTest(Test):
@staticmethod
def check_certrpm():
"""
Check installed cert package
:return:
"""
print("\nChecking installed cert package...")
return_code = True
for cert_package in ["oec-hardware"]:
......@@ -69,12 +83,16 @@ class SystemTest(Test):
sys.stdout.flush()
if rpm_verify.output and len(rpm_verify.output) > 0:
return_code = False
except OSError as err:
except OSError:
print("Error: files in %s have been tampered." % cert_package)
return_code = False
return return_code
def check_kernel(self):
"""
Check kernel
:return:
"""
print("\nChecking kernel...")
kernel_rpm = self.sysinfo.kernel_rpm
os_version = self.sysinfo.product + " " + self.sysinfo.get_version()
......@@ -113,16 +131,16 @@ class SystemTest(Test):
print(module)
print("")
if tainted & (1<<12):
if tainted & (1 << 12):
modules = self.get_modules("O")
print("Out-of-tree modules:")
for module in modules:
print(module)
#self.abi_check(module)
# self.abi_check(module)
return_code = False
print("")
if tainted & (1<<13):
if tainted & (1 << 13):
modules = self.get_modules("E")
print("Unsigned modules:")
for module in modules:
......@@ -152,6 +170,11 @@ class SystemTest(Test):
@staticmethod
def get_modules(sign):
"""
Get the module with signs character
:param sign:
:return:
"""
pattern = re.compile(r"^(?P<mod_name>\w+)[\s\S]+\((?P<signs>[A-Z]+)\)")
proc_modules = open("/proc/modules")
modules = list()
......@@ -164,6 +187,11 @@ class SystemTest(Test):
return modules
def abi_check(self, module):
"""
Check abi whitelist
:param module:
:return:
"""
whitelist_path = [("/lib/modules/kabi-current/kabi_whitelist_" + self.sysinfo.arch),
("/lib/modules/kabi/kabi_whitelist_" + self.sysinfo.arch),
("/usr/src/kernels/%s/kabi_whitelist" % self.sysinfo.kernel)
......@@ -212,42 +240,52 @@ class SystemTest(Test):
@staticmethod
def read_abi_whitelist(whitelist):
"""
Read abi whitelist
:param whitelist:
:return:
"""
symbols = list()
if not os.path.isfile(whitelist):
print("Error: Cannot read whitelist file")
return None
whitelistFile = open(whitelist,"r")
whitelistfile = open(whitelist, "r")
while True:
line = whitelistFile.readline()
line = whitelistfile.readline()
if line == "":
break
if line == "\n":
continue
line.split()
if line[0] == '[':
group=line[1:-2]
# group = line[1:-2]
continue
symbol=line.strip()
symbol = line.strip()
symbols.append(symbol)
return symbols
def read_module(self, module):
"""
Read module
:param module:
:return:
"""
symbols = list()
modulefile = self.get_modulefile(module)
module_file = self.get_modulefile(module)
if not modulefile:
if not module_file:
print("Error: Can not find module file for %s" % module)
return None
if not os.path.isfile(modulefile):
print("Error: Cannot read module file %s" % modulefile)
if not os.path.isfile(module_file):
print("Error: Cannot read module file %s" % module_file)
return None
if modulefile[-2:] == "ko":
nm = os.popen('modprobe --dump-modversions ' + modulefile)
if module_file[-2:] == "ko":
nm = os.popen('modprobe --dump-modversions ' + module_file)
else:
nm = open(modulefile,"r")
nm = open(module_file, "r")
while True:
line = nm.readline()
......@@ -260,6 +298,11 @@ class SystemTest(Test):
@staticmethod
def get_modulefile(module):
"""
Get module file
:param module:
:return:
"""
try:
modulefile = Command("modinfo -F filename %s" % module).get_str()
if os.path.islink(modulefile):
......@@ -271,6 +314,10 @@ class SystemTest(Test):
@staticmethod
def check_selinux():
"""
check selinux
:return:
"""
print("\nChecking selinux...")
status = os.system("/usr/sbin/sestatus | grep 'SELinux status' | grep -qw 'enabled'")
mode = os.system("/usr/sbin/sestatus | grep 'Current mode' | grep -qw 'enforcing'")
......
......@@ -16,11 +16,18 @@ import argparse
from hwcompatible.test import Test
from hwcompatible.command import Command, CertCommandError
from hwcompatible.device import CertDevice, Device
class TapeTest(Test):
"""
Tape test
"""
def setup(self, args=None):
"""
Initialization before test
:param args:
:return:
"""
self.args = args or argparse.Namespace()
self.device = getattr(args, "device", None)
self.tapeDevice = self.device.get_property("DEVNAME")
......@@ -30,6 +37,10 @@ class TapeTest(Test):
print("Found the Tape Device :\n %s" % self.tapeDevice)
def test(self):
"""
test case
:return:
"""
if not self.tapeDevice:
return False
......@@ -38,19 +49,19 @@ class TapeTest(Test):
bs = 64
# rewind the tape
try:
tapeRewind = Command("mt -f %s rewind 2>/dev/null" % self.tapeDevice).read()
print("Rewind tape : \n %s" % tapeRewind)
tape_rewind = Command("mt -f %s rewind 2>/dev/null" % self.tapeDevice).read()
print("Rewind tape : \n %s" % tape_rewind)
except CertCommandError as exception:
print(exception)
return False
# Write data
try:
tapeWriteData = Command("tar -Pcb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if tapeWriteData == 0:
tapewritedata = Command("tar -Pcb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if tapewritedata == 0:
print("Write data done. Start comparing ...")
# Compare data
compareData = Command("tar -Pdb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if compareData == 0:
comparedata = Command("tar -Pdb %s -f %s /usr" % (bs, self.tapeDevice)).read()
if comparedata == 0:
print("Tape test on device %s passed." % self.tapeDevice)
return True
else:
......
......@@ -12,24 +12,29 @@
# See the Mulan PSL v2 for more details.
# Create: 2020-04-01
import os
import sys
import time
from hwcompatible.test import Test
from hwcompatible.commandUI import CommandUI
from hwcompatible.command import Command, CertCommandError
from hwcompatible.device import CertDevice, Device
from hwcompatible.command import Command
from hwcompatible.device import CertDevice
class UsbTest(Test):
"""
Usb test
"""
def __init__(self):
Test.__init__(self)
self.requirements = ["usbutils"]
self.ui = CommandUI()
def test(self):
"""
Test case
:return:
"""
print("USB device:")
Command("lsusb -t").echo()
print("")
......@@ -92,7 +97,12 @@ class UsbTest(Test):
if self.ui.prompt_confirm("All usb sockets have been tested?"):
return True
def get_usb(self):
@staticmethod
def get_usb():
"""
Get usb
:return:
"""
devices = CertDevice().get_devices()
usb_devices = list()
for device in devices:
......
......@@ -22,7 +22,9 @@ from hwcompatible.command import Command, CertCommandError
class WatchDogTest(Test):
"""
WatchDog Test
"""
def __init__(self):
Test.__init__(self)
self.pri = 9
......@@ -32,12 +34,17 @@ class WatchDogTest(Test):
self.test_dir = os.path.dirname(os.path.realpath(__file__))
def test(self):
"""
test case
:return:
"""
if not os.path.exists("/dev/watchdog"):
os.system("modprobe softdog")
os.chdir(self.test_dir)
try:
timeout = Command("./watchdog -g").get_str(regex="^Watchdog timeout is (?P<timeout>[0-9]*) seconds.$",regex_group="timeout")
timeout = Command("./watchdog -g").get_str(regex="^Watchdog timeout is (?P<timeout>[0-9]*) seconds.$",
regex_group="timeout")
timeout = int(timeout)
if timeout > self.max_timeout:
Command("./watchdog -s %d" % self.max_timeout).echo()
......@@ -60,5 +67,9 @@ class WatchDogTest(Test):
@staticmethod
def startup():
"""
Initialization before test
:return:
"""
print("Recover from watchdog.")
return True
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册