test_basic.py 59.7 KB
Newer Older
1
# This Python file uses the following encoding: utf-8
2 3
import aexpect
import glob
4
import json
5
import os
6
import re
7
import shutil
8
import signal
9
import sys
10
import tempfile
11
import time
12
import xml.dom.minidom
13
import zipfile
14
import unittest
15
import psutil
16
import pkg_resources
17

18 19 20 21
try:
    from io import BytesIO
except:
    from BytesIO import BytesIO
22

23 24 25 26 27 28
try:
    from lxml import etree
    SCHEMA_CAPABLE = True
except ImportError:
    SCHEMA_CAPABLE = False

29
from six import iteritems
30
from six.moves import xrange as range
31

32
from avocado.core import exit_codes
33
from avocado.utils import astring
34
from avocado.utils import genio
35 36
from avocado.utils import process
from avocado.utils import script
37
from avocado.utils import path as utils_path
38

39
basedir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..')
40 41
basedir = os.path.abspath(basedir)

42 43
AVOCADO = os.environ.get("UNITTEST_AVOCADO_CMD", "./scripts/avocado")

44 45 46 47 48 49 50 51 52
LOCAL_IMPORT_TEST_CONTENTS = '''
from avocado import Test
from mylib import hello

class LocalImportTest(Test):
    def test(self):
        self.log.info(hello())
'''

53 54 55 56 57 58
UNSUPPORTED_STATUS_TEST_CONTENTS = '''
from avocado import Test

class FakeStatusTest(Test):
    def run_avocado(self):
        super(FakeStatusTest, self).run_avocado()
59 60
        # Please do NOT ever use this, it's for unittesting only.
        self._Test__status = 'not supported'
61 62 63 64 65

    def test(self):
        pass
'''

66 67 68 69 70 71 72 73 74 75 76
INVALID_PYTHON_TEST = '''
from avocado import Test

class MyTest(Test):

    non_existing_variable_causing_crash

    def test_my_name(self):
        pass
'''

77

78 79 80 81 82 83 84 85 86 87 88 89
VALID_PYTHON_TEST_WITH_TAGS = '''
from avocado import Test

class MyTest(Test):
    def test(self):
         """
         :avocado: tags=BIG_TAG_NAME
         """
         pass
'''


90 91 92 93 94 95 96
REPORTS_STATUS_AND_HANG = '''
from avocado import Test
import time

class MyTest(Test):
    def test(self):
         self.runner_queue.put({"running": False})
97
         time.sleep(70)
98 99
'''

100

101 102 103 104 105 106 107 108 109 110 111
DIE_WITHOUT_REPORTING_STATUS = '''
from avocado import Test
import os
import signal

class MyTest(Test):
    def test(self):
         os.kill(os.getpid(), signal.SIGKILL)
'''


112 113 114 115 116 117 118 119 120 121 122 123 124
RAISE_CUSTOM_PATH_EXCEPTION_CONTENT = '''import os
import sys

from avocado import Test

class SharedLibTest(Test):
    def test(self):
        sys.path.append(os.path.join(os.path.dirname(__file__), "shared_lib"))
        from mylib import CancelExc
        raise CancelExc("This should not crash on unpickling in runner")
'''


A
Amador Pahim 已提交
125
def probe_binary(binary):
126
    try:
A
Amador Pahim 已提交
127
        return utils_path.find_command(binary)
128
    except utils_path.CmdNotFoundError:
A
Amador Pahim 已提交
129 130
        return None

L
Lukáš Doktor 已提交
131

132
TRUE_CMD = probe_binary('true')
A
Amador Pahim 已提交
133
CC_BINARY = probe_binary('cc')
134

L
Lukáš Doktor 已提交
135
# On macOS, the default GNU core-utils installation (brew)
136 137 138 139 140
# installs the gnu utility versions with a g prefix. It still has the
# BSD versions of the core utilities installed on their expected paths
# but their behavior and flags are in most cases different.
GNU_ECHO_BINARY = probe_binary('echo')
if GNU_ECHO_BINARY is not None:
141 142
    if probe_binary('man') is not None:
        echo_manpage = process.run('man %s' % os.path.basename(GNU_ECHO_BINARY)).stdout
143
        if b'-e' not in echo_manpage:
144
            GNU_ECHO_BINARY = probe_binary('gecho')
A
Amador Pahim 已提交
145 146
READ_BINARY = probe_binary('read')
SLEEP_BINARY = probe_binary('sleep')
147 148


149 150 151 152 153 154 155 156
def html_capable():
    try:
        pkg_resources.require('avocado-framework-plugin-result-html')
        return True
    except pkg_resources.DistributionNotFound:
        return False


157 158
class RunnerOperationTest(unittest.TestCase):

159
    def setUp(self):
160
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
161
        os.chdir(basedir)
162

163
    def test_show_version(self):
164
        result = process.run('%s -v' % AVOCADO, ignore_status=True)
165
        self.assertEqual(result.exit_status, 0)
166 167 168 169 170
        if sys.version_info[0] == 3:
            content = result.stdout_text
        else:
            content = result.stderr_text
        self.assertTrue(re.match(r"^Avocado \d+\.\d+$", content),
C
Cleber Rosa 已提交
171
                        "Version string does not match 'Avocado \\d\\.\\d:'\n"
172
                        "%r" % (content))
173

174 175 176 177 178 179 180 181 182 183 184 185 186
    def test_alternate_config_datadir(self):
        """
        Uses the "--config" flag to check custom configuration is applied

        Even on the more complex data_dir module, which adds extra checks
        to what is set on the plain settings module.
        """
        base_dir = os.path.join(self.tmpdir, 'datadir_base')
        os.mkdir(base_dir)
        mapping = {'base_dir': base_dir,
                   'test_dir': os.path.join(base_dir, 'test'),
                   'data_dir': os.path.join(base_dir, 'data'),
                   'logs_dir': os.path.join(base_dir, 'logs')}
187
        config = '[datadir.paths]\n'
188
        for key, value in iteritems(mapping):
189 190 191 192
            if not os.path.isdir(value):
                os.mkdir(value)
            config += "%s = %s\n" % (key, value)
        fd, config_file = tempfile.mkstemp(dir=self.tmpdir)
193
        os.write(fd, config.encode())
194 195
        os.close(fd)

196
        cmd = '%s --config %s config --datadir' % (AVOCADO, config_file)
197 198 199 200 201
        result = process.run(cmd)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
202 203 204
        self.assertIn('    base     ' + mapping['base_dir'], result.stdout_text)
        self.assertIn('    data     ' + mapping['data_dir'], result.stdout_text)
        self.assertIn('    logs     ' + mapping['logs_dir'], result.stdout_text)
