test_basic.py 63.3 KB
Newer Older
1 2
import aexpect
import glob
3
import json
4
import os
5
import re
6
import shutil
7
import signal
8
import sys
9
import tempfile
10
import time
11
import xml.dom.minidom
12
import zipfile
13
import unittest
14
import psutil
15

16 17
try:
    from io import BytesIO
18
except ImportError:
19
    from BytesIO import BytesIO
20

21 22 23 24 25 26
try:
    from lxml import etree
    SCHEMA_CAPABLE = True
except ImportError:
    SCHEMA_CAPABLE = False

27
from six import iteritems
28
from six.moves import xrange as range
29

30
from avocado.core import exit_codes
31
from avocado.utils import astring
32
from avocado.utils import genio
33 34
from avocado.utils import process
from avocado.utils import script
35
from avocado.utils import path as utils_path
36

37
from .. import AVOCADO, BASEDIR, python_module_available
38

39

40 41 42 43 44 45 46 47 48
LOCAL_IMPORT_TEST_CONTENTS = '''
from avocado import Test
from mylib import hello

class LocalImportTest(Test):
    def test(self):
        self.log.info(hello())
'''

49 50 51 52 53 54
UNSUPPORTED_STATUS_TEST_CONTENTS = '''
from avocado import Test

class FakeStatusTest(Test):
    def run_avocado(self):
        super(FakeStatusTest, self).run_avocado()
55 56
        # Please do NOT ever use this, it's for unittesting only.
        self._Test__status = 'not supported'
57 58 59 60 61

    def test(self):
        pass
'''

62 63 64 65 66 67 68 69 70 71 72
INVALID_PYTHON_TEST = '''
from avocado import Test

class MyTest(Test):

    non_existing_variable_causing_crash

    def test_my_name(self):
        pass
'''

73

74 75 76 77 78 79 80 81 82 83 84 85
VALID_PYTHON_TEST_WITH_TAGS = '''
from avocado import Test

class MyTest(Test):
    def test(self):
         """
         :avocado: tags=BIG_TAG_NAME
         """
         pass
'''


86 87 88 89 90 91 92
REPORTS_STATUS_AND_HANG = '''
from avocado import Test
import time

class MyTest(Test):
    def test(self):
         self.runner_queue.put({"running": False})
93
         time.sleep(70)
94 95
'''

96

97 98 99 100 101 102 103 104 105 106 107
DIE_WITHOUT_REPORTING_STATUS = '''
from avocado import Test
import os
import signal

class MyTest(Test):
    def test(self):
         os.kill(os.getpid(), signal.SIGKILL)
'''


108 109 110 111 112 113 114 115 116 117 118 119 120
RAISE_CUSTOM_PATH_EXCEPTION_CONTENT = '''import os
import sys

from avocado import Test

class SharedLibTest(Test):
    def test(self):
        sys.path.append(os.path.join(os.path.dirname(__file__), "shared_lib"))
        from mylib import CancelExc
        raise CancelExc("This should not crash on unpickling in runner")
'''


A
Amador Pahim 已提交
121
def probe_binary(binary):
122
    try:
A
Amador Pahim 已提交
123
        return utils_path.find_command(binary)
124
    except utils_path.CmdNotFoundError:
A
Amador Pahim 已提交
125 126
        return None

L
Lukáš Doktor 已提交
127

128
TRUE_CMD = probe_binary('true')
A
Amador Pahim 已提交
129
CC_BINARY = probe_binary('cc')
130

L
Lukáš Doktor 已提交
131
# On macOS, the default GNU core-utils installation (brew)
132 133 134 135 136
# installs the gnu utility versions with a g prefix. It still has the
# BSD versions of the core utilities installed on their expected paths
# but their behavior and flags are in most cases different.
GNU_ECHO_BINARY = probe_binary('echo')
if GNU_ECHO_BINARY is not None:
137
    if probe_binary('man') is not None:
138 139 140
        echo_cmd = 'man %s' % os.path.basename(GNU_ECHO_BINARY)
        echo_manpage = process.run(echo_cmd, env={'LANG': 'C'},
                                   encoding='ascii').stdout
141
        if b'-e' not in echo_manpage:
142
            GNU_ECHO_BINARY = probe_binary('gecho')
A
Amador Pahim 已提交
143 144
READ_BINARY = probe_binary('read')
SLEEP_BINARY = probe_binary('sleep')
145 146


147 148
class RunnerOperationTest(unittest.TestCase):

149
    def setUp(self):
150
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
151
        os.chdir(BASEDIR)
152

153
    def test_show_version(self):
154
        result = process.run('%s -v' % AVOCADO, ignore_status=True)
155
        self.assertEqual(result.exit_status, 0)
156 157 158 159 160
        if sys.version_info[0] == 3:
            content = result.stdout_text
        else:
            content = result.stderr_text
        self.assertTrue(re.match(r"^Avocado \d+\.\d+$", content),
C
Cleber Rosa 已提交
161
                        "Version string does not match 'Avocado \\d\\.\\d:'\n"
162
                        "%r" % (content))
163

164 165 166 167 168 169 170 171 172 173 174 175 176
    def test_alternate_config_datadir(self):
        """
        Uses the "--config" flag to check custom configuration is applied

        Even on the more complex data_dir module, which adds extra checks
        to what is set on the plain settings module.
        """
        base_dir = os.path.join(self.tmpdir, 'datadir_base')
        os.mkdir(base_dir)
        mapping = {'base_dir': base_dir,
                   'test_dir': os.path.join(base_dir, 'test'),
                   'data_dir': os.path.join(base_dir, 'data'),
                   'logs_dir': os.path.join(base_dir, 'logs')}
177
        config = '[datadir.paths]\n'
178
        for key, value in iteritems(mapping):
179 180 181 182
            if not os.path.isdir(value):
                os.mkdir(value)
            config += "%s = %s\n" % (key, value)
        fd, config_file = tempfile.mkstemp(dir=self.tmpdir)
183
        os.write(fd, config.encode())
184 185
        os.close(fd)

186
        cmd = '%s --config %s config --datadir' % (AVOCADO, config_file)
187 188 189 190 191
        result = process.run(cmd)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
192 193 194
        self.assertIn('    base     ' + mapping['base_dir'], result.stdout_text)
        self.assertIn('    data     ' + mapping['data_dir'], result.stdout_text)
        self.assertIn('    logs     ' + mapping['logs_dir'], result.stdout_text)
195

196
    def test_runner_all_ok(self):
