test_utils_process.py 11.4 KB
Newer Older
1
import logging
2
import os
3
import unittest
4 5 6 7 8 9

try:
    from unittest import mock
except ImportError:
    import mock

10

11
from avocado.utils import gdb
12
from avocado.utils import process
13
from avocado.utils import path
14

15 16
TRUE_CMD = path.find_command('true')

17

18 19 20 21 22 23 24
class TestSubProcess(unittest.TestCase):

    def test_allow_output_check_parameter(self):
        self.assertRaises(ValueError, process.SubProcess,
                          TRUE_CMD, False, "invalid")


25 26 27
class TestGDBProcess(unittest.TestCase):

    def setUp(self):
28
        self.current_runtime_expr = gdb.GDB_RUN_BINARY_NAMES_EXPR[:]
29 30

    def cleanUp(self):
31
        gdb.GDB_RUN_BINARY_NAMES_EXPR = self.current_runtime_expr
32 33

    def test_should_run_inside_gdb(self):
34
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['foo']
35 36 37 38
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/foo'))
        self.assertFalse(process.should_run_inside_gdb('/usr/bin/fooz'))

39
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('foo:main')
40 41 42
        self.assertTrue(process.should_run_inside_gdb('foo'))
        self.assertFalse(process.should_run_inside_gdb('bar'))

43
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('bar:main.c:5')
44 45 46 47 48
        self.assertTrue(process.should_run_inside_gdb('bar'))
        self.assertFalse(process.should_run_inside_gdb('baz'))
        self.assertTrue(process.should_run_inside_gdb('bar 1 2 3'))
        self.assertTrue(process.should_run_inside_gdb('/usr/bin/bar 1 2 3'))

49 50 51
    def test_should_run_inside_gdb_malformed_command(self):
        gdb.GDB_RUN_BINARY_NAMES_EXPR = ['/bin/virsh']
        cmd = """/bin/virsh node-memory-tune --shm-sleep-millisecs ~!@#$%^*()-=[]{}|_+":;'`,>?. """
52 53 54
        self.assertTrue(process.should_run_inside_gdb(cmd))
        self.assertFalse(process.should_run_inside_gdb("foo bar baz"))
        self.assertFalse(process.should_run_inside_gdb("foo ' "))
55

56
    def test_get_sub_process_klass(self):
57
        gdb.GDB_RUN_BINARY_NAMES_EXPR = []
58
        self.assertIs(process.get_sub_process_klass(TRUE_CMD),
59 60
                      process.SubProcess)

