test.py 16.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; specifically version 2 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
11 12 13
# This code was inspired in the autotest project,
# client/shared/test.py
# Authors: Martin J Bligh <mbligh@google.com>, Andy Whitcroft <apw@shadowen.org>
14

L
Lucas Meneghel Rodrigues 已提交
15 16 17 18 19
"""
Contains the base test implementation, used as a base for the actual
framework tests.
"""

20
import inspect
L
Lucas Meneghel Rodrigues 已提交
21
import logging
22
import os
23 24 25 26 27
import sys
import time
import traceback
import unittest

28
from avocado.core import data_dir
L
Lucas Meneghel Rodrigues 已提交
29
from avocado.core import exceptions
30
from avocado.utils import path
31
from avocado.utils import process
32
from avocado.utils.params import Params
33
from avocado import sysinfo
34
from avocado.version import VERSION
L
Lucas Meneghel Rodrigues 已提交
35

36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
log = logging.getLogger("avocado.test")


def tb_info(exc_info):
    """
    Prepare traceback info.

    :param exc_info: Exception info produced by sys.exc_info()
    """
    exc_type, exc_value, exc_traceback = exc_info
    tb_info = traceback.format_exception(exc_type, exc_value,
                                         exc_traceback.tb_next)
    return tb_info


def log_exc_info(exc_info):
    """
    Log exception info.

    :param exc_info: Exception info produced by sys.exc_info()
    """
57
    log.error('')
58 59 60
    for line in tb_info(exc_info):
        for l in line.splitlines():
            log.error(l)
61
    log.error('')
62 63 64 65 66 67 68 69 70 71


def prepare_exc_info(exc_info):
    """
    Prepare traceback info.

    :param exc_info: Exception info produced by sys.exc_info()
    """
    return "".join(tb_info(exc_info))

72

73
class Test(unittest.TestCase):
74 75

    """
L
Lucas Meneghel Rodrigues 已提交
76 77 78 79
    Base implementation for the test class.

    You'll inherit from this to write your own tests. Tipically you'll want
    to implement setup(), action() and cleanup() methods on your own tests.
80
    """
81
    default_params = {}
82

83
    def __init__(self, methodName='runTest', name=None, params=None,
84
                 base_logdir=None, tag=None, job=None, runner_queue=None):
L
Lucas Meneghel Rodrigues 已提交
85 86 87
        """
        Initializes the test.

88 89 90
        :param methodName: Name of the main method to run. For the sake of
                           compatibility with the original unittest class,
                           you should not set this.
91 92 93
        :param name: Pretty name of the test name. For normal tests, written
                     with the avocado API, this should not be set, this is
                     reserved for running random executables as tests.
94
        :param base_logdir: Directory where test logs should go. If None
95 96
                            provided, it'll use
                            :func:`avocado.core.data_dir.get_job_logs_dir`.
L
Lucas Meneghel Rodrigues 已提交
97
        :param tag: Tag that differentiates 2 executions of the same test name.
98 99
                    Example: 'long', 'short', so we can differentiate
                    'sleeptest.long' and 'sleeptest.short'.
100
        :param job: The job that this test is part of.
L
Lucas Meneghel Rodrigues 已提交
101
        """
102 103 104 105
        if name is not None:
            self.name = name
        else:
            self.name = self.__class__.__name__
106

107 108 109
        if params is None:
            params = {}
        self.params = Params(params)
110
        self._raw_params = params
111 112 113 114 115 116 117 118

        shortname = self.params.get('shortname')
        s_tag = None
        if shortname:
            split_shortname = shortname.split('.')
            if len(split_shortname) > 1:
                s_tag = ".".join(split_shortname[1:])
        self.tag = tag or s_tag
119
        self.job = job
120 121 122

        basename = os.path.basename(self.name)

123
        self.basedir = os.path.dirname(inspect.getfile(self.__class__))
124 125
        self.datadir = os.path.join(self.basedir, '%s.data' % basename)
        self.workdir = path.init_dir(data_dir.get_tmp_dir(), basename)
126
        self.srcdir = path.init_dir(self.workdir, 'src')
127
        if base_logdir is None:
128
            base_logdir = data_dir.get_job_logs_dir()
129
        base_logdir = os.path.join(base_logdir, 'test-results')
