test_basic.py 59.4 KB
Newer Older
1
# This Python file uses the following encoding: utf-8
2 3
import aexpect
import glob
4
import json
5
import os
6
import re
7
import shutil
8
import signal
9
import sys
10
import tempfile
11
import time
12
import xml.dom.minidom
13
import zipfile
14
import unittest
15
import psutil
16
import pkg_resources
17

18 19 20 21
try:
    from io import BytesIO
except:
    from BytesIO import BytesIO
22

23 24 25 26 27 28
try:
    from lxml import etree
    SCHEMA_CAPABLE = True
except ImportError:
    SCHEMA_CAPABLE = False

29
from six import iteritems
30
from six.moves import xrange as range
31

32
from avocado.core import exit_codes
33
from avocado.utils import astring
34
from avocado.utils import genio
35 36
from avocado.utils import process
from avocado.utils import script
37
from avocado.utils import path as utils_path
38

39
basedir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..')
40 41
basedir = os.path.abspath(basedir)

42 43
AVOCADO = os.environ.get("UNITTEST_AVOCADO_CMD", "./scripts/avocado")

44 45 46 47 48 49 50 51 52
LOCAL_IMPORT_TEST_CONTENTS = '''
from avocado import Test
from mylib import hello

class LocalImportTest(Test):
    def test(self):
        self.log.info(hello())
'''

53 54 55 56 57 58
UNSUPPORTED_STATUS_TEST_CONTENTS = '''
from avocado import Test

class FakeStatusTest(Test):
    def run_avocado(self):
        super(FakeStatusTest, self).run_avocado()
59 60
        # Please do NOT ever use this, it's for unittesting only.
        self._Test__status = 'not supported'
61 62 63 64 65

    def test(self):
        pass
'''

66 67 68 69 70 71 72 73 74 75 76
INVALID_PYTHON_TEST = '''
from avocado import Test

class MyTest(Test):

    non_existing_variable_causing_crash

    def test_my_name(self):
        pass
'''

77

78 79 80 81 82 83 84 85 86 87 88 89
VALID_PYTHON_TEST_WITH_TAGS = '''
from avocado import Test

class MyTest(Test):
    def test(self):
         """
         :avocado: tags=BIG_TAG_NAME
         """
         pass
'''


90 91 92 93 94 95 96
REPORTS_STATUS_AND_HANG = '''
from avocado import Test
import time

class MyTest(Test):
    def test(self):
         self.runner_queue.put({"running": False})
97
         time.sleep(70)
98 99
'''

100

101 102 103 104 105 106 107 108 109 110 111
DIE_WITHOUT_REPORTING_STATUS = '''
from avocado import Test
import os
import signal

class MyTest(Test):
    def test(self):
         os.kill(os.getpid(), signal.SIGKILL)
'''


112 113 114 115 116 117 118 119 120 121 122 123 124
RAISE_CUSTOM_PATH_EXCEPTION_CONTENT = '''import os
import sys

from avocado import Test

class SharedLibTest(Test):
    def test(self):
        sys.path.append(os.path.join(os.path.dirname(__file__), "shared_lib"))
        from mylib import CancelExc
        raise CancelExc("This should not crash on unpickling in runner")
'''


A
Amador Pahim 已提交
125
def probe_binary(binary):
126
    try:
A
Amador Pahim 已提交
127
        return utils_path.find_command(binary)
128
    except utils_path.CmdNotFoundError:
A
Amador Pahim 已提交
129 130
        return None

L
Lukáš Doktor 已提交
131

132
TRUE_CMD = probe_binary('true')
A
Amador Pahim 已提交
133
CC_BINARY = probe_binary('cc')
134

L
Lukáš Doktor 已提交
135
# On macOS, the default GNU core-utils installation (brew)
136 137 138 139 140
# installs the gnu utility versions with a g prefix. It still has the
# BSD versions of the core utilities installed on their expected paths
# but their behavior and flags are in most cases different.
GNU_ECHO_BINARY = probe_binary('echo')
if GNU_ECHO_BINARY is not None:
141 142
    if probe_binary('man') is not None:
        echo_manpage = process.run('man %s' % os.path.basename(GNU_ECHO_BINARY)).stdout
143
        if b'-e' not in echo_manpage:
144
            GNU_ECHO_BINARY = probe_binary('gecho')
A
Amador Pahim 已提交
145 146
READ_BINARY = probe_binary('read')
SLEEP_BINARY = probe_binary('sleep')
147 148


149 150 151 152 153 154 155 156
def html_capable():
    try:
        pkg_resources.require('avocado-framework-plugin-result-html')
        return True
    except pkg_resources.DistributionNotFound:
        return False


157 158
class RunnerOperationTest(unittest.TestCase):

159
    def setUp(self):
160
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
161
        os.chdir(basedir)
162

163
    def test_show_version(self):
164
        result = process.run('%s -v' % AVOCADO, ignore_status=True)
165
        self.assertEqual(result.exit_status, 0)
166 167 168 169 170
        if sys.version_info[0] == 3:
            content = result.stdout_text
        else:
            content = result.stderr_text
        self.assertTrue(re.match(r"^Avocado \d+\.\d+$", content),
C
Cleber Rosa 已提交
171
                        "Version string does not match 'Avocado \\d\\.\\d:'\n"
172
                        "%r" % (content))
173

174 175 176 177 178 179 180 181 182 183 184 185 186 187
    def test_alternate_config_datadir(self):
        """
        Uses the "--config" flag to check custom configuration is applied

        Even on the more complex data_dir module, which adds extra checks
        to what is set on the plain settings module.
        """
        base_dir = os.path.join(self.tmpdir, 'datadir_base')
        os.mkdir(base_dir)
        mapping = {'base_dir': base_dir,
                   'test_dir': os.path.join(base_dir, 'test'),
                   'data_dir': os.path.join(base_dir, 'data'),
                   'logs_dir': os.path.join(base_dir, 'logs')}
        config = '[datadir.paths]'
188
        for key, value in iteritems(mapping):
