test_utils_process.py 16.1 KB
Newer Older
1
import io
2
import logging
3
import os
4
import unittest
5 6 7 8 9 10

try:
    from unittest import mock
except ImportError:
    import mock

11

12
from avocado.utils import gdb
13
from avocado.utils import process
14
from avocado.utils import path
15

16 17 18
from six import string_types


19 20 21 22 23 24 25 26
def probe_binary(binary):
    try:
        return path.find_command(binary)
    except path.CmdNotFoundError:
        return None


TRUE_CMD = probe_binary('true')
27
ECHO_CMD = probe_binary('echo')
28

29

30 31 32 33 34 35 36
class TestSubProcess(unittest.TestCase):

    def test_allow_output_check_parameter(self):
        self.assertRaises(ValueError, process.SubProcess,
                          TRUE_CMD, False, "invalid")


37 38 39
class TestGDBProcess(unittest.TestCase):

    def setUp(self):
40
        self.current_runtime_expr = gdb.GDB_RUN_BINARY_NAMES_EXPR[:]
41 42

    def cleanUp(self):
43
        gdb.GDB_RUN_BINARY_NAMES_EXPR = self.current_runtime_expr
44 45

    def test_should_run_inside_gdb(self):
46
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['foo']
47 48 49 50
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/foo'))
        self.assertFalse(process.should_run_inside_gdb('/usr/bin/fooz'))

51
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('foo:main')
52 53 54
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertFalse(process.should_run_inside_gdb('bar'))

