test_basic.py 59.1 KB
Newer Older
1
# This Python file uses the following encoding: utf-8
2 3
import aexpect
import glob
4
import json
5
import os
6
import re
7
import shutil
8
import signal
9
import sys
10
import tempfile
11
import time
12
import xml.dom.minidom
13
import zipfile
14
import unittest
15
import psutil
16
import pkg_resources
17

18 19 20 21
try:
    from io import BytesIO
except:
    from BytesIO import BytesIO
22

23 24 25 26 27 28
try:
    from lxml import etree
    SCHEMA_CAPABLE = True
except ImportError:
    SCHEMA_CAPABLE = False

29
from six import iteritems
30
from six.moves import xrange as range
31

32
from avocado.core import exit_codes
33
from avocado.utils import astring
34
from avocado.utils import genio
35 36
from avocado.utils import process
from avocado.utils import script
37
from avocado.utils import path as utils_path
38

39
basedir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..')
40 41
basedir = os.path.abspath(basedir)

42 43
AVOCADO = os.environ.get("UNITTEST_AVOCADO_CMD", "./scripts/avocado")

44 45 46 47 48 49 50 51 52
LOCAL_IMPORT_TEST_CONTENTS = '''
from avocado import Test
from mylib import hello

class LocalImportTest(Test):
    def test(self):
        self.log.info(hello())
'''

53 54 55 56 57 58
UNSUPPORTED_STATUS_TEST_CONTENTS = '''
from avocado import Test

class FakeStatusTest(Test):
    def run_avocado(self):
        super(FakeStatusTest, self).run_avocado()
59 60
        # Please do NOT ever use this, it's for unittesting only.
        self._Test__status = 'not supported'
61 62 63 64 65

    def test(self):
        pass
'''

66 67 68 69 70 71 72 73 74 75 76
INVALID_PYTHON_TEST = '''
from avocado import Test

class MyTest(Test):

    non_existing_variable_causing_crash

    def test_my_name(self):
        pass
'''

77

78 79 80 81 82 83 84 85 86 87 88 89
VALID_PYTHON_TEST_WITH_TAGS = '''
from avocado import Test

class MyTest(Test):
    def test(self):
         """
         :avocado: tags=BIG_TAG_NAME
         """
         pass
'''


90 91 92 93 94 95 96
REPORTS_STATUS_AND_HANG = '''
from avocado import Test
import time

class MyTest(Test):
    def test(self):
         self.runner_queue.put({"running": False})
97
         time.sleep(70)
98 99
'''

100

101 102 103 104 105 106 107 108 109 110 111
DIE_WITHOUT_REPORTING_STATUS = '''
from avocado import Test
import os
import signal

class MyTest(Test):
    def test(self):
         os.kill(os.getpid(), signal.SIGKILL)
'''


112 113 114 115 116 117 118 119 120 121 122 123 124
RAISE_CUSTOM_PATH_EXCEPTION_CONTENT = '''import os
import sys

from avocado import Test

class SharedLibTest(Test):
    def test(self):
        sys.path.append(os.path.join(os.path.dirname(__file__), "shared_lib"))
        from mylib import CancelExc
        raise CancelExc("This should not crash on unpickling in runner")
'''


A
Amador Pahim 已提交
125
def probe_binary(binary):
126
    try:
A
Amador Pahim 已提交
127
        return utils_path.find_command(binary)
128
    except utils_path.CmdNotFoundError:
A
Amador Pahim 已提交
129 130
        return None

L
Lukáš Doktor 已提交
131

132
TRUE_CMD = probe_binary('true')
A
Amador Pahim 已提交
133
CC_BINARY = probe_binary('cc')
134

L
Lukáš Doktor 已提交
135
# On macOS, the default GNU core-utils installation (brew)
136 137 138 139 140
# installs the gnu utility versions with a g prefix. It still has the
# BSD versions of the core utilities installed on their expected paths
# but their behavior and flags are in most cases different.
GNU_ECHO_BINARY = probe_binary('echo')
if GNU_ECHO_BINARY is not None:
141 142
    if probe_binary('man') is not None:
        echo_manpage = process.run('man %s' % os.path.basename(GNU_ECHO_BINARY)).stdout
143
        if b'-e' not in echo_manpage:
144
            GNU_ECHO_BINARY = probe_binary('gecho')
A
Amador Pahim 已提交
145 146
READ_BINARY = probe_binary('read')
SLEEP_BINARY = probe_binary('sleep')
147 148


149 150 151 152 153 154 155 156
def html_capable():
    try:
        pkg_resources.require('avocado-framework-plugin-result-html')
        return True
    except pkg_resources.DistributionNotFound:
        return False


157 158
class RunnerOperationTest(unittest.TestCase):

159
    def setUp(self):
160
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
161
        os.chdir(basedir)
162

163
    def test_show_version(self):
164
        result = process.run('%s -v' % AVOCADO, ignore_status=True)
165
        self.assertEqual(result.exit_status, 0)
166
        self.assertTrue(re.match(r"^Avocado \d+\.\d+$", result.stderr_text),
C
Cleber Rosa 已提交
167
                        "Version string does not match 'Avocado \\d\\.\\d:'\n"
168
                        "%r" % (result.stderr_text))
169

170 171 172 173 174 175 176 177 178 179 180 181 182 183
    def test_alternate_config_datadir(self):
        """
        Uses the "--config" flag to check custom configuration is applied

        Even on the more complex data_dir module, which adds extra checks
        to what is set on the plain settings module.
        """
        base_dir = os.path.join(self.tmpdir, 'datadir_base')
        os.mkdir(base_dir)
        mapping = {'base_dir': base_dir,
                   'test_dir': os.path.join(base_dir, 'test'),
                   'data_dir': os.path.join(base_dir, 'data'),
                   'logs_dir': os.path.join(base_dir, 'logs')}
        config = '[datadir.paths]'
184
        for key, value in iteritems(mapping):