189 190 191 192
            if not os.path.isdir(value):
                os.mkdir(value)
            config += "%s = %s\n" % (key, value)
        fd, config_file = tempfile.mkstemp(dir=self.tmpdir)
193
        os.write(fd, config.encode())
194 195
        os.close(fd)

196
        cmd = '%s --config %s config --datadir' % (AVOCADO, config_file)
197 198 199 200 201
        result = process.run(cmd)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
202 203 204
        self.assertIn('    base     ' + mapping['base_dir'], result.stdout_text)
        self.assertIn('    data     ' + mapping['data_dir'], result.stdout_text)
        self.assertIn('    logs     ' + mapping['logs_dir'], result.stdout_text)
205

206
    def test_runner_all_ok(self):
207 208
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py passtest.py' % (AVOCADO, self.tmpdir))
209
        process.run(cmd_line)
210
        # Also check whether jobdata contains correct parameter paths
211 212
        variants = open(os.path.join(self.tmpdir, "latest", "jobdata",
                        "variants.json")).read()
213
        self.assertIn('["/run/*"]', variants, "paths stored in jobdata "
214
                      "does not contains [\"/run/*\"]\n%s" % variants)
215

216
    def test_runner_failfast(self):
217 218 219
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py failtest.py passtest.py --failfast on'
                    % (AVOCADO, self.tmpdir))
220
        result = process.run(cmd_line, ignore_status=True)
221 222
        self.assertIn(b'Interrupting job (failfast).', result.stdout)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 1 | SKIP 1', result.stdout)
223 224 225 226
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL | exit_codes.AVOCADO_JOB_INTERRUPTED
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

A
Amador Pahim 已提交
227 228 229 230 231
    def test_runner_ignore_missing_references_one_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py badtest.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
232 233
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py'", result.stderr)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 0 | SKIP 0', result.stdout)
A
Amador Pahim 已提交
234 235 236 237 238 239 240 241 242
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_ignore_missing_references_all_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'badtest.py badtest2.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
243
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py', 'badtest2.py'",
A
Amador Pahim 已提交
244
                      result.stderr)
245
        self.assertEqual(b'', result.stdout)
A
Amador Pahim 已提交
246 247 248 249
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

250 251 252
    def test_runner_test_with_local_imports(self):
        mylib = script.TemporaryScript(
            'mylib.py',
253
            "def hello():\n    return 'Hello world'",
254 255 256 257 258 259
            'avocado_simpletest_functional')
        mylib.save()
        mytest = script.Script(
            os.path.join(os.path.dirname(mylib.path), 'test_local_imports.py'),
            LOCAL_IMPORT_TEST_CONTENTS)
        mytest.save()
260 261
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "%s" % (AVOCADO, self.tmpdir, mytest))
262 263
        process.run(cmd_line)

264 265 266 267
    def test_unsupported_status(self):
        with script.TemporaryScript("fake_status.py",
                                    UNSUPPORTED_STATUS_TEST_CONTENTS,
                                    "avocado_unsupported_status") as tst:
268 269 270
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s"
                              " --json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
271 272 273 274 275
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
276
            self.assertIn("Runner error occurred: Test reports unsupported",
277 278
                          results["tests"][0]["fail_reason"])

279 280 281
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
282 283 284 285 286
    def test_hanged_test_with_status(self):
        """ Check that avocado handles hanged tests properly """
        with script.TemporaryScript("report_status_and_hang.py",
                                    REPORTS_STATUS_AND_HANG,
                                    "hanged_test_with_status") as tst:
287
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
288
                              "--json - --job-timeout 1" % (AVOCADO, self.tmpdir, tst),
289
                              ignore_status=True)
290 291 292 293 294 295 296
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test reported status but did not finish",
                          results["tests"][0]["fail_reason"])
297 298 299 300 301
            # Currently it should finish up to 1s after the job-timeout
            # but the prep and postprocess could take a bit longer on
            # some environments, so let's just check it does not take
            # > 60s, which is the deadline for force-finishing the test.
            self.assertLess(res.duration, 55, "Test execution took too long, "
302 303 304 305 306 307 308
                            "which is likely because the hanged test was not "
                            "interrupted. Results:\n%s" % res)

    def test_no_status_reported(self):
        with script.TemporaryScript("die_without_reporting_status.py",
                                    DIE_WITHOUT_REPORTING_STATUS,
                                    "no_status_reported") as tst:
309 310 311
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
                              "--json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
312 313 314 315 316 317 318 319
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test died without reporting the status",
                          results["tests"][0]["fail_reason"])

320
    def test_runner_tests_fail(self):
321 322
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s passtest.py '
                    'failtest.py passtest.py' % (AVOCADO, self.tmpdir))
323
        result = process.run(cmd_line, ignore_status=True)
324
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
325 326 327 328
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_nonexistent_test(self):
329 330
        cmd_line = ('%s run --sysinfo=off --job-results-dir '
                    '%s bogustest' % (AVOCADO, self.tmpdir))
331
        result = process.run(cmd_line, ignore_status=True)
332 333
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
334 335 336 337 338
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

339
    def test_runner_doublefail(self):
340 341
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - doublefail.py' % (AVOCADO, self.tmpdir))
342
        result = process.run(cmd_line, ignore_status=True)
343 344
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
345 346 347 348
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
349
        self.assertIn(b"TestError: Failing during tearDown. Yay!", result.stdout,
350
                      "Cleanup exception not printed to log output")
351 352
        self.assertIn(b"TestFail: This test is supposed to fail", result.stdout,
                      "Test did not fail with action exception:\n%s" % result.stdout)
353

354
    def test_uncaught_exception(self):
355 356
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - uncaught_exception.py" % (AVOCADO, self.tmpdir))
357
        result = process.run(cmd_line, ignore_status=True)
358
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
359 360 361
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
362
        self.assertIn(b'"status": "ERROR"', result.stdout)
363

364
    def test_fail_on_exception(self):
365 366
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - fail_on_exception.py" % (AVOCADO, self.tmpdir))
367
        result = process.run(cmd_line, ignore_status=True)
