db_crashtest.py 19.2 KB
Newer Older
1
#!/usr/bin/env python2
2
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
3 4 5
import os
import sys
import time
6
import random
7
import tempfile
8
import subprocess
I
Igor Canadi 已提交
9
import shutil
10
import argparse
11

12 13
# params overwrite priority:
#   for default:
14
#       default_params < {blackbox,whitebox}_default_params < args
15
#   for simple:
16 17 18
#       default_params < {blackbox,whitebox}_default_params <
#       simple_default_params <
#       {blackbox,whitebox}_simple_default_params < args
19
#   for cf_consistency:
20
#       default_params < {blackbox,whitebox}_default_params <
21
#       cf_consistency_params < args
22 23
#   for txn:
#       default_params < {blackbox,whitebox}_default_params < txn_params < args
24

25 26
expected_values_file = tempfile.NamedTemporaryFile()

27
default_params = {
28
    "acquire_snapshot_one_in": 10000,
29
    "block_size": 16384,
30 31
    "bloom_bits": lambda: random.choice([random.randint(0,19),
                                         random.lognormvariate(2.3, 1.3)]),
32
    "cache_index_and_filter_blocks": lambda: random.randint(0, 1),
33
    "cache_size": 1048576,
34
    "checkpoint_one_in": 1000000,
S
sdong 已提交
35
    "compression_type": lambda: random.choice(
36 37 38 39 40 41
        ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", "zstd"]),
    "bottommost_compression_type": lambda:
        "disable" if random.randint(0, 1) == 0 else
        random.choice(
            ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress",
             "zstd"]),
S
sdong 已提交
42
    "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]),
43 44
    "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
    "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
45
    "clear_column_family_one_in": 0,
46 47
    "compact_files_one_in": 1000000,
    "compact_range_one_in": 1000000,
48 49
    "delpercent": 4,
    "delrangepercent": 1,
50
    "destroy_db_initially": 0,
51
    "enable_pipelined_write": lambda: random.randint(0, 1),
52
    "expected_values_path": expected_values_file.name,
53
    "flush_one_in": 1000000,
54
    "get_live_files_and_wal_files_one_in": 1000000,
55
    # Temporarily disable hash index
56
    "index_type": lambda: random.choice([0,2]),
57 58 59 60 61
    "max_background_compactions": 20,
    "max_bytes_for_level_base": 10485760,
    "max_key": 100000000,
    "max_write_buffer_number": 3,
    "mmap_read": lambda: random.randint(0, 1),
62
    "nooverwritepercent": 1,
63
    "open_files": lambda : random.choice([-1, 500000]),
64
    "partition_filters": lambda: random.randint(0, 1),
65
    "pause_background_one_in": 1000000,
66 67 68
    "prefixpercent": 5,
    "progress_reports": 0,
    "readpercent": 45,
69
    "recycle_log_file_num": lambda: random.randint(0, 1),
70
    "reopen": 20,
71
    "snapshot_hold_ops": 100000,
72
    "long_running_snapshots": lambda: random.randint(0, 1),
73
    "subcompactions": lambda: random.randint(1, 4),
74 75
    "target_file_size_base": 2097152,
    "target_file_size_multiplier": 2,
76 77
    "use_direct_reads": lambda: random.randint(0, 1),
    "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
78 79
    "use_full_merge_v1": lambda: random.randint(0, 1),
    "use_merge": lambda: random.randint(0, 1),
80 81 82
    "verify_checksum": 1,
    "write_buffer_size": 4 * 1024 * 1024,
    "writepercent": 35,
83
    "format_version": lambda: random.choice([2, 3, 4, 5, 5]),
84
    "index_block_restart_interval": lambda: random.choice(range(1, 16)),
A
anand76 已提交
85
    "use_multiget" : lambda: random.randint(0, 1),
86 87 88
    "periodic_compaction_seconds" :
        lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
    "compaction_ttl" : lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
89 90 91
    # Test small max_manifest_file_size in a smaller chance, as most of the
    # time we wnat manifest history to be preserved to help debug
    "max_manifest_file_size" : lambda : random.choice(
S
sdong 已提交
92 93 94 95
        [t * 16384 if t < 3 else 1024 * 1024 * 1024 for t in range(1, 30)]),
    # Sync mode might make test runs slower so running it in a smaller chance
    "sync" : lambda : random.choice(
        [0 if t == 0 else 1 for t in range(1, 20)]),
S
sdong 已提交
96 97 98
    # Disable compation_readahead_size because the test is not passing.
    #"compaction_readahead_size" : lambda : random.choice(
    #    [0, 0, 1024 * 1024]),
S
sdong 已提交
99 100 101 102 103 104
    "db_write_buffer_size" : lambda: random.choice(
        [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]),
    "avoid_unnecessary_blocking_io" : random.randint(0, 1),
    "write_dbid_to_manifest" : random.randint(0, 1),
    "max_write_batch_group_size_bytes" : lambda: random.choice(
        [16, 64, 1024 * 1024, 16 * 1024 * 1024]),
105 106 107
    # Temporarily disabled because of assertion violations in
    # BlockBasedTable::ApproximateSize
    # "level_compaction_dynamic_level_bytes" : True,
108 109
    "verify_checksum_one_in": 1000000,
    "verify_db_one_in": 100000,
110 111 112
    "continuous_verification_interval" : 0,
    "max_key_len": 3,
    "key_len_percent_dist": "1,30,69"
113
}
114

