提交 f38cd747 编写于 作者: B Balamuruhan S 提交者: GitHub

Merge pull request #937 from hjydxy/master

fix qmp_command.qmp_query-cpus
import logging
import re
from autotest.client.shared import utils
from autotest.client.shared import error
from avocado.core import exceptions
from avocado.utils import process
from virttest import utils_misc
from virttest import qemu_monitor
@error.context_aware
def run(test, params, env):
"""
Test qmp event notification, this case will:
......@@ -44,29 +43,29 @@ def run(test, params, env):
if result_check == "equal":
value = output
if value != str(qmp_o):
raise error.TestFail("QMP command return value does not match "
"the expect result. Expect result: '%s'\n"
"Actual result: '%s'" % (value, qmp_o))
raise exceptions.TestFail("QMP command return value does not match "
"the expect result. Expect result: '%s'\n"
"Actual result: '%s'" % (value, qmp_o))
elif result_check == "contain":
values = output.split(';')
for value in values:
if value in exception_list:
continue
if value.strip() not in str(qmp_o):
raise error.TestFail("QMP command output does not contain "
"expect result. Expect result: '%s'\n"
"Actual result: '%s'"
% (value, qmp_o))
raise exceptions.TestFail("QMP command output does not contain "
"expect result. Expect result: '%s'\n"
"Actual result: '%s'"
% (value, qmp_o))
elif result_check == "not_contain":
values = output.split(';')
for value in values:
if value in exception_list:
continue
if value in str(qmp_o):
raise error.TestFail("QMP command output contains unexpect"
" result. Unexpect result: '%s'\n"
"Actual result: '%s'"
% (value, qmp_o))
raise exceptions.TestFail("QMP command output contains unexpect"
" result. Unexpect result: '%s'\n"
"Actual result: '%s'"
% (value, qmp_o))
elif result_check == "m_equal_q":
msg = "QMP command ouput is not equal to in human monitor command."
msg += "\nQMP command output: '%s'" % qmp_o
......@@ -78,9 +77,9 @@ def run(test, params, env):
len_o = len(qmp_o)
if len(res) != len_o:
if res[0].startswith(' '):
raise error.TestFail("Human command starts with ' ', "
"there is probably some garbage in "
"the output.\n" + msg)
raise exceptions.TestFail("Human command starts with ' ', "
"there is probably some garbage in "
"the output.\n" + msg)
res_tmp = []
#(qemu)info block in RHEL7 divided into 3 lines
for line in res:
......@@ -90,7 +89,7 @@ def run(test, params, env):
res_tmp[-1] += line
res = res_tmp
if len(res) != len_o:
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
re_str = r'([^ \t\n\r\f\v=]*)=([^ \t\n\r\f\v=]*)'
for i in range(len(res)):
if qmp_cmd == "query-version":
......@@ -106,7 +105,7 @@ def run(test, params, env):
package = package.strip()
hmp_version = hmp_version.strip()
if version != hmp_version or package != hmp_package:
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
else:
matches = re.findall(re_str, res[i])
for key, val in matches:
......@@ -114,10 +113,21 @@ def run(test, params, env):
continue
if '0x' in val:
val = long(val, 16)
if val != qmp_o[i][key]:
msg += "\nValue in human monitor: '%s'" % val
val_str = str(bin(val))
com_str = ""
for p in range(3, len(val_str)):
if val_str[p] == '1':
com_str += '0'
else:
com_str += '1'
com_str = "0b" + com_str
value = eval(com_str) + 1
if val_str[2] == '1':
value = -value
if value != qmp_o[i][key]:
msg += "\nValue in human monitor: '%s'" % value
msg += "\nValue in qmp: '%s'" % qmp_o[i][key]
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
elif qmp_cmd == "query-block":
cmp_str = "u'%s': u'%s'" % (key, val)
cmp_s = "u'%s': %s" % (key, val)
......@@ -133,19 +143,19 @@ def run(test, params, env):
msg += ("\nCan not find '%s', '%s' or '%s' in "
" QMP command output."
% (cmp_s, cmp_str_b, cmp_str))
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
elif qmp_cmd == "query-balloon":
if (int(val) * 1024 * 1024 != qmp_o[key] and
val not in str(qmp_o[key])):
msg += ("\n'%s' is not in QMP command output"
% val)
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
else:
if (val not in str(qmp_o[i][key]) and
str(bool(int(val))) not in str(qmp_o[i][key])):
msg += ("\n'%s' is not in QMP command output"
% val)
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
elif result_check == "m_in_q":
res = output.splitlines(True)
msg = "Key value from human monitor command is not in"
......@@ -167,12 +177,12 @@ def run(test, params, env):
match_flag = True
for i in qmp_o:
if output is None:
raise error.TestError("QMP output pattern is missing")
raise exceptions.TestError("QMP output pattern is missing")
if re.match(output.strip(), str(i)) is None:
match_flag = False
if not match_flag:
msg = "Output does not match the pattern: '%s'" % output
raise error.TestFail(msg)
raise exceptions.TestFail(msg)
def qmp_cpu_check(output):
""" qmp_cpu test check """
......@@ -180,37 +190,37 @@ def run(test, params, env):
for out in output:
cpu = out.get('CPU')
if cpu is None:
raise error.TestFail("'CPU' index is missing in QMP output "
"'%s'" % out)
raise exceptions.TestFail("'CPU' index is missing in QMP output "
"'%s'" % out)
else:
current = out.get('current')
if current is None:
raise error.TestFail("'current' key is missing in QMP "
"output '%s'" % out)
raise exceptions.TestFail("'current' key is missing in QMP "
"output '%s'" % out)
elif cpu < last_cpu:
if current is False:
pass
else:
raise error.TestFail("Attribute 'current' should be "
"'False', but is '%s' instead.\n"
"'%s'" % (current, out))
raise exceptions.TestFail("Attribute 'current' should be "
"'False', but is '%s' instead.\n"
"'%s'" % (current, out))
elif cpu == last_cpu:
if current is True:
pass
else:
raise error.TestFail("Attribute 'current' should be "
"'True', but is '%s' instead.\n"
"'%s'" % (current, out))
raise exceptions.TestFail("Attribute 'current' should be "
"'True', but is '%s' instead.\n"
"'%s'" % (current, out))
elif cpu <= last_cpu:
continue
else:
raise error.TestFail("Incorrect CPU index '%s' (corrupted "
"or higher than no_cpus).\n%s"
% (cpu, out))
raise exceptions.TestFail("Incorrect CPU index '%s' (corrupted "
"or higher than no_cpus).\n%s"
% (cpu, out))
qemu_binary = utils_misc.get_qemu_binary(params)
if not utils_misc.qemu_has_option("qmp", qemu_binary):
raise error.TestNAError("Host qemu does not support qmp.")
raise exceptions.TestSkipError("Host qemu does not support qmp.")
vm = env.get_vm(params["main_vm"])
vm.verify_alive()
......@@ -219,20 +229,20 @@ def run(test, params, env):
module = params.get("modprobe_module")
if module:
error.context("modprobe the module %s" % module, logging.info)
logging.info("modprobe the module %s", module)
session.cmd("modprobe %s" % module)
qmp_ports = vm.get_monitors_by_type('qmp')
if qmp_ports:
qmp_port = qmp_ports[0]
else:
raise error.TestError("Incorrect configuration, no QMP monitor found.")
raise exceptions.TestError("Incorrect configuration, no QMP monitor found.")
hmp_ports = vm.get_monitors_by_type('human')
if hmp_ports:
hmp_port = hmp_ports[0]
else:
raise error.TestError("Incorrect configuration, no QMP monitor found.")
callback = {"host_cmd": utils.system_output,
raise exceptions.TestError("Incorrect configuration, no QMP monitor found.")
callback = {"host_cmd": process.system_output,
"guest_cmd": session.get_command_output,
"monitor_cmd": hmp_port.send_args_cmd,
"qmp_cmd": qmp_port.send_args_cmd}
......@@ -242,7 +252,7 @@ def run(test, params, env):
if cmd_type in callback.keys():
return callback[cmd_type](cmd)
else:
raise error.TestError("cmd_type is not supported")
raise exceptions.TestError("cmd_type is not supported")
pre_cmd = params.get("pre_cmd")
qmp_cmd = params.get("qmp_cmd")
......@@ -258,12 +268,12 @@ def run(test, params, env):
# Pre command
if pre_cmd is not None:
error.context("Run prepare command '%s'." % pre_cmd, logging.info)
logging.info("Run prepare command '%s'.", pre_cmd)
pre_o = send_cmd(pre_cmd)
logging.debug("Pre-command: '%s'\n Output: '%s'", pre_cmd, pre_o)
try:
# Testing command
error.context("Run qmp command '%s'." % qmp_cmd, logging.info)
logging.info("Run qmp command '%s'.", qmp_cmd)
output = qmp_port.send_args_cmd(qmp_cmd)
logging.debug("QMP command: '%s' \n Output: '%s'", qmp_cmd, output)
except qemu_monitor.QMPCmdError, err:
......@@ -273,24 +283,24 @@ def run(test, params, env):
if params.get("negative_check_pattern"):
check_pattern = params.get("negative_check_pattern")
if check_pattern not in str(err):
raise error.TestFail("'%s' not in exception '%s'"
% (check_pattern, err))
raise exceptions.TestFail("'%s' not in exception '%s'"
% (check_pattern, err))
else:
raise error.TestFail(err)
raise exceptions.TestFail(err)
except qemu_monitor.MonitorProtocolError, err:
raise error.TestFail(err)
raise exceptions.TestFail(err)
except Exception, err:
raise error.TestFail(err)
raise exceptions.TestFail(err)
# Post command
if post_cmd is not None:
error.context("Run post command '%s'." % post_cmd, logging.info)
logging.info("Run post command '%s'.", post_cmd)
post_o = send_cmd(post_cmd)
logging.debug("Post-command: '%s'\n Output: '%s'", post_cmd, post_o)
if result_check is not None:
txt = "Verify that qmp command '%s' works as designed." % qmp_cmd
error.context(txt, logging.info)
logging.info(txt)
if result_check == 'qmp_cpu':
qmp_cpu_check(output)
elif result_check == "equal" or result_check == "contain":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册