205

206
    def test_runner_all_ok(self):
207 208
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py passtest.py' % (AVOCADO, self.tmpdir))
209
        process.run(cmd_line)
210
        # Also check whether jobdata contains correct parameter paths
211 212
        variants = open(os.path.join(self.tmpdir, "latest", "jobdata",
                        "variants.json")).read()
213
        self.assertIn('["/run/*"]', variants, "paths stored in jobdata "
214
                      "does not contains [\"/run/*\"]\n%s" % variants)
215

216
    def test_runner_failfast(self):
217 218 219
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py failtest.py passtest.py --failfast on'
                    % (AVOCADO, self.tmpdir))
220
        result = process.run(cmd_line, ignore_status=True)
221 222
        self.assertIn(b'Interrupting job (failfast).', result.stdout)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 1 | SKIP 1', result.stdout)
223 224 225 226
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL | exit_codes.AVOCADO_JOB_INTERRUPTED
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

A
Amador Pahim 已提交
227 228 229 230 231
    def test_runner_ignore_missing_references_one_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py badtest.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
232 233
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py'", result.stderr)
        self.assertIn(b'PASS 1 | ERROR 0 | FAIL 0 | SKIP 0', result.stdout)
A
Amador Pahim 已提交
234 235 236 237 238 239 240 241 242
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_ignore_missing_references_all_missing(self):
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'badtest.py badtest2.py --ignore-missing-references on'
                    % (AVOCADO, self.tmpdir))
        result = process.run(cmd_line, ignore_status=True)
243
        self.assertIn(b"Unable to resolve reference(s) 'badtest.py', 'badtest2.py'",
A
Amador Pahim 已提交
244
                      result.stderr)
245
        self.assertEqual(b'', result.stdout)
A
Amador Pahim 已提交
246 247 248 249
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

250 251 252
    def test_runner_test_with_local_imports(self):
        mylib = script.TemporaryScript(
            'mylib.py',
253
            "def hello():\n    return 'Hello world'",
254 255 256 257 258 259
            'avocado_simpletest_functional')
        mylib.save()
        mytest = script.Script(
            os.path.join(os.path.dirname(mylib.path), 'test_local_imports.py'),
            LOCAL_IMPORT_TEST_CONTENTS)
        mytest.save()
260 261
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "%s" % (AVOCADO, self.tmpdir, mytest))
262 263
        process.run(cmd_line)

264 265 266 267
    def test_unsupported_status(self):
        with script.TemporaryScript("fake_status.py",
                                    UNSUPPORTED_STATUS_TEST_CONTENTS,
                                    "avocado_unsupported_status") as tst:
268 269 270
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s"
                              " --json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
271 272 273 274 275
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
276
            self.assertIn("Runner error occurred: Test reports unsupported",
277 278
                          results["tests"][0]["fail_reason"])

279 280 281
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
282 283 284 285 286
    def test_hanged_test_with_status(self):
        """ Check that avocado handles hanged tests properly """
        with script.TemporaryScript("report_status_and_hang.py",
                                    REPORTS_STATUS_AND_HANG,
                                    "hanged_test_with_status") as tst:
287
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
288
                              "--json - --job-timeout 1" % (AVOCADO, self.tmpdir, tst),
289
                              ignore_status=True)
290 291 292 293 294 295 296
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test reported status but did not finish",
                          results["tests"][0]["fail_reason"])
297 298 299 300 301
            # Currently it should finish up to 1s after the job-timeout
            # but the prep and postprocess could take a bit longer on
            # some environments, so let's just check it does not take
            # > 60s, which is the deadline for force-finishing the test.
            self.assertLess(res.duration, 55, "Test execution took too long, "
302 303 304 305 306 307 308
                            "which is likely because the hanged test was not "
                            "interrupted. Results:\n%s" % res)

    def test_no_status_reported(self):
        with script.TemporaryScript("die_without_reporting_status.py",
                                    DIE_WITHOUT_REPORTING_STATUS,
                                    "no_status_reported") as tst:
309 310 311
            res = process.run("%s run --sysinfo=off --job-results-dir %s %s "
                              "--json -" % (AVOCADO, self.tmpdir, tst),
                              ignore_status=True)
312 313 314 315 316 317 318 319
            self.assertEqual(res.exit_status, exit_codes.AVOCADO_TESTS_FAIL)
            results = json.loads(res.stdout)
            self.assertEqual(results["tests"][0]["status"], "ERROR",
                             "%s != %s\n%s" % (results["tests"][0]["status"],
                                               "ERROR", res))
            self.assertIn("Test died without reporting the status",
                          results["tests"][0]["fail_reason"])

320
    def test_runner_tests_fail(self):
321 322
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s passtest.py '
                    'failtest.py passtest.py' % (AVOCADO, self.tmpdir))
323
        result = process.run(cmd_line, ignore_status=True)
324
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
325 326 327 328
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

    def test_runner_nonexistent_test(self):
329 330
        cmd_line = ('%s run --sysinfo=off --job-results-dir '
                    '%s bogustest' % (AVOCADO, self.tmpdir))
331
        result = process.run(cmd_line, ignore_status=True)
332 333
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
334 335 336 337 338
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

339
    def test_runner_doublefail(self):
340 341
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - doublefail.py' % (AVOCADO, self.tmpdir))
342
        result = process.run(cmd_line, ignore_status=True)
343 344
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
345 346 347 348
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
349
        self.assertIn(b"TestError: Failing during tearDown. Yay!", result.stdout,
350
                      "Cleanup exception not printed to log output")
351 352
        self.assertIn(b"TestFail: This test is supposed to fail", result.stdout,
                      "Test did not fail with action exception:\n%s" % result.stdout)
353

354
    def test_uncaught_exception(self):
355 356
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - uncaught_exception.py" % (AVOCADO, self.tmpdir))
357
        result = process.run(cmd_line, ignore_status=True)
358
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
359 360 361
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
362
        self.assertIn(b'"status": "ERROR"', result.stdout)
363

364
    def test_fail_on_exception(self):
365 366
        cmd_line = ("%s run --sysinfo=off --job-results-dir %s "
                    "--json - fail_on_exception.py" % (AVOCADO, self.tmpdir))
367
        result = process.run(cmd_line, ignore_status=True)
368
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
369 370 371
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc,
                                                                result))
372
        self.assertIn(b'"status": "FAIL"', result.stdout)
373