115 116
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

117

118
def get_dbname(test_name):
119
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
S
sdong 已提交
120
    if test_tmpdir is None or test_tmpdir == "":
121
        dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
S
sdong 已提交
122
    else:
123
        dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
124
        shutil.rmtree(dbname, True)
125
        os.mkdir(dbname)
126 127
    return dbname

128 129 130 131 132 133 134 135 136 137

def is_direct_io_supported(dbname):
    with tempfile.NamedTemporaryFile(dir=dbname) as f:
        try:
            os.open(f.name, os.O_DIRECT)
        except:
            return False
        return True


138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
blackbox_default_params = {
    # total time for this script to test db_stress
    "duration": 6000,
    # time for one db_stress instance to run
    "interval": 120,
    # since we will be killing anyway, use large value for ops_per_thread
    "ops_per_thread": 100000000,
    "set_options_one_in": 10000,
    "test_batches_snapshots": 1,
}

whitebox_default_params = {
    "duration": 10000,
    "log2_keys_per_lock": 10,
    "ops_per_thread": 200000,
153
    "random_kill_odd": 888887,
154
    "test_batches_snapshots": lambda: random.randint(0, 1),
155 156 157
}

simple_default_params = {
158
    "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
159
    "column_families": 1,
160 161 162
    "max_background_compactions": 1,
    "max_bytes_for_level_base": 67108864,
    "memtablerep": "skip_list",
163 164 165
    "prefixpercent": 0,
    "readpercent": 50,
    "prefix_size" : -1,
166 167 168 169
    "target_file_size_base": 16777216,
    "target_file_size_multiplier": 1,
    "test_batches_snapshots": 0,
    "write_buffer_size": 32 * 1024 * 1024,
S
sdong 已提交
170
    "level_compaction_dynamic_level_bytes": False,
171 172 173 174 175 176 177
}

blackbox_simple_default_params = {
    "open_files": -1,
    "set_options_one_in": 0,
}

178
whitebox_simple_default_params = {}
179

180 181
cf_consistency_params = {
    "disable_wal": lambda: random.randint(0, 1),
182
    "reopen": 0,
183
    "test_cf_consistency": 1,
184 185 186
    # use small value for write_buffer_size so that RocksDB triggers flush
    # more frequently
    "write_buffer_size": 1024 * 1024,
187
    "enable_pipelined_write": lambda: random.randint(0, 1),
188 189
}

