db_crashtest.py 14.0 KB
Newer Older
1
#! /usr/bin/env python
2 3 4
import os
import sys
import time
5
import random
6
import tempfile
7
import subprocess
I
Igor Canadi 已提交
8
import shutil
9
import argparse
10

11 12
# params overwrite priority:
#   for default:
13
#       default_params < {blackbox,whitebox}_default_params < args
14
#   for simple:
15 16 17
#       default_params < {blackbox,whitebox}_default_params <
#       simple_default_params <
#       {blackbox,whitebox}_simple_default_params < args
18 19 20
#   for enable_atomic_flush:
#       default_params < {blackbox,whitebox}_default_params <
#       atomic_flush_params < args
21

22 23
expected_values_file = tempfile.NamedTemporaryFile()

24
default_params = {
25
    "acquire_snapshot_one_in": 10000,
26 27
    "block_size": 16384,
    "cache_size": 1048576,
28
    "checkpoint_one_in": 1000000,
29 30
    "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
    "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
31
    "clear_column_family_one_in": 0,
32 33
    "compact_files_one_in": 1000000,
    "compact_range_one_in": 1000000,
34 35
    "delpercent": 5,
    "destroy_db_initially": 0,
36
    "enable_pipelined_write": lambda: random.randint(0, 1),
37
    "expected_values_path": expected_values_file.name,
38
    "flush_one_in": 1000000,
39 40 41 42 43
    "max_background_compactions": 20,
    "max_bytes_for_level_base": 10485760,
    "max_key": 100000000,
    "max_write_buffer_number": 3,
    "mmap_read": lambda: random.randint(0, 1),
44
    "nooverwritepercent": 1,
45 46 47 48 49
    "open_files": 500000,
    "prefixpercent": 5,
    "progress_reports": 0,
    "readpercent": 45,
    "reopen": 20,
50
    "snapshot_hold_ops": 100000,
51
    "subcompactions": lambda: random.randint(1, 4),
52 53
    "target_file_size_base": 2097152,
    "target_file_size_multiplier": 2,
54 55
    "use_direct_reads": lambda: random.randint(0, 1),
    "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
56 57
    "use_full_merge_v1": lambda: random.randint(0, 1),
    "use_merge": lambda: random.randint(0, 1),
58 59 60
    "verify_checksum": 1,
    "write_buffer_size": 4 * 1024 * 1024,
    "writepercent": 35,
61
    "format_version": lambda: random.randint(2, 4),
62
    "index_block_restart_interval": lambda: random.choice(range(1, 16)),
63
}
64

65 66
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

67

68
def get_dbname(test_name):
69
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
S
sdong 已提交
70
    if test_tmpdir is None or test_tmpdir == "":
71
        dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
S
sdong 已提交
72
    else:
73
        dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
74
        shutil.rmtree(dbname, True)
75
        os.mkdir(dbname)
76 77
    return dbname

78 79 80 81 82 83 84 85 86 87

def is_direct_io_supported(dbname):
    with tempfile.NamedTemporaryFile(dir=dbname) as f:
        try:
            os.open(f.name, os.O_DIRECT)
        except:
            return False
        return True


88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
blackbox_default_params = {
    # total time for this script to test db_stress
    "duration": 6000,
    # time for one db_stress instance to run
    "interval": 120,
    # since we will be killing anyway, use large value for ops_per_thread
    "ops_per_thread": 100000000,
    "set_options_one_in": 10000,
    "test_batches_snapshots": 1,
}

whitebox_default_params = {
    "duration": 10000,
    "log2_keys_per_lock": 10,
    "ops_per_thread": 200000,
103
    "random_kill_odd": 888887,
104
    "test_batches_snapshots": lambda: random.randint(0, 1),
105 106 107
}

simple_default_params = {
108
    "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
109
    "column_families": 1,
110 111 112
    "max_background_compactions": 1,
    "max_bytes_for_level_base": 67108864,
    "memtablerep": "skip_list",
113 114
    "prefixpercent": 25,
    "readpercent": 25,
115 116 117 118 119 120 121 122 123 124 125
    "target_file_size_base": 16777216,
    "target_file_size_multiplier": 1,
    "test_batches_snapshots": 0,
    "write_buffer_size": 32 * 1024 * 1024,
}

blackbox_simple_default_params = {
    "open_files": -1,
    "set_options_one_in": 0,
}

126
whitebox_simple_default_params = {}
127

128 129 130 131 132 133 134 135 136
atomic_flush_params = {
    "atomic_flush": 1,
    "disable_wal": 1,
    "reopen": 0,
    # use small value for write_buffer_size so that RocksDB triggers flush
    # more frequently
    "write_buffer_size": 1024 * 1024,
}

137

