db_crashtest.py 14.2 KB
Newer Older
1
#! /usr/bin/env python
2 3 4
import os
import sys
import time
5
import random
6
import tempfile
7
import subprocess
I
Igor Canadi 已提交
8
import shutil
9
import argparse
10

11 12
# params overwrite priority:
#   for default:
13
#       default_params < {blackbox,whitebox}_default_params < args
14
#   for simple:
15 16 17
#       default_params < {blackbox,whitebox}_default_params <
#       simple_default_params <
#       {blackbox,whitebox}_simple_default_params < args
18 19 20
#   for enable_atomic_flush:
#       default_params < {blackbox,whitebox}_default_params <
#       atomic_flush_params < args
21

22 23
expected_values_file = tempfile.NamedTemporaryFile()

24
default_params = {
25
    "acquire_snapshot_one_in": 10000,
26 27
    "block_size": 16384,
    "cache_size": 1048576,
28
    "checkpoint_one_in": 1000000,
29 30
    "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
    "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
31
    "clear_column_family_one_in": 0,
32 33
    "compact_files_one_in": 1000000,
    "compact_range_one_in": 1000000,
34 35
    "delpercent": 4,
    "delrangepercent": 1,
36
    "destroy_db_initially": 0,
37
    "enable_pipelined_write": lambda: random.randint(0, 1),
38
    "expected_values_path": expected_values_file.name,
39
    "flush_one_in": 1000000,
40 41 42 43 44
    "max_background_compactions": 20,
    "max_bytes_for_level_base": 10485760,
    "max_key": 100000000,
    "max_write_buffer_number": 3,
    "mmap_read": lambda: random.randint(0, 1),
45
    "nooverwritepercent": 1,
46 47 48 49 50
    "open_files": 500000,
    "prefixpercent": 5,
    "progress_reports": 0,
    "readpercent": 45,
    "reopen": 20,
51
    "snapshot_hold_ops": 100000,
52
    "subcompactions": lambda: random.randint(1, 4),
53 54
    "target_file_size_base": 2097152,
    "target_file_size_multiplier": 2,
55 56
    "use_direct_reads": lambda: random.randint(0, 1),
    "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
57 58
    "use_full_merge_v1": lambda: random.randint(0, 1),
    "use_merge": lambda: random.randint(0, 1),
59 60 61
    "verify_checksum": 1,
    "write_buffer_size": 4 * 1024 * 1024,
    "writepercent": 35,
62
    "format_version": lambda: random.randint(2, 4),
63
    "index_block_restart_interval": lambda: random.choice(range(1, 16)),
64
}
65

66 67
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

68

69
def get_dbname(test_name):
70
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
S
sdong 已提交
71
    if test_tmpdir is None or test_tmpdir == "":
72
        dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
S
sdong 已提交
73
    else:
74
        dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
75
        shutil.rmtree(dbname, True)
76
        os.mkdir(dbname)
77 78
    return dbname

79 80 81 82 83 84 85 86 87 88

def is_direct_io_supported(dbname):
    with tempfile.NamedTemporaryFile(dir=dbname) as f:
        try:
            os.open(f.name, os.O_DIRECT)
        except:
            return False
        return True


89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
blackbox_default_params = {
    # total time for this script to test db_stress
    "duration": 6000,
    # time for one db_stress instance to run
    "interval": 120,
    # since we will be killing anyway, use large value for ops_per_thread
    "ops_per_thread": 100000000,
    "set_options_one_in": 10000,
    "test_batches_snapshots": 1,
}

whitebox_default_params = {
    "duration": 10000,
    "log2_keys_per_lock": 10,
    "ops_per_thread": 200000,
104
    "random_kill_odd": 888887,
105
    "test_batches_snapshots": lambda: random.randint(0, 1),
106 107 108
}

simple_default_params = {
109
    "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
110
    "column_families": 1,
111 112 113
    "max_background_compactions": 1,
    "max_bytes_for_level_base": 67108864,
    "memtablerep": "skip_list",
114 115
    "prefixpercent": 25,
    "readpercent": 25,
116 117 118 119 120 121 122 123 124 125 126
    "target_file_size_base": 16777216,
    "target_file_size_multiplier": 1,
    "test_batches_snapshots": 0,
    "write_buffer_size": 32 * 1024 * 1024,
}

blackbox_simple_default_params = {
    "open_files": -1,
    "set_options_one_in": 0,
}

127
whitebox_simple_default_params = {}
128

129 130 131
atomic_flush_params = {
    "disable_wal": 1,
    "reopen": 0,
132
    "test_atomic_flush": 1,
133 134 135 136 137
    # use small value for write_buffer_size so that RocksDB triggers flush
    # more frequently
    "write_buffer_size": 1024 * 1024,
}

138

139 140 141
def finalize_and_sanitize(src_params):
    dest_params = dict([(k,  v() if callable(v) else v)
                        for (k, v) in src_params.items()])