368
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
369 370 371
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
372
        self.assertIn(b'"status": "FAIL"', result.stdout)
373

374 375 376 377 378 379 380 381 382 383 384 385 386 387
    def test_exception_not_in_path(self):
        os.mkdir(os.path.join(self.tmpdir, "shared_lib"))
        mylib = script.Script(os.path.join(self.tmpdir, "shared_lib",
                                           "mylib.py"),
                              "from avocado import TestCancel\n\n"
                              "class CancelExc(TestCancel):\n"
                              "    pass")
        mylib.save()
        mytest = script.Script(os.path.join(self.tmpdir, "mytest.py"),
                               RAISE_CUSTOM_PATH_EXCEPTION_CONTENT)
        mytest.save()
        result = process.run("%s --show test run --sysinfo=off "
                             "--job-results-dir %s %s"
                             % (AVOCADO, self.tmpdir, mytest))
388 389
        self.assertIn(b"mytest.py:SharedLibTest.test -> CancelExc: This "
                      b"should not crash on unpickling in runner",
390
                      result.stdout)
391
        self.assertNotIn(b"Failed to read queue", result.stdout)
392

393
    def test_runner_timeout(self):
394 395
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - timeouttest.py' % (AVOCADO, self.tmpdir))
396 397
        result = process.run(cmd_line, ignore_status=True)
        output = result.stdout
398
        expected_rc = exit_codes.AVOCADO_JOB_INTERRUPTED
399
        unexpected_rc = exit_codes.AVOCADO_FAIL
400 401 402 403
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
404
        self.assertIn(b"Runner error occurred: Timeout reached", output,
405
                      "Timeout reached message not found in the output:\n%s" % output)
406
        # Ensure no test aborted error messages show up
407
        self.assertNotIn(b"TestAbortedError: Test aborted unexpectedly", output)
408

409
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
410 411
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
412
    def test_runner_abort(self):
413 414
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - abort.py' % (AVOCADO, self.tmpdir))
415
        result = process.run(cmd_line, ignore_status=True)
416
        excerpt = b'Test died without reporting the status.'
417 418
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
419 420 421 422
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
423
        self.assertIn(excerpt, result.stdout)
424

425
    def test_silent_output(self):
426 427
        cmd_line = ('%s --silent run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
428
        result = process.run(cmd_line, ignore_status=True)
429
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
430
        self.assertEqual(result.stdout, b'')
431

432
    def test_empty_args_list(self):
433
        cmd_line = AVOCADO
434
        result = process.run(cmd_line, ignore_status=True)
435
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_FAIL)
436 437 438 439 440
        if sys.version_info[0] == 3:
            exp = b'avocado: error: the following arguments are required'
        else:
            exp = b'error: too few arguments'
        self.assertIn(exp, result.stderr)
441

442
    def test_empty_test_list(self):
443 444
        cmd_line = '%s run --sysinfo=off --job-results-dir %s' % (AVOCADO,
                                                                  self.tmpdir)
445
        result = process.run(cmd_line, ignore_status=True)
446
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
447 448
        self.assertIn(b'No test references provided nor any other arguments '
                      b'resolved into tests', result.stderr)
449

450
    def test_not_found(self):
451 452
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s sbrubles'
                    % (AVOCADO, self.tmpdir))
453
        result = process.run(cmd_line, ignore_status=True)
454
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
455 456
        self.assertIn(b'Unable to resolve reference', result.stderr)
        self.assertNotIn(b'Unable to resolve reference', result.stdout)
457

458
    def test_invalid_unique_id(self):
459 460
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s --force-job-id '
                    'foobar passtest.py' % (AVOCADO, self.tmpdir))
461
        result = process.run(cmd_line, ignore_status=True)
462
        self.assertNotEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
463 464
        self.assertIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertNotIn(b'needs to be a 40 digit hex', result.stdout)
465 466

    def test_valid_unique_id(self):
467
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
468
                    '--force-job-id 975de258ac05ce5e490648dec4753657b7ccc7d1 '
469
                    'passtest.py' % (AVOCADO, self.tmpdir))
470
        result = process.run(cmd_line, ignore_status=True)
471
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
472 473
        self.assertNotIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertIn(b'PASS', result.stdout)
474

475
    def test_automatic_unique_id(self):
476 477
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'passtest.py --json -' % (AVOCADO, self.tmpdir))
478
        result = process.run(cmd_line, ignore_status=True)
479
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
480 481 482 483
        r = json.loads(result.stdout)
        int(r['job_id'], 16)  # it's an hex number
        self.assertEqual(len(r['job_id']), 40)

484 485 486 487
    def test_early_latest_result(self):
        """
        Tests that the `latest` link to the latest job results is created early
        """
488 489
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'examples/tests/passtest.py' % (AVOCADO, self.tmpdir))
490
        avocado_process = process.SubProcess(cmd_line)
491 492 493 494 495 496 497 498 499 500 501 502
        try:
            avocado_process.start()
            link = os.path.join(self.tmpdir, 'latest')
            for trial in range(0, 50):
                time.sleep(0.1)
                if os.path.exists(link) and os.path.islink(link):
                    avocado_process.wait()
                    break
            self.assertTrue(os.path.exists(link))
            self.assertTrue(os.path.islink(link))
        finally:
            avocado_process.wait()
503

504
    def test_dry_run(self):
505
        cmd = ("%s run --sysinfo=off passtest.py failtest.py "
506
               "gendata.py --json - --mux-inject foo:1 bar:2 baz:3 foo:foo:a"
507
               " foo:bar:b foo:baz:c bar:bar:bar --dry-run" % AVOCADO)
508 509
        result = json.loads(process.run(cmd).stdout)
        debuglog = result['debuglog']
510
        log = genio.read_file(debuglog)
511 512
        # Remove the result dir
        shutil.rmtree(os.path.dirname(os.path.dirname(debuglog)))
513
        self.assertIn(tempfile.gettempdir(), debuglog)   # Use tmp dir, not default location
514 515
        self.assertEqual(result['job_id'], u'0' * 40)
        # Check if all tests were skipped