138 139 140
def finalize_and_sanitize(src_params):
    dest_params = dict([(k,  v() if callable(v) else v)
                        for (k, v) in src_params.items()])
141 142 143
    if dest_params.get("compression_type") != "zstd" or \
            dest_params.get("compression_max_dict_bytes") == 0:
        dest_params["compression_zstd_max_train_bytes"] = 0
144
    if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
145
        dest_params["memtablerep"] = "skip_list"
146 147 148 149
    if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
            dest_params["db"]):
        dest_params["use_direct_io_for_flush_and_compaction"] = 0
        dest_params["use_direct_reads"] = 0
150 151 152
    return dest_params


153 154 155
def gen_cmd_params(args):
    params = {}

156 157 158 159 160
    params.update(default_params)
    if args.test_type == 'blackbox':
        params.update(blackbox_default_params)
    if args.test_type == 'whitebox':
        params.update(whitebox_default_params)
161 162 163 164 165 166
    if args.simple:
        params.update(simple_default_params)
        if args.test_type == 'blackbox':
            params.update(blackbox_simple_default_params)
        if args.test_type == 'whitebox':
            params.update(whitebox_simple_default_params)
167 168
    if args.enable_atomic_flush:
        params.update(atomic_flush_params)
169 170 171 172 173 174 175

    for k, v in vars(args).items():
        if v is not None:
            params[k] = v
    return params


176
def gen_cmd(params, unknown_params):
177
    cmd = ['./db_stress'] + [
178 179
        '--{0}={1}'.format(k, v)
        for k, v in finalize_and_sanitize(params).items()
180
        if k not in set(['test_type', 'simple', 'duration', 'interval',
181
                         'random_kill_odd', 'enable_atomic_flush'])
182
        and v is not None] + unknown_params
183 184 185 186 187
    return cmd


# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
188
def blackbox_crash_main(args, unknown_args):
189
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
190
    dbname = get_dbname('blackbox')
191 192 193 194
    exit_time = time.time() + cmd_params['duration']

    print("Running blackbox-crash-test with \n"
          + "interval_between_crash=" + str(cmd_params['interval']) + "\n"
195
          + "total-duration=" + str(cmd_params['duration']) + "\n")
I
Igor Canadi 已提交
196

197 198
    while time.time() < exit_time:
        run_had_errors = False
199 200
        killtime = time.time() + cmd_params['interval']

201 202
        cmd = gen_cmd(dict(
            cmd_params.items() +
203
            {'db': dbname}.items()), unknown_args)
204

205
        child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
206
        print("Running db_stress with pid=%d: %s\n\n"
207
              % (child.pid, ' '.join(cmd)))
208

L
Lei Jin 已提交
209
        stop_early = False
210
        while time.time() < killtime:
L
Lei Jin 已提交
211 212 213 214 215 216
            if child.poll() is not None:
                print("WARNING: db_stress ended before kill: exitcode=%d\n"
                      % child.returncode)
                stop_early = True
                break
            time.sleep(1)
217

L
Lei Jin 已提交
218 219 220 221 222 223 224 225
        if not stop_early:
            if child.poll() is not None:
                print("WARNING: db_stress ended before kill: exitcode=%d\n"
                      % child.returncode)
            else:
                child.kill()
                print("KILLED %d\n" % child.pid)
                time.sleep(1)  # time to stabilize after a kill
226 227 228

        while True:
            line = child.stderr.readline().strip()
229 230 231
            if line == '':
                break
            elif not line.startswith('WARNING'):
232
                run_had_errors = True
233 234
                print('stderr has error message:')
                print('***' + line + '***')
235

236 237 238 239
        if run_had_errors:
            sys.exit(2)

        time.sleep(1)  # time to stabilize before the next run
240

I
Igor Canadi 已提交
241
    # we need to clean up after ourselves -- only do this on test success
S
Shusen Liu 已提交
242
    shutil.rmtree(dbname, True)
I
Igor Canadi 已提交
243

244 245 246

# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
247
def whitebox_crash_main(args, unknown_args):
248
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
249
    dbname = get_dbname('whitebox')
250 251 252 253 254 255

    cur_time = time.time()
    exit_time = cur_time + cmd_params['duration']
    half_time = cur_time + cmd_params['duration'] / 2

    print("Running whitebox-crash-test with \n"
256
          + "total-duration=" + str(cmd_params['duration']) + "\n")
257 258 259

    total_check_mode = 4
    check_mode = 0
260
    kill_random_test = cmd_params['random_kill_odd']
261 262 263 264 265 266 267 268
    kill_mode = 0

    while time.time() < exit_time:
        if check_mode == 0:
            additional_opts = {
                # use large ops per thread since we will kill it anyway
                "ops_per_thread": 100 * cmd_params['ops_per_thread'],
            }