190 191
txn_params = {
    "use_txn" : 1,
192 193
    # Avoid lambda to set it once for the entire test
    "txn_write_policy": random.randint(0, 2),
194
    "unordered_write": random.randint(0, 1),
195 196 197 198 199 200
    "disable_wal": 0,
    # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
    "checkpoint_one_in": 0,
    # pipeline write is not currnetly compatible with WritePrepared txns
    "enable_pipelined_write": 0,
}
201

202 203 204
def finalize_and_sanitize(src_params):
    dest_params = dict([(k,  v() if callable(v) else v)
                        for (k, v) in src_params.items()])
205 206 207
    if dest_params.get("compression_type") != "zstd" or \
            dest_params.get("compression_max_dict_bytes") == 0:
        dest_params["compression_zstd_max_train_bytes"] = 0
208
    if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
209
        dest_params["memtablerep"] = "skip_list"
210 211 212 213
    if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
            dest_params["db"]):
        dest_params["use_direct_io_for_flush_and_compaction"] = 0
        dest_params["use_direct_reads"] = 0
214 215 216
    # DeleteRange is not currnetly compatible with Txns
    if dest_params.get("test_batches_snapshots") == 1 or \
            dest_params.get("use_txn") == 1:
217 218
        dest_params["delpercent"] += dest_params["delrangepercent"]
        dest_params["delrangepercent"] = 0
219 220 221 222
    # Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
    if dest_params.get("unordered_write", 0) == 1:
        dest_params["txn_write_policy"] = 1
        dest_params["allow_concurrent_memtable_write"] = 1
223 224
    if dest_params.get("disable_wal", 0) == 1:
        dest_params["atomic_flush"] = 1
S
sdong 已提交
225
        dest_params["sync"] = 0
226 227 228 229 230
    if dest_params.get("open_files", 1) != -1:
        # Compaction TTL and periodic compactions are only compatible
        # with open_files = -1
        dest_params["compaction_ttl"] = 0
        dest_params["periodic_compaction_seconds"] = 0
231 232 233 234
    if dest_params.get("compaction_style", 0) == 2:
        # Disable compaction TTL in FIFO compaction, because right
        # now assertion failures are triggered.
        dest_params["compaction_ttl"] = 0
235
        dest_params["periodic_compaction_seconds"] = 0
236
    if dest_params["partition_filters"] == 1:
237 238 239 240
        if dest_params["index_type"] != 2:
            dest_params["partition_filters"] = 0
        else:
            dest_params["use_block_based_filter"] = 0
241 242 243
    if dest_params.get("atomic_flush", 0) == 1:
        # disable pipelined write when atomic flush is used.
        dest_params["enable_pipelined_write"] = 0
244 245
    return dest_params

246 247 248
def gen_cmd_params(args):
    params = {}

249 250 251 252 253
    params.update(default_params)
    if args.test_type == 'blackbox':
        params.update(blackbox_default_params)
    if args.test_type == 'whitebox':
        params.update(whitebox_default_params)
254 255 256 257 258 259
    if args.simple:
        params.update(simple_default_params)
        if args.test_type == 'blackbox':
            params.update(blackbox_simple_default_params)
        if args.test_type == 'whitebox':
            params.update(whitebox_simple_default_params)
260 261
    if args.cf_consistency:
        params.update(cf_consistency_params)
262 263
    if args.txn:
        params.update(txn_params)
264 265 266 267 268 269 270

    for k, v in vars(args).items():
        if v is not None:
            params[k] = v
    return params


271
def gen_cmd(params, unknown_params):
272
    finalzied_params = finalize_and_sanitize(params)
273
    cmd = ['./db_stress'] + [
274
        '--{0}={1}'.format(k, v)
275
        for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
276
        if k not in set(['test_type', 'simple', 'duration', 'interval',
277
                         'random_kill_odd', 'cf_consistency', 'txn'])
278
        and v is not None] + unknown_params
279 280 281 282 283
    return cmd


# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
284
def blackbox_crash_main(args, unknown_args):
285
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
286
    dbname = get_dbname('blackbox')