374 375 376 377 378 379 380 381 382 383 384 385 386 387
    def test_exception_not_in_path(self):
        os.mkdir(os.path.join(self.tmpdir, "shared_lib"))
        mylib = script.Script(os.path.join(self.tmpdir, "shared_lib",
                                           "mylib.py"),
                              "from avocado import TestCancel\n\n"
                              "class CancelExc(TestCancel):\n"
                              "    pass")
        mylib.save()
        mytest = script.Script(os.path.join(self.tmpdir, "mytest.py"),
                               RAISE_CUSTOM_PATH_EXCEPTION_CONTENT)
        mytest.save()
        result = process.run("%s --show test run --sysinfo=off "
                             "--job-results-dir %s %s"
                             % (AVOCADO, self.tmpdir, mytest))
388 389
        self.assertIn(b"mytest.py:SharedLibTest.test -> CancelExc: This "
                      b"should not crash on unpickling in runner",
390
                      result.stdout)
391
        self.assertNotIn(b"Failed to read queue", result.stdout)
392

393
    def test_runner_timeout(self):
394 395
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - timeouttest.py' % (AVOCADO, self.tmpdir))
396 397
        result = process.run(cmd_line, ignore_status=True)
        output = result.stdout
398
        expected_rc = exit_codes.AVOCADO_JOB_INTERRUPTED
399
        unexpected_rc = exit_codes.AVOCADO_FAIL
400 401 402 403
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
404
        self.assertIn(b"Runner error occurred: Timeout reached", output,
405
                      "Timeout reached message not found in the output:\n%s" % output)
406
        # Ensure no test aborted error messages show up
407
        self.assertNotIn(b"TestAbortedError: Test aborted unexpectedly", output)
408

409
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
410 411
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
412
    def test_runner_abort(self):
413 414
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    '--xunit - abort.py' % (AVOCADO, self.tmpdir))
415
        result = process.run(cmd_line, ignore_status=True)
416
        excerpt = b'Test died without reporting the status.'
417 418
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        unexpected_rc = exit_codes.AVOCADO_FAIL
419 420 421 422
        self.assertNotEqual(result.exit_status, unexpected_rc,
                            "Avocado crashed (rc %d):\n%s" % (unexpected_rc, result))
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))
423
        self.assertIn(excerpt, result.stdout)
424

425
    def test_silent_output(self):
426 427
        cmd_line = ('%s --silent run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
428
        result = process.run(cmd_line, ignore_status=True)
429
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
430
        self.assertEqual(result.stdout, b'')
431

432
    def test_empty_args_list(self):
433
        cmd_line = AVOCADO
434
        result = process.run(cmd_line, ignore_status=True)
435
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_FAIL)
436 437 438 439 440
        if sys.version_info[0] == 3:
            exp = b'avocado: error: the following arguments are required'
        else:
            exp = b'error: too few arguments'
        self.assertIn(exp, result.stderr)
441

442
    def test_empty_test_list(self):
443 444
        cmd_line = '%s run --sysinfo=off --job-results-dir %s' % (AVOCADO,
                                                                  self.tmpdir)
445
        result = process.run(cmd_line, ignore_status=True)
446
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
447 448
        self.assertIn(b'No test references provided nor any other arguments '
                      b'resolved into tests', result.stderr)
449

450
    def test_not_found(self):
451 452
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s sbrubles'
                    % (AVOCADO, self.tmpdir))
453
        result = process.run(cmd_line, ignore_status=True)
454
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_JOB_FAIL)
455 456
        self.assertIn(b'Unable to resolve reference', result.stderr)
        self.assertNotIn(b'Unable to resolve reference', result.stdout)
457

458
    def test_invalid_unique_id(self):
459 460
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s --force-job-id '
                    'foobar passtest.py' % (AVOCADO, self.tmpdir))
461
        result = process.run(cmd_line, ignore_status=True)
462
        self.assertNotEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
463 464
        self.assertIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertNotIn(b'needs to be a 40 digit hex', result.stdout)
465 466

    def test_valid_unique_id(self):
467
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
468
                    '--force-job-id 975de258ac05ce5e490648dec4753657b7ccc7d1 '
469
                    'passtest.py' % (AVOCADO, self.tmpdir))
470
        result = process.run(cmd_line, ignore_status=True)
471
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
472 473
        self.assertNotIn(b'needs to be a 40 digit hex', result.stderr)
        self.assertIn(b'PASS', result.stdout)
474

475
    def test_automatic_unique_id(self):
476 477
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'passtest.py --json -' % (AVOCADO, self.tmpdir))
478
        result = process.run(cmd_line, ignore_status=True)
479
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK)
480 481 482 483
        r = json.loads(result.stdout)
        int(r['job_id'], 16)  # it's an hex number
        self.assertEqual(len(r['job_id']), 40)

484 485 486 487
    def test_early_latest_result(self):
        """
        Tests that the `latest` link to the latest job results is created early
        """
488 489
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'examples/tests/passtest.py' % (AVOCADO, self.tmpdir))
490
        avocado_process = process.SubProcess(cmd_line)
491 492 493 494 495 496 497 498 499 500 501 502
        try:
            avocado_process.start()
            link = os.path.join(self.tmpdir, 'latest')
            for trial in range(0, 50):
                time.sleep(0.1)
                if os.path.exists(link) and os.path.islink(link):
                    avocado_process.wait()
                    break
            self.assertTrue(os.path.exists(link))
            self.assertTrue(os.path.islink(link))
        finally:
            avocado_process.wait()
503

504
    def test_dry_run(self):
505
        cmd = ("%s run --sysinfo=off passtest.py failtest.py "
506
               "gendata.py --json - --mux-inject foo:1 bar:2 baz:3 foo:foo:a"
507
               " foo:bar:b foo:baz:c bar:bar:bar --dry-run" % AVOCADO)
508 509
        result = json.loads(process.run(cmd).stdout)
        debuglog = result['debuglog']
510
        log = genio.read_file(debuglog)
511 512
        # Remove the result dir
        shutil.rmtree(os.path.dirname(os.path.dirname(debuglog)))
513
        self.assertIn(tempfile.gettempdir(), debuglog)   # Use tmp dir, not default location
514 515
        self.assertEqual(result['job_id'], u'0' * 40)
        # Check if all tests were skipped
516
        self.assertEqual(result['cancel'], 4)
517
        for i in range(4):
518 519
            test = result['tests'][i]
            self.assertEqual(test['fail_reason'],
520
                             u'Test cancelled due to --dry-run')
521 522 523 524 525
        # Check if all params are listed
        # The "/:bar ==> 2 is in the tree, but not in any leave so inaccessible
        # from test.
        for line in ("/:foo ==> 1", "/:baz ==> 3", "/foo:foo ==> a",
                     "/foo:bar ==> b", "/foo:baz ==> c", "/bar:bar ==> bar"):
