db_crashtest.py 28.4 KB
Newer Older
1
#!/usr/bin/env python3
2
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
3 4
from __future__ import absolute_import, division, print_function, unicode_literals

5 6 7
import os
import sys
import time
8
import random
9
import re
10
import tempfile
11
import subprocess
I
Igor Canadi 已提交
12
import shutil
13
import argparse
14

15 16
# params overwrite priority:
#   for default:
17
#       default_params < {blackbox,whitebox}_default_params < args
18
#   for simple:
19 20 21
#       default_params < {blackbox,whitebox}_default_params <
#       simple_default_params <
#       {blackbox,whitebox}_simple_default_params < args
22
#   for cf_consistency:
23
#       default_params < {blackbox,whitebox}_default_params <
24
#       cf_consistency_params < args
25 26
#   for txn:
#       default_params < {blackbox,whitebox}_default_params < txn_params < args
27

28

29
default_params = {
30
    "acquire_snapshot_one_in": 10000,
31 32 33
    "backup_max_size": 100 * 1024 * 1024,
    # Consider larger number when backups considered more stable
    "backup_one_in": 100000,
34
    "batch_protection_bytes_per_key": lambda: random.choice([0, 8]),
35
    "block_size": 16384,
36 37
    "bloom_bits": lambda: random.choice([random.randint(0,19),
                                         random.lognormvariate(2.3, 1.3)]),
38
    "cache_index_and_filter_blocks": lambda: random.randint(0, 1),
39
    "cache_size": 1048576,
40
    "checkpoint_one_in": 1000000,
S
sdong 已提交
41
    "compression_type": lambda: random.choice(
42 43 44 45 46 47
        ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", "zstd"]),
    "bottommost_compression_type": lambda:
        "disable" if random.randint(0, 1) == 0 else
        random.choice(
            ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress",
             "zstd"]),
S
sdong 已提交
48
    "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]),
49 50
    "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
    "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
51 52 53
    # Disabled compression_parallel_threads as the feature is not stable
    # lambda: random.choice([1] * 9 + [4])
    "compression_parallel_threads": 1,
54
    "compression_max_dict_buffer_bytes": lambda: (1 << random.randint(0, 40)) - 1,
55
    "clear_column_family_one_in": 0,
56 57
    "compact_files_one_in": 1000000,
    "compact_range_one_in": 1000000,
58 59
    "delpercent": 4,
    "delrangepercent": 1,
60
    "destroy_db_initially": 0,
61
    "enable_pipelined_write": lambda: random.randint(0, 1),
62
    "enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
63
    "expected_values_path": lambda: setup_expected_values_file(),
64
    "fail_if_options_file_error": lambda: random.randint(0, 1),
65
    "flush_one_in": 1000000,
66
    "file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
67 68 69 70 71
    "get_live_files_one_in": 1000000,
    # Note: the following two are intentionally disabled as the corresponding
    # APIs are not guaranteed to succeed.
    "get_sorted_wal_files_one_in": 0,
    "get_current_wal_file_one_in": 0,
72
    # Temporarily disable hash index
73
    "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]),
74
    "iterpercent": 10,
75
    "mark_for_compaction_one_file_in": lambda: 10 * random.randint(0, 1),
76 77 78 79 80
    "max_background_compactions": 20,
    "max_bytes_for_level_base": 10485760,
    "max_key": 100000000,
    "max_write_buffer_number": 3,
    "mmap_read": lambda: random.randint(0, 1),
81
    "nooverwritepercent": 1,
82
    "open_files": lambda : random.choice([-1, -1, 100, 500000]),
83
    "optimize_filters_for_memory": lambda: random.randint(0, 1),
84
    "partition_filters": lambda: random.randint(0, 1),
85
    "partition_pinning": lambda: random.randint(0, 3),
86
    "pause_background_one_in": 1000000,
87 88 89
    "prefixpercent": 5,
    "progress_reports": 0,
    "readpercent": 45,
90
    "recycle_log_file_num": lambda: random.randint(0, 1),
91
    "reopen": 20,
92
    "snapshot_hold_ops": 100000,
93 94
    "sst_file_manager_bytes_per_sec": lambda: random.choice([0, 104857600]),
    "sst_file_manager_bytes_per_truncate": lambda: random.choice([0, 1048576]),
95
    "long_running_snapshots": lambda: random.randint(0, 1),
96
    "subcompactions": lambda: random.randint(1, 4),
97 98
    "target_file_size_base": 2097152,
    "target_file_size_multiplier": 2,
99 100
    "top_level_index_pinning": lambda: random.randint(0, 3),
    "unpartitioned_pinning": lambda: random.randint(0, 3),
101 102
    "use_direct_reads": lambda: random.randint(0, 1),
    "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
103
    "mock_direct_io": False,
104
    "use_clock_cache": 0, # currently broken
105 106
    "use_full_merge_v1": lambda: random.randint(0, 1),
    "use_merge": lambda: random.randint(0, 1),
107
    "use_ribbon_filter": lambda: random.randint(0, 1),
108 109 110
    "verify_checksum": 1,
    "write_buffer_size": 4 * 1024 * 1024,
    "writepercent": 35,
111
    "format_version": lambda: random.choice([2, 3, 4, 5, 5]),
112
    "index_block_restart_interval": lambda: random.choice(range(1, 16)),
A
anand76 已提交
113
    "use_multiget" : lambda: random.randint(0, 1),
114 115 116
    "periodic_compaction_seconds" :
        lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
    "compaction_ttl" : lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
117 118 119
    # Test small max_manifest_file_size in a smaller chance, as most of the
    # time we wnat manifest history to be preserved to help debug
    "max_manifest_file_size" : lambda : random.choice(
S
sdong 已提交
120 121 122
        [t * 16384 if t < 3 else 1024 * 1024 * 1024 for t in range(1, 30)]),
    # Sync mode might make test runs slower so running it in a smaller chance
    "sync" : lambda : random.choice(
123
        [1 if t == 0 else 0 for t in range(0, 20)]),
S
sdong 已提交
124 125 126
    # Disable compation_readahead_size because the test is not passing.
    #"compaction_readahead_size" : lambda : random.choice(
    #    [0, 0, 1024 * 1024]),
S
sdong 已提交
127 128 129 130
    "db_write_buffer_size" : lambda: random.choice(
        [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]),
    "avoid_unnecessary_blocking_io" : random.randint(0, 1),
    "write_dbid_to_manifest" : random.randint(0, 1),
131 132
    "avoid_flush_during_recovery" : random.choice(
        [1 if t == 0 else 0 for t in range(0, 8)]),
S
sdong 已提交
133 134
    "max_write_batch_group_size_bytes" : lambda: random.choice(
        [16, 64, 1024 * 1024, 16 * 1024 * 1024]),
135
    "level_compaction_dynamic_level_bytes" : True,
136 137
    "verify_checksum_one_in": 1000000,
    "verify_db_one_in": 100000,
138 139
    "continuous_verification_interval" : 0,
    "max_key_len": 3,
A
anand76 已提交
140
    "key_len_percent_dist": "1,30,69",
141
    "read_fault_one_in": lambda: random.choice([0, 1000]),
142 143 144
    "open_metadata_write_fault_one_in": lambda: random.choice([0, 0, 8]),
    "open_write_fault_one_in": lambda: random.choice([0, 0, 16]),
    "open_read_fault_one_in": lambda: random.choice([0, 0, 32]),
145 146
    "sync_fault_injection": False,
    "get_property_one_in": 1000000,
147
    "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
148 149
    "max_write_buffer_size_to_maintain": lambda: random.choice(
        [0, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024]),
150
    "user_timestamp_size": 0,
151
}
152