197 198
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py passtest.py' % (AVOCADO, self.tmpdir))
199
        process.run(cmd_line)
200
        # Also check whether jobdata contains correct parameter paths
201 202
        variants = open(os.path.join(self.tmpdir, "latest", "jobdata",
                        "variants.json")).read()
203
        self.assertIn('["/run/*"]', variants, "paths stored in jobdata "
204
                      "does not contains [\"/run/*\"]\n%s" % variants)
205

206
    def test_runner_failfast(self):
207 208 209
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py failtest.py passtest.py --failfast on'
                    % (AVOCADO, self.tmpdir))
210
        result = process.run(cmd_line, ignore_status=True)
211 212
        self.assertIn(b'Interrupting job (failfast).', result.stdout)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 1 | SKIP 1', result.stdout)
213 214 215 216
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL | exit_codes.AVOCADO_JOB_INTERRUPTED
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

A
Amador Pahim 已提交
217 218 219 220 221
    def test_runner_ignore_missing_references_one_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py badtest.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
222 223
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py'", result.stderr)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 0 | SKIP 0', result.stdout)
A
Amador Pahim 已提交
224 225 226 227 228 229 230 231 232
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_ignore_missing_references_all_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'badtest.py badtest2.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
233
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py', 'badtest2.py'",
A
Amador Pahim 已提交
234
                      result.stderr)
235
        self.assertEqual(b'', result.stdout)
A
Amador Pahim 已提交
236 237 238 239
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

240 241 242
    def test_runner_test_with_local_imports(self):
        mylib = script.TemporaryScript(
            'mylib.py',
243
            "def hello():\n    return 'Hello world'",
244 245 246 247 248 249
            'avocado_simpletest_functional')
        mylib.save()
        mytest = script.Script(
            os.path.join(os.path.dirname(mylib.path), 'test_local_imports.py'),
            LOCAL_IMPORT_TEST_CONTENTS)
        mytest.save()
250 251
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "%s" % (AVOCADO, self.tmpdir, mytest))
252 253
        process.run(cmd_line)

254 255 256 257
    def test_unsupported_status(self):
        with script.TemporaryScript("fake_status.py",
                                    UNSUPPORTED_STATUS_TEST_CONTENTS,
                                    "avocado_unsupported_status") as tst:
258 259 260
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s"
                              " --json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
261
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
262
            results = json.loads(res.stdout_text)
263 264 265
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
266
            self.assertIn("Runner error occurred: Test reports unsupported",
267 268
                          results["tests"][0]["fail_reason"])

269 270 271
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
272 273 274 275 276
    def test_hanged_test_with_status(self):
        """ Check that avocado handles hanged tests properly """
        with script.TemporaryScript("report_status_and_hang.py",
                                    REPORTS_STATUS_AND_HANG,
                                    "hanged_test_with_status") as tst:
277
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
278
                              "--json - --job-timeout 1" % (AVOCADO, self.tmpdir, tst),
279
                              ignore_status=True)
280
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
281
            results = json.loads(res.stdout_text)
282 283 284 285 286
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test reported status but did not finish",
                          results["tests"][0]["fail_reason"])
287 288 289 290 291
            # Currently it should finish up to 1s after the job-timeout
            # but the prep and postprocess could take a bit longer on
            # some environments, so let's just check it does not take
            # > 60s, which is the deadline for force-finishing the test.
            self.assertLess(res.duration, 55, "Test execution took too long, "
292 293 294 295 296 297 298
                            "which is likely because the hanged test was not "
                            "interrupted. Results:\n%s" % res)

    def test_no_status_reported(self):
        with script.TemporaryScript("die_without_reporting_status.py",
                                    DIE_WITHOUT_REPORTING_STATUS,
                                    "no_status_reported") as tst:
299 300 301
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
                              "--json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
302
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
303
            results = json.loads(res.stdout_text)
304 305 306 307 308 309
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test died without reporting the status",
                          results["tests"][0]["fail_reason"])

310
    def test_runner_tests_fail(self):
311 312
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s passtest.py '
                    'failtest.py passtest.py' % (AVOCADO, self.tmpdir))
313
        result = process.run(cmd_line, ignore_status=True)
314
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
315 316 317 318
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_nonexistent_test(self):
319 320
        cmd_line = ('%s run --sysinfo=off --job-results-dir '
                    '%s bogustest' % (AVOCADO, self.tmpdir))
321
        result = process.run(cmd_line, ignore_status=True)
322 323
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
324 325 326 327 328
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

329
    def test_runner_doublefail(self):
330 331
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - doublefail.py' % (AVOCADO, self.tmpdir))
332
        result = process.run(cmd_line, ignore_status=True)
333 334
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
335 336 337 338
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
339
        self.assertIn(b"TestError: Failing during tearDown. Yay!", result.stdout,
340
                      "Cleanup exception not printed to log output")
341 342
        self.assertIn(b"TestFail: This test is supposed to fail", result.stdout,
                      "Test did not fail with action exception:\n%s" % result.stdout)
343

344
    def test_uncaught_exception(self):
345 346
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - uncaught_exception.py" % (AVOCADO, self.tmpdir))
347
        result = process.run(cmd_line, ignore_status=True)
348
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
349 350 351
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
352
        self.assertIn(b'"status": "ERROR"', result.stdout)
353

354
    def test_fail_on_exception(self):
355 356
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - fail_on_exception.py" % (AVOCADO, self.tmpdir))
357
        result = process.run(cmd_line, ignore_status=True)
358
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
359 360 361
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
362
        self.assertIn(b'"status": "FAIL"', result.stdout)
363

364 365 366 367 368 369 370 371 372 373 374 375
    def test_assert_raises(self):
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "-- assert.py" % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
        self.assertIn(b'Assert.test_assert_raises:  PASS', result.stdout)
        self.assertIn(b'Assert.test_fails_to_raise:  FAIL', result.stdout)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 1 ', result.stdout)

376 377 378 379 380 381 382 383 384 385 386 387 388 389
    def test_exception_not_in_path(self):
        os.mkdir(os.path.join(self.tmpdir, "shared_lib"))
        mylib = script.Script(os.path.join(self.tmpdir, "shared_lib",
                                           "mylib.py"),
                              "from avocado import TestCancel\n\n"
                              "class CancelExc(TestCancel):\n"
                              "    pass")
        mylib.save()
        mytest = script.Script(os.path.join(self.tmpdir, "mytest.py"),
                               RAISE_CUSTOM_PATH_EXCEPTION_CONTENT)
        mytest.save()
        result = process.run("%s --show test run --sysinfo=off "
                             "--job-results-dir %s %s"
                             % (AVOCADO, self.tmpdir, mytest))