526
            self.assertEqual(log.count(line), 4)
527

528 529 530
    def test_invalid_python(self):
        test = script.make_script(os.path.join(self.tmpdir, 'test.py'),
                                  INVALID_PYTHON_TEST)
531 532
        cmd_line = ('%s --show test run --sysinfo=off '
                    '--job-results-dir %s %s') % (AVOCADO, self.tmpdir, test)
533 534 535 536 537
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
538
        self.assertIn('1-%s:MyTest.test_my_name -> TestError' % test,
539
                      result.stdout_text)
540

A
Amador Pahim 已提交
541
    @unittest.skipIf(not READ_BINARY, "read binary not available.")
542 543 544
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
L
Lukáš Doktor 已提交
545
    def test_read(self):
546
        cmd = "%s run --sysinfo=off --job-results-dir %%s %%s" % AVOCADO
547
        cmd %= (self.tmpdir, READ_BINARY)
548
        result = process.run(cmd, timeout=10, ignore_status=True)
L
Lukáš Doktor 已提交
549 550 551 552 553
        self.assertLess(result.duration, 8, "Duration longer than expected."
                        "\n%s" % result)
        self.assertEqual(result.exit_status, 1, "Expected exit status is 1\n%s"
                         % result)

554 555 556
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

557

558 559 560
class RunnerHumanOutputTest(unittest.TestCase):

    def setUp(self):
561
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
562
        os.chdir(basedir)
563 564

    def test_output_pass(self):
565 566
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'passtest.py' % (AVOCADO, self.tmpdir))
567
        result = process.run(cmd_line, ignore_status=True)
568
        expected_rc = exit_codes.AVOCADO_ALL_OK
569 570 571
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
572
        self.assertIn(b'passtest.py:PassTest.test:  PASS', result.stdout)
573 574

    def test_output_fail(self):
575 576
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'failtest.py' % (AVOCADO, self.tmpdir))
577
        result = process.run(cmd_line, ignore_status=True)
578
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
579 580 581
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
582
        self.assertIn(b'failtest.py:FailTest.test:  FAIL', result.stdout)
583 584

    def test_output_error(self):
585 586
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'errortest.py' % (AVOCADO, self.tmpdir))
587
        result = process.run(cmd_line, ignore_status=True)
588
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
589 590 591
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
592
        self.assertIn(b'errortest.py:ErrorTest.test:  ERROR', result.stdout)
593

A
Amador Pahim 已提交
594
    def test_output_cancel(self):
595 596
        cmd_line = ('%s run --sysinfo=off --job-results-dir %s '
                    'cancelonsetup.py' % (AVOCADO, self.tmpdir))
597
        result = process.run(cmd_line, ignore_status=True)
598
        expected_rc = exit_codes.AVOCADO_ALL_OK
599 600 601
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
602 603
        self.assertIn(b'PASS 0 | ERROR 0 | FAIL 0 | SKIP 0 | WARN 0 | '
                      b'INTERRUPT 0 | CANCEL 1',
A
Amador Pahim 已提交
604
                      result.stdout)
605

606 607
    @unittest.skipIf(not GNU_ECHO_BINARY,
                     'GNU style echo binary not available')
608
    def test_ugly_echo_cmd(self):
609
        cmd_line = ('%s run --external-runner "%s -ne" '
610
                    '"foo\\\\\\n\\\'\\\\\\"\\\\\\nbar/baz" --job-results-dir %s'
A
Amador Pahim 已提交
611
                    ' --sysinfo=off  --show-job-log' %
612
                    (AVOCADO, GNU_ECHO_BINARY, self.tmpdir))
613 614 615 616 617
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
618 619 620 621
        self.assertIn(b'[stdout] foo', result.stdout, result)
        self.assertIn(b'[stdout] \'"', result.stdout, result)
        self.assertIn(b'[stdout] bar/baz', result.stdout, result)
        self.assertIn(b'PASS 1-foo\\\\n\\\'\\"\\\\nbar/baz',
622
                      result.stdout, result)
623 624 625 626 627 628 629
        # logdir name should escape special chars (/)
        test_dirs = glob.glob(os.path.join(self.tmpdir, 'latest',
                                           'test-results', '*'))
        self.assertEqual(len(test_dirs), 1, "There are multiple directories in"
                         " test-results dir, but only one test was executed: "
                         "%s" % (test_dirs))
        self.assertEqual(os.path.basename(test_dirs[0]),
630
                         "1-foo__n_'____nbar_baz")
631

632
    def test_replay_skip_skipped(self):
633 634
        cmd = ("%s run --job-results-dir %s --json - "
               "cancelonsetup.py" % (AVOCADO, self.tmpdir))
635
        result = process.run(cmd)
636
        result = json.loads(result.stdout)
637
        jobid = str(result["job_id"])
638 639
        cmd = ("%s run --job-results-dir %s --replay %s "
               "--replay-test-status PASS" % (AVOCADO, self.tmpdir, jobid))
640
        process.run(cmd)
641

642 643 644
    def tearDown(self):
        shutil.rmtree(self.tmpdir)

645

646
class RunnerSimpleTest(unittest.TestCase):
647 648

    def setUp(self):
649
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
650
        self.pass_script = script.TemporaryScript(
651
            'ʊʋʉʈɑ ʅʛʌ',
652
            "#!/bin/sh\ntrue",
653
            'avocado_simpletest_functional')
654
        self.pass_script.save()
L
Lukáš Doktor 已提交
655
        self.fail_script = script.TemporaryScript('avocado_fail.sh',
656
                                                  "#!/bin/sh\nfalse",
L
Lukáš Doktor 已提交
657 658
                                                  'avocado_simpletest_'
                                                  'functional')
659
        self.fail_script.save()
660
        os.chdir(basedir)
661

662
    def test_simpletest_pass(self):
663 664
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' "%s"' % (AVOCADO, self.tmpdir, self.pass_script.path))
665
        result = process.run(cmd_line, ignore_status=True)
666
        expected_rc = exit_codes.AVOCADO_ALL_OK
667 668 669 670
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

671
    def test_simpletest_fail(self):
672 673
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' %s' % (AVOCADO, self.tmpdir, self.fail_script.path))
674
        result = process.run(cmd_line, ignore_status=True)
675
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
676 677 678 679
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

680
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 2,
681 682
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
683 684
    def test_runner_onehundred_fail_timing(self):
        """
A
Amador Pahim 已提交
685
        We can be pretty sure that a failtest should return immediately. Let's
686
        run 100 of them and assure they not take more than 30 seconds to run.
687

688 689
        Notice: on a current machine this takes about 0.12s, so 30 seconds is
        considered to be pretty safe here.
690
        """