269 270 271 272
            # run with kill_random_test, with three modes.
            # Mode 0 covers all kill points. Mode 1 covers less kill points but
            # increases change of triggering them. Mode 2 covers even less
            # frequent kill points and further increases triggering change.
273 274 275 276 277 278
            if kill_mode == 0:
                additional_opts.update({
                    "kill_random_test": kill_random_test,
                })
            elif kill_mode == 1:
                additional_opts.update({
279
                    "kill_random_test": (kill_random_test / 10 + 1),
280 281 282
                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
                    + "WritableFileWriter::WriteBuffered",
                })
283
            elif kill_mode == 2:
284 285
                # TODO: May need to adjust random odds if kill_random_test
                # is too small.
286
                additional_opts.update({
287
                    "kill_random_test": (kill_random_test / 5000 + 1),
288 289 290 291 292 293
                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
                    "WritableFileWriter::WriteBuffered,"
                    "PosixMmapFile::Allocate,WritableFileWriter::Flush",
                })
            # Run kill mode 0, 1 and 2 by turn.
            kill_mode = (kill_mode + 1) % 3
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
        elif check_mode == 1:
            # normal run with universal compaction mode
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
                "compaction_style": 1,
            }
        elif check_mode == 2:
            # normal run with FIFO compaction mode
            # ops_per_thread is divided by 5 because FIFO compaction
            # style is quite a bit slower on reads with lot of files
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'] / 5,
                "compaction_style": 2,
            }
        else:
            # normal run
312
            additional_opts = {
313 314 315 316
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
            }

S
Shusen Liu 已提交
317
        cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
318
                           + {'db': dbname}.items()), unknown_args)
319

M
Mark Isaacson 已提交
320
        print "Running:" + ' '.join(cmd) + "\n"  # noqa: E999 T25377293 Grandfathered in
321

322 323
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT)
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
        stdoutdata, stderrdata = popen.communicate()
        retncode = popen.returncode
        msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
               check_mode, additional_opts['kill_random_test'], retncode))
        print msg
        print stdoutdata

        expected = False
        if additional_opts['kill_random_test'] is None and (retncode == 0):
            # we expect zero retncode if no kill option
            expected = True
        elif additional_opts['kill_random_test'] is not None and retncode < 0:
            # we expect negative retncode if kill option was given
            expected = True

        if not expected:
            print "TEST FAILED. See kill option and exit code above!!!\n"
            sys.exit(1)

        stdoutdata = stdoutdata.lower()
        errorcount = (stdoutdata.count('error') -
                      stdoutdata.count('got errors 0 times'))
        print "#times error occurred in output is " + str(errorcount) + "\n"

        if (errorcount > 0):
            print "TEST FAILED. Output has 'error'!!!\n"
            sys.exit(2)
        if (stdoutdata.find('fail') >= 0):
            print "TEST FAILED. Output has 'fail'!!!\n"
            sys.exit(2)

        # First half of the duration, keep doing kill test. For the next half,
        # try different modes.
        if time.time() > half_time:
            # we need to clean up after ourselves -- only do this on test
            # success
S
Shusen Liu 已提交
360
            shutil.rmtree(dbname, True)
361
            os.mkdir(dbname)
362
            cmd_params.pop('expected_values_path', None)
363 364 365 366 367 368 369 370 371 372
            check_mode = (check_mode + 1) % total_check_mode

        time.sleep(1)  # time to stabilize after a kill


def main():
    parser = argparse.ArgumentParser(description="This script runs and kills \
        db_stress multiple times")
    parser.add_argument("test_type", choices=["blackbox", "whitebox"])
    parser.add_argument("--simple", action="store_true")
373
    parser.add_argument("--enable_atomic_flush", action='store_true')
374 375 376 377 378 379 380 381 382 383

    all_params = dict(default_params.items()
                      + blackbox_default_params.items()
                      + whitebox_default_params.items()
                      + simple_default_params.items()
                      + blackbox_simple_default_params.items()
                      + whitebox_simple_default_params.items())

    for k, v in all_params.items():
        parser.add_argument("--" + k, type=type(v() if callable(v) else v))
384 385
    # unknown_args are passed directly to db_stress
    args, unknown_args = parser.parse_known_args()
386

387 388 389 390 391 392
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
    if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
        print('%s env var is set to a non-existent directory: %s' %
                (_TEST_DIR_ENV_VAR, test_tmpdir))
        sys.exit(1)

393
    if args.test_type == 'blackbox':
394
        blackbox_crash_main(args, unknown_args)
395
    if args.test_type == 'whitebox':
396
        whitebox_crash_main(args, unknown_args)
397 398 399

if __name__ == '__main__':
    main()