55
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('bar:main.c:5')
56 57 58 59 60
        self.assertTrue(process.should_run_inside_gdb('bar'))
        self.assertFalse(process.should_run_inside_gdb('baz'))
        self.assertTrue(process.should_run_inside_gdb('bar 1 2 3'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/bar 1 2 3'))

61 62 63
    def test_should_run_inside_gdb_malformed_command(self):
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['/bin/virsh']
        cmd = """/bin/virsh node-memory-tune --shm-sleep-millisecs ~!@#$%^*()-=[]{}|_+":;'`,>?. """
64 65 66
        self.assertTrue(process.should_run_inside_gdb(cmd))
        self.assertFalse(process.should_run_inside_gdb("foo bar baz"))
        self.assertFalse(process.should_run_inside_gdb("foo ' "))
67

68 69
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
70
    def test_get_sub_process_klass(self):
71
        gdb.GDB_RUN_BINARY_NAMES_EXPR = []
72
        self.assertIs(process.get_sub_process_klass(TRUE_CMD),
73 74
                      process.SubProcess)

75
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false')
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
        self.assertIs(process.get_sub_process_klass('/bin/false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('true'),
                      process.SubProcess)

    def test_split_gdb_expr(self):
        binary, breakpoint = process.split_gdb_expr('foo:debug_print')
        self.assertEqual(binary, 'foo')
        self.assertEqual(breakpoint, 'debug_print')
        binary, breakpoint = process.split_gdb_expr('bar')
        self.assertEqual(binary, 'bar')
        self.assertEqual(breakpoint, 'main')
        binary, breakpoint = process.split_gdb_expr('baz:main.c:57')
        self.assertEqual(binary, 'baz')
        self.assertEqual(breakpoint, 'main.c:57')
        self.assertIsInstance(process.split_gdb_expr('foo'), tuple)
        self.assertIsInstance(process.split_gdb_expr('foo:debug_print'), tuple)

96 97 98 99 100 101 102 103 104

def mock_fail_find_cmd(cmd, default=None):
    path_paths = ["/usr/libexec", "/usr/local/sbin", "/usr/local/bin",
                  "/usr/sbin", "/usr/bin", "/sbin", "/bin"]
    raise path.CmdNotFoundError(cmd, path_paths)


class TestProcessRun(unittest.TestCase):

105 106
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
107
    @mock.patch.object(path, 'find_command',
108
                       mock.Mock(return_value=TRUE_CMD))
109
    @mock.patch.object(os, 'getuid',
110
                       mock.Mock(return_value=1000))
111 112 113 114 115
    def test_subprocess_nosudo(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

116 117
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
118
    @mock.patch.object(path, 'find_command',
119 120
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
121 122 123 124 125
    def test_subprocess_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

126 127
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
128
    @mock.patch.object(path, 'find_command',
129
                       mock.Mock(return_value=TRUE_CMD))
130
    @mock.patch.object(os, 'getuid',
131
                       mock.Mock(return_value=1000))
132
    def test_subprocess_sudo(self):
133
        expected_command = '%s -n ls -l' % TRUE_CMD
134 135 136
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

137
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
138
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
139 140 141 142 143
    def test_subprocess_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

144 145
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
146
    @mock.patch.object(path, 'find_command',
147 148
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
149 150 151 152 153
    def test_subprocess_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

154 155
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
156
    @mock.patch.object(path, 'find_command',
157 158
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
159
    def test_subprocess_sudo_shell(self):
160
        expected_command = '%s -n -s ls -l' % TRUE_CMD
161 162 163
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

164
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
165
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
166 167 168 169 170
    def test_subprocess_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

171 172
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
173
    @mock.patch.object(path, 'find_command',
174 175
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
176 177 178 179 180
    def test_subprocess_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

181 182
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
183
    @mock.patch.object(path, 'find_command',
184 185
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
186 187 188 189 190
    def test_run_nosudo(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

191 192
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
193
    @mock.patch.object(path, 'find_command',
194 195
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
196 197 198 199 200
    def test_run_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

201 202
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
203
    @mock.patch.object(path, 'find_command',
204 205
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
206
    def test_run_sudo(self):
207
        expected_command = '%s -n ls -l' % TRUE_CMD
208 209 210
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

211
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
212
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
213 214 215 216 217
    def test_run_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

218 219
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
220
    @mock.patch.object(path, 'find_command',
221 222
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
223 224 225 226 227
    def test_run_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

228 229
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
230
    @mock.patch.object(path, 'find_command',
231 232
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
233
    def test_run_sudo_shell(self):
234
        expected_command = '%s -n -s ls -l' % TRUE_CMD
235 236 237
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

238
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
239
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
240 241 242 243 244
    def test_run_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

245 246
    @unittest.skipUnless(TRUE_CMD,
                         '"true" binary not available')
247
    @mock.patch.object(path, 'find_command',
248 249
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
250 251 252 253 254
    def test_run_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

255 256 257 258 259 260
    @unittest.skipUnless(ECHO_CMD, "Echo command not available in system")
    def test_run_unicode_output(self):
        # Using encoded string as shlex does not support decoding
        # but the behavior is exactly the same as if shell binary
        # produced unicode
        text = u"Avok\xe1do"
261 262 263 264 265 266 267 268 269 270
        # Even though code point used is "LATIN SMALL LETTER A WITH ACUTE"
        # (http://unicode.scarfboy.com/?s=u%2B00e1) when encoded to proper
        # utf-8, it becomes two bytes because it is >= 0x80
        # See https://en.wikipedia.org/wiki/UTF-8
        encoded_text = b'Avok\xc3\xa1do'
        self.assertEqual(text.encode('utf-8'), encoded_text)
        self.assertEqual(encoded_text.decode('utf-8'), text)
        result = process.run("%s -n %s" % (ECHO_CMD, text), encoding='utf-8')
        self.assertEqual(result.stdout, encoded_text)
        self.assertEqual(result.stdout_text, text)
271

272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287

class MiscProcessTests(unittest.TestCase):

    def test_binary_from_shell(self):
        self.assertEqual("binary", process.binary_from_shell_cmd("binary"))
        res = process.binary_from_shell_cmd("MY_VAR=foo myV4r=123 "
                                            "quote='a b c' QUOTE=\"1 2 && b\" "
                                            "QuOtE=\"1 2\"foo' 3 4' first_bin "
                                            "second_bin VAR=xyz")
        self.assertEqual("first_bin", res)
        res = process.binary_from_shell_cmd("VAR=VALUE 1st_binary var=value "
                                            "second_binary")
        self.assertEqual("1st_binary", res)
        res = process.binary_from_shell_cmd("FOO=bar ./bin var=value")
        self.assertEqual("./bin", res)

L
Lukáš Doktor 已提交
288

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
class CmdResultTests(unittest.TestCase):

    def test_cmd_result_stdout_stderr_bytes(self):
        result = process.CmdResult()
        self.assertTrue(isinstance(result.stdout, bytes))
        self.assertTrue(isinstance(result.stderr, bytes))

    def test_cmd_result_stdout_stderr_text(self):
        result = process.CmdResult()
        self.assertTrue(isinstance(result.stdout_text, string_types))
        self.assertTrue(isinstance(result.stderr_text, string_types))

    def test_cmd_result_stdout_stderr_already_text(self):
        result = process.CmdResult()
        result.stdout = "supposed command output, but not as bytes"
        result.stderr = "supposed command error, but not as bytes"
        self.assertEqual(result.stdout, result.stdout_text)
        self.assertEqual(result.stderr, result.stderr_text)

    def test_cmd_result_stdout_stderr_other_type(self):
        result = process.CmdResult()
        result.stdout = None
        result.stderr = None
        self.assertRaises(TypeError, lambda x: result.stdout_text)
        self.assertRaises(TypeError, lambda x: result.stderr_text)


316 317 318 319 320 321 322
class FDDrainerTests(unittest.TestCase):

    def test_drain_from_pipe_fd(self):
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, "test")
        fd_drainer.start()
323
        for content in (b"foo", b"bar", b"baz", b"foo\nbar\nbaz\n\n"):
324
            os.write(write_fd, content)
325
        os.write(write_fd, b"finish")
326
        os.close(write_fd)
327
        fd_drainer.flush()
328
        self.assertEqual(fd_drainer.data.getvalue(),
329
                         b"foobarbazfoo\nbar\nbaz\n\nfinish")
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352

    def test_log(self):
        class CatchHandler(logging.NullHandler):
            """
            Handler used just to confirm that a logging event happened
            """
            def __init__(self, *args, **kwargs):
                super(CatchHandler, self).__init__(*args, **kwargs)
                self.caught_record = False

            def handle(self, record):
                self.caught_record = True

        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        logger = logging.getLogger("FDDrainerTests.test_log")
        handler = CatchHandler()
        logger.addHandler(handler)
        logger.setLevel(logging.DEBUG)

        fd_drainer = process.FDDrainer(read_fd, result, "test",
                                       logger=logger, verbose=True)
        fd_drainer.start()
353
        os.write(write_fd, b"should go to the log\n")
354
        os.close(write_fd)
355
        fd_drainer.flush()
356
        self.assertEqual(fd_drainer.data.getvalue(),
357
                         b"should go to the log\n")
358 359
        self.assertTrue(handler.caught_record)

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
    def test_flush_on_closed_handler(self):
        handler = logging.StreamHandler(io.StringIO())
        log = logging.getLogger("test_flush_on_closed_handler")
        log.addHandler(handler)
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, name="test",
                                       stream_logger=log)
        fd_drainer.start()
        os.close(write_fd)
        self.assertIsNotNone(fd_drainer._stream_logger)
        one_stream_closed = False
        for handler in fd_drainer._stream_logger.handlers:
            stream = getattr(handler, 'stream', None)
            if stream is not None:
                if hasattr(stream, 'close'):
                    # force closing the handler's stream to check if
                    # flush will adapt to it
                    stream.close()
                    one_stream_closed = True
        self.assertTrue(one_stream_closed)
        fd_drainer.flush()

    def test_flush_on_handler_with_no_fileno(self):
        handler = logging.StreamHandler(io.StringIO())
        log = logging.getLogger("test_flush_on_handler_with_no_fileno")
        log.addHandler(handler)
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, name="test",
                                       stream_logger=log)
        fd_drainer.start()
        os.close(write_fd)
        fd_drainer.flush()

395

396 397
if __name__ == "__main__":
    unittest.main()