691
        one_hundred = 'failtest.py ' * 100
692 693
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, one_hundred))
694 695 696
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
697
        self.assertLess(actual_time, 30.0)
698
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
699 700 701
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

702 703 704
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
705 706 707 708 709
    def test_runner_sleep_fail_sleep_timing(self):
        """
        Sleeptest is supposed to take 1 second, let's make a sandwich of
        100 failtests and check the test runner timing.
        """
710 711
        sleep_fail_sleep = ('sleeptest.py ' + 'failtest.py ' * 100 +
                            'sleeptest.py')
712 713
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off %s'
                    % (AVOCADO, self.tmpdir, sleep_fail_sleep))
714 715 716
        initial_time = time.time()
        result = process.run(cmd_line, ignore_status=True)
        actual_time = time.time() - initial_time
717
        self.assertLess(actual_time, 33.0)
718
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
719 720 721
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" % (expected_rc, result))

722 723 724 725
    def test_simplewarning(self):
        """
        simplewarning.sh uses the avocado-bash-utils
        """
726 727 728 729 730
        # simplewarning.sh calls "avocado" without specifying a path
        os.environ['PATH'] += ":" + os.path.join(basedir, 'scripts')
        # simplewarning.sh calls "avocado exec-path" which hasn't
        # access to an installed location for the libexec scripts
        os.environ['PATH'] += ":" + os.path.join(basedir, 'libexec')
731 732 733
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    'examples/tests/simplewarning.sh --show-job-log'
                    % (AVOCADO, self.tmpdir))
734
        result = process.run(cmd_line, ignore_status=True)
735 736 737 738
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %s:\n%s" %
                         (expected_rc, result))
739 740 741 742 743 744
        self.assertIn(b'DEBUG| Debug message', result.stdout, result)
        self.assertIn(b'INFO | Info message', result.stdout, result)
        self.assertIn(b'WARN | Warning message (should cause this test to '
                      b'finish with warning)', result.stdout, result)
        self.assertIn(b'ERROR| Error message (ordinary message not changing '
                      b'the results)', result.stdout, result)
745

746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
    @unittest.skipIf(not GNU_ECHO_BINARY, "Uses echo as test")
    def test_fs_unfriendly_run(self):
        os.chdir(basedir)
        commands_path = os.path.join(self.tmpdir, "commands")
        script.make_script(commands_path, "echo '\"\\/|?*<>'")
        config_path = os.path.join(self.tmpdir, "config.conf")
        script.make_script(config_path,
                           "[sysinfo.collectibles]\ncommands = %s"
                           % commands_path)
        cmd_line = ("%s --show all --config %s run --job-results-dir %s "
                    "--sysinfo=on --external-runner %s -- \"'\\\"\\/|?*<>'\""
                    % (AVOCADO, config_path, self.tmpdir, GNU_ECHO_BINARY))
        result = process.run(cmd_line)
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "test-results",
                                                    "1-\'________\'/")))
        self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "latest",
                                                    "sysinfo", "pre",
                                                    "echo \'________\'")))

        if html_capable():
            with open(os.path.join(self.tmpdir, "latest",
                                   "results.html")) as html_res:
                html_results = html_res.read()
            # test results should replace odd chars with "_"
771 772 773 774 775 776
            # HTML could contain either the literal char, or an entity reference
            test1_href = (os.path.join("test-results",
                                       "1-'________'") in html_results or
                          os.path.join("test-results",
                                       "1-&#x27;________&#x27;") in html_results)
            self.assertTrue(test1_href)
777
            # sysinfo replaces "_" with " "
778 779 780
            sysinfo = ("echo '________'" in html_results or
                       "echo &#x27;________&#x27;" in html_results)
            self.assertTrue(sysinfo)
781

782 783 784 785
    def test_non_absolute_path(self):
        avocado_path = os.path.join(basedir, 'scripts', 'avocado')
        test_base_dir = os.path.dirname(self.pass_script.path)
        os.chdir(test_base_dir)
786
        test_file_name = os.path.basename(self.pass_script.path)
787
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
788
                    ' "%s"' % (avocado_path, self.tmpdir, test_file_name))
789 790 791 792 793 794
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

A
Amador Pahim 已提交
795
    @unittest.skipIf(not SLEEP_BINARY, 'sleep binary not available')
796 797 798
    @unittest.skipIf(int(os.environ.get("AVOCADO_CHECK_LEVEL", 0)) < 1,
                     "Skipping test that take a long time to run, are "
                     "resource intensive or time sensitve")
799
    def test_kill_stopped_sleep(self):
800 801 802 803
        proc = aexpect.Expect("%s run 60 --job-results-dir %s "
                              "--external-runner %s --sysinfo=off "
                              "--job-timeout 3"
                              % (AVOCADO, self.tmpdir, SLEEP_BINARY))
804 805
        proc.read_until_output_matches(["\(1/1\)"], timeout=3,
                                       internal_timeout=0.01)
806 807 808 809
        # We need pid of the avocado process, not the shell executing it
        avocado_shell = psutil.Process(proc.get_pid())
        avocado_proc = avocado_shell.children()[0]
        pid = avocado_proc.pid
810
        os.kill(pid, signal.SIGTSTP)   # This freezes the process
811
        deadline = time.time() + 9
812 813 814
        while time.time() < deadline:
            if not proc.is_alive():
                break
815
            time.sleep(0.1)
816 817
        else:
            proc.kill(signal.SIGKILL)
818
            self.fail("Avocado process still alive 5s after job-timeout:\n%s"
819 820 821 822 823 824 825
                      % proc.get_output())
        output = proc.get_output()
        self.assertIn("ctrl+z pressed, stopping test", output, "SIGTSTP "
                      "message not in the output, test was probably not "
                      "stopped.")
        self.assertIn("TIME", output, "TIME not in the output, avocado "
                      "probably died unexpectadly")
826
        self.assertEqual(proc.get_status(), 8, "Avocado did not finish with "
827
                         "1.")
828 829

        sleep_dir = astring.string_to_safe_path("1-60")
830 831 832 833
        debug_log_path = os.path.join(self.tmpdir, "latest", "test-results",
                                      sleep_dir, "debug.log")

        debug_log = genio.read_file(debug_log_path)
834 835 836 837 838 839 840
        self.assertIn("Runner error occurred: Timeout reached", debug_log,
                      "Runner error occurred: Timeout reached message not "
                      "in the test's debug.log:\n%s" % debug_log)
        self.assertNotIn("Traceback (most recent", debug_log, "Traceback "
                         "present in the test's debug.log file, but it was "
                         "suppose to be stopped and unable to produce it.\n"
                         "%s" % debug_log)