390 391
        self.assertIn(b"mytest.py:SharedLibTest.test -> CancelExc: This "
                      b"should not crash on unpickling in runner",
392
                      result.stdout)
393
        self.assertNotIn(b"Failed to read queue", result.stdout)
394

395
    def test_runner_timeout(self):
396 397
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - timeouttest.py' % (AVOCADO, self.tmpdir))
398 399
        result = process.run(cmd_line, ignore_status=True)
        output = result.stdout
400
        expected_rc = exit_codes.AVOCADO_JOB_INTERRUPTED
401
        unexpected_rc = exit_codes.AVOCADO_FAIL
402 403 404 405
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
406
        self.assertIn(b"Runner error occurred: Timeout reached", output,
407
                      "Timeout reached message not found in the output:\n%s" % output)
408
        # Ensure no test aborted error messages show up
C
Cleber Rosa 已提交
409
        self.assertNotIn(b"TestAbortError: Test aborted unexpectedly", output)
410

411
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
412 413
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
414
    def test_runner_abort(self):
415 416
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - abort.py' % (AVOCADO, self.tmpdir))
417
        result = process.run(cmd_line, ignore_status=True)
418
        excerpt = b'Test died without reporting the status.'
419 420
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
421 422 423 424
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
425
        self.assertIn(excerpt, result.stdout)
426

427
    def test_silent_output(self):
428 429
        cmd_line = ('%s --silent run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
430
        result = process.run(cmd_line, ignore_status=True)
431
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
432
        self.assertEqual(result.stdout, b'')
433

434
    def test_empty_args_list(self):
435
        cmd_line = AVOCADO
436
        result = process.run(cmd_line, ignore_status=True)
437
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_FAIL)
438 439 440 441 442
        if sys.version_info[0] == 3:
            exp = b'avocado: error: the following arguments are required'
        else:
            exp = b'error: too few arguments'
        self.assertIn(exp, result.stderr)
443

444
    def test_empty_test_list(self):
445 446
        cmd_line = '%s run --sysinfo=off --job-results-dir %s' % (AVOCADO,
                                                                  self.tmpdir)
447
        result = process.run(cmd_line, ignore_status=True)
448
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
449 450
        self.assertIn(b'No test references provided nor any other arguments '
                      b'resolved into tests', result.stderr)
451

452
    def test_not_found(self):
453 454
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s sbrubles'
                    % (AVOCADO, self.tmpdir))
455
        result = process.run(cmd_line, ignore_status=True)
456
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
457 458
        self.assertIn(b'Unable to resolve reference', result.stderr)
        self.assertNotIn(b'Unable to resolve reference', result.stdout)
459

460
    def test_invalid_unique_id(self):
461 462
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s --force-job-id '
                    'foobar passtest.py' % (AVOCADO, self.tmpdir))
463
        result = process.run(cmd_line, ignore_status=True)
464
        self.assertNotEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
465 466
        self.assertIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertNotIn(b'needs to be a 40 digit hex', result.stdout)
467 468

    def test_valid_unique_id(self):
469
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
470
                    '--force-job-id 975de258ac05ce5e490648dec4753657b7ccc7d1 '
471
                    'passtest.py' % (AVOCADO, self.tmpdir))
472
        result = process.run(cmd_line, ignore_status=True)
473
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
474 475
        self.assertNotIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertIn(b'PASS', result.stdout)
476

477
    def test_automatic_unique_id(self):
478 479
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'passtest.py --json -' % (AVOCADO, self.tmpdir))
480
        result = process.run(cmd_line, ignore_status=True)
481
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
482
        r = json.loads(result.stdout_text)
483 484 485
        int(r['job_id'], 16)  # it's an hex number
        self.assertEqual(len(r['job_id']), 40)

486 487 488
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
489 490 491 492
    def test_early_latest_result(self):
        """
        Tests that the `latest` link to the latest job results is created early
        """
493 494
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'examples/tests/passtest.py' % (AVOCADO, self.tmpdir))
495
        avocado_process = process.SubProcess(cmd_line)
496 497 498
        try:
            avocado_process.start()
            link = os.path.join(self.tmpdir, 'latest')
499
            for _ in range(0, 50):
500 501 502 503 504 505 506 507
                time.sleep(0.1)
                if os.path.exists(link) and os.path.islink(link):
                    avocado_process.wait()
                    break
            self.assertTrue(os.path.exists(link))
            self.assertTrue(os.path.islink(link))
        finally:
            avocado_process.wait()
508

509
    def test_dry_run(self):
510 511 512 513
        cmd = ("%s run --sysinfo=off --dry-run --dry-run-no-cleanup --json - "
               "--mux-inject foo:1 bar:2 baz:3 foo:foo:a "
               "foo:bar:b foo:baz:c bar:bar:bar "
               "-- passtest.py failtest.py gendata.py " % AVOCADO)
514
        result = json.loads(process.run(cmd).stdout_text)
515
        debuglog = result['debuglog']
516
        log = genio.read_file(debuglog)
517 518
        # Remove the result dir
        shutil.rmtree(os.path.dirname(os.path.dirname(debuglog)))
519
        self.assertIn(tempfile.gettempdir(), debuglog)   # Use tmp dir, not default location
520 521
        self.assertEqual(result['job_id'], u'0' * 40)
        # Check if all tests were skipped
522
        self.assertEqual(result['cancel'], 4)
523
        for i in range(4):
524 525
            test = result['tests'][i]
            self.assertEqual(test['fail_reason'],
526
                             u'Test cancelled due to --dry-run')
527 528 529 530 531
        # Check if all params are listed
        # The "/:bar ==> 2 is in the tree, but not in any leave so inaccessible
        # from test.
        for line in ("/:foo ==> 1", "/:baz ==> 3", "/foo:foo ==> a",
                     "/foo:bar ==> b", "/foo:baz ==> c", "/bar:bar ==> bar"):
532 533
            self.assertEqual(log.count(line), 4,
                             "Avocado log count for param '%s' not as expected:\n%s" % (line, log))
534

535 536 537
    def test_invalid_python(self):
        test = script.make_script(os.path.join(self.tmpdir, 'test.py'),
                                  INVALID_PYTHON_TEST)
538 539
        cmd_line = ('%s --show test run --sysinfo=off '
                    '--job-results-dir %s %s') % (AVOCADO, self.tmpdir, test)