185 186 187 188 189 190 191
            if not os.path.isdir(value):
                os.mkdir(value)
            config += "%s = %s\n" % (key, value)
        fd, config_file = tempfile.mkstemp(dir=self.tmpdir)
        os.write(fd, config)
        os.close(fd)

192
        cmd = '%s --config %s config --datadir' % (AVOCADO, config_file)
193 194 195 196 197
        result = process.run(cmd)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
198 199 200
        self.assertIn('    base     ' + mapping['base_dir'], result.stdout_text)
        self.assertIn('    data     ' + mapping['data_dir'], result.stdout_text)
        self.assertIn('    logs     ' + mapping['logs_dir'], result.stdout_text)
201

202
    def test_runner_all_ok(self):
203 204
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py passtest.py' % (AVOCADO, self.tmpdir))
205
        process.run(cmd_line)
206
        # Also check whether jobdata contains correct parameter paths
207 208
        variants = open(os.path.join(self.tmpdir, "latest", "jobdata",
                        "variants.json")).read()
209
        self.assertIn('["/run/*"]', variants, "paths stored in jobdata "
210
                      "does not contains [\"/run/*\"]\n%s" % variants)
211

212
    def test_runner_failfast(self):
213 214 215
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py failtest.py passtest.py --failfast on'
                    % (AVOCADO, self.tmpdir))
216
        result = process.run(cmd_line, ignore_status=True)
217 218
        self.assertIn(b'Interrupting job (failfast).', result.stdout)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 1 | SKIP 1', result.stdout)
219 220 221 222
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL | exit_codes.AVOCADO_JOB_INTERRUPTED
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

A
Amador Pahim 已提交
223 224 225 226 227
    def test_runner_ignore_missing_references_one_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py badtest.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
228 229
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py'", result.stderr)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 0 | SKIP 0', result.stdout)
A
Amador Pahim 已提交
230 231 232 233 234 235 236 237 238
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_ignore_missing_references_all_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'badtest.py badtest2.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
239
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py', 'badtest2.py'",
A
Amador Pahim 已提交
240
                      result.stderr)
241
        self.assertEqual(b'', result.stdout)
A
Amador Pahim 已提交
242 243 244 245
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

246 247 248
    def test_runner_test_with_local_imports(self):
        mylib = script.TemporaryScript(
            'mylib.py',
249
            "def hello():\n    return 'Hello world'",
250 251 252 253 254 255
            'avocado_simpletest_functional')
        mylib.save()
        mytest = script.Script(
            os.path.join(os.path.dirname(mylib.path), 'test_local_imports.py'),
            LOCAL_IMPORT_TEST_CONTENTS)
        mytest.save()
256 257
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "%s" % (AVOCADO, self.tmpdir, mytest))
258 259
        process.run(cmd_line)

260 261 262 263
    def test_unsupported_status(self):
        with script.TemporaryScript("fake_status.py",
                                    UNSUPPORTED_STATUS_TEST_CONTENTS,
                                    "avocado_unsupported_status") as tst:
264 265 266
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s"
                              " --json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
267 268 269 270 271
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
272
            self.assertIn("Runner error occurred: Test reports unsupported",
273 274
                          results["tests"][0]["fail_reason"])

275 276 277
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
278 279 280 281 282
    def test_hanged_test_with_status(self):
        """ Check that avocado handles hanged tests properly """
        with script.TemporaryScript("report_status_and_hang.py",
                                    REPORTS_STATUS_AND_HANG,
                                    "hanged_test_with_status") as tst:
283
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
284
                              "--json - --job-timeout 1" % (AVOCADO, self.tmpdir, tst),
285
                              ignore_status=True)
286 287 288 289 290 291 292
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test reported status but did not finish",
                          results["tests"][0]["fail_reason"])
293 294 295 296 297
            # Currently it should finish up to 1s after the job-timeout
            # but the prep and postprocess could take a bit longer on
            # some environments, so let's just check it does not take
            # > 60s, which is the deadline for force-finishing the test.
            self.assertLess(res.duration, 55, "Test execution took too long, "
298 299 300 301 302 303 304
                            "which is likely because the hanged test was not "
                            "interrupted. Results:\n%s" % res)

    def test_no_status_reported(self):
        with script.TemporaryScript("die_without_reporting_status.py",
                                    DIE_WITHOUT_REPORTING_STATUS,
                                    "no_status_reported") as tst:
305 306 307
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
                              "--json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
308 309 310 311 312 313 314 315
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test died without reporting the status",
                          results["tests"][0]["fail_reason"])

316
    def test_runner_tests_fail(self):
317 318
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s passtest.py '
                    'failtest.py passtest.py' % (AVOCADO, self.tmpdir))
319
        result = process.run(cmd_line, ignore_status=True)
320
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
321 322 323 324
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_nonexistent_test(self):
325 326
        cmd_line = ('%s run --sysinfo=off --job-results-dir '
                    '%s bogustest' % (AVOCADO, self.tmpdir))
327
        result = process.run(cmd_line, ignore_status=True)
328 329
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
330 331 332 333 334
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

335
    def test_runner_doublefail(self):
336 337
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - doublefail.py' % (AVOCADO, self.tmpdir))
338
        result = process.run(cmd_line, ignore_status=True)
339 340
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
341 342 343 344
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
345
        self.assertIn(b"TestError: Failing during tearDown. Yay!", result.stdout,
346
                      "Cleanup exception not printed to log output")
347 348
        self.assertIn(b"TestFail: This test is supposed to fail", result.stdout,
                      "Test did not fail with action exception:\n%s" % result.stdout)
349

350
    def test_uncaught_exception(self):
351 352
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - uncaught_exception.py" % (AVOCADO, self.tmpdir))
353
        result = process.run(cmd_line, ignore_status=True)
354
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
355 356 357
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
358
        self.assertIn(b'"status": "ERROR"', result.stdout)
359

360
    def test_fail_on_exception(self):
361 362
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - fail_on_exception.py" % (AVOCADO, self.tmpdir))
363
        result = process.run(cmd_line, ignore_status=True)
