usb_storage.py 8.8 KB
Newer Older
L
Lucas Meneghel Rodrigues 已提交
1 2 3
import logging
import re
import uuid
4 5 6

import aexpect

7
from autotest.client.shared import error
8 9

from virttest import utils_test, utils_misc
10 11 12


@error.context_aware
13
def run(test, params, env):
14
    """
15
    Test usb storage devices in the guest.
16 17 18 19 20 21 22 23 24

    1) Create a image file by qemu-img
    2) Boot up a guest add this image as a usb device
    3) Check usb device information via monitor
    4) Check usb information by executing guest command
    5) Check usb serial option (optional)
    6) Check usb removable option (optional)
    7) Check usb min_io_size/opt_io_size option (optional)

L
Lucas Meneghel Rodrigues 已提交
25 26 27
    :param test: QEMU test object
    :param params: Dictionary with the test parameters
    :param env: Dictionary with test environment.
28 29 30 31 32 33
    """
    @error.context_aware
    def _verify_string(regex_str, string, expect_result, search_opt=0):
        """
        Verify USB storage device in monitor

L
Lucas Meneghel Rodrigues 已提交
34 35 36 37
        :param regex_str: Regex for checking command output
        :param string: The string which will be checked
        :param expect_result: The expected string
        :param search_opt: Search option for re module.
38 39
        """
        def _compare_str(act, exp, ignore_case):
40 41 42 43 44 45 46
            def str_func_1(x):
                return x

            def str_func_2(x):
                return x.lower()

            str_func = str_func_1
47
            if ignore_case:
48 49
                str_func = str_func_2

50 51 52 53 54 55 56 57 58
            if str_func(act) != str_func(exp):
                return ("Expected: '%s', Actual: '%s'" %
                        (str_func(exp), str_func(act)))
            return ""

        ignore_case = False
        if search_opt & re.I == re.I:
            ignore_case = True

59 60
        error.context("Finding matched sub-string with regex pattern '%s'" %
                      regex_str, logging.info)
61 62 63 64 65 66 67
        m = re.findall(regex_str, string, search_opt)
        if not m:
            logging.debug(string)
            raise error.TestError("Could not find matched sub-string")

        error.context("Verify matched string is same as expected")
        actual_result = m[0]
68 69 70 71 72 73
        if "removable" in regex_str:
            if actual_result in ["on", "yes", "true"]:
                actual_result = "on"
            if actual_result in ["off", "no", "false"]:
                actual_result = "off"

74 75 76
        fail_log = []
        if isinstance(actual_result, tuple):
            for i, v in enumerate(expect_result):
L
Lucas Meneghel Rodrigues 已提交
77
                ret = _compare_str(actual_result[i], v, ignore_case)
78 79 80
                if ret:
                    fail_log.append(ret)
        else:
L
Lucas Meneghel Rodrigues 已提交
81
            ret = _compare_str(actual_result, expect_result[0], ignore_case)
82 83 84 85 86 87 88 89 90
            if ret:
                fail_log.append(ret)

        if fail_log:
            logging.debug(string)
            raise error.TestFail("Could not find expected string:\n %s" %
                                 ("\n".join(fail_log)))

    def _do_io_test_guest(session):
91
        utils_test.run_virt_sub_test(test, params, env, "format_disk")
92 93 94 95 96 97 98

    @error.context_aware
    def _restart_vm(options):
        if vm.is_alive():
            vm.destroy()

        for option, value in options.iteritems():
99
            params[option] = value
100
        error.context("Restarting VM")
101
        vm.create(params=params)
102 103 104
        vm.verify_alive()

    def _login():
105
        return vm.wait_for_login(timeout=login_timeout)
106

107
    def _get_usb_disk_name_in_guest(session):
108 109 110 111 112 113 114 115 116
        def _get_output():
            cmd = "ls -l /dev/disk/by-path/* | grep usb"
            try:
                return session.cmd(cmd).strip()
            except aexpect.ShellCmdError:
                return ""

        output = utils_misc.wait_for(_get_output, login_timeout, step=5,
                                     text="Wait for getting USB disk name")
117 118 119 120 121
        devname = re.findall("sd\w", output)
        if devname:
            return devname[0]
        return "sda"

122 123 124
    @error.context_aware
    def _check_serial_option(serial, regex_str, expect_str):
        error.context("Set serial option to '%s'" % serial, logging.info)
125
        _restart_vm({"blk_extra_params_stg": "serial=" + serial})
126 127 128 129 130 131 132 133

        error.context("Check serial option in monitor", logging.info)
        output = str(vm.monitor.info("qtree"))
        _verify_string(regex_str, output, [expect_str], re.S)

        error.context("Check serial option in guest", logging.info)
        session = _login()
        output = session.cmd("lsusb -v")
