提交 feaf8242 编写于 作者: L Lucas Meneghel Rodrigues

Merge pull request #507 from ldoktor/mux3

avocado.job: Refactor how multiplexation happens [v3]
......@@ -43,8 +43,6 @@ except ImportError:
MULTIPLEX_CAPABLE = False
else:
MULTIPLEX_CAPABLE = True
if MULTIPLEX_CAPABLE:
try:
from yaml import CLoader as Loader
except ImportError:
......
......@@ -91,11 +91,6 @@ class Job(object):
self.show_job_log = getattr(self.args, 'show_job_log', False)
self.silent = getattr(self.args, 'silent', False)
if multiplexer.MULTIPLEX_CAPABLE:
self.multiplex_files = getattr(self.args, 'multiplex_files', None)
else:
self.multiplex_files = None
if self.standalone:
self.show_job_log = True
if self.args is not None:
......@@ -220,41 +215,12 @@ class Job(object):
human_plugin = result.HumanTestResult(self.view, self.args)
self.result_proxy.add_output_plugin(human_plugin)
def _multiplex_params_list(self, params_list, multiplex_files):
for mux_file in multiplex_files:
if not os.path.exists(mux_file):
e_msg = "Multiplex file %s doesn't exist." % mux_file
raise exceptions.OptionValidationError(e_msg)
result = []
for params in params_list:
try:
variants = multiplexer.multiplex_yamls(multiplex_files,
self.args.filter_only,
self.args.filter_out)
except SyntaxError:
variants = None
if variants:
tag = 1
for variant in variants:
env = {}
for t in variant:
env.update(dict(t.environment))
env.update({'tag': tag})
env.update({'id': params['id']})
result.append(env)
tag += 1
else:
result.append(params)
return result
def _run(self, urls=None, multiplex_files=None):
def _run(self, urls=None):
"""
Unhandled job method. Runs a list of test URLs to its completion.
:param urls: String with tests to run, separated by whitespace.
Optionally, a list of tests (each test a string).
:param multiplex_files: File that multiplexes a given test url.
:return: Integer with overall job status. See
:mod:`avocado.core.exit_codes` for more information.
:raise: Any exception (avocado crashed), or
......@@ -275,14 +241,7 @@ class Job(object):
params_list = self.test_loader.discover_urls(urls)
if multiplexer.MULTIPLEX_CAPABLE:
if multiplex_files is None:
multiplex_files = getattr(self.args, 'multiplex_files', None)
if multiplex_files is not None:
params_list = self._multiplex_params_list(params_list,
multiplex_files)
mux = multiplexer.Mux(self.args)
self._setup_job_results()
try:
......@@ -307,7 +266,7 @@ class Job(object):
"(Possible reasons: File ownership, permissions, typos)")
raise exceptions.OptionValidationError(e_msg)
self.args.test_result_total = len(test_suite)
self.args.test_result_total = mux.get_number_of_tests(test_suite)
self._make_test_result()
self._make_test_runner()
......@@ -320,7 +279,7 @@ class Job(object):
_TEST_LOGGER.info('')
self.view.logfile = self.logfile
failures = self.test_runner.run_suite(test_suite)
failures = self.test_runner.run_suite(test_suite, mux)
self.view.stop_file_logging()
if not self.standalone:
self._update_latest_link()
......@@ -341,7 +300,7 @@ class Job(object):
else:
return exit_codes.AVOCADO_TESTS_FAIL
def run(self, urls=None, multiplex_files=None):
def run(self, urls=None):
"""
Handled main job method. Runs a list of test URLs to its completion.
......@@ -359,14 +318,12 @@ class Job(object):
:param urls: String with tests to run, separated by whitespace.
Optionally, a list of tests (each test a string).
:param multiplex_files: File that multiplexes a given test url.
:return: Integer with overall job status. See
:mod:`avocado.core.exit_codes` for more information.
"""
runtime.CURRENT_JOB = self
try:
return self._run(urls, multiplex_files)
return self._run(urls)
except exceptions.JobBaseException, details:
self.status = details.status
fail_class = details.__class__.__name__
......
......@@ -23,6 +23,7 @@ import itertools
from avocado.core import tree
MULTIPLEX_CAPABLE = tree.MULTIPLEX_CAPABLE
......@@ -65,8 +66,8 @@ def tree2pools(node, mux=True):
return leaves, pools
def multiplex_yamls(input_yamls, filter_only=None, filter_out=None,
debug=False):
def parse_yamls(input_yamls, filter_only=None, filter_out=None,
debug=False):
if filter_only is None:
filter_only = []
if filter_out is None:
......@@ -77,4 +78,50 @@ def multiplex_yamls(input_yamls, filter_only=None, filter_out=None,
leaves, pools = tree2pools(final_tree, final_tree.multiplex)
if leaves: # Add remaining leaves (they are not variants, only endpoints
pools.extend(leaves)
return pools
def multiplex_pools(pools):
return itertools.product(*pools)
def multiplex_yamls(input_yamls, filter_only=None, filter_out=None,
debug=False):
pools = parse_yamls(input_yamls, filter_only, filter_out, debug)
return multiplex_pools(pools)
class Mux(object):
def __init__(self, args):
mux_files = getattr(args, 'multiplex_files', None)
filter_only = getattr(args, 'filter_only', None)
filter_out = getattr(args, 'filter_out', None)
if mux_files:
self.pools = parse_yamls(mux_files, filter_only, filter_out)
else: # no variants
self.pools = None
def get_number_of_tests(self, test_suite):
# Currently number of tests is symetrical
if self.pools:
return (len(test_suite) *
sum(1 for _ in multiplex_pools(self.pools)))
else:
return len(test_suite)
def itertests(self, template):
if self.pools: # Copy template and modify it's params
i = None
for i, variant in enumerate(multiplex_pools(self.pools)):
test_factory = [template[0], template[1].copy()]
params = template[1]['params'].copy()
for node in variant:
params.update(node.environment)
params.update({'tag': i})
params.update({'id': template[1]['params']['id'] + str(i)})
test_factory[1]['params'] = params
yield test_factory
if i is None: # No variants, use template
yield template
else: # No variants, use template
yield template
......@@ -111,7 +111,109 @@ class TestRunner(object):
test_state['text_output'] = log_file_obj.read()
return test_state
def run_suite(self, test_suite):
def _run_test(self, test_factory, q, failures):
p = multiprocessing.Process(target=self.run_test,
args=(test_factory, q,))
cycle_timeout = 1
time_started = time.time()
test_state = None
p.start()
early_state = q.get()
if 'load_exception' in early_state:
self.job.view.notify(event='error',
msg='Avocado crashed during test load. '
'Some reports might have not been '
'generated. Aborting...')
sys.exit(exit_codes.AVOCADO_FAIL)
# At this point, the test is already initialized and we know
# for sure if there's a timeout set.
timeout = early_state['params'].get('timeout', self.DEFAULT_TIMEOUT)
time_deadline = time_started + timeout
ctrl_c_count = 0
ignore_window = 2.0
ignore_time_started = time.time()
stage_1_msg_displayed = False
stage_2_msg_displayed = False
while True:
try:
if time.time() >= time_deadline:
os.kill(p.pid, signal.SIGUSR1)
break
wait.wait_for(lambda: not q.empty() or not p.is_alive(),
cycle_timeout, first=0.01, step=0.1)
if not q.empty():
test_state = q.get()
if not test_state['running']:
break
else:
self.job.result_proxy.notify_progress(True)
if test_state['paused']:
msg = test_state['paused_msg']
if msg:
self.job.view.notify(event='partial', msg=msg)
elif p.is_alive():
if ctrl_c_count == 0:
self.job.result_proxy.notify_progress()
else:
break
except KeyboardInterrupt:
time_elapsed = time.time() - ignore_time_started
ctrl_c_count += 1
if ctrl_c_count == 2:
if not stage_1_msg_displayed:
k_msg_1 = ("SIGINT sent to tests, waiting for their "
"reaction")
k_msg_2 = ("Ignoring Ctrl+C during the next "
"%d seconds so they can try to finish" %
ignore_window)
k_msg_3 = ("A new Ctrl+C sent after that will send a "
"SIGKILL to them")
self.job.view.notify(event='message', msg=k_msg_1)
self.job.view.notify(event='message', msg=k_msg_2)
self.job.view.notify(event='message', msg=k_msg_3)
stage_1_msg_displayed = True
ignore_time_started = time.time()
if (ctrl_c_count > 2) and (time_elapsed > ignore_window):
if not stage_2_msg_displayed:
k_msg_3 = ("Ctrl+C received after the ignore window. "
"Killing all active tests")
self.job.view.notify(event='message', msg=k_msg_3)
stage_2_msg_displayed = True
os.kill(p.pid, signal.SIGKILL)
# If test_state is None, the test was aborted before it ended.
if test_state is None:
if p.is_alive() and wait.wait_for(lambda: not q.empty(),
cycle_timeout, first=0.01, step=0.1):
test_state = q.get()
else:
early_state['time_elapsed'] = time.time() - time_started
test_state = self._fill_aborted_test_state(early_state)
test_log = logging.getLogger('avocado.test')
test_log.error('ERROR %s -> TestAbortedError: '
'Test aborted unexpectedly',
test_state['name'])
# don't process other tests from the list
if ctrl_c_count > 0:
self.job.view.notify(event='minor', msg='')
return False
self.result.check_test(test_state)
if not status.mapping[test_state['status']]:
failures.append(test_state['name'])
return True
def run_suite(self, test_suite, mux):
"""
Run one or more tests and report with test result.
......@@ -125,110 +227,14 @@ class TestRunner(object):
self.result.start_tests()
q = queues.SimpleQueue()
for test_factory in test_suite:
p = multiprocessing.Process(target=self.run_test,
args=(test_factory, q,))
cycle_timeout = 1
time_started = time.time()
test_state = None
p.start()
early_state = q.get()
if 'load_exception' in early_state:
self.job.view.notify(event='error',
msg='Avocado crashed during test load. '
'Some reports might have not been '
'generated. Aborting...')
sys.exit(exit_codes.AVOCADO_FAIL)
# At this point, the test is already initialized and we know
# for sure if there's a timeout set.
if 'timeout' in early_state['params'].keys():
timeout = float(early_state['params']['timeout'])
else:
timeout = self.DEFAULT_TIMEOUT
time_deadline = time_started + timeout
ctrl_c_count = 0
ignore_window = 2.0
ignore_time_started = time.time()
stage_1_msg_displayed = False
stage_2_msg_displayed = False
while True:
try:
if time.time() >= time_deadline:
os.kill(p.pid, signal.SIGUSR1)
break
wait.wait_for(lambda: not q.empty() or not p.is_alive(),
cycle_timeout, first=0.01, step=0.1)
if not q.empty():
test_state = q.get()
if not test_state['running']:
break
else:
self.job.result_proxy.notify_progress(True)
if test_state['paused']:
msg = test_state['paused_msg']
if msg:
self.job.view.notify(event='partial', msg=msg)
elif p.is_alive():
if ctrl_c_count == 0:
self.job.result_proxy.notify_progress()
else:
break
except KeyboardInterrupt:
time_elapsed = time.time() - ignore_time_started
ctrl_c_count += 1
if ctrl_c_count == 2:
if not stage_1_msg_displayed:
k_msg_1 = ("SIGINT sent to tests, waiting for their "
"reaction")
k_msg_2 = ("Ignoring Ctrl+C during the next "
"%d seconds so they can try to finish" %
ignore_window)
k_msg_3 = ("A new Ctrl+C sent after that will send a "
"SIGKILL to them")
self.job.view.notify(event='message', msg=k_msg_1)
self.job.view.notify(event='message', msg=k_msg_2)
self.job.view.notify(event='message', msg=k_msg_3)
stage_1_msg_displayed = True
ignore_time_started = time.time()
if (ctrl_c_count > 2) and (time_elapsed > ignore_window):
if not stage_2_msg_displayed:
k_msg_3 = ("Ctrl+C received after the ignore window. "
"Killing all active tests")
self.job.view.notify(event='message', msg=k_msg_3)
stage_2_msg_displayed = True
os.kill(p.pid, signal.SIGKILL)
# If test_state is None, the test was aborted before it ended.
if test_state is None:
if p.is_alive() and wait.wait_for(lambda: not q.empty(),
cycle_timeout, first=0.01, step=0.1):
test_state = q.get()
else:
early_state['time_elapsed'] = time.time() - time_started
test_state = self._fill_aborted_test_state(early_state)
test_log = logging.getLogger('avocado.test')
test_log.error('ERROR %s -> TestAbortedError: '
'Test aborted unexpectedly',
test_state['name'])
# don't process other tests from the list
if ctrl_c_count > 0:
self.job.view.notify(event='minor', msg='')
ctrl_c = False
for test_template in test_suite:
for test_factory in mux.itertests(test_template):
if not self._run_test(test_factory, q, failures):
ctrl_c = True
break
if ctrl_c:
break
self.result.check_test(test_state)
if not status.mapping[test_state['status']]:
failures.append(test_state['name'])
runtime.CURRENT_TEST = None
self.result.end_tests()
if self.job.sysinfo is not None:
......
......@@ -292,28 +292,23 @@ class Test(unittest.TestCase):
"""
Get a test tagged name.
If a test tag is defined, just return name.tag. If tag is absent,
it'll try to find a tag that is not already taken (so there are no
clashes in the results directory).
Combines name + tag (if present) to obtain unique name. When associated
directory already exists, appends ".$number" until unused name
is generated to avoid clashes.
:param logdir: Log directory being in use for result storage.
:return: String `test.tag`.
:return: Unique test name
"""
name = self.name
if self.tag is not None:
return "%s.%s" % (self.name, self.tag)
name += ".%s" % self.tag
tag = 0
if tag == 0:
tagged_name = self.name
else:
tagged_name = "%s.%s" % (self.name, tag)
test_logdir = os.path.join(logdir, tagged_name)
while os.path.isdir(test_logdir):
tagged_name = name
while os.path.isdir(os.path.join(logdir, tagged_name)):
tag += 1
tagged_name = "%s.%s" % (self.name, tag)
test_logdir = os.path.join(logdir, tagged_name)
self.tag = str(tag)
tagged_name = "%s.%s" % (name, tag)
self.tag = "%s.%s" % (self.tag, tag) if self.tag else str(tag)
return tagged_name
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册