test_utils.py 5.4 KB
Newer Older
1
import multiprocessing
2
import os
3 4
import random
import shutil
5
import stat
6
import sys
7
import tempfile
8 9 10 11 12
import time

from avocado.utils.filelock import FileLock
from avocado.utils.stacktrace import prepare_exc_info

13

14 15 16 17 18
if sys.version_info[:2] == (2, 6):
    import unittest2 as unittest
else:
    import unittest

19 20
from avocado.utils import process

21 22 23 24 25 26

# What is commonly known as "0775" or "u=rwx,g=rwx,o=rx"
DEFAULT_MODE = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
                stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP |
                stat.S_IROTH | stat.S_IXOTH)

27
FAKE_VMSTAT_CONTENTS = """#!/usr/bin/env python
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
import time
import random
import signal
import sys

class FakeVMStat(object):
    def __init__(self, interval):
        self.interval = interval
        self._sysrand = random.SystemRandom()
        def interrupt_handler(signum, frame):
            sys.exit(0)
        signal.signal(signal.SIGINT, interrupt_handler)
        signal.signal(signal.SIGTERM, interrupt_handler)

    def get_r(self):
        return self._sysrand.randint(0, 2)

    def get_b(self):
        return 0

    def get_swpd(self):
        return 0

    def get_free(self):
        return self._sysrand.randint(1500000, 1600000)

    def get_buff(self):
        return self._sysrand.randint(290000, 300000)

    def get_cache(self):
        return self._sysrand.randint(2900000, 3000000)

    def get_si(self):
        return 0

    def get_so(self):
        return 0

    def get_bi(self):
        return self._sysrand.randint(0, 50)

    def get_bo(self):
        return self._sysrand.randint(0, 500)

    def get_in(self):
        return self._sysrand.randint(200, 3000)

    def get_cs(self):
        return self._sysrand.randint(1000, 4000)

    def get_us(self):
        return self._sysrand.randint(0, 40)

    def get_sy(self):
        return self._sysrand.randint(1, 5)

    def get_id(self):
        return self._sysrand.randint(50, 100)

    def get_wa(self):
        return 0

    def get_st(self):
        return 0

    def start(self):
94 95
        print("procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----")
        print(" r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st")
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
        while True:
            r = self.get_r()
            b = self.get_b()
            swpd = self.get_swpd()
            free = self.get_free()
            buff = self.get_buff()
            cache = self.get_cache()
            si = self.get_si()
            so = self.get_so()
            bi = self.get_bi()
            bo = self.get_bo()
            m_in = self.get_in()
            cs = self.get_cs()
            us = self.get_us()
            sy = self.get_sy()
            m_id = self.get_id()
            wa = self.get_wa()
            st = self.get_st()
114 115 116
            print("%2d %2d  %2d   %7d %6d %7d    %1d    %1d    %2d  %3d %4d %2d %2d %1d  %3d  %1d  %1d" %
                  (r, b, swpd, free, buff, cache, si, so, bi, bo, m_in, cs,
                   us, sy, m_id, wa, st))
117 118 119 120 121 122 123
            time.sleep(self.interval)

if __name__ == '__main__':
    vmstat = FakeVMStat(interval=float(sys.argv[1]))
    vmstat.start()
"""

124
FAKE_UPTIME_CONTENTS = """#!/usr/bin/env python
125
if __name__ == '__main__':
126
    print("17:56:34 up  8:06,  7 users,  load average: 0.26, 0.20, 0.21")
127 128 129 130 131 132 133

"""


class ProcessTest(unittest.TestCase):

    def setUp(self):
134
        self.base_logdir = tempfile.mkdtemp(prefix='avocado_' + __name__)
135 136 137
        self.fake_vmstat = os.path.join(self.base_logdir, 'vmstat')
        with open(self.fake_vmstat, 'w') as fake_vmstat_obj:
            fake_vmstat_obj.write(FAKE_VMSTAT_CONTENTS)
138
        os.chmod(self.fake_vmstat, DEFAULT_MODE)
139 140 141 142

        self.fake_uptime = os.path.join(self.base_logdir, 'uptime')
        with open(self.fake_uptime, 'w') as fake_uptime_obj:
            fake_uptime_obj.write(FAKE_UPTIME_CONTENTS)
143
        os.chmod(self.fake_uptime, DEFAULT_MODE)
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160

    def test_process_start(self):
        proc = process.SubProcess('%s 1' % self.fake_vmstat)
        proc.start()
        time.sleep(3)
        proc.terminate()
        proc.wait()
        stdout = proc.get_stdout()
        self.assertIn('memory', stdout, 'result: %s' % stdout)
        self.assertRegexpMatches(stdout, '[0-9]+')

    def test_process_run(self):
        proc = process.SubProcess(self.fake_uptime)
        result = proc.run()
        self.assertEqual(result.exit_status, 0, 'result: %s' % result)
        self.assertIn('load average', result.stdout)

161 162
    def tearDown(self):
        shutil.rmtree(self.base_logdir)
163

164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

def file_lock_action(args):
    path, players = args
    max_individual_timeout = 0.021
    max_timeout = max_individual_timeout * players
    with FileLock(path, max_timeout):
        sleeptime = random.random() / 100
        time.sleep(sleeptime)


class FileLockTest(unittest.TestCase):

    def setUp(self):
        self.tmpdir = tempfile.mkdtemp(prefix='avocado_' + __name__)

    @unittest.skipIf(os.environ.get("AVOCADO_CHECK_LONG") != "1",
                     "Skipping test that takes a long time to run")
    def test_filelock(self):
        players = 1000
        pool = multiprocessing.Pool(players)
        args = [(self.tmpdir, players)] * players
        try:
            pool.map(file_lock_action, args)
        except:
            msg = 'Failed to run FileLock with %s players:\n%s'
            msg %= (players, prepare_exc_info(sys.exc_info()))
            self.fail(msg)

    def tearDown(self):
        shutil.rmtree(self.tmpdir)


196 197
if __name__ == '__main__':
    unittest.main()