153
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
154 155
_DEBUG_LEVEL_ENV_VAR = 'DEBUG_LEVEL'

156
stress_cmd = "./db_stress"
157 158 159

def is_release_mode():
    return os.environ.get(_DEBUG_LEVEL_ENV_VAR) == "0"
160

161

162
def get_dbname(test_name):
163
    test_dir_name = "rocksdb_crashtest_" + test_name
164
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
S
sdong 已提交
165
    if test_tmpdir is None or test_tmpdir == "":
166
        dbname = tempfile.mkdtemp(prefix=test_dir_name)
S
sdong 已提交
167
    else:
168
        dbname = test_tmpdir + "/" + test_dir_name
169
        shutil.rmtree(dbname, True)
170
        os.mkdir(dbname)
171 172
    return dbname

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
expected_values_file = None
def setup_expected_values_file():
    global expected_values_file
    if expected_values_file is not None:
        return expected_values_file
    expected_file_name = "rocksdb_crashtest_" + "expected"
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
    if test_tmpdir is None or test_tmpdir == "":
        expected_values_file = tempfile.NamedTemporaryFile(
            prefix=expected_file_name, delete=False).name
    else:
        # if tmpdir is specified, store the expected_values_file in the same dir
        expected_values_file = test_tmpdir + "/" + expected_file_name
        if os.path.exists(expected_values_file):
            os.remove(expected_values_file)
        open(expected_values_file, 'a').close()
    return expected_values_file