142 143 144
    if dest_params.get("compression_type") != "zstd" or \
            dest_params.get("compression_max_dict_bytes") == 0:
        dest_params["compression_zstd_max_train_bytes"] = 0
145
    if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
146
        dest_params["memtablerep"] = "skip_list"
147 148 149 150
    if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
            dest_params["db"]):
        dest_params["use_direct_io_for_flush_and_compaction"] = 0
        dest_params["use_direct_reads"] = 0
151 152 153
    if dest_params.get("test_batches_snapshots") == 1:
        dest_params["delpercent"] += dest_params["delrangepercent"]
        dest_params["delrangepercent"] = 0
154 155 156
    return dest_params


157 158 159
def gen_cmd_params(args):
    params = {}

160 161 162 163 164
    params.update(default_params)
    if args.test_type == 'blackbox':
        params.update(blackbox_default_params)
    if args.test_type == 'whitebox':
        params.update(whitebox_default_params)
165 166 167 168 169 170
    if args.simple:
        params.update(simple_default_params)
        if args.test_type == 'blackbox':
            params.update(blackbox_simple_default_params)
        if args.test_type == 'whitebox':
            params.update(whitebox_simple_default_params)
171 172
    if args.enable_atomic_flush:
        params.update(atomic_flush_params)
173 174 175 176 177 178 179

    for k, v in vars(args).items():
        if v is not None:
            params[k] = v
    return params


180
def gen_cmd(params, unknown_params):
181
    cmd = ['./db_stress'] + [
182 183
        '--{0}={1}'.format(k, v)
        for k, v in finalize_and_sanitize(params).items()
184
        if k not in set(['test_type', 'simple', 'duration', 'interval',
185
                         'random_kill_odd', 'enable_atomic_flush'])
186
        and v is not None] + unknown_params
187 188 189 190 191
    return cmd


# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
192
def blackbox_crash_main(args, unknown_args):
193
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
194
    dbname = get_dbname('blackbox')
195 196 197 198
    exit_time = time.time() + cmd_params['duration']

    print("Running blackbox-crash-test with \n"
          + "interval_between_crash=" + str(cmd_params['interval']) + "\n"
199
          + "total-duration=" + str(cmd_params['duration']) + "\n")
I
Igor Canadi 已提交
200

201 202
    while time.time() < exit_time:
        run_had_errors = False
203 204
        killtime = time.time() + cmd_params['interval']

205 206
        cmd = gen_cmd(dict(
            cmd_params.items() +
207
            {'db': dbname}.items()), unknown_args)
208

209
        child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
210
        print("Running db_stress with pid=%d: %s\n\n"
211
              % (child.pid, ' '.join(cmd)))
212

L
Lei Jin 已提交
213
        stop_early = False
214
        while time.time() < killtime:
L
Lei Jin 已提交
215 216 217 218 219 220
            if child.poll() is not None:
                print("WARNING: db_stress ended before kill: exitcode=%d\n"
                      % child.returncode)
                stop_early = True
                break
            time.sleep(1)
221

L
Lei Jin 已提交
222 223 224 225 226 227 228 229
        if not stop_early:
            if child.poll() is not None:
                print("WARNING: db_stress ended before kill: exitcode=%d\n"
                      % child.returncode)
            else:
                child.kill()
                print("KILLED %d\n" % child.pid)
                time.sleep(1)  # time to stabilize after a kill
230 231 232

        while True:
            line = child.stderr.readline().strip()
233 234 235
            if line == '':
                break
            elif not line.startswith('WARNING'):
236
                run_had_errors = True
237 238
                print('stderr has error message:')
                print('***' + line + '***')
239

240 241 242 243
        if run_had_errors:
            sys.exit(2)

        time.sleep(1)  # time to stabilize before the next run
244

I
Igor Canadi 已提交
245
    # we need to clean up after ourselves -- only do this on test success
S
Shusen Liu 已提交
246
    shutil.rmtree(dbname, True)
I
Igor Canadi 已提交
247

248 249 250

# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
251
def whitebox_crash_main(args, unknown_args):
252
    cmd_params = gen_cmd_params(args)
S
Shusen Liu 已提交
253
    dbname = get_dbname('whitebox')
254 255 256 257 258 259

    cur_time = time.time()
    exit_time = cur_time + cmd_params['duration']
    half_time = cur_time + cmd_params['duration'] / 2

    print("Running whitebox-crash-test with \n"
260
          + "total-duration=" + str(cmd_params['duration']) + "\n")
261 262 263

    total_check_mode = 4
    check_mode = 0
264
    kill_random_test = cmd_params['random_kill_odd']
265 266 267 268 269 270 271 272
    kill_mode = 0

    while time.time() < exit_time:
        if check_mode == 0:
            additional_opts = {
                # use large ops per thread since we will kill it anyway
                "ops_per_thread": 100 * cmd_params['ops_per_thread'],
            }
