diff --git a/hwcompatible/command.py b/hwcompatible/command.py index 16577cef09e4a985dbf0a9dec3521f0bcc49faa0..731ac91ecc68a5c3836a476a4adb4d2314208e08 100755 --- a/hwcompatible/command.py +++ b/hwcompatible/command.py @@ -92,28 +92,48 @@ class Command: return def print_output(self): + """ + 结果显示 + :return: + """ if self.output: for line in self.output: - sys.stdout.write( line ) + sys.stdout.write(line) sys.stdout.write("\n") sys.stdout.flush() def print_errors(self): + """ + 页面显示错误信息 + :return: + """ if self.errors: for line in self.errors: - sys.stderr.write( line ) + sys.stderr.write(line) sys.stderr.write("\n") sys.stderr.flush() def pid(self): + """ + 获取管道pid值 + :return: + """ if self.pipe: return self.pipe.pid def readline(self): + """ + 按行读取输出信息 + :return + """ if self.pipe: return self.pipe.stdout.readline() def read(self): + """ + 执行命令,并读取结果 + :return: + """ self.pipe = subprocess.Popen(self.command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) @@ -124,7 +144,7 @@ class Command: if self.pipe: return self.pipe.poll() - def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False, ignore_errors=False): + def _get_str(self, regex=None, regex_group=None, single_line=True, return_list=False): self.regex = regex self.single_line = single_line self.regex_group = regex_group @@ -203,8 +223,10 @@ class Command: class CertCommandError(Exception): def __init__(self, command, message): + super(CertCommandError, self).__init__() self.message = message self.command = command + self.__message = None def __str__(self): return "\"%s\" %s" % (self.command.command, self.message) diff --git a/hwcompatible/compatibility.py b/hwcompatible/compatibility.py index fa6398708e706c516ba7a06bdc3c6f34ff24cd3c..a221a14ad3be337411f079a6eeaf84ddcb25d89e 100755 --- a/hwcompatible/compatibility.py +++ b/hwcompatible/compatibility.py @@ -29,7 +29,7 @@ from .reboot import Reboot from .client import Client -class EulerCertification(): +class EulerCertification: def __init__(self): self.certification = None @@ -50,12 +50,12 @@ class EulerCertification(): print("All cases are passed, test end.") return True - devices = certdevice.get_devices() - self.devices = DeviceDocument(CertEnv.devicefile, devices) + oec_devices = certdevice.get_devices() + self.devices = DeviceDocument(CertEnv.devicefile, oec_devices) self.devices.save() # test_factory format example: [{"name":"nvme", "device":device, "run":True, "status":"PASS", "reboot":False}] - test_factory = self.get_tests(devices) + test_factory = self.get_tests(oec_devices) self.update_factory(test_factory) if not self.choose_tests(): return True @@ -76,7 +76,7 @@ class EulerCertification(): reboot.clean() self.save(job) return True - except Exception as e: + except (IOError, OSError, TypeError) as e: print(e) return False @@ -86,7 +86,7 @@ class EulerCertification(): Command("rm -rf %s" % CertEnv.certificationfile).run() Command("rm -rf %s" % CertEnv.factoryfile).run() Command("rm -rf %s" % CertEnv.devicefile).run() - except Exception as e: + except CertCommandError as e: print(e) return False return True @@ -104,10 +104,10 @@ class EulerCertification(): factory_doc = FactoryDocument(CertEnv.factoryfile) self.test_factory = factory_doc.get_factory() - cert_id = self.certification.get_certify() + oec_id = self.certification.get_certify() hardware_info = self.certification.get_hardware() - self.client = Client(hardware_info, cert_id) - print(" Compatibility Test ID: ".ljust(30) + cert_id) + self.client = Client(hardware_info, oec_id) + print(" Compatibility Test ID: ".ljust(30) + oec_id) print(" Hardware Info: ".ljust(30) + hardware_info) print(" Product URL: ".ljust(30) + self.certification.get_url()) print(" OS Info: ".ljust(30) + self.certification.get_os()) @@ -127,7 +127,7 @@ class EulerCertification(): cwd = os.getcwd() os.chdir(os.path.dirname(doc_dir)) dir_name = "oech-" + datetime.datetime.now().strftime("%Y%m%d%H%M%S") + "-" + job.job_id - pack_name = dir_name +".tar" + pack_name = dir_name + ".tar" cmd = Command("tar -cf %s %s" % (pack_name, dir_name)) try: os.rename(job.job_id, dir_name) @@ -146,6 +146,7 @@ class EulerCertification(): def submit(self): packages = list() pattern = re.compile("^oech-[0-9]{14}-[0-9a-zA-Z]{10}.tar$") + files = [] for (root, dirs, files) in os.walk(CertEnv.datadirectory): break packages.extend(filter(pattern.search, files)) @@ -168,12 +169,17 @@ class EulerCertification(): def upload(self, path, server): print("Uploading...") if not self.client: - cert_id = self.certification.get_certify() + oec_id = self.certification.get_certify() hardware_info = self.certification.get_hardware() - self.client = Client(hardware_info, cert_id) + self.client = Client(hardware_info, oec_id) return self.client.upload(path, server) def get_tests(self, devices): + """ + get test items + :param devices: + :return: + """ nodevice = ["cpufreq", "memory", "clock", "profiler", "system", "stress", "kdump", "perf", "acpi", "watchdog"] ethernet = ["ethernet"] infiniband = ["infiniband"] @@ -214,9 +220,9 @@ class EulerCertification(): empty_device = Device() for device in devices: if device.get_property("SUBSYSTEM") == "usb" and \ - device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \ - ("2." in device.get_property("ID_MODEL_FROM_DATABASE") or \ - "3." in device.get_property("ID_MODEL_FROM_DATABASE")): + device.get_property("ID_VENDOR_FROM_DATABASE") == "Linux Foundation" and \ + ("2." in device.get_property("ID_MODEL_FROM_DATABASE") or + "3." in device.get_property("ID_MODEL_FROM_DATABASE")): sort_devices["usb"] = [empty_device] continue if device.get_property("PCI_CLASS") == "30000" or device.get_property("PCI_CLASS") == "38000": @@ -229,7 +235,7 @@ class EulerCertification(): sort_devices["tape"] = [device] continue if (device.get_property("DEVTYPE") == "disk" and not device.get_property("ID_TYPE")) or \ - device.get_property("ID_TYPE") == "disk": + device.get_property("ID_TYPE") == "disk": if "nvme" in device.get_property("DEVPATH"): sort_devices["disk"] = [empty_device] try: @@ -267,9 +273,9 @@ class EulerCertification(): continue if device.get_property("ID_CDROM") == "1": types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD", \ - "BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"] - for type in types: - if device.get_property("ID_CDROM_" + type) == "1": + "BD_RE", "BD_R", "BD", "CD_RW", "CD_R", "CD"] + for dev_type in types: + if device.get_property("ID_CDROM_" + dev_type) == "1": try: sort_devices["cdrom"].extend([device]) except KeyError: @@ -280,7 +286,7 @@ class EulerCertification(): try: Command("dmidecode").get_str("IPMI Device Information", single_line=False) sort_devices["ipmi"] = [empty_device] - except: + except OSError as e: pass return sort_devices @@ -313,15 +319,19 @@ class EulerCertification(): try: num = int(reply) - except: + except ValueError: continue if num > 0 and num <= len(self.test_factory): - self.test_factory[num-1]["run"] = not self.test_factory[num-1]["run"] + self.test_factory[num - 1]["run"] = not self.test_factory[num - 1]["run"] continue def show_tests(self): - print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \ + """ + show test items + :return: + """ + print("\033[1;35m" + "No.".ljust(4) + "Run-Now?".ljust(10) \ + "Status".ljust(8) + "Class".ljust(14) + "Device\033[0m") num = 0 for test in self.test_factory: @@ -334,24 +344,28 @@ class EulerCertification(): status = test["status"] device = test["device"].get_name() run = "no" - if test["run"] == True: + if test["run"] is True: run = "yes" num = num + 1 if status == "PASS": - print("%-6d"%num + run.ljust(8) + "\033[0;32mPASS \033[0m" \ - + name.ljust(14) + "%s"%device) + print("%-6d" % num + run.ljust(8) + "\033[0;32mPASS \033[0m" \ + + name.ljust(14) + "%s" % device) elif status == "FAIL": - print("%-6d"%num + run.ljust(8) + "\033[0;31mFAIL \033[0m" \ - + name.ljust(14) + "%s"%device) + print("%-6d" % num + run.ljust(8) + "\033[0;31mFAIL \033[0m" \ + + name.ljust(14) + "%s" % device) elif status == "Force": - print("%-6d"%num + run.ljust(8) + "\033[0;33mForce \033[0m" \ - + name.ljust(14) + "%s"%device) + print("%-6d" % num + run.ljust(8) + "\033[0;33mForce \033[0m" \ + + name.ljust(14) + "%s" % device) else: - print("%-6d"%num + run.ljust(8) + "\033[0;34mNotRun \033[0m" \ - + name.ljust(14) + "%s"%device) + print("%-6d" % num + run.ljust(8) + "\033[0;34mNotRun \033[0m" \ + + name.ljust(14) + "%s" % device) def choose_tests(self): + """ + 选择测试用例 + :return: + """ for test in self.test_factory: if test["status"] == "PASS": test["run"] = False @@ -396,7 +410,8 @@ class EulerCertification(): self.test_factory.sort(key=lambda k: k["name"]) FactoryDocument(CertEnv.factoryfile, self.test_factory).save() - def search_factory(self, obj_test, test_factory): + @staticmethod + def search_factory(obj_test, test_factory): for test in test_factory: if test["name"] == obj_test["name"] and test["device"].path == obj_test["device"].path: return True diff --git a/hwcompatible/device.py b/hwcompatible/device.py index 4355e6ba2cacb16cb3603244a49edba63d4ba037..88a666d540cf30c72ffa4c897aa0687bf99e32b7 100755 --- a/hwcompatible/device.py +++ b/hwcompatible/device.py @@ -28,6 +28,7 @@ def filter_char(str): filtered += str[start:] return filtered + class CertDevice: def __init__(self): self.devices = None @@ -60,12 +61,13 @@ class CertDevice: properties["INFO"] = attribute else: break - except Exception as e: + except OSError as e: print("Warning: get devices fail") print(e) self.devices.sort(key=lambda k: k.path) return self.devices + class Device: def __init__(self, properties=None): self.path = "" diff --git a/hwcompatible/document.py b/hwcompatible/document.py index f8725e6ee06f52c3b514ac84f3e17658ff33a9e3..bede231fec58eb21fa4bf7d94ee0ac3763b0d27d 100755 --- a/hwcompatible/document.py +++ b/hwcompatible/document.py @@ -21,8 +21,8 @@ from .sysinfo import SysInfo from .env import CertEnv -class Document(): - def __init__(self, filename, document=dict()): +class Document: + def __init__(self, filename, document={}): self.document = document self.filename = filename @@ -34,7 +34,7 @@ class Document(): with open(self.filename, "w+") as save_f: json.dump(self.document, save_f, indent=4) save_f.close() - except Exception as e: + except (IOError, ValueError) as e: print("Error: doc save fail.") print(e) return False @@ -46,11 +46,13 @@ class Document(): self.document = json.load(load_f) load_f.close() return True - except: + except (IOError, json.decoder.JSONDecodeError): return False + class CertDocument(Document): - def __init__(self, filename, document=dict()): + def __init__(self, filename, document={}): + super(CertDocument, self).__init__() self.document = dict() self.filename = filename if not document: @@ -74,7 +76,7 @@ class CertDocument(Document): self.document[key] = value else: break - except Exception as e: + except OSError as e: print("Error: get hardware info fail.") print(e) @@ -103,8 +105,10 @@ class CertDocument(Document): def get_kernel(self): return self.document["kernel"] + class DeviceDocument(Document): - def __init__(self, filename, devices=list()): + def __init__(self, filename, devices=[]): + super(DeviceDocument, self).__init__() self.filename = filename self.document = list() if not devices: @@ -113,8 +117,10 @@ class DeviceDocument(Document): for device in devices: self.document.append(device.properties) + class FactoryDocument(Document): - def __init__(self, filename, factory=list()): + def __init__(self, filename, factory=[]): + super(FactoryDocument, self).__init__() self.document = list() self.filename = filename if not factory: @@ -143,21 +149,22 @@ class FactoryDocument(Document): class ConfigFile: def __init__(self, filename): + super(ConfigFile, self).__init__() self.filename = filename self.parameters = dict() self.config = list() self.load() def load(self): - file = open(self.filename) - self.config = file.readlines() + fp = open(self.filename) + self.config = fp.readlines() for line in self.config: if line.strip() and line.strip()[0] == "#": continue words = line.strip().split(" ") if words[0]: self.parameters[words[0]] = " ".join(words[1:]) - file.close() + fp.close() def get_parameter(self, name): if self.parameters: @@ -199,7 +206,7 @@ class ConfigFile: self.save() def save(self): - file = open(self.filename, "w") + fp = open(self.filename, "w") for line in self.config: - file.write(line) - file.close() + fp.write(line) + fp.close() diff --git a/hwcompatible/job.py b/hwcompatible/job.py index 608850644acb4ec904c055eae805dea839b4346e..482b2291094187aa63552921d8b457702e056c94 100755 --- a/hwcompatible/job.py +++ b/hwcompatible/job.py @@ -50,7 +50,8 @@ class Job(object): for parameter_name, parameter_value in self.args.test_parameters: self.test_parameters[parameter_name] = parameter_value - def discover(self, testname, device, subtests_filter=None): + @staticmethod + def discover(testname, device, subtests_filter=None): if not testname: print("testname not specified, discover test failed") return None diff --git a/hwcompatible/log.py b/hwcompatible/log.py index cae0add773bf823475c2909631c897132cfcdfc5..c3c3725f2f5e318df897a1d2a840c269c171f158 100755 --- a/hwcompatible/log.py +++ b/hwcompatible/log.py @@ -50,6 +50,7 @@ class Log(object): self.log.close() self.log = None + class Logger(): def __init__(self, logname, logdir, out, err): self.log = Log(logname, logdir) diff --git a/hwcompatible/reboot.py b/hwcompatible/reboot.py index 1cf0875bc2f6d15a7c43de83a42dc7f39567daac..2506d3252ad938d7d70333bb038536f1131b65fc 100755 --- a/hwcompatible/reboot.py +++ b/hwcompatible/reboot.py @@ -63,7 +63,7 @@ class Reboot: try: Command("systemctl daemon-reload").run_quiet() Command("systemctl enable oech").run_quiet() - except: + except OSError as e: print("Error: enable oech.service fail.") return False @@ -81,7 +81,7 @@ class Reboot: self.job.job_id = self.reboot["job_id"] self.job.subtests_filter = self.reboot["rebootup"] time_reboot = datetime.datetime.strptime(self.reboot["time"], "%Y%m%d%H%M%S") - except: + except KeyError: print("Error: reboot file format not as expect.") return False diff --git a/hwcompatible/sysinfo.py b/hwcompatible/sysinfo.py index 458d90ca3f7331e116c51dbac88c45e5fc905f34..158c8d9ac48b9eadcae4ce9da2b5afcb99a9542a 100755 --- a/hwcompatible/sysinfo.py +++ b/hwcompatible/sysinfo.py @@ -35,7 +35,7 @@ class SysInfo: f = open(file) text = f.read() f.close() - except: + except IOError: print("Release file not found.") return diff --git a/server/server.py b/server/server.py index 26adee9a724c9b0ac6247d6ba8829f5a33e74284..566c4a95359eaedea74490bc908f1e184ade5be3 100755 --- a/server/server.py +++ b/server/server.py @@ -65,15 +65,23 @@ def get_results(): for host in next(os.walk(dir_results))[1]: dir_host = os.path.join(dir_results, host) results[host] = {} - for id in next(os.walk(dir_host))[1]: - dir_id = os.path.join(dir_host, id) + for oec_id in next(os.walk(dir_host))[1]: + dir_id = os.path.join(dir_host, oec_id) results[host][id] = next(os.walk(dir_id))[1] return render_template('results.html', results=results) -@app.route('/results///') -def get_job(host, id, job): - dir_job = os.path.join(dir_results, host, id, job) +@app.route('/results///') +def get_job(host, oec_id, job): + """ + 获取job信息 + :param host: + :param oec_id: + :param job: + :return: + """ + + dir_job = os.path.join(dir_results, host, oec_id, job) json_info = os.path.join(dir_job, 'compatibility.json') json_results = os.path.join(dir_job, 'factory.json') try: @@ -81,19 +89,27 @@ def get_job(host, id, job): info = json.load(f) with open(json_results, 'r') as f: results = json.load(f) - except Exception as e: + except (IOError, json.decoder.JSONDecodeError) as e: abort(404) - return render_template('job.html', host=host, id=id, job=job, info=info, results=results) - - -@app.route('/results////devices/') -def get_device(host, id, job, interface): - dir_job = os.path.join(dir_results, host, id, job) + return render_template('job.html', host=host, id=oec_id, job=job, info=info, results=results) + + +@app.route('/results////devices/') +def get_device(host, oec_id, job, interface): + """ + 获取硬件设备信息 + :param host: + :param oec_id: + :param job: + :param interface: + :return: + """ + dir_job = os.path.join(dir_results, host, oec_id, job) json_results = os.path.join(dir_job, 'factory.json') try: with open(json_results, 'r') as f: results = json.load(f) - except Exception as e: + except (IOError, json.decoder.JSONDecodeError) as e: abort(404) for testcase in results: device = testcase.get('device') @@ -103,44 +119,73 @@ def get_device(host, id, job, interface): abort(404) -@app.route('/results////devices') -def get_devices(host, id, job): - dir_job = os.path.join(dir_results, host, id, job) +@app.route('/results////devices') +def get_devices(host, oec_id, job): + """ + 获取设备信息 + :param host: + :param oec_id: + :param job: + :return: + """ + dir_job = os.path.join(dir_results, host, oec_id, job) json_devices = os.path.join(dir_job, 'device.json') try: with open(json_devices, 'r') as f: devices = json.load(f) - except Exception as e: + except (IOError, json.decoder.JSONDecodeError) as e: abort(404) return render_template('devices.html', devices=devices) -@app.route('/results////attachment') -def get_attachment(host, id, job): - dir_job = os.path.join(dir_results, host, id, job) +@app.route('/results////attachment') +def get_attachment(host, oec_id, job): + """ + 发送结果附件 + :param host: + :param oec_id: + :param job: + :return: + """ + dir_job = os.path.join(dir_results, host, oec_id, job) attachment = dir_job + '.tar.gz' filedir = os.path.dirname(attachment) filename = os.path.basename(attachment) return send_from_directory(filedir, filename, as_attachment=True) -@app.route('/results////logs/') -def get_log(host, id, job, name): - dir_job = os.path.join(dir_results, host, id, job) +@app.route('/results////logs/') +def get_log(host, oec_id, job, name): + """ + 获取日志 + :param host: + :param oec_id: + :param job: + :param name: + :return: + """ + dir_job = os.path.join(dir_results, host, oec_id, job) logpath = os.path.join(dir_job, name + '.log') if not os.path.exists(logpath): logpath = os.path.join(dir_job, 'job.log') try: with open(logpath, 'r') as f: log = f.read().split('\n') - except Exception as e: + except IOError as e: abort(404) return render_template('log.html', name=name, log=log) -@app.route('/results////submit') -def submit(host, id, job): - dir_job = os.path.join(dir_results, host, id, job) +@app.route('/results////submit') +def submit(host, oec_id, job): + """ + 提交测试结果 + :param host: + :param oec_id: + :param job: + :return: + """ + dir_job = os.path.join(dir_results, host, oec_id, job) tar_job = dir_job + '.tar.gz' json_cert = os.path.join(dir_job, 'compatibility.json') try: @@ -148,7 +193,7 @@ def submit(host, id, job): cert = json.load(f) with open(tar_job, 'rb') as f: attachment = base64.b64encode(f.read()) - except Exception as e: + except (IOError, json.decoder.JSONDecodeError) as e: print(e) abort(500) @@ -180,16 +225,19 @@ def submit(host, id, job): @app.route('/api/job/upload', methods=['GET', 'POST']) def upload_job(): + """ + 上传job + :return: + """ host = request.values.get('host', '').strip().replace(' ', '-') - id = request.values.get('id', '').strip().replace(' ', '-') + oec_id = request.values.get('id', '').strip().replace(' ', '-') job = request.values.get('job', '').strip().replace(' ', '-') filetext = request.values.get('filetext', '') - - if not(all([host, id, job, filetext])): + if not(all([host, oec_id, job, filetext])): return render_template('upload.html', host=host, id=id, job=job, - filetext=filetext, ret='Failed'), 400 + filetext=filetext, ret='Failed'), 400 - dir_job = os.path.join(dir_results, host, id, job) + dir_job = os.path.join(dir_results, host, oec_id, job) tar_job = dir_job + '.tar.gz' if not os.path.exists(dir_job): os.makedirs(dir_job) @@ -197,10 +245,10 @@ def upload_job(): with open(tar_job, 'wb') as f: f.write(base64.b64decode(filetext)) os.system("tar xf '%s' -C '%s'" % (tar_job, os.path.dirname(dir_job))) - except Exception as e: + except (IOError, OSError) as e: print(e) abort(400) - return render_template('upload.html', host=host, id=id, job=job, + return render_template('upload.html', host=host, id=oec_id, job=job, filetext=filetext, ret='Successful') @@ -229,7 +277,7 @@ def upload_file(): try: with open(filepath, 'wb') as f: f.write(base64.b64decode(filetext)) - except Exception as e: + except IOError as e: print(e) abort(400) return render_template('upload.html', filename=filename, filetext=filetext, @@ -303,7 +351,7 @@ def __get_ib_dev_port(ib_server_ip): ibport = str(ibport) return ibdev, ibport - except Exception as e: + except (OSError, IndexError, ValueError) as e: print(e) return None, None diff --git a/tests/acpi/acpi.py b/tests/acpi/acpi.py index 10bacfbb634f09aa348f519a5ac29571458f703f..0b4745e22f32e12264a965cdb6f6d9b99aab6b74 100755 --- a/tests/acpi/acpi.py +++ b/tests/acpi/acpi.py @@ -13,7 +13,7 @@ # Create: 2020-04-01 from hwcompatible.test import Test -from hwcompatible.command import Command +from hwcompatible.command import Command, CertCommandError class AcpiTest(Test): @@ -22,11 +22,12 @@ class AcpiTest(Test): Test.__init__(self) self.requirements = ["acpica-tools"] - def test(self): + @staticmethod + def test(): try: Command("acpidump").echo() return True - except Exception as e: + except CertCommandError as e: print(e) return False diff --git a/tests/cdrom/cdrom.py b/tests/cdrom/cdrom.py index dc104c784bdd289d0f74d0286ff71ae193a26be7..698d090f573beae1be8cbfd078f23f08913c40ba 100755 --- a/tests/cdrom/cdrom.py +++ b/tests/cdrom/cdrom.py @@ -31,6 +31,7 @@ class CDRomTest(Test): self.method = None self.device = None self.type = None + self.args = None self.ui = CommandUI() self.test_dir = "/usr/share/doc" @@ -65,33 +66,34 @@ class CDRomTest(Test): return False return True - def get_type(self, device): + @staticmethod + def get_type(device): if not device: return None bd_types = ["BD_RE", "BD_R", "BD"] dvd_types = ["DVD_RW", "DVD_PLUS_RW", "DVD_R", "DVD_PLUS_R", "DVD"] cd_types = ["CD_RW", "CD_R", "CD"] - for type in bd_types: - if device.get_property("ID_CDROM_" + type) == "1": - return type - for type in dvd_types: - if device.get_property("ID_CDROM_" + type) == "1": - return type - for type in cd_types: - if device.get_property("ID_CDROM_" + type) == "1": - return type + for bd_type in bd_types: + if device.get_property("ID_CDROM_" + bd_type) == "1": + return bd_type + for bd_type in dvd_types: + if device.get_ertpropy("ID_CDROM_" + bd_type) == "1": + return bd_type + for bd_type in cd_types: + if device.get_property("ID_CDROM_" + bd_type) == "1": + return bd_type print("Can not find pr)oper test-type for %s." % device.get_name()) return None - def get_mode(self, type): - if not type: + def get_mode(self, device_type): + if not device_type: return - if "RW" in type or "RE" in type: + if "RW" in device_type or "RE" in device_type: self.method = "rw_test" - elif "_R" in type: + elif "_R" in device_type: self.method = "write_test" else: self.method = "read_test" @@ -195,7 +197,8 @@ class CDRomTest(Test): print(e) return False - def cmp_tree(self, dir1, dir2): + @staticmethod + def cmp_tree(dir1, dir2): if not (dir1 and dir2): print("Error: invalid input dir.") return False @@ -216,14 +219,14 @@ class CDRomTest(Test): Command("eject %s" % device).run() print("tray ejected.") sys.stdout.flush() - except: + except CertCommandError as e: pass try: Command("eject -t %s" % device).run() print("tray auto-closed.\n") sys.stdout.flush() - except: + except CertCommandError as e: print("Could not auto-close the tray, please close the tray manually.") self.ui.prompt_confirm("Done well?") diff --git a/tests/clock/clock.py b/tests/clock/clock.py index c91cf9dffbc4fd4c481c4e363e4656746249fab9..7a304d8ccb09b91037603a87a413079a226d1897 100755 --- a/tests/clock/clock.py +++ b/tests/clock/clock.py @@ -21,7 +21,8 @@ clock_dir = os.path.dirname(os.path.realpath(__file__)) class ClockTest(Test): - def test(self): + @staticmethod + def test(): return 0 == os.system("cd %s; ./clock" % clock_dir) diff --git a/tests/cpufreq/cpufreq.py b/tests/cpufreq/cpufreq.py index d950cfbc063398da213c0a40bf86228503c0fd03..7dcf38d93830989746b7a631ee5992648a7a74ce 100644 --- a/tests/cpufreq/cpufreq.py +++ b/tests/cpufreq/cpufreq.py @@ -59,7 +59,8 @@ class CPU: return True - def set_freq(self, freq, cpu='all'): + @staticmethod + def set_freq(freq, cpu='all'): cmd = Command("cpupower -c %s frequency-set --freq %s" % (cpu, freq)) try: cmd.run() @@ -68,7 +69,8 @@ class CPU: print(e) return False - def get_freq(self, cpu): + @staticmethod + def get_freq(cpu): cmd = Command("cpupower -c %s frequency-info -w" % cpu) try: return int(cmd.get_str(r'.* frequency: (?P\d+) .*', 'freq', False)) @@ -76,7 +78,8 @@ class CPU: print(e) return False - def set_governor(self, governor, cpu='all'): + @staticmethod + def set_governor(governor, cpu='all'): cmd = Command("cpupower -c %s frequency-set --governor %s" % (cpu, governor)) try: cmd.run() @@ -85,7 +88,8 @@ class CPU: print(e) return False - def get_governor(self, cpu): + @staticmethod + def get_governor(cpu): cmd = Command("cpupower -c %s frequency-info -p" % cpu) try: return cmd.get_str(r'.* governor "(?P\w+)".*', 'governor', False) @@ -93,7 +97,8 @@ class CPU: print(e) return False - def find_path(self, parent_dir, target_name): + @staticmethod + def find_path(parent_dir, target_name): cmd = Command("find %s -name %s" % (parent_dir, target_name)) try: cmd.run() @@ -262,7 +267,7 @@ class CPUFreqTest(Test): (target_cpu, target_cpu_governor)) return False print("[.] The governor of CPU%s is %s." % - (target_cpu, target_cpu_governor)) + (target_cpu, target_cpu_governor)) load_test = Load(target_cpu) load_test.run() diff --git a/tests/disk/disk.py b/tests/disk/disk.py index 6efd442c551f08aa16e3b62ce85390bfdf8a0ad8..250e1777301f2fc0d65619674d5ade2b84377f4c 100755 --- a/tests/disk/disk.py +++ b/tests/disk/disk.py @@ -32,7 +32,7 @@ class DiskTest(Test): self.filesystems = ["ext4"] self.ui = CommandUI() - def setup(self, args=None): + def setup(self): try: print("Disk Info:") Command("fdisk -l").echo(ignore_errors=True) @@ -50,7 +50,7 @@ class DiskTest(Test): Command("cat /proc/mdstat").echo(ignore_errors=True) sys.stdout.flush() print("\n") - except Exception as e: + except CertCommandError as e: print("Warning: could not get disk info") print(e) @@ -195,7 +195,7 @@ class DiskTest(Test): if not self.do_fio(path, size, opts): return_code = False break - except Exception as e: + except CertCommandError as e: print(e) return_code = False break @@ -205,7 +205,8 @@ class DiskTest(Test): print("#############") return return_code - def do_fio(self, filepath, size, option): + @staticmethod + def do_fio(filepath, size, option): if os.path.isdir(filepath): file_opt = "-directory=%s" % filepath else: diff --git a/tests/ipmi/ipmi.py b/tests/ipmi/ipmi.py index 19be5d5e260f46067bd56ad860dea8f0d2b6ea4b..4f0b949f73745257678f058c057a79bfe2c644ed 100755 --- a/tests/ipmi/ipmi.py +++ b/tests/ipmi/ipmi.py @@ -13,7 +13,7 @@ # Create: 2020-04-01 from hwcompatible.test import Test -from hwcompatible.command import Command +from hwcompatible.command import Command, CertCommandError class IpmiTest(Test): @@ -22,21 +22,23 @@ class IpmiTest(Test): Test.__init__(self) self.requirements = ["OpenIPMI", "ipmitool"] - def start_ipmi(self): + @staticmethod + def start_ipmi(): try: Command("systemctl start ipmi").run() Command("systemctl status ipmi.service").get_str(regex="Active: active", single_line=False) - except: + except CertCommandError as e: print("ipmi service cant't be started") return False return True - def ipmitool(self): + @staticmethod + def ipmitool(): cmd_list = ["ipmitool fru","ipmitool sensor"] for cmd in cmd_list: try: Command(cmd).echo() - except: + except CertCommandError as e: print("%s return error." % cmd) return False return True @@ -48,6 +50,7 @@ class IpmiTest(Test): return False return True + if __name__ == "__main__": i = IpmiTest() i.test() diff --git a/tests/kdump/kdump.py b/tests/kdump/kdump.py index 1bcc8a3df973077593bceddf761eef64fa00c329..d60b3f8318a31c0dab9b4bb85b0fa488cf1fdc07 100755 --- a/tests/kdump/kdump.py +++ b/tests/kdump/kdump.py @@ -35,8 +35,8 @@ class KdumpTest(Test): def test(self): try: - Command("cat /proc/cmdline").get_str("crashkernel=[^\ ]*") - except: + Command("cat /proc/cmdline").get_str(r"crashkernel=[^\ ]*") + except (OSError, ValueError): print("Error: no crashkernel found.") return False @@ -53,7 +53,7 @@ class KdumpTest(Test): try: Command("systemctl restart kdump").run() Command("systemctl status kdump").get_str(regex="Active: active", single_line=False) - except: + except (OSError, ValueError): print("Error: kdump service not working.") return False @@ -79,12 +79,12 @@ class KdumpTest(Test): if config.get_parameter("path"): self.vmcore_path = config.get_parameter("path") - dir_pattern = re.compile("(?P[0-9]+\.[0-9]+\.[0-9]+)-(?P[0-9]+(-|\.)[0-9]+(-|\.)[0-9]+)-(?P