287 288 289 290
    exit_time = time.time() + cmd_params['duration']

    print("Running blackbox-crash-test with \n"
          + "interval_between_crash=" + str(cmd_params['interval']) + "\n"
291
          + "total-duration=" + str(cmd_params['duration']) + "\n")
I
Igor Canadi 已提交
292

293 294
    while time.time() < exit_time:
        run_had_errors = False
295 296
        killtime = time.time() + cmd_params['interval']

297 298
        cmd = gen_cmd(dict(
            cmd_params.items() +
299
            {'db': dbname}.items()), unknown_args)
300

301
        child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
302
        print("Running db_stress with pid=%d: %s\n\n"
303
              % (child.pid, ' '.join(cmd)))
304

L
Lei Jin 已提交
305
        stop_early = False
306
        while time.time() < killtime:
L
Lei Jin 已提交
307 308 309 310 311 312
            if child.poll() is not None:
                print("WARNING: db_stress ended before kill: exitcode=%d\n"
                      % child.returncode)
                stop_early = True
                break
            time.sleep(1)
313

L
Lei Jin 已提交
314 315 316 317 318 319 320 321
        if not stop_early:
            if child.poll() is not None:
                print("WARNING: db_stress ended before kill: exitcode=%d\n"
                      % child.returncode)
            else:
                child.kill()
                print("KILLED %d\n" % child.pid)
                time.sleep(1)  # time to stabilize after a kill
322 323 324

        while True:
            line = child.stderr.readline().strip()
325 326 327
            if line == '':
                break
            elif not line.startswith('WARNING'):
328
                run_had_errors = True
329 330
                print('stderr has error message:')
                print('***' + line + '***')
331

332 333 334 335
        if run_had_errors:
            sys.exit(2)

        time.sleep(1)  # time to stabilize before the next run
336

I
Igor Canadi 已提交
337
    # we need to clean up after ourselves -- only do this on test success
S
Shusen Liu 已提交
338
    shutil.rmtree(dbname, True)
I
Igor Canadi 已提交
339

340 341 342

# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
343
def whitebox_crash_main(args, unknown_args):
344
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
345
    dbname = get_dbname('whitebox')
346 347 348 349 350 351

    cur_time = time.time()
    exit_time = cur_time + cmd_params['duration']
    half_time = cur_time + cmd_params['duration'] / 2

    print("Running whitebox-crash-test with \n"
352
          + "total-duration=" + str(cmd_params['duration']) + "\n")
353 354 355

    total_check_mode = 4
    check_mode = 0
356
    kill_random_test = cmd_params['random_kill_odd']
357 358 359 360 361 362 363 364
    kill_mode = 0

    while time.time() < exit_time:
        if check_mode == 0:
            additional_opts = {
                # use large ops per thread since we will kill it anyway
                "ops_per_thread": 100 * cmd_params['ops_per_thread'],
            }
365 366 367 368
            # run with kill_random_test, with three modes.
            # Mode 0 covers all kill points. Mode 1 covers less kill points but
            # increases change of triggering them. Mode 2 covers even less
            # frequent kill points and further increases triggering change.
369 370 371 372 373
            if kill_mode == 0:
                additional_opts.update({
                    "kill_random_test": kill_random_test,
                })
            elif kill_mode == 1:
374 375 376 377
                if cmd_params.get('disable_wal', 0) == 1:
                    my_kill_odd = kill_random_test / 50 + 1
                else:
                    my_kill_odd = kill_random_test / 10 + 1
378
                additional_opts.update({
379
                    "kill_random_test": my_kill_odd,
380 381 382
                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
                    + "WritableFileWriter::WriteBuffered",
                })
383
            elif kill_mode == 2:
384 385
                # TODO: May need to adjust random odds if kill_random_test
                # is too small.