516
        self.assertEqual(result['cancel'], 4)
517
        for i in range(4):
518 519
            test = result['tests'][i]
            self.assertEqual(test['fail_reason'],
520
                             u'Test cancelled due to --dry-run')
521 522 523 524 525
        # Check if all params are listed
        # The "/:bar ==> 2 is in the tree, but not in any leave so inaccessible
        # from test.
        for line in ("/:foo ==> 1", "/:baz ==> 3", "/foo:foo ==> a",
                     "/foo:bar ==> b", "/foo:baz ==> c", "/bar:bar ==> bar"):
526
            self.assertEqual(log.count(line), 4)
527

528 529 530
    def test_invalid_python(self):
        test = script.make_script(os.path.join(self.tmpdir, 'test.py'),
                                  INVALID_PYTHON_TEST)
531 532
        cmd_line = ('%s --show test run --sysinfo=off '
                    '--job-results-dir %s %s') % (AVOCADO, self.tmpdir, test)
533 534 535 536 537
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
538
        self.assertIn('1-%s:MyTest.test_my_name -> TestError' % test,
539
                      result.stdout_text)
540

A
Amador Pahim 已提交
541
    @unittest.skipIf(not READ_BINARY, "read binary not available.")
542 543 544
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
L
Lukáš Doktor 已提交
545
    def test_read(self):
546
        cmd = "%s run --sysinfo=off --job-results-dir %%s %%s" % AVOCADO
547
        cmd %= (self.tmpdir, READ_BINARY)
548
        result = process.run(cmd, timeout=10, ignore_status=True)
L
Lukáš Doktor 已提交
549 550 551 552 553
        self.assertLess(result.duration, 8, "Duration longer than expected."
                        "\n%s" % result)
        self.assertEqual(result.exit_status, 1, "Expected exit status is 1\n%s"
                         % result)

554 555 556
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

557

558 559 560
class RunnerHumanOutputTest(unittest.TestCase):

    def setUp(self):
561
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
562
        os.chdir(basedir)
563 564

    def test_output_pass(self):
565 566
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
567
        result = process.run(cmd_line, ignore_status=True)
568
        expected_rc = exit_codes.AVOCADO_ALL_OK
569 570 571
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
572
        self.assertIn(b'passtest.py:PassTest.test:  PASS', result.stdout)
573 574

    def test_output_fail(self):
575 576
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'failtest.py' % (AVOCADO, self.tmpdir))
577
        result = process.run(cmd_line, ignore_status=True)
578
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
579 580 581
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
582
        self.assertIn(b'failtest.py:FailTest.test:  FAIL', result.stdout)
583 584

    def test_output_error(self):
585 586
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'errortest.py' % (AVOCADO, self.tmpdir))
587
        result = process.run(cmd_line, ignore_status=True)
588
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
589 590 591
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
592
        self.assertIn(b'errortest.py:ErrorTest.test:  ERROR', result.stdout)
593

A
Amador Pahim 已提交
594
    def test_output_cancel(self):
595 596
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'cancelonsetup.py' % (AVOCADO, self.tmpdir))
597
        result = process.run(cmd_line, ignore_status=True)
598
        expected_rc = exit_codes.AVOCADO_ALL_OK
599 600 601
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
602 603
        self.assertIn(b'PASS 0 | ERROR 0 | FAIL 0 | SKIP 0 | WARN 0 | '
                      b'INTERRUPT 0 | CANCEL 1',
A
Amador Pahim 已提交
604
                      result.stdout)
605

606 607
    @unittest.skipIf(not GNU_ECHO_BINARY,
                     'GNU style echo binary not available')
608
    def test_ugly_echo_cmd(self):
609
        cmd_line = ('%s run --external-runner "%s -ne" '
610
                    '"foo\\\\\\n\\\'\\\\\\"\\\\\\nbar/baz" --job-results-dir %s'
A
Amador Pahim 已提交
611
                    ' --sysinfo=off  --show-job-log' %
612
                    (AVOCADO, GNU_ECHO_BINARY, self.tmpdir))
613 614 615 616 617
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
618 619 620 621
        self.assertIn(b'[stdout] foo', result.stdout, result)
        self.assertIn(b'[stdout] \'"', result.stdout, result)
        self.assertIn(b'[stdout] bar/baz', result.stdout, result)
        self.assertIn(b'PASS 1-foo\\\\n\\\'\\"\\\\nbar/baz',
622
                      result.stdout, result)
623 624 625 626 627 628 629
        # logdir name should escape special chars (/)
        test_dirs = glob.glob(os.path.join(self.tmpdir, 'latest',
                                           'test-results', '*'))
        self.assertEqual(len(test_dirs), 1, "There are multiple directories in"
                         " test-results dir, but only one test was executed: "
                         "%s" % (test_dirs))
        self.assertEqual(os.path.basename(test_dirs[0]),
630
                         "1-foo__n_'____nbar_baz")
631

632
    def test_replay_skip_skipped(self):
633 634
        cmd = ("%s run --job-results-dir %s --json - "
               "cancelonsetup.py" % (AVOCADO, self.tmpdir))
635
        result = process.run(cmd)
636
        result = json.loads(result.stdout)
637
        jobid = str(result["job_id"])
638 639
        cmd = ("%s run --job-results-dir %s --replay %s "
               "--replay-test-status PASS" % (AVOCADO, self.tmpdir, jobid))
640
        process.run(cmd)
641

642 643 644
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

645

646
class RunnerSimpleTest(unittest.TestCase):
647 648

    def setUp(self):
649
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
650
        self.pass_script = script.TemporaryScript(
651
            'ʊʋʉʈɑ ʅʛʌ',
652
            "#!/bin/sh\ntrue",
653
            'avocado_simpletest_functional')
654
        self.pass_script.save()
L
Lukáš Doktor 已提交
655
        self.fail_script = script.TemporaryScript('avocado_fail.sh',
656
                                                  "#!/bin/sh\nfalse",
L
Lukáš Doktor 已提交
657 658
                                                  'avocado_simpletest_'
                                                  'functional')