191 192 193 194 195

def is_direct_io_supported(dbname):
    with tempfile.NamedTemporaryFile(dir=dbname) as f:
        try:
            os.open(f.name, os.O_DIRECT)
196
        except BaseException:
197 198 199 200
            return False
        return True


201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
blackbox_default_params = {
    # total time for this script to test db_stress
    "duration": 6000,
    # time for one db_stress instance to run
    "interval": 120,
    # since we will be killing anyway, use large value for ops_per_thread
    "ops_per_thread": 100000000,
    "set_options_one_in": 10000,
    "test_batches_snapshots": 1,
}

whitebox_default_params = {
    "duration": 10000,
    "log2_keys_per_lock": 10,
    "ops_per_thread": 200000,
216
    "random_kill_odd": 888887,
217
    "test_batches_snapshots": lambda: random.randint(0, 1),
218 219 220
}

simple_default_params = {
221
    "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
222
    "column_families": 1,
B
Baptiste Lemaire 已提交
223
    "experimental_allow_mempurge": 0,
224 225 226
    "max_background_compactions": 1,
    "max_bytes_for_level_base": 67108864,
    "memtablerep": "skip_list",
227 228 229
    "prefixpercent": 0,
    "readpercent": 50,
    "prefix_size" : -1,
230 231 232 233
    "target_file_size_base": 16777216,
    "target_file_size_multiplier": 1,
    "test_batches_snapshots": 0,
    "write_buffer_size": 32 * 1024 * 1024,
S
sdong 已提交
234
    "level_compaction_dynamic_level_bytes": False,
235
    "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
236 237 238 239 240 241 242
}

blackbox_simple_default_params = {
    "open_files": -1,
    "set_options_one_in": 0,
}

243
whitebox_simple_default_params = {}
244

245 246
cf_consistency_params = {
    "disable_wal": lambda: random.randint(0, 1),
247
    "reopen": 0,
248
    "test_cf_consistency": 1,
249 250 251
    # use small value for write_buffer_size so that RocksDB triggers flush
    # more frequently
    "write_buffer_size": 1024 * 1024,
252
    "enable_pipelined_write": lambda: random.randint(0, 1),
253 254 255
    # Snapshots are used heavily in this test mode, while they are incompatible
    # with compaction filter.
    "enable_compaction_filter": 0,
256 257
}

258 259
txn_params = {
    "use_txn" : 1,
260 261
    # Avoid lambda to set it once for the entire test
    "txn_write_policy": random.randint(0, 2),
262
    "unordered_write": random.randint(0, 1),
263 264 265 266 267 268
    "disable_wal": 0,
    # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
    "checkpoint_one_in": 0,
    # pipeline write is not currnetly compatible with WritePrepared txns
    "enable_pipelined_write": 0,
}
269

270 271 272 273 274 275 276
best_efforts_recovery_params = {
    "best_efforts_recovery": True,
    "skip_verifydb": True,
    "verify_db_one_in": 0,
    "continuous_verification_interval": 0,
}

277 278 279 280 281
blob_params = {
    "allow_setting_blob_options_dynamically": 1,
    # Enable blob files and GC with a 75% chance initially; note that they might still be
    # enabled/disabled during the test via SetOptions
    "enable_blob_files": lambda: random.choice([0] + [1] * 3),
282
    "min_blob_size": lambda: random.choice([0, 8, 16]),
283 284 285 286 287 288
    "blob_file_size": lambda: random.choice([1048576, 16777216, 268435456, 1073741824]),
    "blob_compression_type": lambda: random.choice(["none", "snappy", "lz4", "zstd"]),
    "enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3),
    "blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]),
}

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
ts_params = {
    "test_cf_consistency": 0,
    "test_batches_snapshots": 0,
    "user_timestamp_size": 8,
    "use_merge": 0,
    "use_full_merge_v1": 0,
    # In order to disable SingleDelete
    "nooverwritepercent": 0,
    "use_txn": 0,
    "read_only": 0,
    "secondary_catch_up_one_in": 0,
    "continuous_verification_interval": 0,
    "checkpoint_one_in": 0,
    "enable_blob_files": 0,
    "use_blob_db": 0,
    "enable_compaction_filter": 0,
    "ingest_external_file_one_in": 0,
}