273 274 275 276
            # run with kill_random_test, with three modes.
            # Mode 0 covers all kill points. Mode 1 covers less kill points but
            # increases change of triggering them. Mode 2 covers even less
            # frequent kill points and further increases triggering change.
277 278 279 280 281 282
            if kill_mode == 0:
                additional_opts.update({
                    "kill_random_test": kill_random_test,
                })
            elif kill_mode == 1:
                additional_opts.update({
283
                    "kill_random_test": (kill_random_test / 10 + 1),
284 285 286
                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
                    + "WritableFileWriter::WriteBuffered",
                })
287
            elif kill_mode == 2:
288 289
                # TODO: May need to adjust random odds if kill_random_test
                # is too small.
290
                additional_opts.update({
291
                    "kill_random_test": (kill_random_test / 5000 + 1),
292 293 294 295 296 297
                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
                    "WritableFileWriter::WriteBuffered,"
                    "PosixMmapFile::Allocate,WritableFileWriter::Flush",
                })
            # Run kill mode 0, 1 and 2 by turn.
            kill_mode = (kill_mode + 1) % 3
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
        elif check_mode == 1:
            # normal run with universal compaction mode
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
                "compaction_style": 1,
            }
        elif check_mode == 2:
            # normal run with FIFO compaction mode
            # ops_per_thread is divided by 5 because FIFO compaction
            # style is quite a bit slower on reads with lot of files
            additional_opts = {
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'] / 5,
                "compaction_style": 2,
            }
        else:
            # normal run
316
            additional_opts = {
317 318 319 320
                "kill_random_test": None,
                "ops_per_thread": cmd_params['ops_per_thread'],
            }

S
Shusen Liu 已提交
321
        cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
322
                           + {'db': dbname}.items()), unknown_args)
323

M
Mark Isaacson 已提交
324
        print "Running:" + ' '.join(cmd) + "\n"  # noqa: E999 T25377293 Grandfathered in
325

326 327
        popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                 stderr=subprocess.STDOUT)
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
        stdoutdata, stderrdata = popen.communicate()
        retncode = popen.returncode
        msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
               check_mode, additional_opts['kill_random_test'], retncode))
        print msg
        print stdoutdata

        expected = False
        if additional_opts['kill_random_test'] is None and (retncode == 0):
            # we expect zero retncode if no kill option
            expected = True
        elif additional_opts['kill_random_test'] is not None and retncode < 0:
            # we expect negative retncode if kill option was given
            expected = True

        if not expected:
            print "TEST FAILED. See kill option and exit code above!!!\n"
            sys.exit(1)

        stdoutdata = stdoutdata.lower()
        errorcount = (stdoutdata.count('error') -
                      stdoutdata.count('got errors 0 times'))
        print "#times error occurred in output is " + str(errorcount) + "\n"

        if (errorcount > 0):
            print "TEST FAILED. Output has 'error'!!!\n"
            sys.exit(2)
        if (stdoutdata.find('fail') >= 0):
            print "TEST FAILED. Output has 'fail'!!!\n"
            sys.exit(2)

        # First half of the duration, keep doing kill test. For the next half,
        # try different modes.
        if time.time() > half_time:
            # we need to clean up after ourselves -- only do this on test
            # success
S
Shusen Liu 已提交
364
            shutil.rmtree(dbname, True)
365
            os.mkdir(dbname)
366
            cmd_params.pop('expected_values_path', None)
367 368 369 370 371 372 373 374 375 376
            check_mode = (check_mode + 1) % total_check_mode

        time.sleep(1)  # time to stabilize after a kill


def main():
    parser = argparse.ArgumentParser(description="This script runs and kills \
        db_stress multiple times")
    parser.add_argument("test_type", choices=["blackbox", "whitebox"])
    parser.add_argument("--simple", action="store_true")
377
    parser.add_argument("--enable_atomic_flush", action='store_true')
378 379 380 381 382 383 384 385 386 387

    all_params = dict(default_params.items()
                      + blackbox_default_params.items()
                      + whitebox_default_params.items()
                      + simple_default_params.items()
                      + blackbox_simple_default_params.items()
                      + whitebox_simple_default_params.items())

    for k, v in all_params.items():
        parser.add_argument("--" + k, type=type(v() if callable(v) else v))
388 389
    # unknown_args are passed directly to db_stress
    args, unknown_args = parser.parse_known_args()
390

391 392 393 394 395 396
    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
    if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
        print('%s env var is set to a non-existent directory: %s' %
                (_TEST_DIR_ENV_VAR, test_tmpdir))
        sys.exit(1)

397
    if args.test_type == 'blackbox':
398
        blackbox_crash_main(args, unknown_args)
399
    if args.test_type == 'whitebox':
400
        whitebox_crash_main(args, unknown_args)
401 402 403

if __name__ == '__main__':
    main()