540 541 542 543 544
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
545
        self.assertIn('1-%s:MyTest.test_my_name -> TestError' % test,
546
                      result.stdout_text)
547

A
Amador Pahim 已提交
548
    @unittest.skipIf(not READ_BINARY, "read binary not available.")
549 550 551
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
L
Lukáš Doktor 已提交
552
    def test_read(self):
553
        cmd = "%s run --sysinfo=off --job-results-dir %%s %%s" % AVOCADO
554
        cmd %= (self.tmpdir, READ_BINARY)
555
        result = process.run(cmd, timeout=10, ignore_status=True)
L
Lukáš Doktor 已提交
556 557 558 559 560
        self.assertLess(result.duration, 8, "Duration longer than expected."
                        "\n%s" % result)
        self.assertEqual(result.exit_status, 1, "Expected exit status is 1\n%s"
                         % result)

561 562 563 564 565 566 567 568 569 570 571 572
    def test_runner_test_parameters(self):
        cmd_line = ('%s --show=test run --sysinfo=off --job-results-dir %s '
                    '-p "sleep_length=0.01" -- sleeptest.py ' % (AVOCADO,
                                                                 self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
        self.assertIn(b"PARAMS (key=sleep_length, path=*, default=1) => '0.01'",
                      result.stdout)
        self.assertIn(b"Sleeping for 0.01 seconds", result.stdout)

573 574 575
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

576

577 578 579
class RunnerHumanOutputTest(unittest.TestCase):

    def setUp(self):
580
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
581
        os.chdir(BASEDIR)
582 583

    def test_output_pass(self):
584 585
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
586
        result = process.run(cmd_line, ignore_status=True)
587
        expected_rc = exit_codes.AVOCADO_ALL_OK
588 589 590
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
591
        self.assertIn(b'passtest.py:PassTest.test:  PASS', result.stdout)
592 593

    def test_output_fail(self):
594 595
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'failtest.py' % (AVOCADO, self.tmpdir))
596
        result = process.run(cmd_line, ignore_status=True)
597
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
598 599 600
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
601
        self.assertIn(b'failtest.py:FailTest.test:  FAIL', result.stdout)
602 603

    def test_output_error(self):
604 605
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'errortest.py' % (AVOCADO, self.tmpdir))
606
        result = process.run(cmd_line, ignore_status=True)
607
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
608 609 610
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
611
        self.assertIn(b'errortest.py:ErrorTest.test:  ERROR', result.stdout)
612

A
Amador Pahim 已提交
613
    def test_output_cancel(self):
614 615
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'cancelonsetup.py' % (AVOCADO, self.tmpdir))
616
        result = process.run(cmd_line, ignore_status=True)
617
        expected_rc = exit_codes.AVOCADO_ALL_OK
618 619 620
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
621 622
        self.assertIn(b'PASS 0 | ERROR 0 | FAIL 0 | SKIP 0 | WARN 0 | '
                      b'INTERRUPT 0 | CANCEL 1',
A
Amador Pahim 已提交
623
                      result.stdout)
624

625 626
    @unittest.skipIf(not GNU_ECHO_BINARY,
                     'GNU style echo binary not available')
627
    def test_ugly_echo_cmd(self):
628
        cmd_line = ('%s run --external-runner "%s -ne" '
629
                    '"foo\\\\\\n\\\'\\\\\\"\\\\\\nbar/baz" --job-results-dir %s'
A
Amador Pahim 已提交
630
                    ' --sysinfo=off  --show-job-log' %
631
                    (AVOCADO, GNU_ECHO_BINARY, self.tmpdir))
632 633 634 635 636
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
637 638 639 640
        self.assertIn(b'[stdout] foo', result.stdout, result)
        self.assertIn(b'[stdout] \'"', result.stdout, result)
        self.assertIn(b'[stdout] bar/baz', result.stdout, result)
        self.assertIn(b'PASS 1-foo\\\\n\\\'\\"\\\\nbar/baz',
641
                      result.stdout, result)
642 643 644 645 646 647 648
        # logdir name should escape special chars (/)
        test_dirs = glob.glob(os.path.join(self.tmpdir, 'latest',
                                           'test-results', '*'))
        self.assertEqual(len(test_dirs), 1, "There are multiple directories in"
                         " test-results dir, but only one test was executed: "
                         "%s" % (test_dirs))
        self.assertEqual(os.path.basename(test_dirs[0]),
649
                         "1-foo__n_'____nbar_baz")
650

651
    def test_replay_skip_skipped(self):
652 653
        cmd = ("%s run --job-results-dir %s --json - "
               "cancelonsetup.py" % (AVOCADO, self.tmpdir))
654
        result = process.run(cmd)
655
        result = json.loads(result.stdout_text)
656
        jobid = str(result["job_id"])
657 658
        cmd = ("%s run --job-results-dir %s --replay %s "
               "--replay-test-status PASS" % (AVOCADO, self.tmpdir, jobid))
659
        process.run(cmd)
660

661 662 663
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

664

665
class RunnerSimpleTest(unittest.TestCase):
666 667

    def setUp(self):
668
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
669
        self.pass_script = script.TemporaryScript(
670
            u'\u00e1 \u00e9 \u00ed \u00f3 \u00fa',
671
            "#!/bin/sh\ntrue",
672
            'avocado_simpletest_functional')
673
        self.pass_script.save()
L
Lukáš Doktor 已提交
674
        self.fail_script = script.TemporaryScript('avocado_fail.sh',
675
                                                  "#!/bin/sh\nfalse",
L
Lukáš Doktor 已提交
676 677
                                                  'avocado_simpletest_'
                                                  'functional')
678
        self.fail_script.save()
679
        os.chdir(BASEDIR)
680

681
    def test_simpletest_pass(self):
682 683
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' "%s"' % (AVOCADO, self.tmpdir, self.pass_script.path))
684
        result = process.run(cmd_line, ignore_status=True)
685
        expected_rc = exit_codes.AVOCADO_ALL_OK
686 687 688 689
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

690
    def test_simpletest_fail(self):
691 692
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' %s' % (AVOCADO, self.tmpdir, self.fail_script.path))
693
        result = process.run(cmd_line, ignore_status=True)
694
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
695 696 697 698
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

699
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
700 701
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
702 703
    def test_runner_onehundred_fail_timing(self):
        """
A
Amador Pahim 已提交
704
        We can be pretty sure that a failtest should return immediately. Let's
705
        run 100 of them and assure they not take more than 30 seconds to run.
706

707 708
        Notice: on a current machine this takes about 0.12s, so 30 seconds is
        considered to be pretty safe here.
709
        """