308 309 310
def finalize_and_sanitize(src_params):
    dest_params = dict([(k,  v() if callable(v) else v)
                        for (k, v) in src_params.items()])
311 312 313 314
    if dest_params.get("compression_max_dict_bytes") == 0:
        dest_params["compression_zstd_max_train_bytes"] = 0
        dest_params["compression_max_dict_buffer_bytes"] = 0
    if dest_params.get("compression_type") != "zstd":
315
        dest_params["compression_zstd_max_train_bytes"] = 0
316
    if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
317
        dest_params["memtablerep"] = "skip_list"
318
    if dest_params["mmap_read"] == 1:
319 320
        dest_params["use_direct_io_for_flush_and_compaction"] = 0
        dest_params["use_direct_reads"] = 0
321 322 323 324
    if (dest_params["use_direct_io_for_flush_and_compaction"] == 1
            or dest_params["use_direct_reads"] == 1) and \
            not is_direct_io_supported(dest_params["db"]):
        if is_release_mode():
325 326 327 328 329
            print("{} does not support direct IO. Disabling use_direct_reads and "
                    "use_direct_io_for_flush_and_compaction.\n".format(
                        dest_params["db"]))
            dest_params["use_direct_reads"] = 0
            dest_params["use_direct_io_for_flush_and_compaction"] = 0
330 331 332
        else:
            dest_params["mock_direct_io"] = True

333 334 335 336
    # DeleteRange is not currnetly compatible with Txns and timestamp
    if (dest_params.get("test_batches_snapshots") == 1 or
        dest_params.get("use_txn") == 1 or
        dest_params.get("user_timestamp_size") > 0):
337 338
        dest_params["delpercent"] += dest_params["delrangepercent"]
        dest_params["delrangepercent"] = 0
339 340 341 342
    # Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
    if dest_params.get("unordered_write", 0) == 1:
        dest_params["txn_write_policy"] = 1
        dest_params["allow_concurrent_memtable_write"] = 1
343 344
    if dest_params.get("disable_wal", 0) == 1:
        dest_params["atomic_flush"] = 1
S
sdong 已提交
345
        dest_params["sync"] = 0
346
        dest_params["write_fault_one_in"] = 0
347 348 349 350 351
    if dest_params.get("open_files", 1) != -1:
        # Compaction TTL and periodic compactions are only compatible
        # with open_files = -1
        dest_params["compaction_ttl"] = 0
        dest_params["periodic_compaction_seconds"] = 0
352 353 354 355
    if dest_params.get("compaction_style", 0) == 2:
        # Disable compaction TTL in FIFO compaction, because right
        # now assertion failures are triggered.
        dest_params["compaction_ttl"] = 0
356
        dest_params["periodic_compaction_seconds"] = 0
357
    if dest_params["partition_filters"] == 1:
358 359 360 361
        if dest_params["index_type"] != 2:
            dest_params["partition_filters"] = 0
        else:
            dest_params["use_block_based_filter"] = 0
362 363 364
    if dest_params.get("atomic_flush", 0) == 1:
        # disable pipelined write when atomic flush is used.
        dest_params["enable_pipelined_write"] = 0
365 366
    if dest_params.get("sst_file_manager_bytes_per_sec", 0) == 0:
        dest_params["sst_file_manager_bytes_per_truncate"] = 0
367 368 369 370 371 372 373 374 375 376
    if dest_params.get("enable_compaction_filter", 0) == 1:
        # Compaction filter is incompatible with snapshots. Need to avoid taking
        # snapshots, as well as avoid operations that use snapshots for
        # verification.
        dest_params["acquire_snapshot_one_in"] = 0
        dest_params["compact_range_one_in"] = 0
        # Give the iterator ops away to reads.
        dest_params["readpercent"] += dest_params.get("iterpercent", 10)
        dest_params["iterpercent"] = 0
        dest_params["test_batches_snapshots"] = 0
377 378
    if dest_params.get("test_batches_snapshots") == 0:
        dest_params["batch_protection_bytes_per_key"] = 0
379 380
    return dest_params

381 382 383
def gen_cmd_params(args):
    params = {}