659
        self.fail_script.save()
660
        os.chdir(basedir)
661

662
    def test_simpletest_pass(self):
663 664
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' "%s"' % (AVOCADO, self.tmpdir, self.pass_script.path))
665
        result = process.run(cmd_line, ignore_status=True)
666
        expected_rc = exit_codes.AVOCADO_ALL_OK
667 668 669 670
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

671
    def test_simpletest_fail(self):
672 673
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' %s' % (AVOCADO, self.tmpdir, self.fail_script.path))
674
        result = process.run(cmd_line, ignore_status=True)
675
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
676 677 678 679
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

680
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
681 682
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
683 684
    def test_runner_onehundred_fail_timing(self):
        """
A
Amador Pahim 已提交
685
        We can be pretty sure that a failtest should return immediately. Let's
686
        run 100 of them and assure they not take more than 30 seconds to run.
687

688 689
        Notice: on a current machine this takes about 0.12s, so 30 seconds is
        considered to be pretty safe here.
690
        """
691
        one_hundred = 'failtest.py ' * 100
692 693
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, one_hundred))
694 695 696
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
697
        self.assertLess(actual_time, 30.0)
698
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
699 700 701
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

702 703 704
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
705 706 707 708 709
    def test_runner_sleep_fail_sleep_timing(self):
        """
        Sleeptest is supposed to take 1 second, let's make a sandwich of
        100 failtests and check the test runner timing.
        """
710 711
        sleep_fail_sleep = ('sleeptest.py ' + 'failtest.py ' * 100 +
                            'sleeptest.py')
712 713
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, sleep_fail_sleep))
714 715 716
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
717
        self.assertLess(actual_time, 33.0)
718
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
719 720 721
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

722 723 724 725
    def test_simplewarning(self):
        """
        simplewarning.sh uses the avocado-bash-utils
        """
726 727 728 729 730
        # simplewarning.sh calls "avocado" without specifying a path
        os.environ['PATH'] += ":" + os.path.join(basedir, 'scripts')
        # simplewarning.sh calls "avocado exec-path" which hasn't
        # access to an installed location for the libexec scripts
        os.environ['PATH'] += ":" + os.path.join(basedir, 'libexec')
731 732 733
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'examples/tests/simplewarning.sh --show-job-log'
                    % (AVOCADO, self.tmpdir))
734
        result = process.run(cmd_line, ignore_status=True)
735 736 737 738
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
739 740 741 742 743 744
        self.assertIn(b'DEBUG| Debug message', result.stdout, result)
        self.assertIn(b'INFO | Info message', result.stdout, result)
        self.assertIn(b'WARN | Warning message (should cause this test to '
                      b'finish with warning)', result.stdout, result)
        self.assertIn(b'ERROR| Error message (ordinary message not changing '
                      b'the results)', result.stdout, result)
745

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
    @unittest.skipIf(not GNU_ECHO_BINARY, "Uses echo as test")
    def test_fs_unfriendly_run(self):
        os.chdir(basedir)
        commands_path = os.path.join(self.tmpdir, "commands")
        script.make_script(commands_path, "echo '\"\\/|?*<>'")
        config_path = os.path.join(self.tmpdir, "config.conf")
        script.make_script(config_path,
                           "[sysinfo.collectibles]\ncommands = %s"
                           % commands_path)
        cmd_line = ("%s --show all --config %s run --job-results-dir %s "
                    "--sysinfo=on --external-runner %s -- \"'\\\"\\/|?*<>'\""
                    % (AVOCADO, config_path, self.tmpdir, GNU_ECHO_BINARY))
        result = process.run(cmd_line)
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "test-results",
                                                    "1-\'________\'/")))
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "sysinfo", "pre",
                                                    "echo \'________\'")))

        if html_capable():
            with open(os.path.join(self.tmpdir, "latest",
                                   "results.html")) as html_res:
                html_results = html_res.read()
            # test results should replace odd chars with "_"
            self.assertIn(os.path.join("test-results", "1-'________'"),
                          html_results)
            # sysinfo replaces "_" with " "
            self.assertIn("echo '________'", html_results)

776 777 778 779
    def test_non_absolute_path(self):
        avocado_path = os.path.join(basedir, 'scripts', 'avocado')
        test_base_dir = os.path.dirname(self.pass_script.path)
        os.chdir(test_base_dir)
780
        test_file_name = os.path.basename(self.pass_script.path)
781
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
782
                    ' "%s"' % (avocado_path, self.tmpdir, test_file_name))
783 784 785 786 787 788
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

A
Amador Pahim 已提交
789
    @unittest.skipIf(not SLEEP_BINARY, 'sleep binary not available')
790 791 792
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
793
    def test_kill_stopped_sleep(self):
794 795 796 797
        proc = aexpect.Expect("%s run 60 --job-results-dir %s "
                              "--external-runner %s --sysinfo=off "
                              "--job-timeout 3"
                              % (AVOCADO, self.tmpdir, SLEEP_BINARY))
798 799
        proc.read_until_output_matches(["\(1/1\)"], timeout=3,
                                       internal_timeout=0.01)
800 801 802 803
        # We need pid of the avocado process, not the shell executing it
        avocado_shell = psutil.Process(proc.get_pid())
        avocado_proc = avocado_shell.children()[0]
        pid = avocado_proc.pid
804
        os.kill(pid, signal.SIGTSTP)   # This freezes the process
805
        deadline = time.time() + 9
806 807 808
        while time.time() < deadline:
            if not proc.is_alive():
                break
809
            time.sleep(0.1)
810 811
        else:
            proc.kill(signal.SIGKILL)
812
            self.fail("Avocado process still alive 5s after job-timeout:\n%s"
813 814 815 816 817 818 819
                      % proc.get_output())
        output = proc.get_output()
        self.assertIn("ctrl+z pressed, stopping test", output, "SIGTSTP "
                      "message not in the output, test was probably not "
                      "stopped.")
        self.assertIn("TIME", output, "TIME not in the output, avocado "
                      "probably died unexpectadly")