386
                additional_opts.update({
387
                    "kill_random_test": (kill_random_test / 5000 + 1),
388 389 390 391 392 393
                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
                    "WritableFileWriter::WriteBuffered,"
                    "PosixMmapFile::Allocate,WritableFileWriter::Flush",
                })
            # Run kill mode 0, 1 and 2 by turn.
            kill_mode = (kill_mode + 1) % 3
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
        elif check_mode == 1:
            # normal run with universal compaction mode
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
                "compaction_style": 1,
            }
        elif check_mode == 2:
            # normal run with FIFO compaction mode
            # ops_per_thread is divided by 5 because FIFO compaction
            # style is quite a bit slower on reads with lot of files
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'] / 5,
                "compaction_style": 2,
            }
        else:
            # normal run
412
            additional_opts = {
413 414 415 416
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
            }

S
Shusen Liu 已提交
417
        cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
418
                           + {'db': dbname}.items()), unknown_args)
419

M
Mark Isaacson 已提交
420
        print "Running:" + ' '.join(cmd) + "\n"  # noqa: E999 T25377293 Grandfathered in
421

422 423
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT)
424 425 426 427 428 429 430 431 432 433 434
        stdoutdata, stderrdata = popen.communicate()
        retncode = popen.returncode
        msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
               check_mode, additional_opts['kill_random_test'], retncode))
        print msg
        print stdoutdata

        expected = False
        if additional_opts['kill_random_test'] is None and (retncode == 0):
            # we expect zero retncode if no kill option
            expected = True
435 436 437
        elif additional_opts['kill_random_test'] is not None and retncode <= 0:
            # When kill option is given, the test MIGHT kill itself.
            # If it does, negative retncode is expected. Otherwise 0.
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
            expected = True

        if not expected:
            print "TEST FAILED. See kill option and exit code above!!!\n"
            sys.exit(1)

        stdoutdata = stdoutdata.lower()
        errorcount = (stdoutdata.count('error') -
                      stdoutdata.count('got errors 0 times'))
        print "#times error occurred in output is " + str(errorcount) + "\n"

        if (errorcount > 0):
            print "TEST FAILED. Output has 'error'!!!\n"
            sys.exit(2)
        if (stdoutdata.find('fail') >= 0):
            print "TEST FAILED. Output has 'fail'!!!\n"
            sys.exit(2)

        # First half of the duration, keep doing kill test. For the next half,
        # try different modes.
        if time.time() > half_time:
            # we need to clean up after ourselves -- only do this on test
            # success
S
Shusen Liu 已提交
461
            shutil.rmtree(dbname, True)
462
            os.mkdir(dbname)
463
            cmd_params.pop('expected_values_path', None)
464 465 466 467 468 469 470 471 472 473
            check_mode = (check_mode + 1) % total_check_mode

        time.sleep(1)  # time to stabilize after a kill


def main():
    parser = argparse.ArgumentParser(description="This script runs and kills \
        db_stress multiple times")
    parser.add_argument("test_type", choices=["blackbox", "whitebox"])
    parser.add_argument("--simple", action="store_true")
474
    parser.add_argument("--cf_consistency", action='store_true')
475
    parser.add_argument("--txn", action='store_true')
476 477 478 479 480 481 482 483 484 485

    all_params = dict(default_params.items()
                      + blackbox_default_params.items()
                      + whitebox_default_params.items()
                      + simple_default_params.items()
                      + blackbox_simple_default_params.items()
                      + whitebox_simple_default_params.items())

    for k, v in all_params.items():
        parser.add_argument("--" + k, type=type(v() if callable(v) else v))
486 487
    # unknown_args are passed directly to db_stress
    args, unknown_args = parser.parse_known_args()
488

489 490 491 492 493 494
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
    if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
        print('%s env var is set to a non-existent directory: %s' %
                (_TEST_DIR_ENV_VAR, test_tmpdir))
        sys.exit(1)

495
    if args.test_type == 'blackbox':
496
        blackbox_crash_main(args, unknown_args)
497
    if args.test_type == 'whitebox':
498
        whitebox_crash_main(args, unknown_args)
499 500 501

if __name__ == '__main__':
    main()