提交 caee7ea9 编写于 作者: L Lukáš Doktor 提交者: Lucas Meneghel Rodrigues

qemu.tests.virtio_console: Prevent infinite loop in some test on failure

Some tests were voulnurable to infinite loop due of not finished
background thread. Use funcatexit to register function, which sets
the exit_event and causes the background threads to finish.
Signed-off-by: NLukáš Doktor <ldoktor@redhat.com>
上级 f3e1d9ba
...@@ -19,6 +19,19 @@ from subprocess import Popen ...@@ -19,6 +19,19 @@ from subprocess import Popen
from autotest.client import utils from autotest.client import utils
from autotest.client.shared import error from autotest.client.shared import error
from virttest import qemu_virtio_port, env_process, utils_test, utils_misc from virttest import qemu_virtio_port, env_process, utils_test, utils_misc
from virttest import funcatexit
EXIT_EVENT = threading.Event()
def __set_exit_event():
"""
Sets global EXIT_EVENT
:note: Used in cleanup by funcatexit in some tests
"""
logging.warn("Executing __set_exit_event()")
EXIT_EVENT.set()
@error.context_aware @error.context_aware
...@@ -628,17 +641,18 @@ def run_virtio_console(test, params, env): ...@@ -628,17 +641,18 @@ def run_virtio_console(test, params, env):
guest_worker.cmd("virt.loopback(['%s'], [%s], %d, virt.LOOP_POLL)" guest_worker.cmd("virt.loopback(['%s'], [%s], %d, virt.LOOP_POLL)"
% (send_pt.name, tmp, buf_len[-1]), 10) % (send_pt.name, tmp, buf_len[-1]), 10)
exit_event = threading.Event() global EXIT_EVENT
funcatexit.register(env, params.get('type'), __set_exit_event)
# TEST # TEST
thread = qemu_virtio_port.ThSendCheck(send_pt, exit_event, queues, thread = qemu_virtio_port.ThSendCheck(send_pt, EXIT_EVENT, queues,
buf_len[0]) buf_len[0])
thread.start() thread.start()
threads.append(thread) threads.append(thread)
for i in range(len(recv_pts)): for i in range(len(recv_pts)):
thread = qemu_virtio_port.ThRecvCheck(recv_pts[i], queues[i], thread = qemu_virtio_port.ThRecvCheck(recv_pts[i], queues[i],
exit_event, buf_len[i + 1]) EXIT_EVENT, buf_len[i + 1])
thread.start() thread.start()
threads.append(thread) threads.append(thread)
...@@ -666,7 +680,8 @@ def run_virtio_console(test, params, env): ...@@ -666,7 +680,8 @@ def run_virtio_console(test, params, env):
break break
time.sleep(1) time.sleep(1)
exit_event.set() EXIT_EVENT.set()
funcatexit.unregister(env, params.get('type'), __set_exit_event)
# TEST END # TEST END
workaround_unfinished_threads = False workaround_unfinished_threads = False
logging.debug('Joining %s', threads[0]) logging.debug('Joining %s', threads[0])
...@@ -700,12 +715,10 @@ def run_virtio_console(test, params, env): ...@@ -700,12 +715,10 @@ def run_virtio_console(test, params, env):
for thread in threads: for thread in threads:
if thread.isAlive(): if thread.isAlive():
vm.destroy() vm.destroy()
del exit_event
del threads[:] del threads[:]
raise error.TestError("Not all threads finished.") raise error.TestError("Not all threads finished.")
if workaround_unfinished_threads: if workaround_unfinished_threads:
logging.debug("All threads finished at this point.") logging.debug("All threads finished at this point.")
del exit_event
del threads[:] del threads[:]
if not vm.is_alive(): if not vm.is_alive():
raise error.TestFail("VM died, can't continue the test loop. " raise error.TestFail("VM died, can't continue the test loop. "
...@@ -899,7 +912,6 @@ def run_virtio_console(test, params, env): ...@@ -899,7 +912,6 @@ def run_virtio_console(test, params, env):
"RECONNECT_NONE)" "RECONNECT_NONE)"
% (send_pt.name, recv_pt.name, buflen), 10) % (send_pt.name, recv_pt.name, buflen), 10)
exit_event = threading.Event()
send_resume_ev = None send_resume_ev = None
recv_resume_ev = None recv_resume_ev = None
...@@ -949,6 +961,8 @@ def run_virtio_console(test, params, env): ...@@ -949,6 +961,8 @@ def run_virtio_console(test, params, env):
raise error.TestNAError("virtio_console_interruption = '%s' " raise error.TestNAError("virtio_console_interruption = '%s' "
"is unknown." % interruption) "is unknown." % interruption)
funcatexit.register(env, params.get('type'), __set_exit_event)
threads.append( threads.append(
qemu_virtio_port.ThSendCheck(send_pt, exit_event, queues, qemu_virtio_port.ThSendCheck(send_pt, exit_event, queues,
buflen, send_resume_ev)) buflen, send_resume_ev))
...@@ -1005,7 +1019,7 @@ def run_virtio_console(test, params, env): ...@@ -1005,7 +1019,7 @@ def run_virtio_console(test, params, env):
vm.monitor.info('qtree') vm.monitor.info('qtree')
except Exception, inst: except Exception, inst:
logging.warn("Failed to get info from qtree: %s", inst) logging.warn("Failed to get info from qtree: %s", inst)
exit_event.set() EXIT_EVENT.set()
vm.verify_kernel_crash() vm.verify_kernel_crash()
raise error.TestFail('No data transferred after' raise error.TestFail('No data transferred after'
'interruption.') 'interruption.')
...@@ -1015,7 +1029,8 @@ def run_virtio_console(test, params, env): ...@@ -1015,7 +1029,8 @@ def run_virtio_console(test, params, env):
inst) inst)
error.context("Stopping loopback", logging.info) error.context("Stopping loopback", logging.info)
exit_event.set() EXIT_EVENT.set()
funcatexit.unregister(env, params.get('type'), __set_exit_event)
workaround_unfinished_threads = False workaround_unfinished_threads = False
threads[0].join(5) threads[0].join(5)
if threads[0].isAlive(): if threads[0].isAlive():
...@@ -1050,13 +1065,11 @@ def run_virtio_console(test, params, env): ...@@ -1050,13 +1065,11 @@ def run_virtio_console(test, params, env):
for thread in threads: for thread in threads:
if thread.isAlive(): if thread.isAlive():
vm.destroy() vm.destroy()
del exit_event
del threads[:] del threads[:]
raise error.TestError("Not all threads finished.") raise error.TestError("Not all threads finished.")
if workaround_unfinished_threads: if workaround_unfinished_threads:
logging.debug("All threads finished at this point.") logging.debug("All threads finished at this point.")
del exit_event
del threads[:] del threads[:]
cleanup(env.get_vm(params["main_vm"]), guest_worker) cleanup(env.get_vm(params["main_vm"]), guest_worker)
...@@ -1130,13 +1143,14 @@ def run_virtio_console(test, params, env): ...@@ -1130,13 +1143,14 @@ def run_virtio_console(test, params, env):
for _ in range(buf_len): for _ in range(buf_len):
data += "%c" % random.randrange(255) data += "%c" % random.randrange(255)
exit_event = threading.Event() funcatexit.register(env, params.get('type'), __set_exit_event)
time_slice = float(duration) / 100 time_slice = float(duration) / 100
# HOST -> GUEST # HOST -> GUEST
guest_worker.cmd('virt.loopback(["%s"], [], %d, virt.LOOP_NONE)' guest_worker.cmd('virt.loopback(["%s"], [], %d, virt.LOOP_NONE)'
% (port.name, buf_len), 10) % (port.name, buf_len), 10)
thread = qemu_virtio_port.ThSend(port.sock, data, exit_event) thread = qemu_virtio_port.ThSend(port.sock, data, EXIT_EVENT)
stats = array.array('f', []) stats = array.array('f', [])
loads = utils.SystemLoad([(os.getpid(), 'autotest'), loads = utils.SystemLoad([(os.getpid(), 'autotest'),
(vm.get_pid(), 'VM'), 0]) (vm.get_pid(), 'VM'), 0])
...@@ -1150,7 +1164,7 @@ def run_virtio_console(test, params, env): ...@@ -1150,7 +1164,7 @@ def run_virtio_console(test, params, env):
_time = time.time() - _time - duration _time = time.time() - _time - duration
logging.info("\n" + loads.get_cpu_status_string()[:-1]) logging.info("\n" + loads.get_cpu_status_string()[:-1])
logging.info("\n" + loads.get_mem_status_string()[:-1]) logging.info("\n" + loads.get_mem_status_string()[:-1])
exit_event.set() EXIT_EVENT.set()
thread.join() thread.join()
if thread.ret_code: if thread.ret_code:
no_errors += 1 no_errors += 1
...@@ -1177,11 +1191,11 @@ def run_virtio_console(test, params, env): ...@@ -1177,11 +1191,11 @@ def run_virtio_console(test, params, env):
del thread del thread
# GUEST -> HOST # GUEST -> HOST
exit_event.clear() EXIT_EVENT.clear()
stats = array.array('f', []) stats = array.array('f', [])
guest_worker.cmd("virt.send_loop_init('%s', %d)" guest_worker.cmd("virt.send_loop_init('%s', %d)"
% (port.name, buf_len), 30) % (port.name, buf_len), 30)
thread = qemu_virtio_port.ThRecv(port.sock, exit_event, thread = qemu_virtio_port.ThRecv(port.sock, EXIT_EVENT,
buf_len) buf_len)
thread.start() thread.start()
loads.start() loads.start()
...@@ -1194,7 +1208,7 @@ def run_virtio_console(test, params, env): ...@@ -1194,7 +1208,7 @@ def run_virtio_console(test, params, env):
logging.info("\n" + loads.get_cpu_status_string()[:-1]) logging.info("\n" + loads.get_cpu_status_string()[:-1])
logging.info("\n" + loads.get_mem_status_string()[:-1]) logging.info("\n" + loads.get_mem_status_string()[:-1])
guest_worker.cmd("virt.exit_threads()", 10) guest_worker.cmd("virt.exit_threads()", 10)
exit_event.set() EXIT_EVENT.set()
thread.join() thread.join()
if thread.ret_code: if thread.ret_code:
no_errors += 1 no_errors += 1
...@@ -1217,19 +1231,18 @@ def run_virtio_console(test, params, env): ...@@ -1217,19 +1231,18 @@ def run_virtio_console(test, params, env):
loads.stop() loads.stop()
try: try:
guest_worker.cmd("virt.exit_threads()", 10) guest_worker.cmd("virt.exit_threads()", 10)
exit_event.set() EXIT_EVENT.set()
thread.join() thread.join()
raise inst raise inst
except Exception, inst: except Exception, inst:
logging.error("test_perf: Critical failure, killing VM %s", logging.error("test_perf: Critical failure, killing VM %s",
inst) inst)
exit_event.set() EXIT_EVENT.set()
vm.destroy() vm.destroy()
del thread del thread
del exit_event
raise inst raise inst
funcatexit.unregister(env, params.get('type'), __set_exit_event)
del thread del thread
del exit_event
cleanup(vm, guest_worker) cleanup(vm, guest_worker)
if no_errors: if no_errors:
msg = ("test_perf: %d errors occurred while executing test, " msg = ("test_perf: %d errors occurred while executing test, "
...@@ -1285,10 +1298,10 @@ def run_virtio_console(test, params, env): ...@@ -1285,10 +1298,10 @@ def run_virtio_console(test, params, env):
guest_worker.cmd("virt.loopback(['%s'], [%s], %d, virt.LOOP_POLL)" guest_worker.cmd("virt.loopback(['%s'], [%s], %d, virt.LOOP_POLL)"
% (ports[0].name, tmp, blocklen), 10) % (ports[0].name, tmp, blocklen), 10)
exit_event = threading.Event() funcatexit.register(env, params.get('type'), __set_exit_event)
# TEST # TEST
thread = qemu_virtio_port.ThSendCheck(ports[0], exit_event, queues, thread = qemu_virtio_port.ThSendCheck(ports[0], EXIT_EVENT, queues,
blocklen, blocklen,
migrate_event=threading.Event()) migrate_event=threading.Event())
thread.start() thread.start()
...@@ -1296,7 +1309,7 @@ def run_virtio_console(test, params, env): ...@@ -1296,7 +1309,7 @@ def run_virtio_console(test, params, env):
for i in range(len(ports[1:])): for i in range(len(ports[1:])):
thread = qemu_virtio_port.ThRecvCheck(ports[1:][i], queues[i], thread = qemu_virtio_port.ThRecvCheck(ports[1:][i], queues[i],
exit_event, blocklen, EXIT_EVENT, blocklen,
sendlen=sendlen, sendlen=sendlen,
migrate_event=threading.Event()) migrate_event=threading.Event())
thread.start() thread.start()
...@@ -1343,20 +1356,20 @@ def run_virtio_console(test, params, env): ...@@ -1343,20 +1356,20 @@ def run_virtio_console(test, params, env):
i += 1 i += 1
time.sleep(2) time.sleep(2)
if not threads[0].isAlive(): if not threads[0].isAlive():
if exit_event.isSet(): if EXIT_EVENT.isSet():
raise error.TestFail("Exit event emitted, check the log for" raise error.TestFail("Exit event emitted, check the log for"
"send/recv thread failure.") "send/recv thread failure.")
else: else:
exit_event.set() EXIT_EVENT.set()
raise error.TestFail("Send thread died unexpectedly in " raise error.TestFail("Send thread died unexpectedly in "
"migration %d" % (j + 1)) "migration %d" % (j + 1))
for i in range(0, len(ports[1:])): for i in range(0, len(ports[1:])):
if not threads[i + 1].isAlive(): if not threads[i + 1].isAlive():
exit_event.set() EXIT_EVENT.set()
raise error.TestFail("Recv thread %d died unexpectedly in " raise error.TestFail("Recv thread %d died unexpectedly in "
"migration %d" % (i, (j + 1))) "migration %d" % (i, (j + 1)))
if verified[i] == threads[i + 1].idx: if verified[i] == threads[i + 1].idx:
exit_event.set() EXIT_EVENT.set()
raise error.TestFail("No new data in %d console were " raise error.TestFail("No new data in %d console were "
"transferred after migration %d" "transferred after migration %d"
% (i, (j + 1))) % (i, (j + 1)))
...@@ -1369,7 +1382,8 @@ def run_virtio_console(test, params, env): ...@@ -1369,7 +1382,8 @@ def run_virtio_console(test, params, env):
# TODO detect recv-thread failure and throw out whole test # TODO detect recv-thread failure and throw out whole test
# FINISH # FINISH
exit_event.set() EXIT_EVENT.set()
funcatexit.unregister(env, params.get('type'), __set_exit_event)
# Send thread might fail to exit when the guest stucks # Send thread might fail to exit when the guest stucks
workaround_unfinished_threads = False workaround_unfinished_threads = False
threads[0].join(5) threads[0].join(5)
...@@ -1402,12 +1416,10 @@ def run_virtio_console(test, params, env): ...@@ -1402,12 +1416,10 @@ def run_virtio_console(test, params, env):
for thread in threads: for thread in threads:
if thread.isAlive(): if thread.isAlive():
vm.destroy() vm.destroy()
del exit_event
del threads[:] del threads[:]
raise error.TestError("Not all threads finished.") raise error.TestError("Not all threads finished.")
if workaround_unfinished_threads: if workaround_unfinished_threads:
logging.debug("All threads finished at this point.") logging.debug("All threads finished at this point.")
del exit_event
del threads[:] del threads[:]
cleanup(vm, guest_worker) cleanup(vm, guest_worker)
...@@ -1539,10 +1551,11 @@ def run_virtio_console(test, params, env): ...@@ -1539,10 +1551,11 @@ def run_virtio_console(test, params, env):
guest_worker.cmd("virt.loopback(['%s'], ['%s'], 1024," guest_worker.cmd("virt.loopback(['%s'], ['%s'], 1024,"
"virt.LOOP_POLL)" % (consoles[0][0].name, "virt.LOOP_POLL)" % (consoles[0][0].name,
consoles[1][0].name), 10) consoles[1][0].name), 10)
exit_event = threading.Event() funcatexit.register(env, params.get('type'), __set_exit_event)
send = qemu_virtio_port.ThSend(consoles[0][0].sock, "Data", exit_event,
send = qemu_virtio_port.ThSend(consoles[0][0].sock, "Data", EXIT_EVENT,
quiet=True) quiet=True)
recv = qemu_virtio_port.ThRecv(consoles[1][0].sock, exit_event, recv = qemu_virtio_port.ThRecv(consoles[1][0].sock, EXIT_EVENT,
quiet=True) quiet=True)
send.start() send.start()
time.sleep(2) time.sleep(2)
...@@ -1555,7 +1568,8 @@ def run_virtio_console(test, params, env): ...@@ -1555,7 +1568,8 @@ def run_virtio_console(test, params, env):
if ret != "": if ret != "":
logging.error(ret) logging.error(ret)
exit_event.set() EXIT_EVENT.set()
funcatexit.unregister(env, params.get('type'), __set_exit_event)
send.join() send.join()
recv.join() recv.join()
guest_worker.cmd("virt.exit_threads()", 10) guest_worker.cmd("virt.exit_threads()", 10)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册