test_utils_process.py 13.9 KB
Newer Older
1
import io
2
import logging
3
import os
4
import unittest
5 6 7 8 9 10

try:
    from unittest import mock
except ImportError:
    import mock

11

12
from avocado.utils import gdb
13
from avocado.utils import process
14
from avocado.utils import path
15

16 17 18
from six import string_types


19 20
TRUE_CMD = path.find_command('true')

21

22 23 24 25 26 27 28
class TestSubProcess(unittest.TestCase):

    def test_allow_output_check_parameter(self):
        self.assertRaises(ValueError, process.SubProcess,
                          TRUE_CMD, False, "invalid")


29 30 31
class TestGDBProcess(unittest.TestCase):

    def setUp(self):
32
        self.current_runtime_expr = gdb.GDB_RUN_BINARY_NAMES_EXPR[:]
33 34

    def cleanUp(self):
35
        gdb.GDB_RUN_BINARY_NAMES_EXPR = self.current_runtime_expr
36 37

    def test_should_run_inside_gdb(self):
38
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['foo']
39 40 41 42
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/foo'))
        self.assertFalse(process.should_run_inside_gdb('/usr/bin/fooz'))

43
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('foo:main')
44 45 46
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertFalse(process.should_run_inside_gdb('bar'))

47
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('bar:main.c:5')
48 49 50 51 52
        self.assertTrue(process.should_run_inside_gdb('bar'))
        self.assertFalse(process.should_run_inside_gdb('baz'))
        self.assertTrue(process.should_run_inside_gdb('bar 1 2 3'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/bar 1 2 3'))

53 54 55
    def test_should_run_inside_gdb_malformed_command(self):
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['/bin/virsh']
        cmd = """/bin/virsh node-memory-tune --shm-sleep-millisecs ~!@#$%^*()-=[]{}|_+":;'`,>?. """
56 57 58
        self.assertTrue(process.should_run_inside_gdb(cmd))
        self.assertFalse(process.should_run_inside_gdb("foo bar baz"))
        self.assertFalse(process.should_run_inside_gdb("foo ' "))
59

60
    def test_get_sub_process_klass(self):
61
        gdb.GDB_RUN_BINARY_NAMES_EXPR = []
62
        self.assertIs(process.get_sub_process_klass(TRUE_CMD),
63 64
                      process.SubProcess)

65
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false')
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
        self.assertIs(process.get_sub_process_klass('/bin/false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('true'),
                      process.SubProcess)

    def test_split_gdb_expr(self):
        binary, breakpoint = process.split_gdb_expr('foo:debug_print')
        self.assertEqual(binary, 'foo')
        self.assertEqual(breakpoint, 'debug_print')
        binary, breakpoint = process.split_gdb_expr('bar')
        self.assertEqual(binary, 'bar')
        self.assertEqual(breakpoint, 'main')
        binary, breakpoint = process.split_gdb_expr('baz:main.c:57')
        self.assertEqual(binary, 'baz')
        self.assertEqual(breakpoint, 'main.c:57')
        self.assertIsInstance(process.split_gdb_expr('foo'), tuple)
        self.assertIsInstance(process.split_gdb_expr('foo:debug_print'), tuple)

86 87 88 89 90 91 92 93 94

def mock_fail_find_cmd(cmd, default=None):
    path_paths = ["/usr/libexec", "/usr/local/sbin", "/usr/local/bin",
                  "/usr/sbin", "/usr/bin", "/sbin", "/bin"]
    raise path.CmdNotFoundError(cmd, path_paths)


class TestProcessRun(unittest.TestCase):

95
    @mock.patch.object(path, 'find_command',
96
                       mock.Mock(return_value=TRUE_CMD))
97
    @mock.patch.object(os, 'getuid',
98
                       mock.Mock(return_value=1000))
99 100 101 102 103
    def test_subprocess_nosudo(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

104
    @mock.patch.object(path, 'find_command',
105 106
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
107 108 109 110 111
    def test_subprocess_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

112
    @mock.patch.object(path, 'find_command',
113
                       mock.Mock(return_value=TRUE_CMD))
114
    @mock.patch.object(os, 'getuid',
115
                       mock.Mock(return_value=1000))
116
    def test_subprocess_sudo(self):
117
        expected_command = '%s -n ls -l' % TRUE_CMD
118 119 120
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

121
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
122
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
123 124 125 126 127
    def test_subprocess_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

128
    @mock.patch.object(path, 'find_command',
129 130
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
131 132 133 134 135
    def test_subprocess_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

136
    @mock.patch.object(path, 'find_command',
137 138
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
139
    def test_subprocess_sudo_shell(self):
140
        expected_command = '%s -n -s ls -l' % TRUE_CMD
141 142 143
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

144
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
145
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
146 147 148 149 150
    def test_subprocess_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

151
    @mock.patch.object(path, 'find_command',
152 153
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
154 155 156 157 158
    def test_subprocess_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

159
    @mock.patch.object(path, 'find_command',
160 161
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
162 163 164 165 166
    def test_run_nosudo(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

167
    @mock.patch.object(path, 'find_command',
168 169
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
170 171 172 173 174
    def test_run_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

175
    @mock.patch.object(path, 'find_command',
176 177
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
178
    def test_run_sudo(self):
179
        expected_command = '%s -n ls -l' % TRUE_CMD
180 181 182
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

183
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
184
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
185 186 187 188 189
    def test_run_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

190
    @mock.patch.object(path, 'find_command',
191 192
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
193 194 195 196 197
    def test_run_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

198
    @mock.patch.object(path, 'find_command',
199 200
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
201
    def test_run_sudo_shell(self):
202
        expected_command = '%s -n -s ls -l' % TRUE_CMD
203 204 205
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

206
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
207
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
208 209 210 211 212
    def test_run_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

213
    @mock.patch.object(path, 'find_command',
214 215
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
216 217 218 219 220
    def test_run_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236

class MiscProcessTests(unittest.TestCase):

    def test_binary_from_shell(self):
        self.assertEqual("binary", process.binary_from_shell_cmd("binary"))
        res = process.binary_from_shell_cmd("MY_VAR=foo myV4r=123 "
                                            "quote='a b c' QUOTE=\"1 2 && b\" "
                                            "QuOtE=\"1 2\"foo' 3 4' first_bin "
                                            "second_bin VAR=xyz")
        self.assertEqual("first_bin", res)
        res = process.binary_from_shell_cmd("VAR=VALUE 1st_binary var=value "
                                            "second_binary")
        self.assertEqual("1st_binary", res)
        res = process.binary_from_shell_cmd("FOO=bar ./bin var=value")
        self.assertEqual("./bin", res)

L
Lukáš Doktor 已提交
237

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
class CmdResultTests(unittest.TestCase):

    def test_cmd_result_stdout_stderr_bytes(self):
        result = process.CmdResult()
        self.assertTrue(isinstance(result.stdout, bytes))
        self.assertTrue(isinstance(result.stderr, bytes))

    def test_cmd_result_stdout_stderr_text(self):
        result = process.CmdResult()
        self.assertTrue(isinstance(result.stdout_text, string_types))
        self.assertTrue(isinstance(result.stderr_text, string_types))

    def test_cmd_result_stdout_stderr_already_text(self):
        result = process.CmdResult()
        result.stdout = "supposed command output, but not as bytes"
        result.stderr = "supposed command error, but not as bytes"
        self.assertEqual(result.stdout, result.stdout_text)
        self.assertEqual(result.stderr, result.stderr_text)

    def test_cmd_result_stdout_stderr_other_type(self):
        result = process.CmdResult()
        result.stdout = None
        result.stderr = None
        self.assertRaises(TypeError, lambda x: result.stdout_text)
        self.assertRaises(TypeError, lambda x: result.stderr_text)


265 266 267 268 269 270 271
class FDDrainerTests(unittest.TestCase):

    def test_drain_from_pipe_fd(self):
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, "test")
        fd_drainer.start()
272
        for content in (b"foo", b"bar", b"baz", b"foo\nbar\nbaz\n\n"):
273
            os.write(write_fd, content)
274
        os.write(write_fd, b"finish")
275
        os.close(write_fd)
276
        fd_drainer.flush()
277
        self.assertEqual(fd_drainer.data.getvalue(),
278
                         b"foobarbazfoo\nbar\nbaz\n\nfinish")
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

    def test_log(self):
        class CatchHandler(logging.NullHandler):
            """
            Handler used just to confirm that a logging event happened
            """
            def __init__(self, *args, **kwargs):
                super(CatchHandler, self).__init__(*args, **kwargs)
                self.caught_record = False

            def handle(self, record):
                self.caught_record = True

        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        logger = logging.getLogger("FDDrainerTests.test_log")
        handler = CatchHandler()
        logger.addHandler(handler)
        logger.setLevel(logging.DEBUG)

        fd_drainer = process.FDDrainer(read_fd, result, "test",
                                       logger=logger, verbose=True)
        fd_drainer.start()
302
        os.write(write_fd, b"should go to the log\n")
303
        os.close(write_fd)
304
        fd_drainer.flush()
305
        self.assertEqual(fd_drainer.data.getvalue(),
306
                         b"should go to the log\n")
307 308
        self.assertTrue(handler.caught_record)

309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
    def test_flush_on_closed_handler(self):
        handler = logging.StreamHandler(io.StringIO())
        log = logging.getLogger("test_flush_on_closed_handler")
        log.addHandler(handler)
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, name="test",
                                       stream_logger=log)
        fd_drainer.start()
        os.close(write_fd)
        self.assertIsNotNone(fd_drainer._stream_logger)
        one_stream_closed = False
        for handler in fd_drainer._stream_logger.handlers:
            stream = getattr(handler, 'stream', None)
            if stream is not None:
                if hasattr(stream, 'close'):
                    # force closing the handler's stream to check if
                    # flush will adapt to it
                    stream.close()
                    one_stream_closed = True
        self.assertTrue(one_stream_closed)
        fd_drainer.flush()

    def test_flush_on_handler_with_no_fileno(self):
        handler = logging.StreamHandler(io.StringIO())
        log = logging.getLogger("test_flush_on_handler_with_no_fileno")
        log.addHandler(handler)
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, name="test",
                                       stream_logger=log)
        fd_drainer.start()
        os.close(write_fd)
        fd_drainer.flush()

344

345 346
if __name__ == "__main__":
    unittest.main()