820
        self.assertEqual(proc.get_status(), 8, "Avocado did not finish with "
821
                         "1.")
822 823

        sleep_dir = astring.string_to_safe_path("1-60")
824 825 826 827
        debug_log_path = os.path.join(self.tmpdir, "latest", "test-results",
                                      sleep_dir, "debug.log")

        debug_log = genio.read_file(debug_log_path)
828 829 830 831 832 833 834
        self.assertIn("Runner error occurred: Timeout reached", debug_log,
                      "Runner error occurred: Timeout reached message not "
                      "in the test's debug.log:\n%s" % debug_log)
        self.assertNotIn("Traceback (most recent", debug_log, "Traceback "
                         "present in the test's debug.log file, but it was "
                         "suppose to be stopped and unable to produce it.\n"
                         "%s" % debug_log)
835

836
    def tearDown(self):
837 838
        self.pass_script.remove()
        self.fail_script.remove()
839
        shutil.rmtree(self.tmpdir)
840 841


A
Amador Pahim 已提交
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885
class RunnerSimpleTestStatus(unittest.TestCase):

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)

        self.config_file = script.TemporaryScript('avocado.conf',
                                                  "[simpletests.status]\n"
                                                  "warn_regex = ^WARN$\n"
                                                  "skip_regex = ^SKIP$\n")
        self.config_file.save()
        os.chdir(basedir)

    def test_simpletest_status(self):
        warn_script = script.TemporaryScript('avocado_warn.sh',
                                             "#!/bin/sh\necho WARN",
                                             'avocado_simpletest_'
                                             'functional')
        warn_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, warn_script.path))
        result = process.system_output(cmd_line, ignore_status=True)
        json_results = json.loads(result)
        self.assertEquals(json_results['tests'][0]['status'], 'WARN')
        warn_script.remove()

        skip_script = script.TemporaryScript('avocado_skip.sh',
                                             "#!/bin/sh\necho SKIP",
                                             'avocado_simpletest_'
                                             'functional')
        skip_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, skip_script.path))
        result = process.system_output(cmd_line, ignore_status=True)
        json_results = json.loads(result)
        self.assertEquals(json_results['tests'][0]['status'], 'SKIP')
        skip_script.remove()

    def tearDown(self):
        self.config_file.remove()
        shutil.rmtree(self.tmpdir)


886
class ExternalRunnerTest(unittest.TestCase):
C
Cleber Rosa 已提交
887 888

    def setUp(self):
889
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
C
Cleber Rosa 已提交
890 891
        self.pass_script = script.TemporaryScript(
            'pass',
892
            "exit 0",
893
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
894 895 896
        self.pass_script.save()
        self.fail_script = script.TemporaryScript(
            'fail',
897
            "exit 1",
898
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
899
        self.fail_script.save()
900
        os.chdir(basedir)
C
Cleber Rosa 已提交
901

902
    def test_externalrunner_pass(self):
903 904 905
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
906
        result = process.run(cmd_line, ignore_status=True)
907
        expected_rc = exit_codes.AVOCADO_ALL_OK
C
Cleber Rosa 已提交
908 909 910 911
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

912
    def test_externalrunner_fail(self):
913 914 915
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.fail_script.path))
C
Cleber Rosa 已提交
916
        result = process.run(cmd_line, ignore_status=True)
917
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
C
Cleber Rosa 已提交
918 919 920 921
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

922
    def test_externalrunner_chdir_no_testdir(self):
923 924 925
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh --external-runner-chdir=test %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
926
        result = process.run(cmd_line, ignore_status=True)
927 928
        expected_output = (b'Option "--external-runner-chdir=test" requires '
                           b'"--external-runner-testdir" to be set')
C
Cleber Rosa 已提交
929
        self.assertIn(expected_output, result.stderr)
930
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
931 932 933 934 935
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def test_externalrunner_no_url(self):
936 937
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=%s' % (AVOCADO, self.tmpdir, TRUE_CMD))
938
        result = process.run(cmd_line, ignore_status=True)
939 940
        expected_output = (b'No test references provided nor any other '
                           b'arguments resolved into tests')
941 942
        self.assertIn(expected_output, result.stderr)
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
C
Cleber Rosa 已提交
943 944 945 946 947 948 949 950 951 952
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def tearDown(self):
        self.pass_script.remove()
        self.fail_script.remove()
        shutil.rmtree(self.tmpdir)


953
class AbsPluginsTest(object):
954

955
    def setUp(self):
956
        self.base_outputdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
957
        os.chdir(basedir)
958

959 960 961 962 963 964
    def tearDown(self):
        shutil.rmtree(self.base_outputdir)


class PluginsTest(AbsPluginsTest, unittest.TestCase):

965
    def test_sysinfo_plugin(self):
966
        cmd_line = '%s sysinfo %s' % (AVOCADO, self.base_outputdir)
967
        result = process.run(cmd_line, ignore_status=True)
968
        expected_rc = exit_codes.AVOCADO_ALL_OK
969 970 971 972 973 974
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        sysinfo_files = os.listdir(self.base_outputdir)
        self.assertGreater(len(sysinfo_files), 0, "Empty sysinfo files dir")

975
    def test_list_plugin(self):
976
        cmd_line = '%s list' % AVOCADO
977
        result = process.run(cmd_line, ignore_status=True)
978
        expected_rc = exit_codes.AVOCADO_ALL_OK
979 980 981
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
982 983
        self.assertNotIn(b'No tests were found on current tests dir',
                         result.stdout)
984

985
    def test_list_error_output(self):
986
        cmd_line = '%s list sbrubles' % AVOCADO
987
        result = process.run(cmd_line, ignore_status=True)
988
        expected_rc = exit_codes.AVOCADO_FAIL
989 990 991
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
992
        self.assertIn(b"Unable to resolve reference", result.stderr)
993

