test_utils_process.py 15.2 KB
Newer Older
1
import io
2
import logging
3
import os
4
import unittest
5 6 7 8 9 10

try:
    from unittest import mock
except ImportError:
    import mock

11

12
from avocado.utils import gdb
13
from avocado.utils import process
14
from avocado.utils import path
15

16 17 18
from six import string_types


19 20 21 22 23 24 25 26
def probe_binary(binary):
    try:
        return path.find_command(binary)
    except path.CmdNotFoundError:
        return None


TRUE_CMD = probe_binary('true')
27

28

29 30 31 32 33 34 35
class TestSubProcess(unittest.TestCase):

    def test_allow_output_check_parameter(self):
        self.assertRaises(ValueError, process.SubProcess,
                          TRUE_CMD, False, "invalid")


36 37 38
class TestGDBProcess(unittest.TestCase):

    def setUp(self):
39
        self.current_runtime_expr = gdb.GDB_RUN_BINARY_NAMES_EXPR[:]
40 41

    def cleanUp(self):
42
        gdb.GDB_RUN_BINARY_NAMES_EXPR = self.current_runtime_expr
43 44

    def test_should_run_inside_gdb(self):
45
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['foo']
46 47 48 49
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/foo'))
        self.assertFalse(process.should_run_inside_gdb('/usr/bin/fooz'))

50
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('foo:main')
51 52 53
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertFalse(process.should_run_inside_gdb('bar'))