710
        one_hundred = 'failtest.py ' * 100
711 712
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, one_hundred))
713 714 715
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
716
        self.assertLess(actual_time, 30.0)
717
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
718 719 720
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

721
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
722 723
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
724 725 726 727 728
    def test_runner_sleep_fail_sleep_timing(self):
        """
        Sleeptest is supposed to take 1 second, let's make a sandwich of
        100 failtests and check the test runner timing.
        """
729 730
        sleep_fail_sleep = ('sleeptest.py ' + 'failtest.py ' * 100 +
                            'sleeptest.py')
731 732
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, sleep_fail_sleep))
733 734 735
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
736
        self.assertLess(actual_time, 33.0)
737
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
738 739 740
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

741 742 743 744
    def test_simplewarning(self):
        """
        simplewarning.sh uses the avocado-bash-utils
        """
745
        # simplewarning.sh calls "avocado" without specifying a path
746 747 748
        # let's add the path that was defined at the global module
        # scope here
        os.environ['PATH'] += ":" + os.path.dirname(AVOCADO)
749 750
        # simplewarning.sh calls "avocado exec-path" which hasn't
        # access to an installed location for the libexec scripts
751
        os.environ['PATH'] += ":" + os.path.join(BASEDIR, 'libexec')
752 753 754
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'examples/tests/simplewarning.sh --show-job-log'
                    % (AVOCADO, self.tmpdir))
755
        result = process.run(cmd_line, ignore_status=True)
756 757 758 759
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
760 761 762 763 764 765
        self.assertIn(b'DEBUG| Debug message', result.stdout, result)
        self.assertIn(b'INFO | Info message', result.stdout, result)
        self.assertIn(b'WARN | Warning message (should cause this test to '
                      b'finish with warning)', result.stdout, result)
        self.assertIn(b'ERROR| Error message (ordinary message not changing '
                      b'the results)', result.stdout, result)
766 767
        self.assertIn(b'Test passed but there were warnings', result.stdout,
                      result)
768

769 770
    @unittest.skipIf(not GNU_ECHO_BINARY, "Uses echo as test")
    def test_fs_unfriendly_run(self):
771
        os.chdir(BASEDIR)
772 773 774 775 776 777 778 779 780
        commands_path = os.path.join(self.tmpdir, "commands")
        script.make_script(commands_path, "echo '\"\\/|?*<>'")
        config_path = os.path.join(self.tmpdir, "config.conf")
        script.make_script(config_path,
                           "[sysinfo.collectibles]\ncommands = %s"
                           % commands_path)
        cmd_line = ("%s --show all --config %s run --job-results-dir %s "
                    "--sysinfo=on --external-runner %s -- \"'\\\"\\/|?*<>'\""
                    % (AVOCADO, config_path, self.tmpdir, GNU_ECHO_BINARY))
781
        process.run(cmd_line)
782 783 784 785 786 787 788
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "test-results",
                                                    "1-\'________\'/")))
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "sysinfo", "pre",
                                                    "echo \'________\'")))

789
        if python_module_available('avocado-framework-plugin-result-html'):
790 791 792 793
            with open(os.path.join(self.tmpdir, "latest",
                                   "results.html")) as html_res:
                html_results = html_res.read()
            # test results should replace odd chars with "_"
794 795 796 797
            # HTML could contain either the literal char, or an entity reference
            test1_href = (os.path.join("test-results",
                                       "1-'________'") in html_results or
                          os.path.join("test-results",
798
                                       "1-&#39;________&#39;") in html_results)
799
            self.assertTrue(test1_href)
800
            # sysinfo replaces "_" with " "
801
            sysinfo = ("echo '________'" in html_results or
802
                       "echo &#39;________&#39;" in html_results)
803
            self.assertTrue(sysinfo)
804

805
    def test_non_absolute_path(self):
806
        avocado_path = os.path.join(BASEDIR, 'scripts', 'avocado')
807 808
        test_base_dir = os.path.dirname(self.pass_script.path)
        os.chdir(test_base_dir)
809
        test_file_name = os.path.basename(self.pass_script.path)
810 811 812
        cmd_line = ('%s %s run --job-results-dir %s --sysinfo=off'
                    ' "%s"' % (sys.executable, avocado_path, self.tmpdir,
                               test_file_name))
813 814 815 816 817 818
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

A
Amador Pahim 已提交
819
    @unittest.skipIf(not SLEEP_BINARY, 'sleep binary not available')
820
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
821 822
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
823
    def test_kill_stopped_sleep(self):
824 825 826 827
        proc = aexpect.Expect("%s run 60 --job-results-dir %s "
                              "--external-runner %s --sysinfo=off "
                              "--job-timeout 3"
                              % (AVOCADO, self.tmpdir, SLEEP_BINARY))
L
Lukáš Doktor 已提交
828
        proc.read_until_output_matches([r"\(1/1\)"], timeout=3,
829
                                       internal_timeout=0.01)
830 831 832 833
        # We need pid of the avocado process, not the shell executing it
        avocado_shell = psutil.Process(proc.get_pid())
        avocado_proc = avocado_shell.children()[0]
        pid = avocado_proc.pid
834
        os.kill(pid, signal.SIGTSTP)   # This freezes the process
835
        deadline = time.time() + 9
836 837 838
        while time.time() < deadline:
            if not proc.is_alive():
                break
839
            time.sleep(0.1)
840 841
        else:
            proc.kill(signal.SIGKILL)
842
            self.fail("Avocado process still alive 5s after job-timeout:\n%s"
843 844 845 846 847 848 849
                      % proc.get_output())
        output = proc.get_output()
        self.assertIn("ctrl+z pressed, stopping test", output, "SIGTSTP "
                      "message not in the output, test was probably not "
                      "stopped.")
        self.assertIn("TIME", output, "TIME not in the output, avocado "
                      "probably died unexpectadly")
850
        self.assertEqual(proc.get_status(), 8, "Avocado did not finish with "
851
                         "1.")
852 853

        sleep_dir = astring.string_to_safe_path("1-60")
854 855 856 857
        debug_log_path = os.path.join(self.tmpdir, "latest", "test-results",
                                      sleep_dir, "debug.log")

        debug_log = genio.read_file(debug_log_path)
858 859 860 861 862 863 864
        self.assertIn("Runner error occurred: Timeout reached", debug_log,
                      "Runner error occurred: Timeout reached message not "
                      "in the test's debug.log:\n%s" % debug_log)
        self.assertNotIn("Traceback (most recent", debug_log, "Traceback "
                         "present in the test's debug.log file, but it was "
                         "suppose to be stopped and unable to produce it.\n"
                         "%s" % debug_log)