994 995 996 997 998 999 1000
    def test_list_no_file_loader(self):
        cmd_line = ("%s list --loaders external --verbose -- "
                    "this-wont-be-matched" % AVOCADO)
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1001 1002 1003 1004 1005 1006
        exp = (b"Type    Test                 Tag(s)\n"
               b"MISSING this-wont-be-matched \n\n"
               b"TEST TYPES SUMMARY\n"
               b"==================\n"
               b"EXTERNAL: 0\n"
               b"MISSING: 1\n")
1007 1008 1009
        self.assertEqual(exp, result.stdout, "Stdout mismatch:\n%s\n\n%s"
                         % (exp, result))

1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
    def test_list_verbose_tags(self):
        """
        Runs list verbosely and check for tag related output
        """
        test = script.make_script(os.path.join(self.base_outputdir, 'test.py'),
                                  VALID_PYTHON_TEST_WITH_TAGS)
        cmd_line = ("%s list --loaders file --verbose %s" % (AVOCADO,
                                                             test))
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1022
        stdout_lines = result.stdout_text.splitlines()
1023 1024
        self.assertIn("Tag(s)", stdout_lines[0])
        full_test_name = "%s:MyTest.test" % test
1025 1026
        self.assertEqual("INSTRUMENTED %s BIG_TAG_NAME" % full_test_name,
                         stdout_lines[1])
1027 1028 1029
        self.assertIn("TEST TYPES SUMMARY", stdout_lines)
        self.assertIn("INSTRUMENTED: 1", stdout_lines)
        self.assertIn("TEST TAGS SUMMARY", stdout_lines)
1030
        self.assertEqual("BIG_TAG_NAME: 1", stdout_lines[-1])
1031

1032
    def test_plugin_list(self):
1033
        cmd_line = '%s plugins' % AVOCADO
1034
        result = process.run(cmd_line, ignore_status=True)
1035
        expected_rc = exit_codes.AVOCADO_ALL_OK
1036 1037 1038
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1039
        if sys.version_info[:2] >= (2, 7, 0):
1040
            self.assertNotIn(b'Disabled', result.stdout)
1041

1042
    def test_config_plugin(self):
1043
        cmd_line = '%s config --paginator off' % AVOCADO
1044
        result = process.run(cmd_line, ignore_status=True)
1045
        expected_rc = exit_codes.AVOCADO_ALL_OK
1046 1047 1048
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1049
        self.assertNotIn(b'Disabled', result.stdout)
1050 1051

    def test_config_plugin_datadir(self):
1052
        cmd_line = '%s config --datadir --paginator off' % AVOCADO
1053
        result = process.run(cmd_line, ignore_status=True)
1054
        expected_rc = exit_codes.AVOCADO_ALL_OK
1055 1056 1057
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1058
        self.assertNotIn(b'Disabled', result.stdout)
1059

1060
    def test_disable_plugin(self):
1061
        cmd_line = '%s plugins' % AVOCADO
1062 1063 1064 1065 1066
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1067
        self.assertIn(b"Collect system information", result.stdout)
1068 1069 1070 1071 1072

        config_content = "[plugins]\ndisable=['cli.cmd.sysinfo',]"
        config = script.TemporaryScript("disable_sysinfo_cmd.conf",
                                        config_content)
        with config:
1073
            cmd_line = '%s --config %s plugins' % (AVOCADO, config)
1074 1075 1076 1077 1078
            result = process.run(cmd_line, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))
1079
            self.assertNotIn(b"Collect system information", result.stdout)
1080

1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092
    def test_plugin_order(self):
        """
        Tests plugin order by configuration file

        First it checks if html, json, xunit and zip_archive plugins are enabled.
        Then it runs a test with zip_archive running first, which means the html,
        json and xunit output files do not make into the archive.

        Then it runs with zip_archive set to run last, which means the html,
        json and xunit output files *do* make into the archive.
        """
        def run_config(config_path):
1093
            cmd = ('%s --config %s run passtest.py --archive '
1094
                   '--job-results-dir %s --sysinfo=off'
1095
                   % (AVOCADO, config_path, self.base_outputdir))
1096 1097 1098 1099 1100 1101 1102 1103
            result = process.run(cmd, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))

        result_plugins = ["json", "xunit", "zip_archive"]
        result_outputs = ["results.json", "results.xml"]
1104
        if html_capable():
1105
            result_plugins.append("html")
1106
            result_outputs.append("results.html")
1107

1108
        cmd_line = '%s plugins' % AVOCADO
1109 1110 1111 1112 1113 1114
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        for result_plugin in result_plugins:
1115
            self.assertIn(result_plugin, result.stdout_text)
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130

        config_content_zip_first = "[plugins.result]\norder=['zip_archive']"
        config_zip_first = script.TemporaryScript("zip_first.conf",
                                                  config_content_zip_first)
        with config_zip_first:
            run_config(config_zip_first)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertNotIn(result_output, zip_file_list)
            os.unlink(archives[0])

        config_content_zip_last = ("[plugins.result]\norder=['html', 'json',"
1131 1132
                                   "'xunit', 'non_existing_plugin_is_ignored'"
                                   ",'zip_archive']")
1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
        config_zip_last = script.TemporaryScript("zip_last.conf",
                                                 config_content_zip_last)
        with config_zip_last:
            run_config(config_zip_last)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertIn(result_output, zip_file_list)

1144
    def test_Namespace_object_has_no_attribute(self):
1145
        cmd_line = '%s plugins' % AVOCADO
1146
        result = process.run(cmd_line, ignore_status=True)
1147
        expected_rc = exit_codes.AVOCADO_ALL_OK
1148 1149 1150
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1151
        self.assertNotIn(b"'Namespace' object has no attribute", result.stderr)
1152

1153

1154 1155 1156 1157
class ParseXMLError(Exception):
    pass


1158
class PluginsXunitTest(AbsPluginsTest, unittest.TestCase):
1159

1160 1161
    @unittest.skipUnless(SCHEMA_CAPABLE,
                         'Unable to validate schema due to missing lxml.etree library')
1162
    def setUp(self):
1163
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
L
Lucas Meneghel Rodrigues 已提交
1164 1165 1166
        junit_xsd = os.path.join(os.path.dirname(__file__),
                                 os.path.pardir, ".data", 'junit-4.xsd')
        self.junit = os.path.abspath(junit_xsd)