841

842
    def tearDown(self):
843 844
        self.pass_script.remove()
        self.fail_script.remove()
845
        shutil.rmtree(self.tmpdir)
846 847


A
Amador Pahim 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891
class RunnerSimpleTestStatus(unittest.TestCase):

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)

        self.config_file = script.TemporaryScript('avocado.conf',
                                                  "[simpletests.status]\n"
                                                  "warn_regex = ^WARN$\n"
                                                  "skip_regex = ^SKIP$\n")
        self.config_file.save()
        os.chdir(basedir)

    def test_simpletest_status(self):
        warn_script = script.TemporaryScript('avocado_warn.sh',
                                             "#!/bin/sh\necho WARN",
                                             'avocado_simpletest_'
                                             'functional')
        warn_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, warn_script.path))
        result = process.system_output(cmd_line, ignore_status=True)
        json_results = json.loads(result)
        self.assertEquals(json_results['tests'][0]['status'], 'WARN')
        warn_script.remove()

        skip_script = script.TemporaryScript('avocado_skip.sh',
                                             "#!/bin/sh\necho SKIP",
                                             'avocado_simpletest_'
                                             'functional')
        skip_script.save()
        cmd_line = ('%s --config %s run --job-results-dir %s --sysinfo=off'
                    ' %s --json -' % (AVOCADO, self.config_file.path,
                                      self.tmpdir, skip_script.path))
        result = process.system_output(cmd_line, ignore_status=True)
        json_results = json.loads(result)
        self.assertEquals(json_results['tests'][0]['status'], 'SKIP')
        skip_script.remove()

    def tearDown(self):
        self.config_file.remove()
        shutil.rmtree(self.tmpdir)


892
class ExternalRunnerTest(unittest.TestCase):
C
Cleber Rosa 已提交
893 894

    def setUp(self):
895
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
C
Cleber Rosa 已提交
896 897
        self.pass_script = script.TemporaryScript(
            'pass',
898
            "exit 0",
899
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
900 901 902
        self.pass_script.save()
        self.fail_script = script.TemporaryScript(
            'fail',
903
            "exit 1",
904
            'avocado_externalrunner_functional')
C
Cleber Rosa 已提交
905
        self.fail_script.save()
906
        os.chdir(basedir)
C
Cleber Rosa 已提交
907

908
    def test_externalrunner_pass(self):
909 910 911
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
912
        result = process.run(cmd_line, ignore_status=True)
913
        expected_rc = exit_codes.AVOCADO_ALL_OK
C
Cleber Rosa 已提交
914 915 916 917
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

918
    def test_externalrunner_fail(self):
919 920 921
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh %s'
                    % (AVOCADO, self.tmpdir, self.fail_script.path))
C
Cleber Rosa 已提交
922
        result = process.run(cmd_line, ignore_status=True)
923
        expected_rc = exit_codes.AVOCADO_TESTS_FAIL
C
Cleber Rosa 已提交
924 925 926 927
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

928
    def test_externalrunner_chdir_no_testdir(self):
929 930 931
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=/bin/sh --external-runner-chdir=test %s'
                    % (AVOCADO, self.tmpdir, self.pass_script.path))
C
Cleber Rosa 已提交
932
        result = process.run(cmd_line, ignore_status=True)
933 934
        expected_output = (b'Option "--external-runner-chdir=test" requires '
                           b'"--external-runner-testdir" to be set')
C
Cleber Rosa 已提交
935
        self.assertIn(expected_output, result.stderr)
936
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
937 938 939 940 941
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def test_externalrunner_no_url(self):
942 943
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off '
                    '--external-runner=%s' % (AVOCADO, self.tmpdir, TRUE_CMD))
944
        result = process.run(cmd_line, ignore_status=True)
945 946
        expected_output = (b'No test references provided nor any other '
                           b'arguments resolved into tests')
947 948
        self.assertIn(expected_output, result.stderr)
        expected_rc = exit_codes.AVOCADO_JOB_FAIL
C
Cleber Rosa 已提交
949 950 951 952 953 954 955 956 957 958
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))

    def tearDown(self):
        self.pass_script.remove()
        self.fail_script.remove()
        shutil.rmtree(self.tmpdir)


959
class AbsPluginsTest(object):
960

961
    def setUp(self):
962
        self.base_outputdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
963
        os.chdir(basedir)
964

965 966 967 968 969 970
    def tearDown(self):
        shutil.rmtree(self.base_outputdir)


class PluginsTest(AbsPluginsTest, unittest.TestCase):

971
    def test_sysinfo_plugin(self):
972
        cmd_line = '%s sysinfo %s' % (AVOCADO, self.base_outputdir)
973
        result = process.run(cmd_line, ignore_status=True)
974
        expected_rc = exit_codes.AVOCADO_ALL_OK
975 976 977 978 979 980
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        sysinfo_files = os.listdir(self.base_outputdir)
        self.assertGreater(len(sysinfo_files), 0, "Empty sysinfo files dir")

981
    def test_list_plugin(self):
982
        cmd_line = '%s list' % AVOCADO
983
        result = process.run(cmd_line, ignore_status=True)
984
        expected_rc = exit_codes.AVOCADO_ALL_OK
985 986 987
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
988 989
        self.assertNotIn(b'No tests were found on current tests dir',
                         result.stdout)
990

991
    def test_list_error_output(self):
992
        cmd_line = '%s list sbrubles' % AVOCADO
993
        result = process.run(cmd_line, ignore_status=True)
994
        expected_rc = exit_codes.AVOCADO_FAIL
995 996 997
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
998
        self.assertIn(b"Unable to resolve reference", result.stderr)
999

1000 1001 1002 1003 1004 1005 1006
    def test_list_no_file_loader(self):
        cmd_line = ("%s list --loaders external --verbose -- "
                    "this-wont-be-matched" % AVOCADO)
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1007 1008 1009 1010 1011 1012
        exp = (b"Type    Test                 Tag(s)\n"
               b"MISSING this-wont-be-matched \n\n"
               b"TEST TYPES SUMMARY\n"
               b"==================\n"
               b"EXTERNAL: 0\n"
               b"MISSING: 1\n")
1013 1014 1015
        self.assertEqual(exp, result.stdout, "Stdout mismatch:\n%s\n\n%s"
                         % (exp, result))