865

866
    def tearDown(self):
867 868
        self.pass_script.remove()
        self.fail_script.remove()
869
        shutil.rmtree(self.tmpdir)
870 871


A
Amador Pahim 已提交
872 873 874 875 876 877 878 879
class RunnerSimpleTestStatus(unittest.TestCase):

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)

        self.config_file = script.TemporaryScript('avocado.conf',
                                                  "[simpletests.status]\n"
                                                  "warn_regex = ^WARN$\n"
880 881
                                                  "skip_regex = ^SKIP$\n"
                                                  "skip_location = stdout\n")
A
Amador Pahim 已提交
882
        self.config_file.save()
883
        os.chdir(BASEDIR)
A
Amador Pahim 已提交
884 885

    def test_simpletest_status(self):
886
        # Multi-line warning in STDERR should by default be handled
A
Amador Pahim 已提交
887
        warn_script = script.TemporaryScript('avocado_warn.sh',
888
                                             '#!/bin/sh\n'
889
                                             '>&2 echo -e "\\n\\nWARN\\n"',
A
Amador Pahim 已提交
890 891 892 893 894 895
                                             'avocado_simpletest_'
                                             'functional')
        warn_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, warn_script.path))
896 897
        result = process.run(cmd_line, ignore_status=True)
        json_results = json.loads(result.stdout_text)
898
        self.assertEqual(json_results['tests'][0]['status'], 'WARN')
A
Amador Pahim 已提交
899
        warn_script.remove()
900
        # Skip in STDOUT should be handled because of config
A
Amador Pahim 已提交
901 902 903 904 905 906 907 908
        skip_script = script.TemporaryScript('avocado_skip.sh',
                                             "#!/bin/sh\necho SKIP",
                                             'avocado_simpletest_'
                                             'functional')
        skip_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, skip_script.path))
909 910
        result = process.run(cmd_line, ignore_status=True)
        json_results = json.loads(result.stdout_text)
911
        self.assertEqual(json_results['tests'][0]['status'], 'SKIP')
A
Amador Pahim 已提交
912
        skip_script.remove()
913 914 915 916 917 918 919 920 921 922 923
        # STDERR skip should not be handled
        skip2_script = script.TemporaryScript('avocado_skip.sh',
                                              "#!/bin/sh\n>&2 echo SKIP",
                                              'avocado_simpletest_'
                                              'functional')
        skip2_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, skip2_script.path))
        result = process.run(cmd_line, ignore_status=True)
        json_results = json.loads(result.stdout_text)
924
        self.assertEqual(json_results['tests'][0]['status'], 'PASS')
925
        skip2_script.remove()
A
Amador Pahim 已提交
926 927 928 929 930 931

    def tearDown(self):
        self.config_file.remove()
        shutil.rmtree(self.tmpdir)


932
class ExternalRunnerTest(unittest.TestCase):
C
Cleber Rosa 已提交
933 934

    def setUp(self):
935
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
C
Cleber Rosa 已提交
936 937
        self.pass_script = script.TemporaryScript(
            'pass',
938
            "exit 0",
939
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
940 941 942
        self.pass_script.save()
        self.fail_script = script.TemporaryScript(
            'fail',
943
            "exit 1",
944
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
945
        self.fail_script.save()
946
        os.chdir(BASEDIR)
C
Cleber Rosa 已提交
947

948
    def test_externalrunner_pass(self):
949 950 951
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
952
        result = process.run(cmd_line, ignore_status=True)
953
        expected_rc = exit_codes.AVOCADO_ALL_OK
C
Cleber Rosa 已提交
954 955 956 957
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

958
    def test_externalrunner_fail(self):
959 960 961
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.fail_script.path))
C
Cleber Rosa 已提交
962
        result = process.run(cmd_line, ignore_status=True)
963
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
C
Cleber Rosa 已提交
964 965 966 967
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

968
    def test_externalrunner_chdir_no_testdir(self):
969 970 971
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh --external-runner-chdir=test %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
972
        result = process.run(cmd_line, ignore_status=True)
973 974
        expected_output = (b'Option "--external-runner-chdir=test" requires '
                           b'"--external-runner-testdir" to be set')
C
Cleber Rosa 已提交
975
        self.assertIn(expected_output, result.stderr)
976
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
977 978 979 980
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

981 982 983 984 985 986 987 988 989 990 991 992 993
    def test_externalrunner_chdir_runner_relative(self):
        avocado_abs = " ".join([os.path.abspath(_) for _ in AVOCADO.split(" ")])
        pass_abs = os.path.abspath(self.pass_script.path)
        os.chdir('/')
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=bin/sh --external-runner-chdir=runner -- %s'
                    % (avocado_abs, self.tmpdir, pass_abs))
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

994
    def test_externalrunner_no_url(self):
995 996
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=%s' % (AVOCADO, self.tmpdir, TRUE_CMD))
997
        result = process.run(cmd_line, ignore_status=True)
998 999
        expected_output = (b'No test references provided nor any other '
                           b'arguments resolved into tests')
1000 1001
        self.assertIn(expected_output, result.stderr)
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
C
Cleber Rosa 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def tearDown(self):
        self.pass_script.remove()
        self.fail_script.remove()
        shutil.rmtree(self.tmpdir)


1012
class AbsPluginsTest(object):
1013

1014
    def setUp(self):
1015
        self.base_outputdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
1016
        os.chdir(BASEDIR)
1017

1018 1019 1020 1021 1022 1023
    def tearDown(self):
        shutil.rmtree(self.base_outputdir)


class PluginsTest(AbsPluginsTest, unittest.TestCase):

1024
    def test_sysinfo_plugin(self):
1025
        cmd_line = '%s sysinfo %s' % (AVOCADO, self.base_outputdir)
1026
        result = process.run(cmd_line, ignore_status=True)
1027
        expected_rc = exit_codes.AVOCADO_ALL_OK
1028 1029 1030 1031 1032 1033
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        sysinfo_files = os.listdir(self.base_outputdir)
        self.assertGreater(len(sysinfo_files), 0, "Empty sysinfo files dir")

1034
    def test_list_plugin(self):
1035
        cmd_line = '%s list' % AVOCADO
1036
        result = process.run(cmd_line, ignore_status=True)
1037
        expected_rc = exit_codes.AVOCADO_ALL_OK