130
        self.tagged_name = self.get_tagged_name(base_logdir)
131

132
        self.logdir = path.init_dir(base_logdir, self.tagged_name)
133
        self.logfile = os.path.join(self.logdir, 'debug.log')
134 135
        self.outputdir = path.init_dir(self.logdir, 'data')
        self.sysinfodir = path.init_dir(self.logdir, 'sysinfo')
136
        self.sysinfo_logger = sysinfo.SysInfo(basedir=self.sysinfodir)
137 138 139

        self.log = logging.getLogger("avocado.test")

140 141
        self.log.info('START %s', self.tagged_name)
        self.log.debug('')
142 143 144
        self.log.debug('Test instance parameters:')

        # Set the helper set_default to the params object
145
        setattr(self.params, 'set_default', self._set_default)
146 147

        # Apply what comes from the params dict
148 149 150 151
        for key in sorted(self.params.keys()):
            self.log.debug('    %s = %s', key, self.params.get(key))
        self.log.debug('')

152 153 154 155 156 157 158 159 160
        # Apply what comes from the default_params dict
        self.log.debug('Default parameters:')
        for key in sorted(self.default_params.keys()):
            self.log.debug('    %s = %s', key, self.default_params.get(key))
            self.params.set_default(key, self.default_params[key])
        self.log.debug('')
        self.log.debug('Test instance params override defaults whenever available')
        self.log.debug('')

161
        # If there's a timeout set, log a timeout reminder
162
        if self.params.timeout:
163 164 165 166 167
            self.log.info('Test timeout set. Will wait %.2f s for '
                          'PID %s to end',
                          float(self.params.timeout), os.getpid())
            self.log.info('')

168 169
        self.debugdir = None
        self.resultsdir = None
170
        self.status = None
171
        self.fail_reason = None
172
        self.fail_class = None
173
        self.traceback = None
174
        self.text_output = None
175

176
        self.whiteboard = ''
C
Cleber Rosa 已提交
177

178 179 180 181
        self.running = False
        self.time_start = None
        self.time_end = None

182 183
        self.runner_queue = runner_queue

184
        self.time_elapsed = None
185 186 187 188 189 190 191
        unittest.TestCase.__init__(self)

    def __str__(self):
        return str(self.name)

    def __repr__(self):
        return "Test(%r)" % self.tagged_name
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    def tag_start(self):
        self.running = True
        self.time_start = time.time()

    def tag_end(self):
        self.running = False
        self.time_end = time.time()
        # for consistency sake, always use the same stupid method
        self.update_time_elapsed(self.time_end)

    def update_time_elapsed(self, current_time=None):
        if current_time is None:
            current_time = time.time()
        self.time_elapsed = current_time - self.time_start

208
    def get_state(self):
209
        """
210
        Serialize selected attributes representing the test state
211

212 213
        :returns: a dictionary containing relevant test state data
        :rtype: dict
214
        """
215 216 217
        if self.running and self.time_start:
            self.update_time_elapsed()

218 219 220 221 222 223
        orig = dict(self.__dict__)
        d = {}
        preserve_attr = ['basedir', 'debugdir', 'depsdir', 'fail_class',
                         'fail_reason', 'logdir', 'logfile', 'name',
                         'resultsdir', 'srcdir', 'status', 'sysinfodir',
                         'tag', 'tagged_name', 'text_output', 'time_elapsed',
224 225
                         'traceback', 'workdir', 'whiteboard', 'time_start',
                         'time_end', 'running']
226 227 228
        for key in sorted(orig):
            if key in preserve_attr:
                d[key] = orig[key]
229
        d['params'] = dict(orig['params'])
230
        d['class_name'] = self.__class__.__name__
C
Cleber Rosa 已提交
231
        d['job_unique_id'] = self.job.unique_id
232 233
        return d

234 235 236 237 238 239
    def _set_default(self, key, default):
        try:
            self.params[key]
        except Exception:
            self.params[key] = default

240
    def get_data_path(self, basename):
241
        """
242 243 244 245
        Find a test dependency path inside the test data dir.

        This is a short hand for an operation that will be commonly
        used on avocado tests, so we feel it deserves its own API.
246 247 248 249 250

        :param basename: Basename of the dep file. Ex: ``testsuite.tar.bz2``.

        :return: Path where dependency is supposed to be found.
        """