1167 1168
        super(PluginsXunitTest, self).setUp()

1169
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1170
                      e_nnotfound, e_nfailures, e_nskip):
1171 1172
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' --xunit - %s' % (AVOCADO, self.tmpdir, testname))
1173 1174 1175 1176 1177 1178 1179
        result = process.run(cmd_line, ignore_status=True)
        xml_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            xunit_doc = xml.dom.minidom.parseString(xml_output)
1180
        except Exception as detail:
1181 1182 1183
            raise ParseXMLError("Failed to parse content: %s\n%s" %
                                (detail, xml_output))

1184
        with open(self.junit, 'rb') as f:
1185 1186
            xmlschema = etree.XMLSchema(etree.parse(f))

1187
        self.assertTrue(xmlschema.validate(etree.parse(BytesIO(xml_output))),
1188 1189 1190 1191
                        "Failed to validate against %s, message:\n%s" %
                        (self.junit,
                         xmlschema.error_log.filter_from_errors()))

1192 1193 1194 1195
        testsuite_list = xunit_doc.getElementsByTagName('testsuite')
        self.assertEqual(len(testsuite_list), 1, 'More than one testsuite tag')

        testsuite_tag = testsuite_list[0]
1196 1197
        self.assertEqual(len(testsuite_tag.attributes), 7,
                         'The testsuite tag does not have 7 attributes. '
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
                         'XML:\n%s' % xml_output)

        n_tests = int(testsuite_tag.attributes['tests'].value)
        self.assertEqual(n_tests, e_ntests,
                         "Unexpected number of executed tests, "
                         "XML:\n%s" % xml_output)

        n_errors = int(testsuite_tag.attributes['errors'].value)
        self.assertEqual(n_errors, e_nerrors,
                         "Unexpected number of test errors, "
                         "XML:\n%s" % xml_output)

        n_failures = int(testsuite_tag.attributes['failures'].value)
        self.assertEqual(n_failures, e_nfailures,
                         "Unexpected number of test failures, "
                         "XML:\n%s" % xml_output)

1215
        n_skip = int(testsuite_tag.attributes['skipped'].value)
1216 1217 1218 1219
        self.assertEqual(n_skip, e_nskip,
                         "Unexpected number of test skips, "
                         "XML:\n%s" % xml_output)

1220
    def test_xunit_plugin_passtest(self):
1221
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1222
                           1, 0, 0, 0, 0)
1223 1224

    def test_xunit_plugin_failtest(self):
1225
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1226
                           1, 0, 0, 1, 0)
1227

1228
    def test_xunit_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1229
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1230
                           1, 0, 0, 0, 1)
1231

1232
    def test_xunit_plugin_errortest(self):
1233
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1234
                           1, 1, 0, 0, 0)
1235

1236 1237 1238 1239
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsXunitTest, self).tearDown()

1240 1241 1242 1243 1244

class ParseJSONError(Exception):
    pass


1245
class PluginsJSONTest(AbsPluginsTest, unittest.TestCase):
1246

1247
    def setUp(self):
1248
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
1249 1250
        super(PluginsJSONTest, self).setUp()

1251
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1252
                      e_nfailures, e_nskip, e_ncancel=0, external_runner=None):
1253 1254
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off --json - '
                    '--archive %s' % (AVOCADO, self.tmpdir, testname))
1255 1256
        if external_runner is not None:
            cmd_line += " --external-runner '%s'" % external_runner
1257 1258 1259 1260 1261 1262 1263
        result = process.run(cmd_line, ignore_status=True)
        json_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            json_data = json.loads(json_output)
1264
        except Exception as detail:
1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281
            raise ParseJSONError("Failed to parse content: %s\n%s" %
                                 (detail, json_output))
        self.assertTrue(json_data, "Empty JSON result:\n%s" % json_output)
        self.assertIsInstance(json_data['tests'], list,
                              "JSON result lacks 'tests' list")
        n_tests = len(json_data['tests'])
        self.assertEqual(n_tests, e_ntests,
                         "Different number of expected tests")
        n_errors = json_data['errors']
        self.assertEqual(n_errors, e_nerrors,
                         "Different number of expected tests")
        n_failures = json_data['failures']
        self.assertEqual(n_failures, e_nfailures,
                         "Different number of expected tests")
        n_skip = json_data['skip']
        self.assertEqual(n_skip, e_nskip,
                         "Different number of skipped tests")
1282 1283
        n_cancel = json_data['cancel']
        self.assertEqual(n_cancel, e_ncancel)
1284
        return json_data
1285

1286
    def test_json_plugin_passtest(self):
1287
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1288
                           1, 0, 0, 0)
1289 1290

    def test_json_plugin_failtest(self):
1291
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1292
                           1, 0, 1, 0)
1293

1294
    def test_json_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1295
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1296
                           1, 0, 0, 0, 1)
1297

1298
    def test_json_plugin_errortest(self):
1299
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1300
                           1, 1, 0, 0)
1301

1302
    @unittest.skipIf(not GNU_ECHO_BINARY, 'echo binary not available')
1303
    def test_ugly_echo_cmd(self):
1304
        data = self.run_and_check('"-ne foo\\\\\\n\\\'\\\\\\"\\\\\\'
1305
                                  'nbar/baz"', exit_codes.AVOCADO_ALL_OK, 1, 0,
1306
                                  0, 0, external_runner=GNU_ECHO_BINARY)
1307
        # The executed test should be this
1308
        self.assertEqual(data['tests'][0]['id'],
1309
                         '1--ne foo\\\\n\\\'\\"\\\\nbar/baz')
1310 1311
        # logdir name should escape special chars (/)
        self.assertEqual(os.path.basename(data['tests'][0]['logdir']),
1312
                         "1--ne foo__n_'____nbar_baz")
1313

1314 1315 1316 1317
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsJSONTest, self).tearDown()

L
Lukáš Doktor 已提交
1318

1319 1320
if __name__ == '__main__':
    unittest.main()