384 385 386 387 388
    params.update(default_params)
    if args.test_type == 'blackbox':
        params.update(blackbox_default_params)
    if args.test_type == 'whitebox':
        params.update(whitebox_default_params)
389 390 391 392 393 394
    if args.simple:
        params.update(simple_default_params)
        if args.test_type == 'blackbox':
            params.update(blackbox_simple_default_params)
        if args.test_type == 'whitebox':
            params.update(whitebox_simple_default_params)
395 396
    if args.cf_consistency:
        params.update(cf_consistency_params)
397 398
    if args.txn:
        params.update(txn_params)
399 400
    if args.test_best_efforts_recovery:
        params.update(best_efforts_recovery_params)
401 402
    if args.enable_ts:
        params.update(ts_params)
403

404 405 406
    # Best-effort recovery and BlobDB are currently incompatible. Test BE recovery
    # if specified on the command line; otherwise, apply BlobDB related overrides
    # with a 10% chance.
407 408 409
    if (not args.test_best_efforts_recovery and
        not args.enable_ts and
        random.choice([0] * 9 + [1]) == 1):
410 411
        params.update(blob_params)

412 413 414 415 416 417
    for k, v in vars(args).items():
        if v is not None:
            params[k] = v
    return params


418
def gen_cmd(params, unknown_params):
419
    finalzied_params = finalize_and_sanitize(params)
420
    cmd = [stress_cmd] + [
421
        '--{0}={1}'.format(k, v)
422
        for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
423
        if k not in set(['test_type', 'simple', 'duration', 'interval',
424
                         'random_kill_odd', 'cf_consistency', 'txn',
425
                         'test_best_efforts_recovery', 'enable_ts', 'stress_cmd'])
426
        and v is not None] + unknown_params
427 428 429
    return cmd


430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
# Inject inconsistency to db directory.
def inject_inconsistencies_to_db_dir(dir_path):
    files = os.listdir(dir_path)
    file_num_rgx = re.compile(r'(?P<number>[0-9]{6})')
    largest_fnum = 0
    for f in files:
        m = file_num_rgx.search(f)
        if m and not f.startswith('LOG'):
            largest_fnum = max(largest_fnum, int(m.group('number')))

    candidates = [
        f for f in files if re.search(r'[0-9]+\.sst', f)
    ]
    deleted = 0
    corrupted = 0
    for f in candidates:
        rnd = random.randint(0, 99)
        f_path = os.path.join(dir_path, f)
        if rnd < 10:
            os.unlink(f_path)
            deleted = deleted + 1
        elif 10 <= rnd and rnd < 30:
            with open(f_path, "a") as fd:
                fd.write('12345678')
            corrupted = corrupted + 1
    print('Removed %d table files' % deleted)
    print('Corrupted %d table files' % corrupted)

    # Add corrupted MANIFEST and SST
    for num in range(largest_fnum + 1, largest_fnum + 10):
        rnd = random.randint(0, 1)
        fname = ("MANIFEST-%06d" % num) if rnd == 0 else ("%06d.sst" % num)
        print('Write %s' % fname)
        with open(os.path.join(dir_path, fname), "w") as fd:
            fd.write("garbage")

466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
def execute_cmd(cmd, timeout):
    child = subprocess.Popen(cmd, stderr=subprocess.PIPE,
                             stdout=subprocess.PIPE)
    print("Running db_stress with pid=%d: %s\n\n"
          % (child.pid, ' '.join(cmd)))

    try:
        outs, errs = child.communicate(timeout=timeout)
        hit_timeout = False
        print("WARNING: db_stress ended before kill: exitcode=%d\n"
              % child.returncode)
    except subprocess.TimeoutExpired:
        hit_timeout = True
        child.kill()
        print("KILLED %d\n" % child.pid)
        outs, errs = child.communicate()

    return hit_timeout, child.returncode, outs.decode('utf-8'), errs.decode('utf-8')

485

486 487
# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
488
def blackbox_crash_main(args, unknown_args):
489
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
490
    dbname = get_dbname('blackbox')
491 492 493 494
    exit_time = time.time() + cmd_params['duration']

    print("Running blackbox-crash-test with \n"
          + "interval_between_crash=" + str(cmd_params['interval']) + "\n"
495
          + "total-duration=" + str(cmd_params['duration']) + "\n")
I
Igor Canadi 已提交
496

497
    while time.time() < exit_time:
498
        cmd = gen_cmd(dict(
499 500
            list(cmd_params.items())
            + list({'db': dbname}.items())), unknown_args)
501

502 503 504
        hit_timeout, retcode, outs, errs = execute_cmd(cmd, cmd_params['interval'])

        if not hit_timeout:
505
            print('Exit Before Killing')
506 507 508 509 510 511 512 513
            print('stdout:')
            print(outs)
            print('stderr:')
            print(errs)
            sys.exit(2)

        for line in errs.split('\n'):
            if line != '' and  not line.startswith('WARNING'):
514
                run_had_errors = True
515 516
                print('stderr has error message:')
                print('***' + line + '***')
517

518
        time.sleep(1)  # time to stabilize before the next run
519

520 521 522 523 524
        if args.test_best_efforts_recovery:
            inject_inconsistencies_to_db_dir(dbname)

        time.sleep(1)  # time to stabilize before the next run

I
Igor Canadi 已提交
525
    # we need to clean up after ourselves -- only do this on test success
S
Shusen Liu 已提交
526
    shutil.rmtree(dbname, True)
I
Igor Canadi 已提交
527

528 529 530

# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
531
def whitebox_crash_main(args, unknown_args):
532
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
533
    dbname = get_dbname('whitebox')
534 535 536

    cur_time = time.time()
    exit_time = cur_time + cmd_params['duration']
537
    half_time = cur_time + cmd_params['duration'] // 2
538 539

    print("Running whitebox-crash-test with \n"
540
          + "total-duration=" + str(cmd_params['duration']) + "\n")
541 542 543

    total_check_mode = 4
    check_mode = 0
544
    kill_random_test = cmd_params['random_kill_odd']
545 546 547 548 549 550 551 552
    kill_mode = 0

    while time.time() < exit_time:
        if check_mode == 0:
            additional_opts = {
                # use large ops per thread since we will kill it anyway
                "ops_per_thread": 100 * cmd_params['ops_per_thread'],
            }
553 554 555 556
            # run with kill_random_test, with three modes.
            # Mode 0 covers all kill points. Mode 1 covers less kill points but
            # increases change of triggering them. Mode 2 covers even less
            # frequent kill points and further increases triggering change.
557 558 559 560 561
            if kill_mode == 0:
                additional_opts.update({
                    "kill_random_test": kill_random_test,
                })
            elif kill_mode == 1:
562
                if cmd_params.get('disable_wal', 0) == 1:
563
                    my_kill_odd = kill_random_test // 50 + 1
564
                else:
565
                    my_kill_odd = kill_random_test // 10 + 1
566
                additional_opts.update({
567
                    "kill_random_test": my_kill_odd,
568
                    "kill_exclude_prefixes": "WritableFileWriter::Append,"
569 570
                    + "WritableFileWriter::WriteBuffered",
                })
571
            elif kill_mode == 2:
572 573
                # TODO: May need to adjust random odds if kill_random_test
                # is too small.
574
                additional_opts.update({
575
                    "kill_random_test": (kill_random_test // 5000 + 1),
576
                    "kill_exclude_prefixes": "WritableFileWriter::Append,"
577 578 579 580 581
                    "WritableFileWriter::WriteBuffered,"
                    "PosixMmapFile::Allocate,WritableFileWriter::Flush",
                })
            # Run kill mode 0, 1 and 2 by turn.
            kill_mode = (kill_mode + 1) % 3
582 583 584 585 586 587 588
        elif check_mode == 1:
            # normal run with universal compaction mode
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
                "compaction_style": 1,
            }
589 590 591 592 593 594
            # Single level universal has a lot of special logic. Ensure we cover
            # it sometimes.
            if random.randint(0, 1) == 1:
                additional_opts.update({
                    "num_levels": 1,
                })
595 596 597 598 599 600
        elif check_mode == 2:
            # normal run with FIFO compaction mode
            # ops_per_thread is divided by 5 because FIFO compaction
            # style is quite a bit slower on reads with lot of files
            additional_opts = {
                "kill_random_test": None,
601
                "ops_per_thread": cmd_params['ops_per_thread'] // 5,
602 603 604 605
                "compaction_style": 2,
            }
        else:
            # normal run
606
            additional_opts = {
607 608 609 610
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
            }

611 612 613
        cmd = gen_cmd(dict(list(cmd_params.items())
            + list(additional_opts.items())
            + list({'db': dbname}.items())), unknown_args)