251
        return os.path.join(self.datadir, basename)
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266
    def start_logging(self):
        """
        Simple helper for adding a file logger to the root logger.
        """
        self.file_handler = logging.FileHandler(filename=self.logfile)
        self.file_handler.setLevel(logging.DEBUG)

        fmt = '%(asctime)s %(levelname)-5.5s| %(message)s'
        formatter = logging.Formatter(fmt=fmt, datefmt='%H:%M:%S')

        self.file_handler.setFormatter(formatter)
        self.log.addHandler(self.file_handler)

    def stop_logging(self):
267 268 269
        """
        Stop the logging activity of the test by cleaning the logger handlers.
        """
270 271
        self.log.removeHandler(self.file_handler)

272
    def get_tagged_name(self, logdir):
273 274 275 276 277 278 279 280 281 282 283
        """
        Get a test tagged name.

        If a test tag is defined, just return name.tag. If tag is absent,
        it'll try to find a tag that is not already taken (so there are no
        clashes in the results directory).

        :param logdir: Log directory being in use for result storage.

        :return: String `test.tag`.
        """
284 285
        if self.name.startswith('/'):
            self.name = self.name[1:]
286
        if self.tag is not None:
287
            return "%s.%s" % (self.name, self.tag)
288 289 290 291 292 293

        tag = 0
        if tag == 0:
            tagged_name = self.name
        else:
            tagged_name = "%s.%s" % (self.name, tag)
294 295 296
        test_logdir = os.path.join(logdir, tagged_name)
        while os.path.isdir(test_logdir):
            tag += 1
297
            tagged_name = "%s.%s" % (self.name, tag)
298
            test_logdir = os.path.join(logdir, tagged_name)
299
        self.tag = str(tag)
300

301 302
        return tagged_name

303
    def setup(self):
L
Lucas Meneghel Rodrigues 已提交
304 305 306 307 308 309 310
        """
        Setup stage that the test needs before passing to the actual action.

        Must be implemented by tests if they want such an stage. Commonly we'll
        download/compile test suites, create files needed for a test, among
        other possibilities.
        """
311 312 313
        pass

    def action(self):
L
Lucas Meneghel Rodrigues 已提交
314 315 316 317 318 319 320 321 322 323
        """
        Actual test payload. Must be implemented by tests.

        In case of an existing test suite wrapper, it'll execute the suite,
        or perform a series of operations, and based in the results of the
        operations decide if the test pass (let the test complete) or fail
        (raise a test related exception).
        """
        raise NotImplementedError('Test subclasses must implement an action '
                                  'method')
324

325
    def cleanup(self):
L
Lucas Meneghel Rodrigues 已提交
326 327 328 329 330 331 332
        """
        Cleanup stage after the action is done.

        Examples of cleanup actions are deleting temporary files, restoring
        firewall configurations or other system settings that were changed
        in setup.
        """
333
        pass
334

335 336 337
    def runTest(self, result=None):
        """
        Run test method, for compatibility with unittest.TestCase.
338 339

        :result: Unused param, compatibiltiy with :class:`unittest.TestCase`.
340
        """
341
        self.start_logging()
342
        self.sysinfo_logger.start_test_hook()
343 344
        action_exception = None
        cleanup_exception = None
345 346 347
        try:
            self.setup()
        except Exception, details:
348
            log_exc_info(sys.exc_info())
349
            raise exceptions.TestSetupFail(details)
350
        try:
351
            self.action()
352
        except Exception, details:
353
            log_exc_info(sys.exc_info())
354 355 356 357 358 359 360 361 362 363 364 365 366
            action_exception = details
        finally:
            try:
                self.cleanup()
            except Exception, details:
                log_exc_info(sys.exc_info())
                cleanup_exception = details
        # pylint: disable=E0702
        if action_exception is not None:
            raise action_exception
        elif cleanup_exception is not None:
            raise exceptions.TestSetupFail(cleanup_exception)

367
        self.status = 'PASS'
368
        self.sysinfo_logger.end_test_hook()
369