61
        gdb.GDB_RUN_BINARY_NAMES_EXPR.append('/bin/false')
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
        self.assertIs(process.get_sub_process_klass('/bin/false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('false'),
                      process.GDBSubProcess)
        self.assertIs(process.get_sub_process_klass('true'),
                      process.SubProcess)

    def test_split_gdb_expr(self):
        binary, breakpoint = process.split_gdb_expr('foo:debug_print')
        self.assertEqual(binary, 'foo')
        self.assertEqual(breakpoint, 'debug_print')
        binary, breakpoint = process.split_gdb_expr('bar')
        self.assertEqual(binary, 'bar')
        self.assertEqual(breakpoint, 'main')
        binary, breakpoint = process.split_gdb_expr('baz:main.c:57')
        self.assertEqual(binary, 'baz')
        self.assertEqual(breakpoint, 'main.c:57')
        self.assertIsInstance(process.split_gdb_expr('foo'), tuple)
        self.assertIsInstance(process.split_gdb_expr('foo:debug_print'), tuple)

82 83 84 85 86 87 88 89 90

def mock_fail_find_cmd(cmd, default=None):
    path_paths = ["/usr/libexec", "/usr/local/sbin", "/usr/local/bin",
                  "/usr/sbin", "/usr/bin", "/sbin", "/bin"]
    raise path.CmdNotFoundError(cmd, path_paths)


class TestProcessRun(unittest.TestCase):

91
    @mock.patch.object(path, 'find_command',
92
                       mock.Mock(return_value=TRUE_CMD))
93
    @mock.patch.object(os, 'getuid',
94
                       mock.Mock(return_value=1000))
95 96 97 98 99
    def test_subprocess_nosudo(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

100
    @mock.patch.object(path, 'find_command',
101 102
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
103 104 105 106 107
    def test_subprocess_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l')
        self.assertEqual(p.cmd, expected_command)

108
    @mock.patch.object(path, 'find_command',
109
                       mock.Mock(return_value=TRUE_CMD))
110
    @mock.patch.object(os, 'getuid',
111
                       mock.Mock(return_value=1000))
112
    def test_subprocess_sudo(self):
113
        expected_command = '%s -n ls -l' % TRUE_CMD
114 115 116
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

117
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
118
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
119 120 121 122 123
    def test_subprocess_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

124
    @mock.patch.object(path, 'find_command',
125 126
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
127 128 129 130 131
    def test_subprocess_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True)
        self.assertEqual(p.cmd, expected_command)

132
    @mock.patch.object(path, 'find_command',
133 134
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
135
    def test_subprocess_sudo_shell(self):
136
        expected_command = '%s -n -s ls -l' % TRUE_CMD
137 138 139
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

140
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
141
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
142 143 144 145 146
    def test_subprocess_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

147
    @mock.patch.object(path, 'find_command',
148 149
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
150 151 152 153 154
    def test_subprocess_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.SubProcess(cmd='ls -l', sudo=True, shell=True)
        self.assertEqual(p.cmd, expected_command)

155
    @mock.patch.object(path, 'find_command',
156 157
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
158 159 160 161 162
    def test_run_nosudo(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

163
    @mock.patch.object(path, 'find_command',
164 165
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
166 167 168 169 170
    def test_run_nosudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', ignore_status=True)
        self.assertEqual(p.command, expected_command)

171
    @mock.patch.object(path, 'find_command',
172 173
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
174
    def test_run_sudo(self):
175
        expected_command = '%s -n ls -l' % TRUE_CMD
176 177 178
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

179
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
180
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
181 182 183 184 185
    def test_run_sudo_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

186
    @mock.patch.object(path, 'find_command',
187 188
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
189 190 191 192 193
    def test_run_sudo_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

194
    @mock.patch.object(path, 'find_command',
195 196
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
197
    def test_run_sudo_shell(self):
198
        expected_command = '%s -n -s ls -l' % TRUE_CMD
199 200 201
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

202
    @mock.patch.object(path, 'find_command', mock_fail_find_cmd)
203
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=1000))
204 205 206 207 208
    def test_run_sudo_shell_no_sudo_installed(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

209
    @mock.patch.object(path, 'find_command',
210 211
                       mock.Mock(return_value=TRUE_CMD))
    @mock.patch.object(os, 'getuid', mock.Mock(return_value=0))
212 213 214 215 216
    def test_run_sudo_shell_uid_0(self):
        expected_command = 'ls -l'
        p = process.run(cmd='ls -l', sudo=True, shell=True, ignore_status=True)
        self.assertEqual(p.command, expected_command)

217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232

class MiscProcessTests(unittest.TestCase):

    def test_binary_from_shell(self):
        self.assertEqual("binary", process.binary_from_shell_cmd("binary"))
        res = process.binary_from_shell_cmd("MY_VAR=foo myV4r=123 "
                                            "quote='a b c' QUOTE=\"1 2 && b\" "
                                            "QuOtE=\"1 2\"foo' 3 4' first_bin "
                                            "second_bin VAR=xyz")
        self.assertEqual("first_bin", res)
        res = process.binary_from_shell_cmd("VAR=VALUE 1st_binary var=value "
                                            "second_binary")
        self.assertEqual("1st_binary", res)
        res = process.binary_from_shell_cmd("FOO=bar ./bin var=value")
        self.assertEqual("./bin", res)

L
Lukáš Doktor 已提交
233

234 235 236 237 238 239 240
class FDDrainerTests(unittest.TestCase):

    def test_drain_from_pipe_fd(self):
        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        fd_drainer = process.FDDrainer(read_fd, result, "test")
        fd_drainer.start()
241
        for content in (b"foo", b"bar", b"baz", b"foo\nbar\nbaz\n\n"):
242
            os.write(write_fd, content)
243
        os.write(write_fd, b"finish")
244
        os.close(write_fd)
245
        fd_drainer.flush()
246
        self.assertEqual(fd_drainer.data.getvalue(),
247
                         b"foobarbazfoo\nbar\nbaz\n\nfinish")
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270

    def test_log(self):
        class CatchHandler(logging.NullHandler):
            """
            Handler used just to confirm that a logging event happened
            """
            def __init__(self, *args, **kwargs):
                super(CatchHandler, self).__init__(*args, **kwargs)
                self.caught_record = False

            def handle(self, record):
                self.caught_record = True

        read_fd, write_fd = os.pipe()
        result = process.CmdResult()
        logger = logging.getLogger("FDDrainerTests.test_log")
        handler = CatchHandler()
        logger.addHandler(handler)
        logger.setLevel(logging.DEBUG)

        fd_drainer = process.FDDrainer(read_fd, result, "test",
                                       logger=logger, verbose=True)
        fd_drainer.start()
271
        os.write(write_fd, b"should go to the log\n")
272
        os.close(write_fd)
273
        fd_drainer.flush()
274
        self.assertEqual(fd_drainer.data.getvalue(),
275
                         b"should go to the log\n")
276 277 278
        self.assertTrue(handler.caught_record)


279 280
if __name__ == "__main__":
    unittest.main()