提交 483680bc 编写于 作者: C Cleber Rosa 提交者: clebergnu

Merge pull request #130 from lmr/test-feedback-2

Test feedback 2
......@@ -83,6 +83,14 @@ class TestTimeoutError(TestBaseException):
status = "ERROR"
class TestAbortError(TestBaseException):
"""
Indicates that the test was prematurely aborted.
"""
status = "ERROR"
class TestNAError(TestBaseException):
"""
......
......@@ -72,7 +72,17 @@ def add_console_handler(logger):
logger.addHandler(console_handler)
class TermColors(object):
class TermSupport(object):
COLOR_BLUE = '\033[94m'
COLOR_GREEN = '\033[92m'
COLOR_YELLOW = '\033[93m'
COLOR_RED = '\033[91m'
CONTROL_END = '\033[0m'
MOVE_BACK = '\033[1D'
MOVE_FORWARD = '\033[1C'
"""
Class to help applications to colorize their outputs for terminals.
......@@ -85,18 +95,13 @@ class TermColors(object):
'screen-256color']
def __init__(self):
self.blue = '\033[94m'
self.green = '\033[92m'
self.yellow = '\033[93m'
self.red = '\033[91m'
self.end = '\033[0m'
self.HEADER = self.blue
self.PASS = self.green
self.SKIP = self.yellow
self.FAIL = self.red
self.ERROR = self.red
self.WARN = self.yellow
self.ENDC = self.end
self.HEADER = self.COLOR_BLUE
self.PASS = self.COLOR_GREEN
self.SKIP = self.COLOR_YELLOW
self.FAIL = self.COLOR_RED
self.ERROR = self.COLOR_RED
self.WARN = self.COLOR_YELLOW
self.ENDC = self.CONTROL_END
term = os.environ.get("TERM")
if (not os.isatty(1)) or (term not in self.allowed_terms):
self.disable()
......@@ -105,11 +110,6 @@ class TermColors(object):
"""
Disable colors from the strings output by this class.
"""
self.blue = ''
self.green = ''
self.yellow = ''
self.red = ''
self.end = ''
self.HEADER = ''
self.PASS = ''
self.SKIP = ''
......@@ -134,13 +134,21 @@ class TermColors(object):
"""
return self.FAIL + msg + self.ENDC
def healthy_str(self, msg):
"""
Print a healthy string (green colored).
If the output does not support colors, just return the original string.
"""
return self.PASS + msg + self.ENDC
def pass_str(self):
"""
Print a pass string (green colored).
If the output does not support colors, just return the original string.
"""
return self.PASS + 'PASS' + self.ENDC
return self.MOVE_BACK + self.PASS + 'PASS' + self.ENDC
def skip_str(self):
"""
......@@ -148,7 +156,7 @@ class TermColors(object):
If the output does not support colors, just return the original string.
"""
return self.SKIP + 'SKIP' + self.ENDC
return self.MOVE_BACK + self.SKIP + 'SKIP' + self.ENDC
def fail_str(self):
"""
......@@ -156,7 +164,7 @@ class TermColors(object):
If the output does not support colors, just return the original string.
"""
return self.FAIL + 'FAIL' + self.ENDC
return self.MOVE_BACK + self.FAIL + 'FAIL' + self.ENDC
def error_str(self):
"""
......@@ -164,7 +172,7 @@ class TermColors(object):
If the output does not support colors, just return the original string.
"""
return self.ERROR + 'ERROR' + self.ENDC
return self.MOVE_BACK + self.ERROR + 'ERROR' + self.ENDC
def warn_str(self):
"""
......@@ -172,10 +180,10 @@ class TermColors(object):
If the output does not support colors, just return the original string.
"""
return self.WARN + 'WARN' + self.ENDC
return self.MOVE_BACK + self.WARN + 'WARN' + self.ENDC
colors = TermColors()
term_support = TermSupport()
class OutputManager(object):
......@@ -184,8 +192,22 @@ class OutputManager(object):
Takes care of both disk logs and stdout/err logs.
"""
THROBBER_STEPS = ['-', '\\', '|', '/']
THROBBER_MOVES = [term_support.MOVE_BACK + THROBBER_STEPS[0],
term_support.MOVE_BACK + THROBBER_STEPS[1],
term_support.MOVE_BACK + THROBBER_STEPS[2],
term_support.MOVE_BACK + THROBBER_STEPS[3]]
def __init__(self, logger_name='avocado.app'):
self.console_log = logging.getLogger('avocado.app')
self.throbber_pos = 0
def throbber_progress(self):
self.log_healthy(self.THROBBER_MOVES[self.throbber_pos], True)
if self.throbber_pos == (len(self.THROBBER_MOVES)-1):
self.throbber_pos = 0
else:
self.throbber_pos += 1
def _log(self, msg, level=logging.INFO, skip_newline=False):
"""
......@@ -247,13 +269,21 @@ class OutputManager(object):
"""
self._log(msg, level=logging.ERROR)
def log_healthy(self, msg, skip_newline=False):
"""
Log a message that indicates something healthy is going on
:param msg: Message to write.
"""
self.info(term_support.healthy_str(msg), skip_newline)
def log_header(self, msg):
"""
Log a header message.
:param msg: Message to write.
"""
self.info(colors.header_str(msg))
self.info(term_support.header_str(msg))
def log_fail_header(self, msg):
"""
......@@ -261,7 +291,7 @@ class OutputManager(object):
:param msg: Message to write.
"""
self.info(colors.fail_header_str(msg))
self.info(term_support.fail_header_str(msg))
def log_pass(self, t_elapsed):
"""
......@@ -269,7 +299,7 @@ class OutputManager(object):
:param t_elapsed: Time it took for the operation to complete.
"""
normal_pass_msg = colors.pass_str() + " (%.2f s)" % t_elapsed
normal_pass_msg = term_support.pass_str() + " (%.2f s)" % t_elapsed
self.info(normal_pass_msg)
def log_error(self, t_elapsed):
......@@ -278,7 +308,7 @@ class OutputManager(object):
:param t_elapsed: Time it took for the operation to complete.
"""
normal_error_msg = colors.error_str() + " (%.2f s)" % t_elapsed
normal_error_msg = term_support.error_str() + " (%.2f s)" % t_elapsed
self.error(normal_error_msg)
def log_fail(self, t_elapsed):
......@@ -287,7 +317,7 @@ class OutputManager(object):
:param t_elapsed: Time it took for the operation to complete.
"""
normal_fail_msg = colors.fail_str() + " (%.2f s)" % t_elapsed
normal_fail_msg = term_support.fail_str() + " (%.2f s)" % t_elapsed
self.error(normal_fail_msg)
def log_skip(self, t_elapsed):
......@@ -296,7 +326,7 @@ class OutputManager(object):
:param t_elapsed: Time it took for the operation to complete.
"""
normal_skip_msg = colors.skip_str()
normal_skip_msg = term_support.skip_str()
self.info(normal_skip_msg)
def log_warn(self, t_elapsed):
......@@ -305,5 +335,5 @@ class OutputManager(object):
:param t_elapsed: Time it took for the operation to complete.
"""
normal_warn_msg = colors.warn_str() + " (%.2f s)" % t_elapsed
normal_warn_msg = term_support.warn_str() + " (%.2f s)" % t_elapsed
self.error(normal_warn_msg)
......@@ -26,3 +26,25 @@ mapping = {"TEST_NA": True,
"ALERT": False,
"RUNNING": False,
"NOSTATUS": False}
feedback = {
# Test did not advertise current status, but process running the test is
# known to be still running
'.': 'Process Running',
# Test advertised its current status explicitly (by means of a formal test
# API, so user can be sure his test not only has a process running, but
# is performing its intended tasks
'T': 'Test Running',
# The process is paused because a binary was run under a debugger and hit
# a breakpoint. The breakpoint may be a breakpoint explicitly set by the
# user or a signal that is automatically caught, such as a SIGSEGV
'D': 'Paused for debugging',
# The test has ended and either passed or failed. After this message, a
# proper test result should be passed so that it is presented to the
# user and passed along other result plugins.
'P': 'Passed (ended)',
'F': 'Failed (ended)'
}
......@@ -22,11 +22,12 @@ import imp
import logging
import multiprocessing
import os
import signal
import sys
import signal
import time
import traceback
import uuid
import Queue
from avocado.core import data_dir
from avocado.core import output
......@@ -50,6 +51,7 @@ class TestRunner(object):
"""
A test runner class that displays tests results.
"""
DEFAULT_TIMEOUT = 60 * 60 * 24
def __init__(self, job, test_result):
"""
......@@ -65,6 +67,8 @@ class TestRunner(object):
"""
Resolve and load the test url from the the test shortname.
This method should now be called by the test runner process.
:param params: Dictionary with test params.
:type params: dict
:return: an instance of :class:`avocado.test.Test`.
......@@ -109,7 +113,7 @@ class TestRunner(object):
return test_instance
def run_test(self, instance, queue):
def run_test(self, params, queue):
"""
Run a test instance in a subprocess.
......@@ -122,11 +126,32 @@ class TestRunner(object):
e_msg = "Timeout reached waiting for %s to end" % instance
raise exceptions.TestTimeoutError(e_msg)
instance = self.load_test(params)
queue.put(instance.get_state())
signal.signal(signal.SIGUSR1, timeout_handler)
self.result.start_test(instance.get_state())
try:
instance.run_avocado()
finally:
queue.put(instance)
queue.put(instance.get_state())
def _fill_aborted_test_state(self, test_state):
"""
Fill details necessary to process aborted tests.
:param test_state: Test state.
:type test_state: dict
:param time_started: When the test started
"""
test_state['fail_reason'] = 'Test process aborted'
test_state['status'] = exceptions.TestAbortError.status
test_state['fail_class'] = exceptions.TestAbortError.__class__.__name__
test_state['traceback'] = 'Traceback not available'
with open(test_state['logfile'], 'r') as log_file_obj:
test_state['text_output'] = log_file_obj.read()
return test_state
def run(self, params_list):
"""
......@@ -136,41 +161,56 @@ class TestRunner(object):
:return: a list of test failures.
"""
def send_signal(p, sig):
if p.exitcode is None:
os.kill(p.pid, sig)
time.sleep(0.1)
failures = []
self.result.start_tests()
q = multiprocessing.Queue()
for params in params_list:
test_instance = self.load_test(params)
self.result.start_test(test_instance)
p = multiprocessing.Process(target=self.run_test,
args=(test_instance, q,))
args=(params, q,))
cycle_timeout = 1
time_started = time.time()
should_quit = False
test_state = None
p.start()
# The test timeout can come from:
# 1) Test params dict (params)
# 2) Test default params dict (test_instance.params.timeout)
timeout = params.get('timeout')
if timeout is None:
if hasattr(test_instance.params, 'timeout'):
timeout = test_instance.params.timeout
if timeout is not None:
timeout = float(timeout)
# Wait for the test to end for [timeout] s
try:
test_instance = q.get(timeout=timeout)
except Exception:
# If there's nothing inside the queue after timeout, the process
# must be terminated.
send_signal(p, signal.SIGUSR1)
test_instance = q.get()
self.result.check_test(test_instance)
if not status.mapping[test_instance.status]:
failures.append(test_instance.name)
early_state = q.get()
# At this point, the test is already initialized and we know
# for sure if there's a timeout set.
if 'timeout' in early_state['params'].keys():
timeout = float(early_state['params']['timeout'])
else:
timeout = self.DEFAULT_TIMEOUT
time_deadline = time_started + timeout - cycle_timeout
while not should_quit:
try:
if time.time() >= time_deadline:
os.kill(p.pid, signal.SIGUSR1)
should_quit = True
test_state = q.get(timeout=cycle_timeout)
except Queue.Empty:
if p.is_alive():
self.job.result_proxy.throbber_progress()
else:
should_quit = True
if should_quit:
p.terminate()
# If test_state is None, the test was aborted before it ended.
if test_state is None:
early_state['time_elapsed'] = time.time() - time_started
test_state = self._fill_aborted_test_state(early_state)
test_log = logging.getLogger('avocado.test')
test_log.error('ERROR %s -> TestAbortedError: '
'Test aborted unexpectedly', test_state['name'])
self.result.check_test(test_state)
if not status.mapping[test_state['status']]:
failures.append(test_state['name'])
self.result.end_tests()
return failures
......
......@@ -23,15 +23,13 @@ from avocado.result import TestResult
JOURNAL_FILENAME = ".journal.sqlite"
JOB_INFO_SCHEMA = ("CREATE TABLE job_info ("
"unique_id TEXT)")
TEST_JOURNAL_SCHEMA = ("CREATE TABLE test_journal ("
"tag TEXT, "
"time TEXT, "
"action TEXT, "
"status TEXT, "
"flushed BOOLEAN DEFAULT 0)")
SCHEMA = {'job_info': 'CREATE TABLE job_info (unique_id TEXT UNIQUE)',
'test_journal': ("CREATE TABLE test_journal ("
"tag TEXT, "
"time TEXT, "
"action TEXT, "
"status TEXT, "
"flushed BOOLEAN DEFAULT 0)")}
class TestResultJournal(TestResult):
......@@ -58,29 +56,40 @@ class TestResultJournal(TestResult):
self.journal_path = os.path.join(logdir, JOURNAL_FILENAME)
self.journal = sqlite3.connect(self.journal_path)
self.journal_cursor = self.journal.cursor()
self.journal_cursor.execute(JOB_INFO_SCHEMA)
self.journal_cursor.execute(TEST_JOURNAL_SCHEMA)
for table in SCHEMA:
res = self.journal_cursor.execute("PRAGMA table_info('%s')" % table)
if res.fetchone() is None:
self.journal_cursor.execute(SCHEMA[table])
self.journal.commit()
def lazy_init_journal(self, state):
# lazy init because we need the toplevel logdir for the job
if not self.journal_initialized:
self._init_journal(os.path.dirname(state['logdir']))
self._record_job_info(state)
self.journal_initialized = True
def _shutdown_journal(self):
self.journal.close()
def _record_job_info(self, test):
sql = "INSERT INTO job_info (unique_id) VALUES (?)"
self.journal_cursor.execute(sql, (test.job.unique_id, ))
self.journal.commit()
def _record_job_info(self, state):
res = self.journal_cursor.execute("SELECT unique_id FROM job_info")
if res.fetchone() is None:
sql = "INSERT INTO job_info (unique_id) VALUES (?)"
self.journal_cursor.execute(sql, (state['job_unique_id'], ))
self.journal.commit()
def _record_status(self, test, action):
def _record_status(self, state, action):
sql = "INSERT INTO test_journal (tag, time, action, status) VALUES (?, ?, ?, ?)"
# This shouldn't be required
if action == "ENDED":
status = test.status
status = state['status']
else:
status = None
self.journal_cursor.execute(sql,
(test.tagged_name,
(state['tagged_name'],
datetime.datetime(1, 1, 1).now().isoformat(),
action,
status))
......@@ -94,19 +103,15 @@ class TestResultJournal(TestResult):
# Journal does not need an output option
self.output_option = None
def start_test(self, test):
# lazy init because we need the toplevel logdir for the job
if not self.journal_initialized:
self._init_journal(os.path.dirname(test.logdir))
self._record_job_info(test)
self.journal_initialized = True
TestResult.start_test(self, test)
self._record_status(test, "STARTED")
def start_test(self, state):
self.lazy_init_journal(state)
TestResult.start_test(self, state)
self._record_status(state, "STARTED")
def end_test(self, test):
TestResult.end_test(self, test)
self._record_status(test, "ENDED")
def end_test(self, state):
self.lazy_init_journal(state)
TestResult.end_test(self, state)
self._record_status(state, "ENDED")
def end_tests(self):
self._shutdown_journal()
......
......@@ -42,18 +42,19 @@ class JSONTestResult(TestResult):
self.json = {'debuglog': self.stream.logfile,
'tests': []}
def end_test(self, test):
def end_test(self, state):
"""
Called when the given test has been run.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.end_test(self, test)
t = {'test': test.tagged_name,
'url': test.name,
'time': test.time_elapsed,
'status': test.status,
'whiteboard': test.whiteboard,
TestResult.end_test(self, state)
t = {'test': state['tagged_name'],
'url': state['name'],
'time': state['time_elapsed'],
'status': state['status'],
'whiteboard': state['whiteboard'],
}
self.json['tests'].append(t)
......
......@@ -46,7 +46,7 @@ class Multiplexer(plugin.Plugin):
self.configured = True
def multiplex(self, args):
bcolors = output.colors
bcolors = output.term_support
pipe = output.get_paginator()
if not args.multiplex_file:
......
......@@ -45,6 +45,18 @@ class Test(object):
self.text_output = note
self.fail_reason = note
self.whiteboard = ''
self.job_unique_id = ''
def get_state(self):
"""
Serialize selected attributes representing the test state
:returns: a dictionary containing relevant test state data
:rtype: dict
"""
d = self.__dict__
d['class_name'] = self.__class__.__name__
return d
class VMTestRunner(TestRunner):
......@@ -75,7 +87,7 @@ class VMTestRunner(TestRunner):
:return: a list of test failures.
"""
failures = []
urls = [x['shortname'] for x in params_list]
urls = [x['id'] for x in params_list]
self.result.urls = urls
self.result.setup()
results = self.run_test(' '.join(urls))
......@@ -86,7 +98,7 @@ class VMTestRunner(TestRunner):
time=tst['time'],
status=tst['status'])
self.result.start_test(test)
self.result.check_test(test)
self.result.check_test(test.get_state())
if not status.mapping[test.status]:
failures.append(test.tagged_name)
self.result.end_tests()
......@@ -225,7 +237,7 @@ class VMTestResult(TestResult):
:param test: :class:`avocado.test.Test` instance.
"""
TestResult.add_pass(self, test)
self.stream.log_pass(test.time_elapsed)
self.stream.log_pass(test['time_elapsed'])
def add_error(self, test):
"""
......
......@@ -83,68 +83,72 @@ class XmlResult(object):
self.xml.append(tc)
self.xml.append('</testsuite>')
def add_success(self, test):
def add_success(self, state):
"""
Add a testcase node of kind succeed.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
tc = '\t<testcase classname={class} name={name} time="{time}"/>'
values = {'class': self._escape_attr(test.__class__.__name__),
'name': self._escape_attr(test.tagged_name),
'time': test.time_elapsed}
values = {'class': self._escape_attr(state['class_name']),
'name': self._escape_attr(state['tagged_name']),
'time': state['time_elapsed']}
self.testcases.append(tc.format(**values))
def add_skip(self, test):
def add_skip(self, state):
"""
Add a testcase node of kind skipped.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
tc = '''\t<testcase classname={class} name={name} time="{time}">
\t\t<skipped />
\t</testcase>'''
values = {'class': self._escape_attr(test.__class__.__name__),
'name': self._escape_attr(test.tagged_name),
'time': test.time_elapsed}
values = {'class': self._escape_attr(state['class_name']),
'name': self._escape_attr(state['tagged_name']),
'time': state['time_elapsed']}
self.testcases.append(tc.format(**values))
def add_failure(self, test):
def add_failure(self, state):
"""
Add a testcase node of kind failed.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
tc = '''\t<testcase classname={class} name={name} time="{time}">
\t\t<failure type={type} message={reason}><![CDATA[{traceback}]]></failure>
\t\t<system-out><![CDATA[{systemout}]]></system-out>
\t</testcase>'''
values = {'class': self._escape_attr(test.__class__.__name__),
'name': self._escape_attr(test.tagged_name),
'time': test.time_elapsed,
'type': self._escape_attr(test.fail_class),
'traceback': self._escape_cdata(test.traceback),
'systemout': self._escape_cdata(test.text_output),
'reason': self._escape_attr(str(test.fail_reason))}
values = {'class': self._escape_attr(state['class_name']),
'name': self._escape_attr(state['tagged_name']),
'time': state['time_elapsed'],
'type': self._escape_attr(state['fail_class']),
'traceback': self._escape_cdata(state['traceback']),
'systemout': self._escape_cdata(state['text_output']),
'reason': self._escape_attr(str(state['fail_reason']))}
self.testcases.append(tc.format(**values))
def add_error(self, test):
def add_error(self, state):
"""
Add a testcase node of kind error.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
tc = '''\t<testcase classname={class} name={name} time="{time}">
\t\t<error type={type} message={reason}><![CDATA[{traceback}]]></error>
\t\t<system-out><![CDATA[{systemout}]]></system-out>
\t</testcase>'''
values = {'class': self._escape_attr(test.__class__.__name__),
'name': self._escape_attr(test.tagged_name),
'time': test.time_elapsed,
'type': self._escape_attr(test.fail_class),
'traceback': self._escape_cdata(test.traceback),
'systemout': self._escape_cdata(test.text_output),
'reason': self._escape_attr(str(test.fail_reason))}
values = {'class': self._escape_attr(state['class_name']),
'name': self._escape_attr(state['tagged_name']),
'time': state['time_elapsed'],
'type': self._escape_attr(state['fail_class']),
'traceback': self._escape_cdata(state['traceback']),
'systemout': self._escape_cdata(state['text_output']),
'reason': self._escape_attr(str(state['fail_reason']))}
self.testcases.append(tc.format(**values))
......@@ -183,19 +187,22 @@ class xUnitTestResult(TestResult):
"""
TestResult.start_test(self, test)
def end_test(self, test):
def end_test(self, state):
"""
Record an end test event, accord to the given test status.
"""
TestResult.end_test(self, test)
if test.status == 'PASS':
self.xml.add_success(test)
if test.status == 'TEST_NA':
self.xml.add_skip(test)
if test.status == 'FAIL':
self.xml.add_failure(test)
if test.status == 'ERROR':
self.xml.add_error(test)
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.end_test(self, state)
if state['status'] == 'PASS':
self.xml.add_success(state)
elif state['status'] == 'TEST_NA':
self.xml.add_skip(state)
elif state['status'] == 'FAIL':
self.xml.add_failure(state)
elif state['status'] == 'ERROR':
self.xml.add_error(state)
def end_tests(self):
"""
......
......@@ -38,6 +38,11 @@ class TestResultProxy(object):
else:
return None
def throbber_progress(self):
for output_plugin in self.output_plugins:
if hasattr(output_plugin, 'throbber_progress'):
output_plugin.throbber_progress()
def add_output_plugin(self, plugin):
if not isinstance(plugin, TestResult):
raise InvalidOutputPlugin("Object %s is not an instance of "
......@@ -52,37 +57,37 @@ class TestResultProxy(object):
for output_plugin in self.output_plugins:
output_plugin.end_tests()
def start_test(self, test):
def start_test(self, state):
for output_plugin in self.output_plugins:
output_plugin.start_test(test)
output_plugin.start_test(state)
def end_test(self, test):
def end_test(self, state):
for output_plugin in self.output_plugins:
output_plugin.end_test(test)
output_plugin.end_test(state)
def add_pass(self, test):
def add_pass(self, state):
for output_plugin in self.output_plugins:
output_plugin.add_pass(test)
output_plugin.add_pass(state)
def add_error(self, test):
def add_error(self, state):
for output_plugin in self.output_plugins:
output_plugin.add_error(test)
output_plugin.add_error(state)
def add_fail(self, test):
def add_fail(self, state):
for output_plugin in self.output_plugins:
output_plugin.add_fail(test)
output_plugin.add_fail(state)
def add_skip(self, test):
def add_skip(self, state):
for output_plugin in self.output_plugins:
output_plugin.add_skip(test)
output_plugin.add_skip(state)
def add_warn(self, test):
def add_warn(self, state):
for output_plugin in self.output_plugins:
output_plugin.add_warn(test)
output_plugin.add_warn(state)
def check_test(self, test):
def check_test(self, state):
for output_plugin in self.output_plugins:
output_plugin.check_test(test)
output_plugin.check_test(state)
class TestResult(object):
......@@ -149,77 +154,83 @@ class TestResult(object):
"""
pass
def start_test(self, test):
def start_test(self, state):
"""
Called when the given test is about to run.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
pass
def end_test(self, test):
def end_test(self, state):
"""
Called when the given test has been run.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
self.tests_run += 1
self.total_time += test.time_elapsed
self.total_time += state['time_elapsed']
def add_pass(self, test):
def add_pass(self, state):
"""
Called when a test succeeded.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
self.passed.append(test)
self.passed.append(state)
def add_error(self, test):
def add_error(self, state):
"""
Called when a test had a setup error.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
self.errors.append(test)
self.errors.append(state)
def add_fail(self, test):
def add_fail(self, state):
"""
Called when a test fails.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
self.failed.append(test)
self.failed.append(state)
def add_skip(self, test):
def add_skip(self, state):
"""
Called when a test is skipped.
:param test: an instance of :class:`avocado.test.Test`.
"""
self.skipped.append(test)
self.skipped.append(state)
def add_warn(self, test):
def add_warn(self, state):
"""
Called when a test had a warning.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
self.warned.append(test)
self.warned.append(state)
def check_test(self, test):
def check_test(self, state):
"""
Called once for a test to check status and report.
:param test: an instance of :class:`avocado.test.Test`.
:param test: A dict with test internal state
"""
status_map = {'PASS': self.add_pass,
'ERROR': self.add_error,
'FAIL': self.add_fail,
'TEST_NA': self.add_skip,
'WARN': self.add_warn}
add = status_map[test.status]
add(test)
self.end_test(test)
add = status_map[state['status']]
add(state)
self.end_test(state)
class HumanTestResult(TestResult):
......@@ -247,66 +258,76 @@ class HumanTestResult(TestResult):
self.stream.log_header("TOTAL WARNED: %d" % len(self.warned))
self.stream.log_header("ELAPSED TIME: %.2f s" % self.total_time)
def start_test(self, test):
def start_test(self, state):
"""
Called when the given test is about to run.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
self.test_label = '(%s/%s) %s: ' % (self.tests_run,
self.tests_total,
test.tagged_name)
self.test_label = '(%s/%s) %s: ' % (self.tests_run,
self.tests_total,
state['tagged_name'])
self.stream.info(msg=self.test_label, skip_newline=True)
def end_test(self, test):
def end_test(self, state):
"""
Called when the given test has been run.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.end_test(self, test)
TestResult.end_test(self, state)
def add_pass(self, test):
def add_pass(self, state):
"""
Called when a test succeeded.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.add_pass(self, test)
self.stream.log_pass(test.time_elapsed)
TestResult.add_pass(self, state)
self.stream.log_pass(state['time_elapsed'])
def add_error(self, test):
def add_error(self, state):
"""
Called when a test had a setup error.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.add_error(self, test)
self.stream.log_error(test.time_elapsed)
TestResult.add_error(self, state)
self.stream.log_error(state['time_elapsed'])
def add_fail(self, test):
def add_fail(self, state):
"""
Called when a test fails.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.add_fail(self, test)
self.stream.log_fail(test.time_elapsed)
TestResult.add_fail(self, state)
self.stream.log_fail(state['time_elapsed'])
def add_skip(self, test):
def add_skip(self, state):
"""
Called when a test is skipped.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.add_skip(self, test)
self.stream.log_skip(test.time_elapsed)
TestResult.add_skip(self, state)
self.stream.log_skip(state['time_elapsed'])
def add_warn(self, test):
def add_warn(self, state):
"""
Called when a test had a warning.
:param test: an instance of :class:`avocado.test.Test`.
:param state: result of :class:`avocado.test.Test.get_state`.
:type state: dict
"""
TestResult.add_warn(self, test)
self.stream.log_warn(test.time_elapsed)
TestResult.add_warn(self, state)
self.stream.log_warn(state['time_elapsed'])
def throbber_progress(self):
self.stream.throbber_progress()
......@@ -187,12 +187,12 @@ class Test(unittest.TestCase):
def __repr__(self):
return "Test(%r)" % self.tagged_name
def __getstate__(self):
def get_state(self):
"""
Pickle only selected attributes of the class for serialization.
Serialize selected attributes representing the test state
The fact we serialize the class means you'll have to modify this
class if you intend to make significant changes to its structure.
:returns: a dictionary containing relevant test state data
:rtype: dict
"""
orig = dict(self.__dict__)
d = {}
......@@ -204,7 +204,9 @@ class Test(unittest.TestCase):
for key in sorted(orig):
if key in preserve_attr:
d[key] = orig[key]
d['params'] = orig['_raw_params']
d['params'] = dict(orig['params'])
d['class_name'] = self.__class__.__name__
d['job_unique_id'] = self.job.unique_id
return d
def _set_default(self, key, default):
......
......@@ -102,6 +102,17 @@ class RunnerOperationTest(unittest.TestCase):
output,
"Test did not fail with timeout exception")
def test_runner_abort(self):
os.chdir(basedir)
cmd_line = './scripts/avocado --xunit run abort'
result = process.run(cmd_line, ignore_status=True)
expected_rc = 1
unexpected_rc = 3
self.assertNotEqual(result.exit_status, unexpected_rc,
"Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
self.assertEqual(result.exit_status, expected_rc,
"Avocado did not return rc %d:\n%s" % (expected_rc, result))
class RunnerDropinTest(unittest.TestCase):
......
......@@ -29,6 +29,7 @@ if os.path.isdir(os.path.join(basedir, 'avocado')):
from avocado.plugins import jsonresult
from avocado import test
from avocado import job
class _Stream(object):
......@@ -53,7 +54,7 @@ class JSONResultTest(unittest.TestCase):
self.test_result = jsonresult.JSONTestResult(stream, args)
self.test_result.filename = self.tmpfile[1]
self.test_result.start_tests()
self.test1 = test.Test()
self.test1 = test.Test(job=job.Job())
self.test1.status = 'PASS'
self.test1.time_elapsed = 1.23
......@@ -63,7 +64,7 @@ class JSONResultTest(unittest.TestCase):
def testAddSuccess(self):
self.test_result.start_test(self.test1)
self.test_result.end_test(self.test1)
self.test_result.end_test(self.test1.get_state())
self.test_result.end_tests()
self.assertTrue(self.test_result.json)
with open(self.test_result.filename) as fp:
......
......@@ -69,7 +69,7 @@ class VMResultTest(unittest.TestCase):
time=tst['time'],
status=tst['status'])
self.test_result.start_test(test)
self.test_result.check_test(test)
self.test_result.check_test(test.get_state())
if not status.mapping[test.status]:
failures.append(test.tagged_name)
self.test_result.end_tests()
......
......@@ -29,6 +29,7 @@ if os.path.isdir(os.path.join(basedir, 'avocado')):
from avocado.plugins import xunit
from avocado import test
from avocado import job
class ParseXMLError(Exception):
......@@ -43,7 +44,7 @@ class xUnitSucceedTest(unittest.TestCase):
args.xunit_output = self.tmpfile[1]
self.test_result = xunit.xUnitTestResult(args=args)
self.test_result.start_tests()
self.test1 = test.Test()
self.test1 = test.Test(job=job.Job())
self.test1.status = 'PASS'
self.test1.time_elapsed = 1.23
......@@ -53,7 +54,7 @@ class xUnitSucceedTest(unittest.TestCase):
def testAddSuccess(self):
self.test_result.start_test(self.test1)
self.test_result.end_test(self.test1)
self.test_result.end_test(self.test1.get_state())
self.test_result.end_tests()
self.assertTrue(self.test_result.xml)
with open(self.test_result.output) as fp:
......
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
# Author: Ruda Moura <rmoura@redhat.com>
import os
from avocado import test
from avocado import job
class abort(test.Test):
"""
A test that just calls abort() (and abort).
"""
def action(self):
os.abort()
if __name__ == "__main__":
job.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册