364
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
365 366 367
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
368
        self.assertIn(b'"status": "FAIL"', result.stdout)
369

370 371 372 373 374 375 376 377 378 379 380 381 382 383
    def test_exception_not_in_path(self):
        os.mkdir(os.path.join(self.tmpdir, "shared_lib"))
        mylib = script.Script(os.path.join(self.tmpdir, "shared_lib",
                                           "mylib.py"),
                              "from avocado import TestCancel\n\n"
                              "class CancelExc(TestCancel):\n"
                              "    pass")
        mylib.save()
        mytest = script.Script(os.path.join(self.tmpdir, "mytest.py"),
                               RAISE_CUSTOM_PATH_EXCEPTION_CONTENT)
        mytest.save()
        result = process.run("%s --show test run --sysinfo=off "
                             "--job-results-dir %s %s"
                             % (AVOCADO, self.tmpdir, mytest))
384 385
        self.assertIn(b"mytest.py:SharedLibTest.test -> CancelExc: This "
                      b"should not crash on unpickling in runner",
386
                      result.stdout)
387
        self.assertNotIn(b"Failed to read queue", result.stdout)
388

389
    def test_runner_timeout(self):
390 391
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - timeouttest.py' % (AVOCADO, self.tmpdir))
392 393
        result = process.run(cmd_line, ignore_status=True)
        output = result.stdout
394
        expected_rc = exit_codes.AVOCADO_JOB_INTERRUPTED
395
        unexpected_rc = exit_codes.AVOCADO_FAIL
396 397 398 399
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
400
        self.assertIn(b"Runner error occurred: Timeout reached", output,
401
                      "Timeout reached message not found in the output:\n%s" % output)
402
        # Ensure no test aborted error messages show up
403
        self.assertNotIn(b"TestAbortedError: Test aborted unexpectedly", output)
404

405
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
406 407
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
408
    def test_runner_abort(self):
409 410
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - abort.py' % (AVOCADO, self.tmpdir))
411
        result = process.run(cmd_line, ignore_status=True)
412
        excerpt = b'Test died without reporting the status.'
413 414
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
415 416 417 418
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
419
        self.assertIn(excerpt, result.stdout)
420

421
    def test_silent_output(self):
422 423
        cmd_line = ('%s --silent run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
424
        result = process.run(cmd_line, ignore_status=True)
425
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
426
        self.assertEqual(result.stdout, b'')
427

428
    def test_empty_args_list(self):
429
        cmd_line = AVOCADO
430
        result = process.run(cmd_line, ignore_status=True)
431
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_FAIL)
432
        self.assertIn(b'error: too few arguments', result.stderr)
433

434
    def test_empty_test_list(self):
435 436
        cmd_line = '%s run --sysinfo=off --job-results-dir %s' % (AVOCADO,
                                                                  self.tmpdir)
437
        result = process.run(cmd_line, ignore_status=True)
438
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
439 440
        self.assertIn(b'No test references provided nor any other arguments '
                      b'resolved into tests', result.stderr)
441

442
    def test_not_found(self):
443 444
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s sbrubles'
                    % (AVOCADO, self.tmpdir))
445
        result = process.run(cmd_line, ignore_status=True)
446
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
447 448
        self.assertIn(b'Unable to resolve reference', result.stderr)
        self.assertNotIn(b'Unable to resolve reference', result.stdout)
449

450
    def test_invalid_unique_id(self):
451 452
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s --force-job-id '
                    'foobar passtest.py' % (AVOCADO, self.tmpdir))
453
        result = process.run(cmd_line, ignore_status=True)
454
        self.assertNotEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
455 456
        self.assertIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertNotIn(b'needs to be a 40 digit hex', result.stdout)
457 458

    def test_valid_unique_id(self):
459
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
460
                    '--force-job-id 975de258ac05ce5e490648dec4753657b7ccc7d1 '
461
                    'passtest.py' % (AVOCADO, self.tmpdir))
462
        result = process.run(cmd_line, ignore_status=True)
463
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
464 465
        self.assertNotIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertIn(b'PASS', result.stdout)
466

467
    def test_automatic_unique_id(self):
468 469
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'passtest.py --json -' % (AVOCADO, self.tmpdir))
470
        result = process.run(cmd_line, ignore_status=True)
471
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
472 473 474 475
        r = json.loads(result.stdout)
        int(r['job_id'], 16)  # it's an hex number
        self.assertEqual(len(r['job_id']), 40)

476 477 478 479
    def test_early_latest_result(self):
        """
        Tests that the `latest` link to the latest job results is created early
        """
480 481
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'examples/tests/passtest.py' % (AVOCADO, self.tmpdir))
482
        avocado_process = process.SubProcess(cmd_line)
483 484 485 486 487 488 489 490 491 492 493 494
        try:
            avocado_process.start()
            link = os.path.join(self.tmpdir, 'latest')
            for trial in range(0, 50):
                time.sleep(0.1)
                if os.path.exists(link) and os.path.islink(link):
                    avocado_process.wait()
                    break
            self.assertTrue(os.path.exists(link))
            self.assertTrue(os.path.islink(link))
        finally:
            avocado_process.wait()
495

496
    def test_dry_run(self):
497
        cmd = ("%s run --sysinfo=off passtest.py failtest.py "
498
               "gendata.py --json - --mux-inject foo:1 bar:2 baz:3 foo:foo:a"
499
               " foo:bar:b foo:baz:c bar:bar:bar --dry-run" % AVOCADO)
500 501
        result = json.loads(process.run(cmd).stdout)
        debuglog = result['debuglog']
502
        log = genio.read_file(debuglog)
503 504
        # Remove the result dir
        shutil.rmtree(os.path.dirname(os.path.dirname(debuglog)))
505
        self.assertIn(tempfile.gettempdir(), debuglog)   # Use tmp dir, not default location
506 507
        self.assertEqual(result['job_id'], u'0' * 40)
        # Check if all tests were skipped
508
        self.assertEqual(result['cancel'], 4)
509
        for i in range(4):