1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
    def test_list_verbose_tags(self):
        """
        Runs list verbosely and check for tag related output
        """
        test = script.make_script(os.path.join(self.base_outputdir, 'test.py'),
                                  VALID_PYTHON_TEST_WITH_TAGS)
        cmd_line = ("%s list --loaders file --verbose %s" % (AVOCADO,
                                                             test))
        result = process.run(cmd_line, ignore_status=True)
        self.assertEqual(result.exit_status, exit_codes.AVOCADO_ALL_OK,
                         "Avocado did not return rc %d:\n%s"
                         % (exit_codes.AVOCADO_ALL_OK, result))
1028
        stdout_lines = result.stdout_text.splitlines()
1029 1030
        self.assertIn("Tag(s)", stdout_lines[0])
        full_test_name = "%s:MyTest.test" % test
1031 1032
        self.assertEqual("INSTRUMENTED %s BIG_TAG_NAME" % full_test_name,
                         stdout_lines[1])
1033 1034 1035
        self.assertIn("TEST TYPES SUMMARY", stdout_lines)
        self.assertIn("INSTRUMENTED: 1", stdout_lines)
        self.assertIn("TEST TAGS SUMMARY", stdout_lines)
1036
        self.assertEqual("BIG_TAG_NAME: 1", stdout_lines[-1])
1037

1038
    def test_plugin_list(self):
1039
        cmd_line = '%s plugins' % AVOCADO
1040
        result = process.run(cmd_line, ignore_status=True)
1041
        expected_rc = exit_codes.AVOCADO_ALL_OK
1042 1043 1044
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1045
        if sys.version_info[:2] >= (2, 7, 0):
1046
            self.assertNotIn(b'Disabled', result.stdout)
1047

1048
    def test_config_plugin(self):
1049
        cmd_line = '%s config --paginator off' % AVOCADO
1050
        result = process.run(cmd_line, ignore_status=True)
1051
        expected_rc = exit_codes.AVOCADO_ALL_OK
1052 1053 1054
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1055
        self.assertNotIn(b'Disabled', result.stdout)
1056 1057

    def test_config_plugin_datadir(self):
1058
        cmd_line = '%s config --datadir --paginator off' % AVOCADO
1059
        result = process.run(cmd_line, ignore_status=True)
1060
        expected_rc = exit_codes.AVOCADO_ALL_OK
1061 1062 1063
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1064
        self.assertNotIn(b'Disabled', result.stdout)
1065

1066
    def test_disable_plugin(self):
1067
        cmd_line = '%s plugins' % AVOCADO
1068 1069 1070 1071 1072
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1073
        self.assertIn(b"Collect system information", result.stdout)
1074 1075 1076 1077 1078

        config_content = "[plugins]\ndisable=['cli.cmd.sysinfo',]"
        config = script.TemporaryScript("disable_sysinfo_cmd.conf",
                                        config_content)
        with config:
1079
            cmd_line = '%s --config %s plugins' % (AVOCADO, config)
1080 1081 1082 1083 1084
            result = process.run(cmd_line, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))
1085
            self.assertNotIn(b"Collect system information", result.stdout)
1086

1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
    def test_plugin_order(self):
        """
        Tests plugin order by configuration file

        First it checks if html, json, xunit and zip_archive plugins are enabled.
        Then it runs a test with zip_archive running first, which means the html,
        json and xunit output files do not make into the archive.

        Then it runs with zip_archive set to run last, which means the html,
        json and xunit output files *do* make into the archive.
        """
        def run_config(config_path):
1099
            cmd = ('%s --config %s run passtest.py --archive '
1100
                   '--job-results-dir %s --sysinfo=off'
1101
                   % (AVOCADO, config_path, self.base_outputdir))
1102 1103 1104 1105 1106 1107 1108 1109
            result = process.run(cmd, ignore_status=True)
            expected_rc = exit_codes.AVOCADO_ALL_OK
            self.assertEqual(result.exit_status, expected_rc,
                             "Avocado did not return rc %d:\n%s" %
                             (expected_rc, result))

        result_plugins = ["json", "xunit", "zip_archive"]
        result_outputs = ["results.json", "results.xml"]
1110
        if html_capable():
1111
            result_plugins.append("html")
1112
            result_outputs.append("results.html")
1113

1114
        cmd_line = '%s plugins' % AVOCADO
1115 1116 1117 1118 1119 1120
        result = process.run(cmd_line, ignore_status=True)
        expected_rc = exit_codes.AVOCADO_ALL_OK
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
        for result_plugin in result_plugins:
1121
            self.assertIn(result_plugin, result.stdout_text)
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136

        config_content_zip_first = "[plugins.result]\norder=['zip_archive']"
        config_zip_first = script.TemporaryScript("zip_first.conf",
                                                  config_content_zip_first)
        with config_zip_first:
            run_config(config_zip_first)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertNotIn(result_output, zip_file_list)
            os.unlink(archives[0])

        config_content_zip_last = ("[plugins.result]\norder=['html', 'json',"
1137 1138
                                   "'xunit', 'non_existing_plugin_is_ignored'"
                                   ",'zip_archive']")
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149
        config_zip_last = script.TemporaryScript("zip_last.conf",
                                                 config_content_zip_last)
        with config_zip_last:
            run_config(config_zip_last)
            archives = glob.glob(os.path.join(self.base_outputdir, '*.zip'))
            self.assertEqual(len(archives), 1, "ZIP Archive not generated")
            zip_file = zipfile.ZipFile(archives[0], 'r')
            zip_file_list = zip_file.namelist()
            for result_output in result_outputs:
                self.assertIn(result_output, zip_file_list)

1150
    def test_Namespace_object_has_no_attribute(self):
1151
        cmd_line = '%s plugins' % AVOCADO
1152
        result = process.run(cmd_line, ignore_status=True)
1153
        expected_rc = exit_codes.AVOCADO_ALL_OK
1154 1155 1156
        self.assertEqual(result.exit_status, expected_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (expected_rc, result))
1157
        self.assertNotIn(b"'Namespace' object has no attribute", result.stderr)
1158

1159

1160 1161 1162 1163
class ParseXMLError(Exception):
    pass


1164
class PluginsXunitTest(AbsPluginsTest, unittest.TestCase):
1165

1166 1167
    @unittest.skipUnless(SCHEMA_CAPABLE,
                         'Unable to validate schema due to missing lxml.etree library')
1168
    def setUp(self):
1169
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
L
Lucas Meneghel Rodrigues 已提交
1170 1171 1172
        junit_xsd = os.path.join(os.path.dirname(__file__),
                                 os.path.pardir, ".data", 'junit-4.xsd')
        self.junit = os.path.abspath(junit_xsd)
1173 1174
        super(PluginsXunitTest, self).setUp()

1175
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1176
                      e_nnotfound, e_nfailures, e_nskip):