1038 1039 1040
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1041 1042
        self.assertNotIn(b'No tests were found on current tests dir',
                         result.stdout)
1043

1044
    def test_list_error_output(self):
1045
        cmd_line = '%s list sbrubles' % AVOCADO
1046
        result = process.run(cmd_line, ignore_status=True)
1047
        expected_rc = exit_codes.AVOCADO_FAIL
1048 1049 1050
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1051
        self.assertIn(b"Unable to resolve reference", result.stderr)
1052

1053 1054 1055 1056 1057 1058 1059
    def test_list_no_file_loader(self):
        cmd_line = ("%s list --loaders external --verbose -- "
                    "this-wont-be-matched" % AVOCADO)
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1060
        exp = (b"Type    Test                 Tag(s)\n"
1061
               b"MISSING this-wont-be-matched\n\n"
1062 1063 1064 1065
               b"TEST TYPES SUMMARY\n"
               b"==================\n"
               b"EXTERNAL: 0\n"
               b"MISSING: 1\n")
1066 1067 1068
        self.assertEqual(exp, result.stdout, "Stdout mismatch:\n%s\n\n%s"
                         % (exp, result))

1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080
    def test_list_verbose_tags(self):
        """
        Runs list verbosely and check for tag related output
        """
        test = script.make_script(os.path.join(self.base_outputdir, 'test.py'),
                                  VALID_PYTHON_TEST_WITH_TAGS)
        cmd_line = ("%s list --loaders file --verbose %s" % (AVOCADO,
                                                             test))
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1081
        stdout_lines = result.stdout_text.splitlines()
1082 1083
        self.assertIn("Tag(s)", stdout_lines[0])
        full_test_name = "%s:MyTest.test" % test
1084 1085
        self.assertEqual("INSTRUMENTED %s BIG_TAG_NAME" % full_test_name,
                         stdout_lines[1])
1086 1087 1088
        self.assertIn("TEST TYPES SUMMARY", stdout_lines)
        self.assertIn("INSTRUMENTED: 1", stdout_lines)
        self.assertIn("TEST TAGS SUMMARY", stdout_lines)
1089
        self.assertEqual("BIG_TAG_NAME: 1", stdout_lines[-1])
1090

1091
    def test_plugin_list(self):
1092
        cmd_line = '%s plugins' % AVOCADO
1093
        result = process.run(cmd_line, ignore_status=True)
1094
        expected_rc = exit_codes.AVOCADO_ALL_OK
1095 1096 1097
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1098
        if sys.version_info[:2] >= (2, 7, 0):
1099
            self.assertNotIn(b'Disabled', result.stdout)
1100

1101
    def test_config_plugin(self):
1102
        cmd_line = '%s config --paginator off' % AVOCADO
1103
        result = process.run(cmd_line, ignore_status=True)
1104
        expected_rc = exit_codes.AVOCADO_ALL_OK
1105 1106 1107
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1108
        self.assertNotIn(b'Disabled', result.stdout)
1109 1110

    def test_config_plugin_datadir(self):
1111
        cmd_line = '%s config --datadir --paginator off' % AVOCADO
1112
        result = process.run(cmd_line, ignore_status=True)
1113
        expected_rc = exit_codes.AVOCADO_ALL_OK
1114 1115 1116
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1117
        self.assertNotIn(b'Disabled', result.stdout)
1118

1119
    def test_disable_plugin(self):
1120
        cmd_line = '%s plugins' % AVOCADO
1121 1122 1123 1124 1125
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1126
        self.assertIn(b"Collect system information", result.stdout)
1127 1128 1129 1130 1131

        config_content = "[plugins]\ndisable=['cli.cmd.sysinfo',]"
        config = script.TemporaryScript("disable_sysinfo_cmd.conf",
                                        config_content)
        with config:
1132
            cmd_line = '%s --config %s plugins' % (AVOCADO, config)
1133 1134 1135 1136 1137
            result = process.run(cmd_line, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))
1138
            self.assertNotIn(b"Collect system information", result.stdout)
1139

1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151
    def test_plugin_order(self):
        """
        Tests plugin order by configuration file

        First it checks if html, json, xunit and zip_archive plugins are enabled.
        Then it runs a test with zip_archive running first, which means the html,
        json and xunit output files do not make into the archive.

        Then it runs with zip_archive set to run last, which means the html,
        json and xunit output files *do* make into the archive.
        """
        def run_config(config_path):
1152
            cmd = ('%s --config %s run passtest.py --archive '
1153
                   '--job-results-dir %s --sysinfo=off'
1154
                   % (AVOCADO, config_path, self.base_outputdir))
1155 1156 1157 1158 1159 1160 1161 1162
            result = process.run(cmd, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))

        result_plugins = ["json", "xunit", "zip_archive"]
        result_outputs = ["results.json", "results.xml"]
1163
        if python_module_available('avocado-framework-plugin-result-html'):
1164
            result_plugins.append("html")
1165
            result_outputs.append("results.html")
1166

1167
        cmd_line = '%s plugins' % AVOCADO
1168 1169 1170 1171 1172 1173
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        for result_plugin in result_plugins:
1174
            self.assertIn(result_plugin, result.stdout_text)
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189

        config_content_zip_first = "[plugins.result]\norder=['zip_archive']"
        config_zip_first = script.TemporaryScript("zip_first.conf",
                                                  config_content_zip_first)
        with config_zip_first:
            run_config(config_zip_first)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertNotIn(result_output, zip_file_list)
            os.unlink(archives[0])

        config_content_zip_last = ("[plugins.result]\norder=['html', 'json',"
1190 1191
                                   "'xunit', 'non_existing_plugin_is_ignored'"
                                   ",'zip_archive']")
1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
        config_zip_last = script.TemporaryScript("zip_last.conf",
                                                 config_content_zip_last)
        with config_zip_last:
            run_config(config_zip_last)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertIn(result_output, zip_file_list)

1203
    def test_Namespace_object_has_no_attribute(self):
1204
        cmd_line = '%s plugins' % AVOCADO
1205
        result = process.run(cmd_line, ignore_status=True)
1206
        expected_rc = exit_codes.AVOCADO_ALL_OK
1207 1208 1209
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1210
        self.assertNotIn(b"'Namespace' object has no attribute", result.stderr)
1211

1212

1213 1214 1215 1216
class ParseXMLError(Exception):
    pass


1217
class PluginsXunitTest(AbsPluginsTest, unittest.TestCase):
1218

1219 1220
    @unittest.skipUnless(SCHEMA_CAPABLE,
                         'Unable to validate schema due to missing lxml.etree library')