510 511
            test = result['tests'][i]
            self.assertEqual(test['fail_reason'],
512
                             u'Test cancelled due to --dry-run')
513 514 515 516 517
        # Check if all params are listed
        # The "/:bar ==> 2 is in the tree, but not in any leave so inaccessible
        # from test.
        for line in ("/:foo ==> 1", "/:baz ==> 3", "/foo:foo ==> a",
                     "/foo:bar ==> b", "/foo:baz ==> c", "/bar:bar ==> bar"):
518
            self.assertEqual(log.count(line), 4)
519

520 521 522
    def test_invalid_python(self):
        test = script.make_script(os.path.join(self.tmpdir, 'test.py'),
                                  INVALID_PYTHON_TEST)
523 524
        cmd_line = ('%s --show test run --sysinfo=off '
                    '--job-results-dir %s %s') % (AVOCADO, self.tmpdir, test)
525 526 527 528 529
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
530
        self.assertIn('1-%s:MyTest.test_my_name -> TestError' % test,
531
                      result.stdout_text)
532

A
Amador Pahim 已提交
533
    @unittest.skipIf(not READ_BINARY, "read binary not available.")
534 535 536
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
L
Lukáš Doktor 已提交
537
    def test_read(self):
538
        cmd = "%s run --sysinfo=off --job-results-dir %%s %%s" % AVOCADO
539
        cmd %= (self.tmpdir, READ_BINARY)
540
        result = process.run(cmd, timeout=10, ignore_status=True)
L
Lukáš Doktor 已提交
541 542 543 544 545
        self.assertLess(result.duration, 8, "Duration longer than expected."
                        "\n%s" % result)
        self.assertEqual(result.exit_status, 1, "Expected exit status is 1\n%s"
                         % result)

546 547 548
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

549

550 551 552
class RunnerHumanOutputTest(unittest.TestCase):

    def setUp(self):
553
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
554
        os.chdir(basedir)
555 556

    def test_output_pass(self):
557 558
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
559
        result = process.run(cmd_line, ignore_status=True)
560
        expected_rc = exit_codes.AVOCADO_ALL_OK
561 562 563
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
564
        self.assertIn(b'passtest.py:PassTest.test:  PASS', result.stdout)
565 566

    def test_output_fail(self):
567 568
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'failtest.py' % (AVOCADO, self.tmpdir))
569
        result = process.run(cmd_line, ignore_status=True)
570
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
571 572 573
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
574
        self.assertIn(b'failtest.py:FailTest.test:  FAIL', result.stdout)
575 576

    def test_output_error(self):
577 578
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'errortest.py' % (AVOCADO, self.tmpdir))
579
        result = process.run(cmd_line, ignore_status=True)
580
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
581 582 583
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
584
        self.assertIn(b'errortest.py:ErrorTest.test:  ERROR', result.stdout)
585

A
Amador Pahim 已提交
586
    def test_output_cancel(self):
587 588
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'cancelonsetup.py' % (AVOCADO, self.tmpdir))
589
        result = process.run(cmd_line, ignore_status=True)
590
        expected_rc = exit_codes.AVOCADO_ALL_OK
591 592 593
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
594 595
        self.assertIn(b'PASS 0 | ERROR 0 | FAIL 0 | SKIP 0 | WARN 0 | '
                      b'INTERRUPT 0 | CANCEL 1',
A
Amador Pahim 已提交
596
                      result.stdout)
597

598 599
    @unittest.skipIf(not GNU_ECHO_BINARY,
                     'GNU style echo binary not available')
600
    def test_ugly_echo_cmd(self):
601
        cmd_line = ('%s run --external-runner "%s -ne" '
602
                    '"foo\\\\\\n\\\'\\\\\\"\\\\\\nbar/baz" --job-results-dir %s'
A
Amador Pahim 已提交
603
                    ' --sysinfo=off  --show-job-log' %
604
                    (AVOCADO, GNU_ECHO_BINARY, self.tmpdir))
605 606 607 608 609
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
610 611 612 613
        self.assertIn(b'[stdout] foo', result.stdout, result)
        self.assertIn(b'[stdout] \'"', result.stdout, result)
        self.assertIn(b'[stdout] bar/baz', result.stdout, result)
        self.assertIn(b'PASS 1-foo\\\\n\\\'\\"\\\\nbar/baz',
614
                      result.stdout, result)
615 616 617 618 619 620 621
        # logdir name should escape special chars (/)
        test_dirs = glob.glob(os.path.join(self.tmpdir, 'latest',
                                           'test-results', '*'))
        self.assertEqual(len(test_dirs), 1, "There are multiple directories in"
                         " test-results dir, but only one test was executed: "
                         "%s" % (test_dirs))
        self.assertEqual(os.path.basename(test_dirs[0]),
622
                         "1-foo__n_'____nbar_baz")
623

624
    def test_replay_skip_skipped(self):
625 626
        cmd = ("%s run --job-results-dir %s --json - "
               "cancelonsetup.py" % (AVOCADO, self.tmpdir))
627
        result = process.run(cmd)
628
        result = json.loads(result.stdout)
629
        jobid = str(result["job_id"])
630 631
        cmd = ("%s run --job-results-dir %s --replay %s "
               "--replay-test-status PASS" % (AVOCADO, self.tmpdir, jobid))
632
        process.run(cmd)
633

634 635 636
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

637

638
class RunnerSimpleTest(unittest.TestCase):
639 640

    def setUp(self):
641
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
642
        self.pass_script = script.TemporaryScript(
643
            'ʊʋʉʈɑ ʅʛʌ',
644
            "#!/bin/sh\ntrue",
645
            'avocado_simpletest_functional')
646
        self.pass_script.save()
L
Lukáš Doktor 已提交
647
        self.fail_script = script.TemporaryScript('avocado_fail.sh',
648
                                                  "#!/bin/sh\nfalse",
L
Lukáš Doktor 已提交
649 650
                                                  'avocado_simpletest_'
                                                  'functional')
651
        self.fail_script.save()
652
        os.chdir(basedir)
653