1177 1178
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off'
                    ' --xunit - %s' % (AVOCADO, self.tmpdir, testname))
1179 1180 1181 1182 1183 1184 1185
        result = process.run(cmd_line, ignore_status=True)
        xml_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            xunit_doc = xml.dom.minidom.parseString(xml_output)
1186
        except Exception as detail:
1187 1188 1189
            raise ParseXMLError("Failed to parse content: %s\n%s" %
                                (detail, xml_output))

1190
        with open(self.junit, 'rb') as f:
1191 1192
            xmlschema = etree.XMLSchema(etree.parse(f))

1193
        self.assertTrue(xmlschema.validate(etree.parse(BytesIO(xml_output))),
1194 1195 1196 1197
                        "Failed to validate against %s, message:\n%s" %
                        (self.junit,
                         xmlschema.error_log.filter_from_errors()))

1198 1199 1200 1201
        testsuite_list = xunit_doc.getElementsByTagName('testsuite')
        self.assertEqual(len(testsuite_list), 1, 'More than one testsuite tag')

        testsuite_tag = testsuite_list[0]
1202 1203
        self.assertEqual(len(testsuite_tag.attributes), 7,
                         'The testsuite tag does not have 7 attributes. '
1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220
                         'XML:\n%s' % xml_output)

        n_tests = int(testsuite_tag.attributes['tests'].value)
        self.assertEqual(n_tests, e_ntests,
                         "Unexpected number of executed tests, "
                         "XML:\n%s" % xml_output)

        n_errors = int(testsuite_tag.attributes['errors'].value)
        self.assertEqual(n_errors, e_nerrors,
                         "Unexpected number of test errors, "
                         "XML:\n%s" % xml_output)

        n_failures = int(testsuite_tag.attributes['failures'].value)
        self.assertEqual(n_failures, e_nfailures,
                         "Unexpected number of test failures, "
                         "XML:\n%s" % xml_output)

1221
        n_skip = int(testsuite_tag.attributes['skipped'].value)
1222 1223 1224 1225
        self.assertEqual(n_skip, e_nskip,
                         "Unexpected number of test skips, "
                         "XML:\n%s" % xml_output)

1226
    def test_xunit_plugin_passtest(self):
1227
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1228
                           1, 0, 0, 0, 0)
1229 1230

    def test_xunit_plugin_failtest(self):
1231
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1232
                           1, 0, 0, 1, 0)
1233

1234
    def test_xunit_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1235
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1236
                           1, 0, 0, 0, 1)
1237

1238
    def test_xunit_plugin_errortest(self):
1239
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1240
                           1, 1, 0, 0, 0)
1241

1242 1243 1244 1245
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsXunitTest, self).tearDown()

1246 1247 1248 1249 1250

class ParseJSONError(Exception):
    pass


1251
class PluginsJSONTest(AbsPluginsTest, unittest.TestCase):
1252

1253
    def setUp(self):
1254
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
1255 1256
        super(PluginsJSONTest, self).setUp()

1257
    def run_and_check(self, testname, e_rc, e_ntests, e_nerrors,
1258
                      e_nfailures, e_nskip, e_ncancel=0, external_runner=None):
1259 1260
        cmd_line = ('%s run --job-results-dir %s --sysinfo=off --json - '
                    '--archive %s' % (AVOCADO, self.tmpdir, testname))
1261 1262
        if external_runner is not None:
            cmd_line += " --external-runner '%s'" % external_runner
1263 1264 1265 1266 1267 1268 1269
        result = process.run(cmd_line, ignore_status=True)
        json_output = result.stdout
        self.assertEqual(result.exit_status, e_rc,
                         "Avocado did not return rc %d:\n%s" %
                         (e_rc, result))
        try:
            json_data = json.loads(json_output)
1270
        except Exception as detail:
1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
            raise ParseJSONError("Failed to parse content: %s\n%s" %
                                 (detail, json_output))
        self.assertTrue(json_data, "Empty JSON result:\n%s" % json_output)
        self.assertIsInstance(json_data['tests'], list,
                              "JSON result lacks 'tests' list")
        n_tests = len(json_data['tests'])
        self.assertEqual(n_tests, e_ntests,
                         "Different number of expected tests")
        n_errors = json_data['errors']
        self.assertEqual(n_errors, e_nerrors,
                         "Different number of expected tests")
        n_failures = json_data['failures']
        self.assertEqual(n_failures, e_nfailures,
                         "Different number of expected tests")
        n_skip = json_data['skip']
        self.assertEqual(n_skip, e_nskip,
                         "Different number of skipped tests")
1288 1289
        n_cancel = json_data['cancel']
        self.assertEqual(n_cancel, e_ncancel)
1290
        return json_data
1291

1292
    def test_json_plugin_passtest(self):
1293
        self.run_and_check('passtest.py', exit_codes.AVOCADO_ALL_OK,
1294
                           1, 0, 0, 0)
1295 1296

    def test_json_plugin_failtest(self):
1297
        self.run_and_check('failtest.py', exit_codes.AVOCADO_TESTS_FAIL,
1298
                           1, 0, 1, 0)
1299

1300
    def test_json_plugin_skiponsetuptest(self):
A
Amador Pahim 已提交
1301
        self.run_and_check('cancelonsetup.py', exit_codes.AVOCADO_ALL_OK,
1302
                           1, 0, 0, 0, 1)
1303

1304
    def test_json_plugin_errortest(self):
1305
        self.run_and_check('errortest.py', exit_codes.AVOCADO_TESTS_FAIL,
1306
                           1, 1, 0, 0)
1307

1308
    @unittest.skipIf(not GNU_ECHO_BINARY, 'echo binary not available')
1309
    def test_ugly_echo_cmd(self):
1310
        data = self.run_and_check('"-ne foo\\\\\\n\\\'\\\\\\"\\\\\\'
1311
                                  'nbar/baz"', exit_codes.AVOCADO_ALL_OK, 1, 0,
1312
                                  0, 0, external_runner=GNU_ECHO_BINARY)
1313
        # The executed test should be this
1314
        self.assertEqual(data['tests'][0]['id'],
1315
                         '1--ne foo\\\\n\\\'\\"\\\\nbar/baz')
1316 1317
        # logdir name should escape special chars (/)
        self.assertEqual(os.path.basename(data['tests'][0]['logdir']),
1318
                         "1--ne foo__n_'____nbar_baz")
1319

1320 1321 1322 1323
    def tearDown(self):
        shutil.rmtree(self.tmpdir)
        super(PluginsJSONTest, self).tearDown()

L
Lukáš Doktor 已提交
1324

1325 1326
if __name__ == '__main__':
    unittest.main()