54
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('bar:main.c:5')
55 56 57 58 59
        self.assertTrue(process.should_run_inside_gdb('bar'))
        self.assertFalse(process.should_run_inside_gdb('baz'))
        self.assertTrue(process.should_run_inside_gdb('bar 1 2 3'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/bar 1 2 3'))

60 61 62
    def test_should_run_inside_gdb_malformed_command(self):
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['/bin/virsh']
        cmd = """/bin/virsh node-memory-tune --shm-sleep-millisecs ~!@#$%^*()-=[]{}|_+":;'`,>?. """
63 64 65
        self.assertTrue(process.should_run_inside_gdb(cmd))
        self.assertFalse(process.should_run_inside_gdb("foo bar baz"))
        self.assertFalse(process.should_run_inside_gdb("foo ' "))
66

67 68
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
69
    def test_get_sub_process_klass(self):
70
        gdb.GDB_RUN_BINARY_NAMES_EXPR = []
71
        self.assertIs(process.get_sub_process_klass(TRUE_CMD),
72 73
                      process.SubProcess)

74
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false')
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
        self.assertIs(process.get_sub_process_klass('/bin/false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('true'),
                      process.SubProcess)

    def test_split_gdb_expr(self):
        binary, breakpoint = process.split_gdb_expr('foo:debug_print')
        self.assertEqual(binary, 'foo')
        self.assertEqual(breakpoint, 'debug_print')
        binary, breakpoint = process.split_gdb_expr('bar')
        self.assertEqual(binary, 'bar')
        self.assertEqual(breakpoint, 'main')
        binary, breakpoint = process.split_gdb_expr('baz:main.c:57')
        self.assertEqual(binary, 'baz')
        self.assertEqual(breakpoint, 'main.c:57')
        self.assertIsInstance(process.split_gdb_expr('foo'), tuple)
        self.assertIsInstance(process.split_gdb_expr('foo:debug_print'), tuple)

95 96 97 98 99 100 101 102 103

def mock_fail_find_cmd(cmd, default=None):
    path_paths = ["/usr/libexec", "/usr/local/sbin", "/usr/local/bin",
                  "/usr/sbin", "/usr/bin", "/sbin", "/bin"]
    raise path.CmdNotFoundError(cmd, path_paths)


class TestProcessRun(unittest.TestCase):

104 105
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
106
    @mock.patch.object(path, 'find_command',
107
                       mock.Mock(return_value=TRUE_CMD))
108
    @mock.patch.object(os, 'getuid',
109
                       mock.Mock(return_value=1000))
110 111 112 113 114
    def test_subprocess_nosudo(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

115 116
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
117
    @mock.patch.object(path, 'find_command',
118 119
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
120 121 122 123 124
    def test_subprocess_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

125 126
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
127
    @mock.patch.object(path, 'find_command',
128
                       mock.Mock(return_value=TRUE_CMD))
129
    @mock.patch.object(os, 'getuid',
130
                       mock.Mock(return_value=1000))
131
    def test_subprocess_sudo(self):
132
        expected_command = '%s -n ls -l' % TRUE_CMD
133 134 135
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

136
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
137
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
138 139 140 141 142
    def test_subprocess_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

143 144
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
145
    @mock.patch.object(path, 'find_command',
146 147
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
148 149 150 151 152
    def test_subprocess_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

153 154
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
155
    @mock.patch.object(path, 'find_command',
156 157
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
158
    def test_subprocess_sudo_shell(self):
159
        expected_command = '%s -n -s ls -l' % TRUE_CMD
160 161 162
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

163
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
164
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
165 166 167 168 169
    def test_subprocess_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

170 171
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
172
    @mock.patch.object(path, 'find_command',
173 174
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
175 176 177 178 179
    def test_subprocess_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

180 181
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
182
    @mock.patch.object(path, 'find_command',
183 184
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
185 186 187 188 189
    def test_run_nosudo(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

190 191
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
192
    @mock.patch.object(path, 'find_command',
193 194
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
195 196 197 198 199
    def test_run_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

200 201
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
202
    @mock.patch.object(path, 'find_command',
203 204
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
205
    def test_run_sudo(self):
206
        expected_command = '%s -n ls -l' % TRUE_CMD
207 208 209
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

210
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
211
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
212 213 214 215 216
    def test_run_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

217 218
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
219
    @mock.patch.object(path, 'find_command',
220 221
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
222 223 224 225 226
    def test_run_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

227 228
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
229
    @mock.patch.object(path, 'find_command',
230 231
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
232
    def test_run_sudo_shell(self):
233
        expected_command = '%s -n -s ls -l' % TRUE_CMD
234 235 236
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

237
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
238
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
239 240 241 242 243
    def test_run_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

244 245
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
246
    @mock.patch.object(path, 'find_command',
247 248
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
249 250 251 252 253
    def test_run_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269

class MiscProcessTests(unittest.TestCase):

    def test_binary_from_shell(self):
        self.assertEqual("binary", process.binary_from_shell_cmd("binary"))
        res = process.binary_from_shell_cmd("MY_VAR=foo myV4r=123 "
                                            "quote='a b c' QUOTE=\"1 2 && b\" "
                                            "QuOtE=\"1 2\"foo' 3 4' first_bin "
                                            "second_bin VAR=xyz")
        self.assertEqual("first_bin", res)
        res = process.binary_from_shell_cmd("VAR=VALUE 1st_binary var=value "
                                            "second_binary")
        self.assertEqual("1st_binary", res)
        res = process.binary_from_shell_cmd("FOO=bar ./bin var=value")
        self.assertEqual("./bin", res)

L
Lukáš Doktor 已提交
270

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
class CmdResultTests(unittest.TestCase):

    def test_cmd_result_stdout_stderr_bytes(self):
        result = process.CmdResult()
        self.assertTrue(isinstance(result.stdout, bytes))
        self.assertTrue(isinstance(result.stderr, bytes))

    def test_cmd_result_stdout_stderr_text(self):
        result = process.CmdResult()
        self.assertTrue(isinstance(result.stdout_text, string_types))
        self.assertTrue(isinstance(result.stderr_text, string_types))

    def test_cmd_result_stdout_stderr_already_text(self):
        result = process.CmdResult()
        result.stdout = "supposed command output, but not as bytes"
        result.stderr = "supposed command error, but not as bytes"
        self.assertEqual(result.stdout, result.stdout_text)
        self.assertEqual(result.stderr, result.stderr_text)

    def test_cmd_result_stdout_stderr_other_type(self):
        result = process.CmdResult()
        result.stdout = None
        result.stderr = None
        self.assertRaises(TypeError, lambda x: result.stdout_text)
        self.assertRaises(TypeError, lambda x: result.stderr_text)


298 299 300 301 302 303 304
class FDDrainerTests(unittest.TestCase):

    def test_drain_from_pipe_fd(self):
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, "test")
        fd_drainer.start()
305
        for content in (b"foo", b"bar", b"baz", b"foo\nbar\nbaz\n\n"):
306
            os.write(write_fd, content)
307
        os.write(write_fd, b"finish")
308
        os.close(write_fd)
309
        fd_drainer.flush()
310
        self.assertEqual(fd_drainer.data.getvalue(),
311
                         b"foobarbazfoo\nbar\nbaz\n\nfinish")
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334

    def test_log(self):
        class CatchHandler(logging.NullHandler):
            """
            Handler used just to confirm that a logging event happened
            """
            def __init__(self, *args, **kwargs):
                super(CatchHandler, self).__init__(*args, **kwargs)
                self.caught_record = False

            def handle(self, record):
                self.caught_record = True

        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        logger = logging.getLogger("FDDrainerTests.test_log")
        handler = CatchHandler()
        logger.addHandler(handler)
        logger.setLevel(logging.DEBUG)

        fd_drainer = process.FDDrainer(read_fd, result, "test",
                                       logger=logger, verbose=True)
        fd_drainer.start()
335
        os.write(write_fd, b"should go to the log\n")
336
        os.close(write_fd)
337
        fd_drainer.flush()
338
        self.assertEqual(fd_drainer.data.getvalue(),
339
                         b"should go to the log\n")
340 341
        self.assertTrue(handler.caught_record)

342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
    def test_flush_on_closed_handler(self):
        handler = logging.StreamHandler(io.StringIO())
        log = logging.getLogger("test_flush_on_closed_handler")
        log.addHandler(handler)
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, name="test",
                                       stream_logger=log)
        fd_drainer.start()
        os.close(write_fd)
        self.assertIsNotNone(fd_drainer._stream_logger)
        one_stream_closed = False
        for handler in fd_drainer._stream_logger.handlers:
            stream = getattr(handler, 'stream', None)
            if stream is not None:
                if hasattr(stream, 'close'):
                    # force closing the handler's stream to check if
                    # flush will adapt to it
                    stream.close()
                    one_stream_closed = True
        self.assertTrue(one_stream_closed)
        fd_drainer.flush()

    def test_flush_on_handler_with_no_fileno(self):
        handler = logging.StreamHandler(io.StringIO())
        log = logging.getLogger("test_flush_on_handler_with_no_fileno")
        log.addHandler(handler)
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, name="test",
                                       stream_logger=log)
        fd_drainer.start()
        os.close(write_fd)
        fd_drainer.flush()

377

378 379
if __name__ == "__main__":
    unittest.main()