From 94bb99b8c9351d987343d51b8ea7f59bc9a39431 Mon Sep 17 00:00:00 2001 From: Qingtang Zhou Date: Wed, 8 May 2013 01:06:16 +0800 Subject: [PATCH] qemu.tests.cdrom: Fix coding style problem Signed-off-by: Qingtang Zhou --- qemu/tests/cdrom.py | 243 +++++++++++++++++++++++--------------------- 1 file changed, 129 insertions(+), 114 deletions(-) diff --git a/qemu/tests/cdrom.py b/qemu/tests/cdrom.py index 0e4a0ccb..515a3cc0 100644 --- a/qemu/tests/cdrom.py +++ b/qemu/tests/cdrom.py @@ -53,55 +53,56 @@ def run_cdrom(test, params, env): cdrom_prepare_timeout = int(params.get("cdrom_preapre_timeout", 360)) @error.context_aware - def create_cdrom(params, name, prepare=True, file_size=None): + def create_iso_image(params, name, prepare=True, file_size=None): """ - Creates 'new' cdrom with one file on it + Creates 'new' iso image with one file on it @param params: paramters for test - @param name: name of new cdrom file + @param name: name of new iso image file @param preapre: if True then it prepare cd images. - @param file_size: Size of CDrom in MB + @param file_size: Size of iso image in MB - @return: path to new cdrom file. + @return: path to new iso image file. """ - error.context("creating test cdrom", logging.info) + error.context("creating test iso image '%s'" % name, logging.info) cdrom_cd1 = params.get("cdrom_cd1") if not os.path.isabs(cdrom_cd1): cdrom_cd1 = os.path.join(data_dir.get_data_dir(), cdrom_cd1) - cdrom_dir = os.path.dirname(cdrom_cd1) + iso_image_dir = os.path.dirname(cdrom_cd1) if file_size is None: file_size = 10 - file_name = os.path.join(cdrom_dir, "%s.iso" % (name)) + file_name = os.path.join(iso_image_dir, "%s.iso" % (name)) if prepare: - utils.run("dd if=/dev/urandom of=%s bs=1M count=%d" % - (name, file_size)) + cmd = "dd if=/dev/urandom of=%s bs=1M count=%d" + utils.run(cmd % (name, file_size)) utils.run("mkisofs -o %s %s" % (file_name, name)) utils.run("rm -rf %s" % (name)) return file_name def cleanup_cdrom(path): - """ Removes created cdrom """ - error.context("cleaning up temp cdrom images", logging.info) + """ Removes created iso image """ + error.context("cleaning up temp iso image", logging.info) os.remove("%s" % path) - def get_cdrom_file(vm, device): + def get_cdrom_file(vm, qemu_cdrom_device): """ @param vm: VM object - @param device: qemu monitor device - @return: file associated with $device device + @param qemu_cdrom_device: qemu monitor device + @return: file associated with $qemu_cdrom_device device """ blocks = vm.monitor.info("block") cdfile = None if isinstance(blocks, str): - file_list = re.findall('%s: .*file=(\S*) ' % device, blocks) + tmp_re_str = r'%s: .*file=(\S*) ' % qemu_cdrom_device + file_list = re.findall(tmp_re_str, blocks) if file_list: cdfile = file_list[0] else: for block in blocks: - if block['device'] == device: + if block['device'] == qemu_cdrom_device: try: cdfile = block['inserted']['file'] break @@ -110,7 +111,7 @@ def run_cdrom(test, params, env): return cdfile - def _get_tray_stat_via_monitor(vm, cdrom): + def _get_tray_stat_via_monitor(vm, qemu_cdrom_device): """ Get the cdrom tray status via qemu monitor """ @@ -119,14 +120,14 @@ def run_cdrom(test, params, env): blocks = vm.monitor.info("block") if isinstance(blocks, str): for block in blocks.splitlines(): - if cdrom in block: + if qemu_cdrom_device in block: if "tray-open=1" in block: is_open, checked = (True, True) elif "tray-open=0" in block: is_open, checked = (False, True) else: for block in blocks: - if block['device'] == cdrom: + if block['device'] == qemu_cdrom_device: key = filter(lambda x: re.match(r"tray.*open", x), block.keys()) # compatible rhel6 and rhel7 diff qmp output @@ -136,12 +137,13 @@ def run_cdrom(test, params, env): return (is_open, checked) - def is_tray_opened(vm, cdrom, mode='monitor', dev_name="/dev/sr0"): + def is_tray_opened(vm, qemu_cdrom_device, mode='monitor', + dev_name="/dev/sr0"): """ Checks whether the tray is opend @param vm: VM object - @param cdrom: cdrom image file name. + @param qemu_cdrom_device: cdrom image file name. @param mode: tray status checking mode, now support: "monitor": get tray status from monitor. "session": get tray status from guest os. @@ -155,7 +157,7 @@ def run_cdrom(test, params, env): is_open, checked = (None, False) if mode in ['monitor', 'mixed']: - is_open, checked = _get_tray_stat_via_monitor(vm, cdrom) + is_open, checked = _get_tray_stat_via_monitor(vm, qemu_cdrom_device) if (mode in ['session', 'mixed']) and not checked: session = vm.wait_for_login(timeout=login_timeout) @@ -197,20 +199,21 @@ def run_cdrom(test, params, env): - def eject_cdrom(device, monitor): - """ Ejects the cdrom using kvm-monitor """ + def eject_cdrom(qemu_cdrom_device, monitor): + """ Ejects the medium using qemu-monitor """ if isinstance(monitor, qemu_monitor.HumanMonitor): - monitor.cmd("eject %s" % device) + monitor.cmd("eject %s" % qemu_cdrom_device) elif isinstance(monitor, qemu_monitor.QMPMonitor): - monitor.cmd("eject", args={'device': device}) + monitor.cmd("eject", args={'device': qemu_cdrom_device}) - def change_cdrom(device, target, monitor): - """ Changes the medium using kvm-monitor """ + def change_cdrom(qemu_cdrom_device, target, monitor): + """ Changes the medium using qemu-monitor """ if isinstance(monitor, qemu_monitor.HumanMonitor): - monitor.cmd("change %s %s" % (device, target)) + monitor.cmd("change %s %s" % (qemu_cdrom_device, target)) elif isinstance(monitor, qemu_monitor.QMPMonitor): - monitor.cmd("change", args={'device': device, 'target': target}) + args = {'device': qemu_cdrom_device, 'target': target} + monitor.cmd("change", args=args) @error.context_aware @@ -267,35 +270,36 @@ def run_cdrom(test, params, env): return device - def eject_test_via_monitor(vm, device, cdrom_dev, cdrom_orig, - cdrom_new, max_times): + def eject_test_via_monitor(vm, qemu_cdrom_device, guest_cdrom_device, + iso_image_orig, iso_image_new, max_times): """ Test cdrom eject function via qemu monitor. """ - error.context("Eject the cdrom in monitor %s times" % max_times, + error.context("Eject the iso image in monitor %s times" % max_times, logging.info) session = vm.wait_for_login(timeout=login_timeout) - cdrom = cdrom_orig + iso_image = iso_image_orig for i in range(1, max_times): - session.cmd('eject %s' % cdrom_dev) - eject_cdrom(device, vm.monitor) + session.cmd('eject %s' % guest_cdrom_device) + eject_cdrom(qemu_cdrom_device, vm.monitor) time.sleep(2) - if get_cdrom_file(vm, device) is not None: + if get_cdrom_file(vm, qemu_cdrom_device) is not None: raise error.TestFail("Device %s was not ejected" - " (round %s)" % (cdrom, i)) + " (round %s)" % (iso_image, i)) - cdrom = cdrom_new - # On even attempts, try to change the cdrom + iso_image = iso_image_new + # On even attempts, try to change the iso image if i % 2 == 0: - cdrom = cdrom_orig - change_cdrom(device, cdrom, vm.monitor) - if get_cdrom_file(vm, device) != cdrom: - raise error.TestFail("Could not change cdrom image %s" - " (round %s)" % (cdrom, i)) + iso_image = iso_image_orig + change_cdrom(qemu_cdrom_device, iso_image, vm.monitor) + if get_cdrom_file(vm, qemu_cdrom_device) != iso_image: + raise error.TestFail("Could not change iso image %s" + " (round %s)" % (iso_image, i)) time.sleep(workaround_eject_time) - def check_tray_status_test(vm, device, cdrom_dev, max_times): + def check_tray_status_test(vm, qemu_cdrom_device, guest_cdrom_device, + max_times): """ Test cdrom tray status reporting function. """ @@ -306,7 +310,7 @@ def run_cdrom(test, params, env): "deps/%s" % tray_check_src) vm.copy_files_to(tray_check_src, "/tmp") - if is_tray_opened(vm, device) is None: + if is_tray_opened(vm, qemu_cdrom_device) is None: logging.warn("Tray status reporting is not supported by qemu!") logging.warn("cdrom_test_tray_status test is skipped...") return @@ -315,53 +319,59 @@ def run_cdrom(test, params, env): logging.info) session = vm.wait_for_login(timeout=login_timeout) for i in range(1, max_times): - session.cmd('eject %s' % cdrom_dev) - if not is_tray_opened(vm, device): + session.cmd('eject %s' % guest_cdrom_device) + if not is_tray_opened(vm, qemu_cdrom_device): raise error.TestFail("Monitor reports tray closed" " when ejecting (round %s)" % i) - session.cmd('dd if=%s of=/dev/null count=1' % cdrom_dev) - if is_tray_opened(vm, device): + session.cmd('dd if=%s of=/dev/null count=1' % guest_cdrom_device) + if is_tray_opened(vm, qemu_cdrom_device): raise error.TestFail("Monitor reports tray opened when reading" " cdrom in guest (round %s)" % i) time.sleep(workaround_eject_time) - def check_tray_locked_test(vm, device, cdrom_dev): + def check_tray_locked_test(vm, qemu_cdrom_device, guest_cdrom_device): """ Test cdrom tray locked function. """ error.context("Check cdrom tray status after cdrom is locked", logging.info) session = vm.wait_for_login(timeout=login_timeout) - if is_tray_opened(vm, device, mode='mixed', dev_name=cdrom_dev) is None: + tmp_is_trap_open = is_tray_opened(vm, qemu_cdrom_device, mode='mixed', + dev_name=guest_cdrom_device) + if tmp_is_trap_open is None: logging.warn("Tray status reporting is not supported by qemu!") logging.warn("cdrom_test_locked test is skipped...") return eject_failed = False eject_failed_msg = "Tray should be closed even in locked status" - session.cmd('eject %s' % cdrom_dev) - if not is_tray_opened(vm, device, mode='mixed', dev_name=cdrom_dev): + session.cmd('eject %s' % guest_cdrom_device) + tmp_is_trap_open = is_tray_opened(vm, qemu_cdrom_device, mode='mixed', + dev_name=guest_cdrom_device) + if not tmp_is_trap_open: raise error.TestFail("Tray should not in closed status") - session.cmd('eject -i on %s' % cdrom_dev) + session.cmd('eject -i on %s' % guest_cdrom_device) try: - session.cmd('eject -t %s' % cdrom_dev) + session.cmd('eject -t %s' % guest_cdrom_device) except aexpect.ShellCmdError, e: eject_failed = True eject_failed_msg += ", eject command failed: %s" % str(e) - if (eject_failed - or is_tray_opened(vm, device, mode='mixed', dev_name=cdrom_dev)): + + tmp_is_trap_open = is_tray_opened(vm, qemu_cdrom_device, mode='mixed', + dev_name=guest_cdrom_device) + if (eject_failed or tmp_is_trap_open): raise error.TestFail(eject_failed_msg) - session.cmd('eject -i off %s' % cdrom_dev) - session.cmd('eject -t %s' % cdrom_dev) + session.cmd('eject -i off %s' % guest_cdrom_device) + session.cmd('eject -t %s' % guest_cdrom_device) - def file_operation_test(session, cdrom_dev, max_times): + def file_operation_test(session, guest_cdrom_device, max_times): """ Cdrom file operation test. """ error.context("Mounting the cdrom under /mnt", logging.info) - session.cmd("mount %s %s" % (cdrom_dev, "/mnt"), timeout=30) + session.cmd("mount %s %s" % (guest_cdrom_device, "/mnt"), timeout=30) filename = "new" @@ -378,16 +388,16 @@ def run_cdrom(test, params, env): error.context("Mount/Unmount cdrom for %s times" % max_times, logging.info) - for i in range(1, max_times): + for _ in range(1, max_times): try: - session.cmd("umount %s" % cdrom_dev) - session.cmd("mount %s /mnt" % cdrom_dev) + session.cmd("umount %s" % guest_cdrom_device) + session.cmd("mount %s /mnt" % guest_cdrom_device) except aexpect.ShellError, detail: logging.error("Mount/Unmount fail, detail: '%s'", detail) logging.debug(session.cmd("cat /etc/mtab")) raise - session.cmd("umount %s" % cdrom_dev) + session.cmd("umount %s" % guest_cdrom_device) # Test main body start. @@ -420,63 +430,67 @@ def run_cdrom(test, params, env): def test(self): if (not params.get("not_insert_at_start") or params.get("not_insert_at_start") == "no"): - self.cdrom_orig = create_cdrom(params, "orig") - self.cdrom_new = create_cdrom(params, "new") - self.cdrom_dir = os.path.dirname(self.cdrom_new) + self.iso_image_orig = create_iso_image(params, "orig") + self.iso_image_new = create_iso_image(params, "new") + self.cdrom_dir = os.path.dirname(self.iso_image_new) vm = env.get_vm(params["main_vm"]) vm.create() self.session = vm.wait_for_login(timeout=login_timeout) - if self.cdrom_orig and not os.path.isabs(self.cdrom_orig): - self.cdrom_orig = os.path.join(data_dir.get_data_dir(), - self.cdrom_orig) - cdrom = self.cdrom_orig - output = self.session.get_command_output("ls /dev/cdrom*") - cdrom_dev_list = re.findall("/dev/cdrom-\w+|/dev/cdrom\d*", output) - logging.debug("cdrom_dev_list: %s", cdrom_dev_list) + iso_image = self.iso_image_orig + error.context("Query cdrom devices in guest") + tmp_output = self.session.cmd_output("ls /dev/cdrom*") + tmp_re_str = r"/dev/cdrom-\w+|/dev/cdrom\d*" + guest_cdrom_device_list = re.findall(tmp_re_str, tmp_output) + logging.debug("guest_cdrom_device_list: '%s'", + guest_cdrom_device_list) + if params.get('not_insert_at_start') == "yes": error.context("Locked without media present", logging.info) - device = get_empty_cdrom_device() - cdrom_dev = cdrom_dev_list[0] - if vm.check_block_locked(device): - raise error.TestFail("Device should not be locked just after " - "booting up") - self.session.cmd("eject -i on %s" % cdrom_dev) - if not vm.check_block_locked(device): + #XXX: The device got from monitor might not match with the guest + # defice if there are multiple cdrom devices. + qemu_cdrom_device = get_empty_cdrom_device() + guest_cdrom_device = guest_cdrom_device_list[0] + if vm.check_block_locked(qemu_cdrom_device): + raise error.TestFail("Device should not be locked just" + " after booting up") + self.session.cmd("eject -i on %s" % guest_cdrom_device) + if not vm.check_block_locked(qemu_cdrom_device): raise error.TestFail("Device is not locked as expect.") return error.context("Detecting the existence of a cdrom (guest OS side)", logging.info) - cdrom_dev = "" + guest_cdrom_device = "" test_cmd = "dd if=%s of=/dev/null bs=1 count=1" - for d in cdrom_dev_list: + for dev in guest_cdrom_device_list: try: - output = self.session.cmd(test_cmd % d) - cdrom_dev = d + tmp_output = self.session.cmd(test_cmd % dev) + guest_cdrom_device = dev break except aexpect.ShellError: - logging.error(output) - if not cdrom_dev: + logging.error(tmp_output) + if not guest_cdrom_device: raise error.TestFail("Could not find a valid cdrom device") error.context("Detecting the existence of a cdrom (qemu side)", logging.info) - cdfile = cdrom - device = get_device(vm, cdfile) + qemu_cdrom_device = get_device(vm, iso_image) - self.session.get_command_output("umount %s" % cdrom_dev) + self.session.get_command_output("umount %s" % guest_cdrom_device) if params.get('cdrom_test_autounlock') == 'yes': error.context("Trying to unlock the cdrom", logging.info) - _f = lambda: not vm.check_block_locked(device) + _f = lambda: not vm.check_block_locked(qemu_cdrom_device) if not utils_misc.wait_for(_f, 300): raise error.TestFail("Device %s could not be" - " unlocked" % (device)) + " unlocked" % (qemu_cdrom_device)) + del _f max_times = int(params.get("max_times", 100)) if params.get("cdrom_test_eject") == "yes": - eject_test_via_monitor(vm, device, cdrom_dev, self.cdrom_orig, - self.cdrom_new, max_times) + eject_test_via_monitor(vm, qemu_cdrom_device, + guest_cdrom_device, self.iso_image_orig, + self.iso_image_new, max_times) if params.get('cdrom_test_tray_status') == 'yes': check_tray_status_test(vm, qemu_cdrom_device, @@ -491,25 +505,26 @@ def run_cdrom(test, params, env): max_test_times) error.context("Cleanup", logging.info) - # Return the self.cdrom_orig - cdfile = get_cdrom_file(vm, device) - if cdfile != self.cdrom_orig: + # Return the self.iso_image_orig + cdfile = get_cdrom_file(vm, qemu_cdrom_device) + if cdfile != self.iso_image_orig: time.sleep(workaround_eject_time) - self.session.cmd('eject %s' % cdrom_dev) - eject_cdrom(device, vm.monitor) - if get_cdrom_file(vm, device) is not None: - raise error.TestFail("Device %s was not ejected" % cdrom) - - change_cdrom(device, self.cdrom_orig, vm.monitor) - if get_cdrom_file(vm, device) != self.cdrom_orig: + self.session.cmd('eject %s' % guest_cdrom_device) + eject_cdrom(qemu_cdrom_device, vm.monitor) + if get_cdrom_file(vm, qemu_cdrom_device) is not None: + raise error.TestFail("Device %s was not ejected" + " in clearup stage" % qemu_cdrom_device) + + change_cdrom(qemu_cdrom_device, self.iso_image_orig, vm.monitor) + if get_cdrom_file(vm, qemu_cdrom_device) != self.iso_image_orig: raise error.TestFail("It wasn't possible to change" - " cdrom %s" % cdrom) + " cdrom %s" % iso_image) def clean(self): self.session.close() - cleanup_cdrom(self.cdrom_orig) - cleanup_cdrom(self.cdrom_new) + cleanup_cdrom(self.iso_image_orig) + cleanup_cdrom(self.iso_image_new) class Multihost(MiniSubtest): @@ -530,7 +545,7 @@ def run_cdrom(test, params, env): self.cdrom_size = int(params.get("cdrom_size", 10)) if self.is_src: - self.cdrom_orig = create_cdrom(params, "orig", + self.cdrom_orig = create_iso_image(params, "orig", file_size=self.cdrom_size) self.cdrom_dir = os.path.dirname(self.cdrom_orig) params["start_vm"] = "yes" @@ -540,7 +555,7 @@ def run_cdrom(test, params, env): vm = env.get_vm(self.vms[0]) vm.wait_for_login(timeout=login_timeout) else: - self.cdrom_orig = create_cdrom(params, "orig", False) + self.cdrom_orig = create_iso_image(params, "orig", False) self.cdrom_dir = os.path.dirname(self.cdrom_orig) @@ -629,7 +644,7 @@ def run_cdrom(test, params, env): super(test_multihost_ejecting, self).test() if self.is_src: # Starts in source - self.cdrom_new = create_cdrom(params, "new") + self.cdrom_new = create_iso_image(params, "new") vm = env.get_vm(self.vms[0]) session = vm.wait_for_login(timeout=login_timeout) output = session.get_command_output("ls /dev/cdrom*") -- GitLab