654
    def test_simpletest_pass(self):
655 656
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' "%s"' % (AVOCADO, self.tmpdir, self.pass_script.path))
657
        result = process.run(cmd_line, ignore_status=True)
658
        expected_rc = exit_codes.AVOCADO_ALL_OK
659 660 661 662
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

663
    def test_simpletest_fail(self):
664 665
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' %s' % (AVOCADO, self.tmpdir, self.fail_script.path))
666
        result = process.run(cmd_line, ignore_status=True)
667
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
668 669 670 671
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

672
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
673 674
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
675 676
    def test_runner_onehundred_fail_timing(self):
        """
A
Amador Pahim 已提交
677
        We can be pretty sure that a failtest should return immediately. Let's
678
        run 100 of them and assure they not take more than 30 seconds to run.
679

680 681
        Notice: on a current machine this takes about 0.12s, so 30 seconds is
        considered to be pretty safe here.
682
        """
683
        one_hundred = 'failtest.py ' * 100
684 685
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, one_hundred))
686 687 688
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
689
        self.assertLess(actual_time, 30.0)
690
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
691 692 693
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

694 695 696
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
697 698 699 700 701
    def test_runner_sleep_fail_sleep_timing(self):
        """
        Sleeptest is supposed to take 1 second, let's make a sandwich of
        100 failtests and check the test runner timing.
        """
702 703
        sleep_fail_sleep = ('sleeptest.py ' + 'failtest.py ' * 100 +
                            'sleeptest.py')
704 705
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, sleep_fail_sleep))
706 707 708
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
709
        self.assertLess(actual_time, 33.0)
710
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
711 712 713
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

714 715 716 717
    def test_simplewarning(self):
        """
        simplewarning.sh uses the avocado-bash-utils
        """
718 719 720 721 722
        # simplewarning.sh calls "avocado" without specifying a path
        os.environ['PATH'] += ":" + os.path.join(basedir, 'scripts')
        # simplewarning.sh calls "avocado exec-path" which hasn't
        # access to an installed location for the libexec scripts
        os.environ['PATH'] += ":" + os.path.join(basedir, 'libexec')
723 724 725
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'examples/tests/simplewarning.sh --show-job-log'
                    % (AVOCADO, self.tmpdir))
726
        result = process.run(cmd_line, ignore_status=True)
727 728 729 730
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
731 732 733 734 735 736
        self.assertIn(b'DEBUG| Debug message', result.stdout, result)
        self.assertIn(b'INFO | Info message', result.stdout, result)
        self.assertIn(b'WARN | Warning message (should cause this test to '
                      b'finish with warning)', result.stdout, result)
        self.assertIn(b'ERROR| Error message (ordinary message not changing '
                      b'the results)', result.stdout, result)
737

738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
    @unittest.skipIf(not GNU_ECHO_BINARY, "Uses echo as test")
    def test_fs_unfriendly_run(self):
        os.chdir(basedir)
        commands_path = os.path.join(self.tmpdir, "commands")
        script.make_script(commands_path, "echo '\"\\/|?*<>'")
        config_path = os.path.join(self.tmpdir, "config.conf")
        script.make_script(config_path,
                           "[sysinfo.collectibles]\ncommands = %s"
                           % commands_path)
        cmd_line = ("%s --show all --config %s run --job-results-dir %s "
                    "--sysinfo=on --external-runner %s -- \"'\\\"\\/|?*<>'\""
                    % (AVOCADO, config_path, self.tmpdir, GNU_ECHO_BINARY))
        result = process.run(cmd_line)
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "test-results",
                                                    "1-\'________\'/")))
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "sysinfo", "pre",
                                                    "echo \'________\'")))

        if html_capable():
            with open(os.path.join(self.tmpdir, "latest",
                                   "results.html")) as html_res:
                html_results = html_res.read()
            # test results should replace odd chars with "_"
            self.assertIn(os.path.join("test-results", "1-'________'"),
                          html_results)
            # sysinfo replaces "_" with " "
            self.assertIn("echo '________'", html_results)

768 769 770 771
    def test_non_absolute_path(self):
        avocado_path = os.path.join(basedir, 'scripts', 'avocado')
        test_base_dir = os.path.dirname(self.pass_script.path)
        os.chdir(test_base_dir)
772
        test_file_name = os.path.basename(self.pass_script.path)
773
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
774
                    ' "%s"' % (avocado_path, self.tmpdir, test_file_name))
775 776 777 778 779 780
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

A
Amador Pahim 已提交
781
    @unittest.skipIf(not SLEEP_BINARY, 'sleep binary not available')
782 783 784
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
785
    def test_kill_stopped_sleep(self):
786 787 788 789
        proc = aexpect.Expect("%s run 60 --job-results-dir %s "
                              "--external-runner %s --sysinfo=off "
                              "--job-timeout 3"
                              % (AVOCADO, self.tmpdir, SLEEP_BINARY))
790 791
        proc.read_until_output_matches(["\(1/1\)"], timeout=3,
                                       internal_timeout=0.01)
792 793 794 795
        # We need pid of the avocado process, not the shell executing it
        avocado_shell = psutil.Process(proc.get_pid())
        avocado_proc = avocado_shell.children()[0]
        pid = avocado_proc.pid
796
        os.kill(pid, signal.SIGTSTP)   # This freezes the process
797
        deadline = time.time() + 9
798 799 800
        while time.time() < deadline:
            if not proc.is_alive():
                break
801
            time.sleep(0.1)
802 803
        else:
            proc.kill(signal.SIGKILL)
804
            self.fail("Avocado process still alive 5s after job-timeout:\n%s"
805 806 807 808 809 810 811
                      % proc.get_output())
        output = proc.get_output()
        self.assertIn("ctrl+z pressed, stopping test", output, "SIGTSTP "
                      "message not in the output, test was probably not "
                      "stopped.")
        self.assertIn("TIME", output, "TIME not in the output, avocado "
                      "probably died unexpectadly")
812
        self.assertEqual(proc.get_status(), 8, "Avocado did not finish with "
813
                         "1.")