1221
    def setUp(self):
1222
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
L
Lucas Meneghel Rodrigues 已提交
1223
        junit_xsd = os.path.join(os.path.dirname(__file__),
1224
                                 os.path.pardir, ".data", 'jenkins-junit.xsd')
L
Lucas Meneghel Rodrigues 已提交
1225
        self.junit = os.path.abspath(junit_xsd)
1226 1227
        super(PluginsXunitTest, self).setUp()

1228
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
C
Caio Carrara 已提交
1229
                      e_nnotfound, e_nfailures, e_nskip):  # pylint: disable=W0613
1230 1231
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' --xunit - %s' % (AVOCADO, self.tmpdir, testname))
1232 1233 1234 1235 1236 1237 1238
        result = process.run(cmd_line, ignore_status=True)
        xml_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            xunit_doc = xml.dom.minidom.parseString(xml_output)
1239
        except Exception as detail:
1240 1241 1242
            raise ParseXMLError("Failed to parse content: %s\n%s" %
                                (detail, xml_output))

1243
        with open(self.junit, 'rb') as f:
1244
            xmlschema = etree.XMLSchema(etree.parse(f))   # pylint: disable=I1101
1245

1246
        # pylint: disable=I1101
1247
        self.assertTrue(xmlschema.validate(etree.parse(BytesIO(xml_output))),
1248 1249 1250 1251
                        "Failed to validate against %s, message:\n%s" %
                        (self.junit,
                         xmlschema.error_log.filter_from_errors()))

1252 1253 1254 1255
        testsuite_list = xunit_doc.getElementsByTagName('testsuite')
        self.assertEqual(len(testsuite_list), 1, 'More than one testsuite tag')

        testsuite_tag = testsuite_list[0]
1256 1257
        self.assertEqual(len(testsuite_tag.attributes), 7,
                         'The testsuite tag does not have 7 attributes. '
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274
                         'XML:\n%s' % xml_output)

        n_tests = int(testsuite_tag.attributes['tests'].value)
        self.assertEqual(n_tests, e_ntests,
                         "Unexpected number of executed tests, "
                         "XML:\n%s" % xml_output)

        n_errors = int(testsuite_tag.attributes['errors'].value)
        self.assertEqual(n_errors, e_nerrors,
                         "Unexpected number of test errors, "
                         "XML:\n%s" % xml_output)

        n_failures = int(testsuite_tag.attributes['failures'].value)
        self.assertEqual(n_failures, e_nfailures,
                         "Unexpected number of test failures, "
                         "XML:\n%s" % xml_output)

1275
        n_skip = int(testsuite_tag.attributes['skipped'].value)
1276 1277 1278 1279
        self.assertEqual(n_skip, e_nskip,
                         "Unexpected number of test skips, "
                         "XML:\n%s" % xml_output)

1280
    def test_xunit_plugin_passtest(self):
1281
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1282
                           1, 0, 0, 0, 0)
1283 1284

    def test_xunit_plugin_failtest(self):
1285
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1286
                           1, 0, 0, 1, 0)
1287

1288
    def test_xunit_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1289
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1290
                           1, 0, 0, 0, 1)
1291

1292
    def test_xunit_plugin_errortest(self):
1293
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1294
                           1, 1, 0, 0, 0)
1295

1296 1297 1298 1299
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsXunitTest, self).tearDown()

1300 1301 1302 1303 1304

class ParseJSONError(Exception):
    pass


1305
class PluginsJSONTest(AbsPluginsTest, unittest.TestCase):
1306

1307
    def setUp(self):
1308
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
1309 1310
        super(PluginsJSONTest, self).setUp()

1311
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1312
                      e_nfailures, e_nskip, e_ncancel=0, external_runner=None):
1313 1314
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off --json - '
                    '--archive %s' % (AVOCADO, self.tmpdir, testname))
1315 1316
        if external_runner is not None:
            cmd_line += " --external-runner '%s'" % external_runner
1317
        result = process.run(cmd_line, ignore_status=True)
1318
        json_output = result.stdout_text
1319 1320 1321 1322 1323
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            json_data = json.loads(json_output)
1324
        except Exception as detail:
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341
            raise ParseJSONError("Failed to parse content: %s\n%s" %
                                 (detail, json_output))
        self.assertTrue(json_data, "Empty JSON result:\n%s" % json_output)
        self.assertIsInstance(json_data['tests'], list,
                              "JSON result lacks 'tests' list")
        n_tests = len(json_data['tests'])
        self.assertEqual(n_tests, e_ntests,
                         "Different number of expected tests")
        n_errors = json_data['errors']
        self.assertEqual(n_errors, e_nerrors,
                         "Different number of expected tests")
        n_failures = json_data['failures']
        self.assertEqual(n_failures, e_nfailures,
                         "Different number of expected tests")
        n_skip = json_data['skip']
        self.assertEqual(n_skip, e_nskip,
                         "Different number of skipped tests")
1342 1343
        n_cancel = json_data['cancel']
        self.assertEqual(n_cancel, e_ncancel)
1344
        return json_data
1345

1346
    def test_json_plugin_passtest(self):
1347
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1348
                           1, 0, 0, 0)
1349 1350

    def test_json_plugin_failtest(self):
1351
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1352
                           1, 0, 1, 0)
1353

1354
    def test_json_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1355
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1356
                           1, 0, 0, 0, 1)
1357

1358
    def test_json_plugin_errortest(self):
1359
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1360
                           1, 1, 0, 0)
1361

1362
    @unittest.skipIf(not GNU_ECHO_BINARY, 'echo binary not available')
1363
    def test_ugly_echo_cmd(self):
1364
        data = self.run_and_check('"-ne foo\\\\\\n\\\'\\\\\\"\\\\\\'
1365
                                  'nbar/baz"', exit_codes.AVOCADO_ALL_OK, 1, 0,
1366
                                  0, 0, external_runner=GNU_ECHO_BINARY)
1367
        # The executed test should be this
1368
        self.assertEqual(data['tests'][0]['id'],
1369
                         '1--ne foo\\\\n\\\'\\"\\\\nbar/baz')
1370 1371
        # logdir name should escape special chars (/)
        self.assertEqual(os.path.basename(data['tests'][0]['logdir']),
1372
                         "1--ne foo__n_'____nbar_baz")
1373

1374 1375 1376 1377
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsJSONTest, self).tearDown()

L
Lukáš Doktor 已提交
1378

1379 1380
if __name__ == '__main__':
    unittest.main()