134
        if serial not in ["EMPTY_STRING", "NO_EQUAL_STRING"]:
135 136 137 138 139 140 141 142 143 144 145 146 147
            # Verify in guest when serial is set to empty/null is meaningless.
            _verify_string(serial, output, [serial])
        _do_io_test_guest(session)

        session.close()

    @error.context_aware
    def _check_removable_option(removable, expect_str):
        error.context("Set removable option to '%s'" % removable, logging.info)
        _restart_vm({"removable_stg": removable})

        error.context("Check removable option in monitor", logging.info)
        output = str(vm.monitor.info("qtree"))
148
        regex_str = 'usb-storage.*?removable = (.*?)\s'
149 150 151 152
        _verify_string(regex_str, output, [removable], re.S)

        error.context("Check removable option in guest", logging.info)
        session = _login()
153
        cmd = "dmesg | grep %s" % _get_usb_disk_name_in_guest(session)
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
        output = session.cmd(cmd)
        _verify_string(expect_str, output, [expect_str], re.I)
        _do_io_test_guest(session)

        session.close()

    @error.context_aware
    def _check_io_size_option(min_io_size="512", opt_io_size="0"):
        error.context("Set min_io_size to %s, opt_io_size to %s" %
                      (min_io_size, opt_io_size), logging.info)
        opt = {}
        opt["min_io_size_stg"] = min_io_size
        opt["opt_io_size_stg"] = opt_io_size

        _restart_vm(opt)

        error.context("Check min/opt io_size option in monitor", logging.info)
        output = str(vm.monitor.info("qtree"))
        regex_str = "usb-storage.*?min_io_size = (\d+).*?opt_io_size = (\d+)"
        _verify_string(regex_str, output, [min_io_size, opt_io_size], re.S)

        error.context("Check min/opt io_size option in guest", logging.info)
        session = _login()
177
        d = _get_usb_disk_name_in_guest(session)
178 179 180 181 182 183 184 185 186
        cmd = ("cat /sys/block/%s/queue/{minimum,optimal}_io_size" % d)

        output = session.cmd(cmd)
        # Note: If set min_io_size = 0, guest min_io_size would be set to
        # 512 by default.
        if min_io_size != "0":
            expected_min_size = min_io_size
        else:
            expected_min_size = "512"
L
Lucas Meneghel Rodrigues 已提交
187 188
        _verify_string(
            "(\d+)\n(\d+)", output, [expected_min_size, opt_io_size])
189 190 191 192 193 194 195
        _do_io_test_guest(session)

        session.close()

    vm = env.get_vm(params["main_vm"])
    vm.verify_alive()

196
    login_timeout = int(params.get("login_timeout", 360))
197 198 199 200 201 202 203 204
    error.context("Check usb device information in monitor", logging.info)
    output = str(vm.monitor.info("usb"))
    if "Product QEMU USB MSD" not in output:
        logging.debug(output)
        raise error.TestFail("Could not find mass storage device")

    error.context("Check usb device information in guest", logging.info)
    session = _login()
205
    output = session.cmd(params["chk_usb_info_cmd"])
206
    # No bus specified, default using "usb.0" for "usb-storage"
207
    for i in params["chk_usb_info_keyword"].split(","):
208 209 210 211 212 213 214
        _verify_string(i, output, [i])
    _do_io_test_guest(session)
    session.close()

    if params.get("check_serial_option") == "yes":
        error.context("Check usb serial option", logging.info)
        serial = str(uuid.uuid4())
215
        regex_str = 'usb-storage.*?serial = "(.*?)"\s'
216 217 218 219 220 221
        _check_serial_option(serial, regex_str, serial)

        logging.info("Check this option with some illegal string")
        logging.info("Set usb serial to a empty string")
        # An empty string, ""
        serial = "EMPTY_STRING"
222
        regex_str = 'usb-storage.*?serial = (.*?)\s'
223 224 225 226
        _check_serial_option(serial, regex_str, '""')

        logging.info("Leave usb serial option blank")
        serial = "NO_EQUAL_STRING"
227
        regex_str = 'usb-storage.*?serial = (.*?)\s'
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
        _check_serial_option(serial, regex_str, '"on"')

    if params.get("check_removable_option") == "yes":
        error.context("Check usb removable option", logging.info)
        removable = "on"
        expect_str = "Attached SCSI removable disk"
        _check_removable_option(removable, expect_str)

        removable = "off"
        expect_str = "Attached SCSI disk"
        _check_removable_option(removable, expect_str)

    if params.get("check_io_size_option") == "yes":
        error.context("Check usb min/opt io_size option", logging.info)
        _check_io_size_option("0", "0")
        # Guest can't recognize correct value which we set now,
        # So comment these test temporary.
        #_check_io_size_option("1024", "1024")
        #_check_io_size_option("4096", "4096")