614

615
        print("Running:" + ' '.join(cmd) + "\n")  # noqa: E999 T25377293 Grandfathered in
616

617 618 619 620 621 622 623 624
        # If the running time is 15 minutes over the run time, explicit kill and
        # exit even if white box kill didn't hit. This is to guarantee run time
        # limit, as if it runs as a job, running too long will create problems
        # for job scheduling or execution.
        # TODO detect a hanging condition. The job might run too long as RocksDB
        # hits a hanging bug.
        hit_timeout, retncode, stdoutdata, stderrdata = execute_cmd(
            cmd, exit_time - time.time() + 900)
625 626
        msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
               check_mode, additional_opts['kill_random_test'], retncode))
627

628 629
        print(msg)
        print(stdoutdata)
630
        print(stderrdata)
631

632 633 634 635
        if hit_timeout:
            print("Killing the run for running too long")
            break

636 637 638 639
        expected = False
        if additional_opts['kill_random_test'] is None and (retncode == 0):
            # we expect zero retncode if no kill option
            expected = True
640 641 642
        elif additional_opts['kill_random_test'] is not None and retncode <= 0:
            # When kill option is given, the test MIGHT kill itself.
            # If it does, negative retncode is expected. Otherwise 0.
643 644 645
            expected = True

        if not expected:
646
            print("TEST FAILED. See kill option and exit code above!!!\n")
647 648
            sys.exit(1)

649 650 651 652 653
        stderrdata = stderrdata.lower()
        errorcount = (stderrdata.count('error') -
                      stderrdata.count('got errors 0 times'))
        print("#times error occurred in output is " + str(errorcount) +
                "\n")
654 655

        if (errorcount > 0):
656
            print("TEST FAILED. Output has 'error'!!!\n")
657
            sys.exit(2)
658
        if (stderrdata.find('fail') >= 0):
659
            print("TEST FAILED. Output has 'fail'!!!\n")
660 661 662 663 664 665 666
            sys.exit(2)

        # First half of the duration, keep doing kill test. For the next half,
        # try different modes.
        if time.time() > half_time:
            # we need to clean up after ourselves -- only do this on test
            # success
S
Shusen Liu 已提交
667
            shutil.rmtree(dbname, True)
668
            os.mkdir(dbname)
669
            cmd_params.pop('expected_values_path', None)
670 671 672 673 674 675
            check_mode = (check_mode + 1) % total_check_mode

        time.sleep(1)  # time to stabilize after a kill


def main():
676 677
    global stress_cmd

678 679 680 681
    parser = argparse.ArgumentParser(description="This script runs and kills \
        db_stress multiple times")
    parser.add_argument("test_type", choices=["blackbox", "whitebox"])
    parser.add_argument("--simple", action="store_true")
682
    parser.add_argument("--cf_consistency", action='store_true')
683
    parser.add_argument("--txn", action='store_true')
684
    parser.add_argument("--test_best_efforts_recovery", action='store_true')
685
    parser.add_argument("--enable_ts", action='store_true')
686
    parser.add_argument("--stress_cmd")
687

688 689 690 691 692
    all_params = dict(list(default_params.items())
                      + list(blackbox_default_params.items())
                      + list(whitebox_default_params.items())
                      + list(simple_default_params.items())
                      + list(blackbox_simple_default_params.items())
693
                      + list(whitebox_simple_default_params.items())
694 695
                      + list(blob_params.items())
                      + list(ts_params.items()))
696 697 698

    for k, v in all_params.items():
        parser.add_argument("--" + k, type=type(v() if callable(v) else v))
699 700
    # unknown_args are passed directly to db_stress
    args, unknown_args = parser.parse_known_args()
701

702 703 704 705 706 707
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
    if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
        print('%s env var is set to a non-existent directory: %s' %
                (_TEST_DIR_ENV_VAR, test_tmpdir))
        sys.exit(1)

708 709
    if args.stress_cmd:
        stress_cmd = args.stress_cmd
710
    if args.test_type == 'blackbox':
711
        blackbox_crash_main(args, unknown_args)
712
    if args.test_type == 'whitebox':
713
        whitebox_crash_main(args, unknown_args)
714 715 716 717
    # Only delete the `expected_values_file` if test passes
    if os.path.exists(expected_values_file):
        os.remove(expected_values_file)

718 719 720

if __name__ == '__main__':
    main()