814 815

        sleep_dir = astring.string_to_safe_path("1-60")
816 817 818 819
        debug_log_path = os.path.join(self.tmpdir, "latest", "test-results",
                                      sleep_dir, "debug.log")

        debug_log = genio.read_file(debug_log_path)
820 821 822 823 824 825 826
        self.assertIn("Runner error occurred: Timeout reached", debug_log,
                      "Runner error occurred: Timeout reached message not "
                      "in the test's debug.log:\n%s" % debug_log)
        self.assertNotIn("Traceback (most recent", debug_log, "Traceback "
                         "present in the test's debug.log file, but it was "
                         "suppose to be stopped and unable to produce it.\n"
                         "%s" % debug_log)
827

828
    def tearDown(self):
829 830
        self.pass_script.remove()
        self.fail_script.remove()
831
        shutil.rmtree(self.tmpdir)
832 833


A
Amador Pahim 已提交
834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877
class RunnerSimpleTestStatus(unittest.TestCase):

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)

        self.config_file = script.TemporaryScript('avocado.conf',
                                                  "[simpletests.status]\n"
                                                  "warn_regex = ^WARN$\n"
                                                  "skip_regex = ^SKIP$\n")
        self.config_file.save()
        os.chdir(basedir)

    def test_simpletest_status(self):
        warn_script = script.TemporaryScript('avocado_warn.sh',
                                             "#!/bin/sh\necho WARN",
                                             'avocado_simpletest_'
                                             'functional')
        warn_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, warn_script.path))
        result = process.system_output(cmd_line, ignore_status=True)
        json_results = json.loads(result)
        self.assertEquals(json_results['tests'][0]['status'], 'WARN')
        warn_script.remove()

        skip_script = script.TemporaryScript('avocado_skip.sh',
                                             "#!/bin/sh\necho SKIP",
                                             'avocado_simpletest_'
                                             'functional')
        skip_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, skip_script.path))
        result = process.system_output(cmd_line, ignore_status=True)
        json_results = json.loads(result)
        self.assertEquals(json_results['tests'][0]['status'], 'SKIP')
        skip_script.remove()

    def tearDown(self):
        self.config_file.remove()
        shutil.rmtree(self.tmpdir)


878
class ExternalRunnerTest(unittest.TestCase):
C
Cleber Rosa 已提交
879 880

    def setUp(self):
881
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
C
Cleber Rosa 已提交
882 883
        self.pass_script = script.TemporaryScript(
            'pass',
884
            "exit 0",
885
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
886 887 888
        self.pass_script.save()
        self.fail_script = script.TemporaryScript(
            'fail',
889
            "exit 1",
890
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
891
        self.fail_script.save()
892
        os.chdir(basedir)
C
Cleber Rosa 已提交
893

894
    def test_externalrunner_pass(self):
895 896 897
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
898
        result = process.run(cmd_line, ignore_status=True)
899
        expected_rc = exit_codes.AVOCADO_ALL_OK
C
Cleber Rosa 已提交
900 901 902 903
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

904
    def test_externalrunner_fail(self):
905 906 907
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.fail_script.path))
C
Cleber Rosa 已提交
908
        result = process.run(cmd_line, ignore_status=True)
909
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
C
Cleber Rosa 已提交
910 911 912 913
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

914
    def test_externalrunner_chdir_no_testdir(self):
915 916 917
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh --external-runner-chdir=test %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
918
        result = process.run(cmd_line, ignore_status=True)
919 920
        expected_output = (b'Option "--external-runner-chdir=test" requires '
                           b'"--external-runner-testdir" to be set')
C
Cleber Rosa 已提交
921
        self.assertIn(expected_output, result.stderr)
922
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
923 924 925 926 927
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def test_externalrunner_no_url(self):
928 929
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=%s' % (AVOCADO, self.tmpdir, TRUE_CMD))
930
        result = process.run(cmd_line, ignore_status=True)
931 932
        expected_output = (b'No test references provided nor any other '
                           b'arguments resolved into tests')
933 934
        self.assertIn(expected_output, result.stderr)
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
C
Cleber Rosa 已提交
935 936 937 938 939 940 941 942 943 944
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def tearDown(self):
        self.pass_script.remove()
        self.fail_script.remove()
        shutil.rmtree(self.tmpdir)


945
class AbsPluginsTest(object):
946

947
    def setUp(self):
948
        self.base_outputdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
949
        os.chdir(basedir)
950

951 952 953 954 955 956
    def tearDown(self):
        shutil.rmtree(self.base_outputdir)


class PluginsTest(AbsPluginsTest, unittest.TestCase):

957
    def test_sysinfo_plugin(self):
958
        cmd_line = '%s sysinfo %s' % (AVOCADO, self.base_outputdir)
959
        result = process.run(cmd_line, ignore_status=True)
960
        expected_rc = exit_codes.AVOCADO_ALL_OK
961 962 963 964 965 966
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        sysinfo_files = os.listdir(self.base_outputdir)
        self.assertGreater(len(sysinfo_files), 0, "Empty sysinfo files dir")

967
    def test_list_plugin(self):
968
        cmd_line = '%s list' % AVOCADO
969
        result = process.run(cmd_line, ignore_status=True)
970
        expected_rc = exit_codes.AVOCADO_ALL_OK
971 972 973
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
974 975
        self.assertNotIn(b'No tests were found on current tests dir',
                         result.stdout)
976

977
    def test_list_error_output(self):
978
        cmd_line = '%s list sbrubles' % AVOCADO
979
        result = process.run(cmd_line, ignore_status=True)
980
        expected_rc = exit_codes.AVOCADO_FAIL
981 982 983
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
984
        self.assertIn(b"Unable to resolve reference", result.stderr)
985

986 987 988 989 990 991 992
    def test_list_no_file_loader(self):
        cmd_line = ("%s list --loaders external --verbose -- "
                    "this-wont-be-matched" % AVOCADO)
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
993 994 995 996 997 998
        exp = (b"Type    Test                 Tag(s)\n"
               b"MISSING this-wont-be-matched \n\n"
               b"TEST TYPES SUMMARY\n"
               b"==================\n"
               b"EXTERNAL: 0\n"
               b"MISSING: 1\n")