370 371 372 373 374 375 376 377 378 379 380
    def _setup_environment_variables(self):
        os.environ['AVOCADO_VERSION'] = VERSION
        os.environ['AVOCADO_TEST_BASEDIR'] = self.basedir
        os.environ['AVOCADO_TEST_DATADIR'] = self.datadir
        os.environ['AVOCADO_TEST_WORKDIR'] = self.workdir
        os.environ['AVOCADO_TEST_SRCDIR'] = self.srcdir
        os.environ['AVOCADO_TEST_LOGDIR'] = self.logdir
        os.environ['AVOCADO_TEST_LOGFILE'] = self.logfile
        os.environ['AVOCADO_TEST_OUTPUTDIR'] = self.outputdir
        os.environ['AVOCADO_TEST_SYSINFODIR'] = self.sysinfodir

381 382 383
    def run_avocado(self, result=None):
        """
        Wraps the runTest metod, for execution inside the avocado runner.
384 385

        :result: Unused param, compatibiltiy with :class:`unittest.TestCase`.
386
        """
387
        self._setup_environment_variables()
388
        try:
389
            self.tag_start()
390
            self.runTest(result)
391 392
        except exceptions.TestBaseException, detail:
            self.status = detail.status
393
            self.fail_class = detail.__class__.__name__
394
            self.fail_reason = detail
395
            self.traceback = prepare_exc_info(sys.exc_info())
396 397
        except AssertionError, detail:
            self.status = 'FAIL'
398
            self.fail_class = detail.__class__.__name__
399
            self.fail_reason = detail
400
            self.traceback = prepare_exc_info(sys.exc_info())
401
        except Exception, detail:
402 403 404
            self.status = 'FAIL'
            self.fail_class = detail.__class__.__name__
            self.fail_reason = detail
405 406 407
            exc_type, exc_value, exc_traceback = sys.exc_info()
            tb_info = traceback.format_exception(exc_type, exc_value,
                                                 exc_traceback.tb_next)
408 409
            self.traceback = "".join(tb_info)
            for e_line in tb_info:
410 411
                self.log.error(e_line)
        finally:
412
            self.tag_end()
413
            self.report()
414
            self.log.info("")
415 416
            with open(self.logfile, 'r') as log_file_obj:
                self.text_output = log_file_obj.read()
417 418
            self.stop_logging()

419
    def report(self):
420 421 422
        """
        Report result to the logging system.
        """
423 424 425 426 427 428 429
        if self.fail_reason is not None:
            self.log.error("%s %s -> %s: %s", self.status,
                           self.tagged_name,
                           self.fail_reason.__class__.__name__,
                           self.fail_reason)

        else:
430 431
            if self.status is None:
                self.status = 'INTERRUPTED'
432 433
            self.log.info("%s %s", self.status,
                          self.tagged_name)
434 435 436 437 438 439 440 441


class DropinTest(Test):

    """
    Run an arbitrary command that returns either 0 (PASS) or !=0 (FAIL).
    """

442
    def __init__(self, path, params=None, base_logdir=None, tag=None, job=None):
443
        self.path = os.path.abspath(path)
444
        super(DropinTest, self).__init__(name=path, base_logdir=base_logdir,
445
                                         params=params, tag=tag, job=job)
446

447
    def _log_detailed_cmd_info(self, result):
448 449 450 451 452
        """
        Log detailed command information.

        :param result: :class:`avocado.utils.process.CmdResult` instance.
        """
453 454 455 456
        run_info = str(result)
        for line in run_info.splitlines():
            self.log.info(line)

457
    def action(self):
458 459 460
        """
        Run the executable, and log its detailed execution.
        """
461 462 463 464 465 466
        try:
            result = process.run(self.path, verbose=True)
            self._log_detailed_cmd_info(result)
        except exceptions.CmdError, details:
            self._log_detailed_cmd_info(details.result)
            raise exceptions.TestFail(details)
467 468 469 470 471 472 473 474


class MissingTest(Test):

    """
    Handle when there is no such test module in the test directory.
    """

475 476
    def __init__(self, name=None, params=None, base_logdir=None, tag=None,
                 job=None):
477 478 479 480 481
        super(MissingTest, self).__init__(name=name,
                                          base_logdir=base_logdir,
                                          tag=tag, job=job)

    def action(self):
482
        e_msg = ('Test %s could not be found in the test dir %s '
483 484 485
                 '(or test path does not exist)' %
                 (self.name, data_dir.get_test_dir()))
        raise exceptions.TestNotFoundError(e_msg)