test_interrupt.py 6.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import os
import sys
import tempfile
import time
import shutil

import aexpect
import psutil

if sys.version_info[:2] == (2, 6):
    import unittest2 as unittest
else:
    import unittest

from avocado.utils import wait
from avocado.utils import script
from avocado.utils import data_factory

19 20 21 22 23

basedir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..')
basedir = os.path.abspath(basedir)


24 25 26 27 28 29 30 31 32 33 34 35
BAD_TEST = """#!/usr/bin/env python
import signal
import time

if __name__ == "__main__":
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    signal.signal(signal.SIGTERM, signal.SIG_IGN)
    signal.signal(signal.SIGQUIT, signal.SIG_IGN)
    while True:
        time.sleep(0.1)
"""

36
GOOD_TEST = """#!/usr/bin/env python
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
import time
from avocado import Test
from avocado import main

class GoodTest(Test):
    def test(self):
        time.sleep(600)

if __name__ == "__main__":
    main()
"""


class InterruptTest(unittest.TestCase):

    def setUp(self):
53
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
54

55
    @unittest.skip("Temporary plugin infrastructure removal")
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    def test_badly_behaved(self):
        """
        Make sure avocado can cleanly get out of a loop of badly behaved tests.
        """
        bad_test_basename = ('wontquit-%s' %
                             data_factory.generate_random_string(5))
        bad_test = script.TemporaryScript(bad_test_basename, BAD_TEST,
                                          'avocado_interrupt_test',
                                          mode=0755)
        bad_test.save()

        os.chdir(basedir)
        cmd_line = ('./scripts/avocado run --sysinfo=off --job-results-dir %s '
                    '%s %s %s' % (self.tmpdir,
                                  bad_test.path,
                                  bad_test.path,
                                  bad_test.path))
73
        proc = aexpect.Expect(command=cmd_line, linesep='')
74 75 76 77 78 79 80 81 82 83 84 85 86
        proc.read_until_last_line_matches(os.path.basename(bad_test.path))
        proc.sendline('\x03')
        proc.read_until_last_line_matches('Interrupt requested. Waiting 2 '
                                          'seconds for test to finish '
                                          '(ignoring new Ctrl+C until then)')
        # We have to actually wait 2 seconds until the ignore window is over
        time.sleep(2)
        proc.sendline('\x03')
        proc.read_until_last_line_matches('TIME       : %d s')
        wait.wait_for(lambda: not proc.is_alive(), timeout=1)

        # Make sure the bad test will be really gone from the process table
        def wait_until_no_badtest():
87
            bad_test_processes = []
88

89
            old_psutil = False
90 91 92 93
            try:
                process_list = psutil.pids()
            except AttributeError:
                process_list = psutil.get_pid_list()
94
                old_psutil = True
95 96

            for p in process_list:
97 98 99 100 101 102
                p_obj = None
                try:
                    p_obj = psutil.Process(p)
                except psutil.NoSuchProcess:
                    pass
                if p_obj is not None:
103 104 105 106 107
                    if old_psutil:
                        cmdline_list = psutil.Process(p).cmdline
                    else:
                        cmdline_list = psutil.Process(p).cmdline()
                    if bad_test.path in " ".join(cmdline_list):
108
                        bad_test_processes.append(p_obj)
109 110 111 112 113 114
            return len(bad_test_processes) == 0

        wait.wait_for(wait_until_no_badtest, timeout=2)
        # Make sure the Killing test subprocess message did appear
        self.assertIn('Killing test subprocess', proc.get_output())

115
    @unittest.skip("Temporary plugin infrastructure removal")
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    def test_well_behaved(self):
        """
        Make sure avocado can cleanly get out of a loop of well behaved tests.
        """
        good_test_basename = ('goodtest-%s.py' %
                              data_factory.generate_random_string(5))
        good_test = script.TemporaryScript(good_test_basename, GOOD_TEST,
                                           'avocado_interrupt_test',
                                           mode=0755)
        good_test.save()

        os.chdir(basedir)
        cmd_line = ('./scripts/avocado run --sysinfo=off --job-results-dir %s '
                    '%s %s %s' % (self.tmpdir,
                                  good_test.path,
                                  good_test.path,
                                  good_test.path))
133
        proc = aexpect.Expect(command=cmd_line, linesep='')
134 135 136 137 138 139 140
        proc.read_until_last_line_matches(os.path.basename(good_test.path))
        proc.sendline('\x03')
        proc.read_until_last_line_matches('TIME       : %d s')
        wait.wait_for(lambda: not proc.is_alive(), timeout=1)

        # Make sure the good test will be really gone from the process table
        def wait_until_no_goodtest():
141
            good_test_processes = []
142

143
            old_psutil = False
144 145 146 147
            try:
                process_list = psutil.pids()
            except AttributeError:
                process_list = psutil.get_pid_list()
148
                old_psutil = True
149 150

            for p in process_list:
151 152 153 154 155 156
                p_obj = None
                try:
                    p_obj = psutil.Process(p)
                except psutil.NoSuchProcess:
                    pass
                if p_obj is not None:
157 158 159 160 161
                    if old_psutil:
                        cmdline_list = psutil.Process(p).cmdline
                    else:
                        cmdline_list = psutil.Process(p).cmdline()
                    if good_test.path in " ".join(cmdline_list):
162
                        good_test_processes.append(p_obj)
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
            return len(good_test_processes) == 0

        wait.wait_for(wait_until_no_goodtest, timeout=2)
        # Make sure the Killing test subprocess message is not there
        self.assertNotIn('Killing test subprocess', proc.get_output())
        # Make sure the Interrupted requested sentence is there
        self.assertIn('Interrupt requested. Waiting 2 seconds for test to '
                      'finish (ignoring new Ctrl+C until then)',
                      proc.get_output())

    def tearDown(self):
        shutil.rmtree(self.tmpdir)

if __name__ == '__main__':
    unittest.main()