999 1000 1001
        self.assertEqual(exp, result.stdout, "Stdout mismatch:\n%s\n\n%s"
                         % (exp, result))

1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
    def test_list_verbose_tags(self):
        """
        Runs list verbosely and check for tag related output
        """
        test = script.make_script(os.path.join(self.base_outputdir, 'test.py'),
                                  VALID_PYTHON_TEST_WITH_TAGS)
        cmd_line = ("%s list --loaders file --verbose %s" % (AVOCADO,
                                                             test))
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1014
        stdout_lines = result.stdout_text.splitlines()
1015 1016
        self.assertIn("Tag(s)", stdout_lines[0])
        full_test_name = "%s:MyTest.test" % test
1017 1018
        self.assertEqual("INSTRUMENTED %s BIG_TAG_NAME" % full_test_name,
                         stdout_lines[1])
1019 1020 1021
        self.assertIn("TEST TYPES SUMMARY", stdout_lines)
        self.assertIn("INSTRUMENTED: 1", stdout_lines)
        self.assertIn("TEST TAGS SUMMARY", stdout_lines)
1022
        self.assertEqual("BIG_TAG_NAME: 1", stdout_lines[-1])
1023

1024
    def test_plugin_list(self):
1025
        cmd_line = '%s plugins' % AVOCADO
1026
        result = process.run(cmd_line, ignore_status=True)
1027
        expected_rc = exit_codes.AVOCADO_ALL_OK
1028 1029 1030
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1031
        if sys.version_info[:2] >= (2, 7, 0):
1032
            self.assertNotIn(b'Disabled', result.stdout)
1033

1034
    def test_config_plugin(self):
1035
        cmd_line = '%s config --paginator off' % AVOCADO
1036
        result = process.run(cmd_line, ignore_status=True)
1037
        expected_rc = exit_codes.AVOCADO_ALL_OK
1038 1039 1040
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1041
        self.assertNotIn(b'Disabled', result.stdout)
1042 1043

    def test_config_plugin_datadir(self):
1044
        cmd_line = '%s config --datadir --paginator off' % AVOCADO
1045
        result = process.run(cmd_line, ignore_status=True)
1046
        expected_rc = exit_codes.AVOCADO_ALL_OK
1047 1048 1049
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1050
        self.assertNotIn(b'Disabled', result.stdout)
1051

1052
    def test_disable_plugin(self):
1053
        cmd_line = '%s plugins' % AVOCADO
1054 1055 1056 1057 1058
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1059
        self.assertIn(b"Collect system information", result.stdout)
1060 1061 1062 1063 1064

        config_content = "[plugins]\ndisable=['cli.cmd.sysinfo',]"
        config = script.TemporaryScript("disable_sysinfo_cmd.conf",
                                        config_content)
        with config:
1065
            cmd_line = '%s --config %s plugins' % (AVOCADO, config)
1066 1067 1068 1069 1070
            result = process.run(cmd_line, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))
1071
            self.assertNotIn(b"Collect system information", result.stdout)
1072

1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
    def test_plugin_order(self):
        """
        Tests plugin order by configuration file

        First it checks if html, json, xunit and zip_archive plugins are enabled.
        Then it runs a test with zip_archive running first, which means the html,
        json and xunit output files do not make into the archive.

        Then it runs with zip_archive set to run last, which means the html,
        json and xunit output files *do* make into the archive.
        """
        def run_config(config_path):
1085
            cmd = ('%s --config %s run passtest.py --archive '
1086
                   '--job-results-dir %s --sysinfo=off'
1087
                   % (AVOCADO, config_path, self.base_outputdir))
1088 1089 1090 1091 1092 1093 1094 1095
            result = process.run(cmd, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))

        result_plugins = ["json", "xunit", "zip_archive"]
        result_outputs = ["results.json", "results.xml"]
1096
        if html_capable():
1097
            result_plugins.append("html")
1098
            result_outputs.append("results.html")
1099

1100
        cmd_line = '%s plugins' % AVOCADO
1101 1102 1103 1104 1105 1106
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        for result_plugin in result_plugins:
1107
            self.assertIn(result_plugin, result.stdout_text)
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122

        config_content_zip_first = "[plugins.result]\norder=['zip_archive']"
        config_zip_first = script.TemporaryScript("zip_first.conf",
                                                  config_content_zip_first)
        with config_zip_first:
            run_config(config_zip_first)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertNotIn(result_output, zip_file_list)
            os.unlink(archives[0])

        config_content_zip_last = ("[plugins.result]\norder=['html', 'json',"
1123 1124
                                   "'xunit', 'non_existing_plugin_is_ignored'"
                                   ",'zip_archive']")
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
        config_zip_last = script.TemporaryScript("zip_last.conf",
                                                 config_content_zip_last)
        with config_zip_last:
            run_config(config_zip_last)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertIn(result_output, zip_file_list)

1136
    def test_Namespace_object_has_no_attribute(self):
1137
        cmd_line = '%s plugins' % AVOCADO
1138
        result = process.run(cmd_line, ignore_status=True)
1139
        expected_rc = exit_codes.AVOCADO_ALL_OK
1140 1141 1142
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1143
        self.assertNotIn(b"'Namespace' object has no attribute", result.stderr)
1144

1145

1146 1147 1148 1149
class ParseXMLError(Exception):
    pass


1150
class PluginsXunitTest(AbsPluginsTest, unittest.TestCase):
1151

1152 1153
    @unittest.skipUnless(SCHEMA_CAPABLE,
                         'Unable to validate schema due to missing lxml.etree library')
1154
    def setUp(self):
1155
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
L
Lucas Meneghel Rodrigues 已提交
1156 1157 1158
        junit_xsd = os.path.join(os.path.dirname(__file__),
                                 os.path.pardir, ".data", 'junit-4.xsd')
        self.junit = os.path.abspath(junit_xsd)
1159 1160
        super(PluginsXunitTest, self).setUp()

1161
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1162
                      e_nnotfound, e_nfailures, e_nskip):
1163 1164
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' --xunit - %s' % (AVOCADO, self.tmpdir, testname))
1165 1166 1167 1168 1169 1170 1171
        result = process.run(cmd_line, ignore_status=True)
        xml_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            xunit_doc = xml.dom.minidom.parseString(xml_output)
1172
        except Exception as detail:
1173 1174 1175
            raise ParseXMLError("Failed to parse content: %s\n%s" %
                                (detail, xml_output))

1176
        with open(self.junit, 'rb') as f:
1177 1178
            xmlschema = etree.XMLSchema(etree.parse(f))

1179
        self.assertTrue(xmlschema.validate(etree.parse(BytesIO(xml_output))),
1180 1181 1182 1183
                        "Failed to validate against %s, message:\n%s" %
                        (self.junit,
                         xmlschema.error_log.filter_from_errors()))

1184 1185 1186 1187
        testsuite_list = xunit_doc.getElementsByTagName('testsuite')
        self.assertEqual(len(testsuite_list), 1, 'More than one testsuite tag')

        testsuite_tag = testsuite_list[0]
1188 1189
        self.assertEqual(len(testsuite_tag.attributes), 7,
                         'The testsuite tag does not have 7 attributes. '
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206
                         'XML:\n%s' % xml_output)

        n_tests = int(testsuite_tag.attributes['tests'].value)
        self.assertEqual(n_tests, e_ntests,
                         "Unexpected number of executed tests, "
                         "XML:\n%s" % xml_output)

        n_errors = int(testsuite_tag.attributes['errors'].value)
        self.assertEqual(n_errors, e_nerrors,
                         "Unexpected number of test errors, "
                         "XML:\n%s" % xml_output)

        n_failures = int(testsuite_tag.attributes['failures'].value)
        self.assertEqual(n_failures, e_nfailures,
                         "Unexpected number of test failures, "
                         "XML:\n%s" % xml_output)

1207
        n_skip = int(testsuite_tag.attributes['skipped'].value)
1208 1209 1210 1211
        self.assertEqual(n_skip, e_nskip,
                         "Unexpected number of test skips, "
                         "XML:\n%s" % xml_output)

1212
    def test_xunit_plugin_passtest(self):
1213
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1214
                           1, 0, 0, 0, 0)
1215 1216

    def test_xunit_plugin_failtest(self):
1217
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1218
                           1, 0, 0, 1, 0)
1219

1220
    def test_xunit_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1221
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1222
                           1, 0, 0, 0, 1)
1223

1224
    def test_xunit_plugin_errortest(self):
1225
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1226
                           1, 1, 0, 0, 0)
1227

1228 1229 1230 1231
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsXunitTest, self).tearDown()

1232 1233 1234 1235 1236

class ParseJSONError(Exception):
    pass


1237
class PluginsJSONTest(AbsPluginsTest, unittest.TestCase):
1238

1239
    def setUp(self):
1240
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
1241 1242
        super(PluginsJSONTest, self).setUp()

1243
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1244
                      e_nfailures, e_nskip, e_ncancel=0, external_runner=None):
1245 1246
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off --json - '
                    '--archive %s' % (AVOCADO, self.tmpdir, testname))
1247 1248
        if external_runner is not None:
            cmd_line += " --external-runner '%s'" % external_runner
1249 1250 1251 1252 1253 1254 1255
        result = process.run(cmd_line, ignore_status=True)
        json_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            json_data = json.loads(json_output)
1256
        except Exception as detail:
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
            raise ParseJSONError("Failed to parse content: %s\n%s" %
                                 (detail, json_output))
        self.assertTrue(json_data, "Empty JSON result:\n%s" % json_output)
        self.assertIsInstance(json_data['tests'], list,
                              "JSON result lacks 'tests' list")
        n_tests = len(json_data['tests'])
        self.assertEqual(n_tests, e_ntests,
                         "Different number of expected tests")
        n_errors = json_data['errors']
        self.assertEqual(n_errors, e_nerrors,
                         "Different number of expected tests")
        n_failures = json_data['failures']
        self.assertEqual(n_failures, e_nfailures,
                         "Different number of expected tests")
        n_skip = json_data['skip']
        self.assertEqual(n_skip, e_nskip,
                         "Different number of skipped tests")
1274 1275
        n_cancel = json_data['cancel']
        self.assertEqual(n_cancel, e_ncancel)
1276
        return json_data
1277

1278
    def test_json_plugin_passtest(self):
1279
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1280
                           1, 0, 0, 0)
1281 1282

    def test_json_plugin_failtest(self):
1283
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1284
                           1, 0, 1, 0)
1285

1286
    def test_json_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1287
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1288
                           1, 0, 0, 0, 1)
1289

1290
    def test_json_plugin_errortest(self):
1291
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1292
                           1, 1, 0, 0)
1293

1294
    @unittest.skipIf(not GNU_ECHO_BINARY, 'echo binary not available')
1295
    def test_ugly_echo_cmd(self):
1296
        data = self.run_and_check('"-ne foo\\\\\\n\\\'\\\\\\"\\\\\\'
1297
                                  'nbar/baz"', exit_codes.AVOCADO_ALL_OK, 1, 0,
1298
                                  0, 0, external_runner=GNU_ECHO_BINARY)
1299
        # The executed test should be this
1300
        self.assertEqual(data['tests'][0]['id'],
1301
                         '1--ne foo\\\\n\\\'\\"\\\\nbar/baz')
1302 1303
        # logdir name should escape special chars (/)
        self.assertEqual(os.path.basename(data['tests'][0]['logdir']),
1304
                         "1--ne foo__n_'____nbar_baz")
1305

1306 1307 1308 1309
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsJSONTest, self).tearDown()

L
Lukáš Doktor 已提交
1310

1311 1312
if __name__ == '__main__':
    unittest.main()