db_bench_tool.cc 227.0 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

L
liuhuahang 已提交
10
#ifndef __STDC_FORMAT_MACROS
11
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
12
#endif
13

J
Jonathan Wiepert 已提交
14
#ifdef GFLAGS
15 16 17 18
#ifdef NUMA
#include <numa.h>
#include <numaif.h>
#endif
D
Dmitri Smirnov 已提交
19
#ifndef OS_WIN
20
#include <unistd.h>
D
Dmitri Smirnov 已提交
21
#endif
22
#include <fcntl.h>
23
#include <inttypes.h>
24
#include <math.h>
J
jorlow@chromium.org 已提交
25 26
#include <stdio.h>
#include <stdlib.h>
27
#include <sys/types.h>
28 29
#include <atomic>
#include <condition_variable>
30
#include <cstddef>
S
Siying Dong 已提交
31
#include <memory>
32 33
#include <mutex>
#include <thread>
34
#include <unordered_map>
35

36
#include "db/db_impl/db_impl.h"
37
#include "db/malloc_stats.h"
J
jorlow@chromium.org 已提交
38
#include "db/version_set.h"
A
agiardullo 已提交
39
#include "hdfs/env_hdfs.h"
40 41
#include "monitoring/histogram.h"
#include "monitoring/statistics.h"
42
#include "options/cf_options.h"
A
agiardullo 已提交
43 44
#include "port/port.h"
#include "port/stack_trace.h"
45 46 47
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
48
#include "rocksdb/filter_policy.h"
A
agiardullo 已提交
49 50 51
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
52
#include "rocksdb/persistent_cache.h"
S
sdong 已提交
53
#include "rocksdb/rate_limiter.h"
A
agiardullo 已提交
54
#include "rocksdb/slice.h"
55
#include "rocksdb/slice_transform.h"
56
#include "rocksdb/utilities/object_registry.h"
A
agiardullo 已提交
57
#include "rocksdb/utilities/optimistic_transaction_db.h"
58
#include "rocksdb/utilities/options_util.h"
59
#include "rocksdb/utilities/sim_cache.h"
A
agiardullo 已提交
60 61
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
A
agiardullo 已提交
62
#include "rocksdb/write_batch.h"
63 64
#include "test_util/testutil.h"
#include "test_util/transaction_test_util.h"
S
Siying Dong 已提交
65
#include "util/cast_util.h"
I
Igor Canadi 已提交
66
#include "util/compression.h"
A
agiardullo 已提交
67
#include "util/crc32c.h"
A
Andrew Kryczka 已提交
68
#include "util/gflags_compat.h"
69
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
70
#include "util/random.h"
71
#include "util/stderr_logger.h"
A
agiardullo 已提交
72
#include "util/string_util.h"
I
xxHash  
Igor Canadi 已提交
73
#include "util/xxhash.h"
74
#include "utilities/blob_db/blob_db.h"
D
Deon Nicholas 已提交
75
#include "utilities/merge_operators.h"
P
Pooya Shareghi 已提交
76
#include "utilities/merge_operators/bytesxor.h"
77
#include "utilities/persistent_cache/block_cache_tier.h"
J
jorlow@chromium.org 已提交
78

D
Dmitri Smirnov 已提交
79
#ifdef OS_WIN
S
sdong 已提交
80
#include <io.h>  // open/close
D
Dmitri Smirnov 已提交
81 82
#endif

A
Andrew Kryczka 已提交
83 84 85
using GFLAGS_NAMESPACE::ParseCommandLineFlags;
using GFLAGS_NAMESPACE::RegisterFlagValidator;
using GFLAGS_NAMESPACE::SetUsageMessage;
T
Tyler Harter 已提交
86

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
DEFINE_string(
    benchmarks,
    "fillseq,"
    "fillseqdeterministic,"
    "fillsync,"
    "fillrandom,"
    "filluniquerandomdeterministic,"
    "overwrite,"
    "readrandom,"
    "newiterator,"
    "newiteratorwhilewriting,"
    "seekrandom,"
    "seekrandomwhilewriting,"
    "seekrandomwhilemerging,"
    "readseq,"
    "readreverse,"
    "compact,"
104
    "compactall,"
105
    "multireadrandom,"
106
    "mixgraph,"
107 108 109 110 111
    "readseq,"
    "readtocache,"
    "readreverse,"
    "readwhilewriting,"
    "readwhilemerging,"
Y
Yi Wu 已提交
112
    "readwhilescanning,"
113 114
    "readrandomwriterandom,"
    "updaterandom,"
P
Pooya Shareghi 已提交
115
    "xorupdaterandom,"
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
    "randomwithverify,"
    "fill100K,"
    "crc32c,"
    "xxhash,"
    "compress,"
    "uncompress,"
    "acquireload,"
    "fillseekseq,"
    "randomtransaction,"
    "randomreplacekeys,"
    "timeseries",

    "Comma-separated list of operations to run in the specified"
    " order. Available benchmarks:\n"
    "\tfillseq       -- write N values in sequential key"
    " order in async mode\n"
    "\tfillseqdeterministic       -- write N values in the specified"
    " key order and keep the shape of the LSM tree\n"
    "\tfillrandom    -- write N values in random key order in async"
    " mode\n"
    "\tfilluniquerandomdeterministic       -- write N values in a random"
    " key order and keep the shape of the LSM tree\n"
    "\toverwrite     -- overwrite N values in random key order in"
    " async mode\n"
    "\tfillsync      -- write N/100 values in random key order in "
    "sync mode\n"
    "\tfill100K      -- write N/1000 100K values in random order in"
    " async mode\n"
    "\tdeleteseq     -- delete N keys in sequential order\n"
    "\tdeleterandom  -- delete N keys in random order\n"
    "\treadseq       -- read N times sequentially\n"
    "\treadtocache   -- 1 thread reading database sequentially\n"
    "\treadreverse   -- read N times in reverse order\n"
    "\treadrandom    -- read N times in random order\n"
    "\treadmissing   -- read N missing keys in random order\n"
    "\treadwhilewriting      -- 1 writer, N threads doing random "
    "reads\n"
    "\treadwhilemerging      -- 1 merger, N threads doing random "
    "reads\n"
Y
Yi Wu 已提交
155 156
    "\treadwhilescanning     -- 1 thread doing full table scan, "
    "N threads doing random reads\n"
157 158 159 160
    "\treadrandomwriterandom -- N threads doing random-read, "
    "random-write\n"
    "\tupdaterandom  -- N threads doing read-modify-write for random "
    "keys\n"
P
Pooya Shareghi 已提交
161 162
    "\txorupdaterandom  -- N threads doing read-XOR-write for "
    "random keys\n"
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
    "\tappendrandom  -- N threads doing read-modify-write with "
    "growing values\n"
    "\tmergerandom   -- same as updaterandom/appendrandom using merge"
    " operator. "
    "Must be used with merge_operator\n"
    "\treadrandommergerandom -- perform N random read-or-merge "
    "operations. Must be used with merge_operator\n"
    "\tnewiterator   -- repeated iterator creation\n"
    "\tseekrandom    -- N random seeks, call Next seek_nexts times "
    "per seek\n"
    "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
    "overwrite\n"
    "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
    "merge\n"
    "\tcrc32c        -- repeated crc32c of 4K of data\n"
    "\txxhash        -- repeated xxHash of 4K of data\n"
    "\tacquireload   -- load N*1000 times\n"
    "\tfillseekseq   -- write N values in sequential key, then read "
    "them by seeking to each key\n"
    "\trandomtransaction     -- execute N random transactions and "
    "verify correctness\n"
    "\trandomreplacekeys     -- randomly replaces N keys by deleting "
    "the old version and putting the new version\n\n"
    "\ttimeseries            -- 1 writer generates time series data "
    "and multiple readers doing random reads on id\n\n"
    "Meta operations:\n"
A
Aaron Gao 已提交
189 190
    "\tcompact     -- Compact the entire DB; If multiple, randomly choose one\n"
    "\tcompactall  -- Compact the entire DB\n"
191
    "\tstats       -- Print DB stats\n"
S
Siying Dong 已提交
192
    "\tresetstats  -- Reset DB stats\n"
193 194
    "\tlevelstats  -- Print the number of files and bytes per level\n"
    "\tsstables    -- Print sstable info\n"
195 196
    "\theapprofile -- Dump a heap profile (if supported by this port)\n"
    "\treplay      -- replay the trace file specified with trace_file\n");
197 198 199 200 201 202 203 204

DEFINE_int64(num, 1000000, "Number of key/values to place in database");

DEFINE_int64(numdistinct, 1000,
             "Number of distinct keys to use. Used in RandomWithVerify to "
             "read/write on fewer keys so that gets are more likely to find the"
             " key and puts are more likely to update the same key");

205 206 207 208
DEFINE_int64(merge_keys, -1,
             "Number of distinct keys to use for MergeRandom and "
             "ReadRandomMergeRandom. "
             "If negative, there will be FLAGS_num keys.");
209
DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
210

211
DEFINE_int32(
212
    num_hot_column_families, 0,
213 214 215 216 217
    "Number of Hot Column Families. If more than 0, only write to this "
    "number of column families. After finishing all the writes to them, "
    "create new set of column families and insert to them. Only used "
    "when num_column_families > 1.");

218 219 220 221 222 223 224 225 226
DEFINE_string(column_family_distribution, "",
              "Comma-separated list of percentages, where the ith element "
              "indicates the probability of an op using the ith column family. "
              "The number of elements must be `num_hot_column_families` if "
              "specified; otherwise, it must be `num_column_families`. The "
              "sum of elements must be 100. E.g., if `num_column_families=4`, "
              "and `num_hot_column_families=0`, a valid list could be "
              "\"10,20,30,40\".");

227 228 229
DEFINE_int64(reads, -1, "Number of read operations to do.  "
             "If negative, do FLAGS_num reads.");

Y
Yueh-Hsuan Chiang 已提交
230 231 232
DEFINE_int64(deletes, -1, "Number of delete operations to do.  "
             "If negative, do FLAGS_num deletions.");

L
Lei Jin 已提交
233 234
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");

235 236 237 238 239 240 241 242 243
DEFINE_int64(seed, 0, "Seed base for random number generators. "
             "When 0 it is deterministic.");

DEFINE_int32(threads, 1, "Number of concurrent threads to run.");

DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
             " When 0 then num & reads determine the test duration");

DEFINE_int32(value_size, 100, "Size of each value");
T
Tyler Harter 已提交
244

245 246
DEFINE_int32(seek_nexts, 0,
             "How many times to call Next() after Seek() in "
247 248
             "fillseekseq, seekrandom, seekrandomwhilewriting and "
             "seekrandomwhilemerging");
T
Tomislav Novak 已提交
249

M
Mark Callaghan 已提交
250 251 252 253
DEFINE_bool(reverse_iterator, false,
            "When true use Prev rather than Next for iterators that do "
            "Seek and then Next");

254 255 256 257
DEFINE_int64(max_scan_distance, 0,
             "Used to define iterate_upper_bound (or iterate_lower_bound "
             "if FLAGS_reverse_iterator is set to true) when value is nonzero");

258
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
259

260 261
DEFINE_int64(batch_size, 1, "Batch size");

A
Andrew Kryczka 已提交
262
static bool ValidateKeySize(const char* /*flagname*/, int32_t /*value*/) {
263 264
  return true;
}
265

266 267
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
  if (value > std::numeric_limits<uint32_t>::max()) {
268
    fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
269 270 271 272 273 274
            (unsigned long)value);
    return false;
  }
  return true;
}

275
DEFINE_int32(key_size, 16, "size of each key");
276

277 278 279
DEFINE_int32(num_multi_db, 0,
             "Number of DBs used in the benchmark. 0 means single DB.");

280 281
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
              " to this fraction of their original size after compression");
J
jorlow@chromium.org 已提交
282

283 284
DEFINE_double(read_random_exp_range, 0.0,
              "Read random's key will be generated using distribution of "
285
              "num * exp(-r) where r is uniform number from 0 to this value. "
286 287 288
              "The larger the number is, the more skewed the reads are. "
              "Only used in readrandom and multireadrandom benchmarks.");

289
DEFINE_bool(histogram, false, "Print histogram of operation timings");
J
jorlow@chromium.org 已提交
290

291 292 293 294 295 296 297 298
DEFINE_bool(enable_numa, false,
            "Make operations aware of NUMA architecture and bind memory "
            "and cpus corresponding to nodes together. In NUMA, memory "
            "in same node as CPUs are closer when compared to memory in "
            "other nodes. Reads can be faster when the process is bound to "
            "CPU and memory of same node. Use \"$numactl --hardware\" command "
            "to see NUMA memory architecture.");

299 300 301
DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
             "Number of bytes to buffer in all memtables before compacting");

302 303 304
DEFINE_bool(cost_write_buffer_to_cache, false,
            "The usage of memtable is costed to the block cache");

305
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
306
             "Number of bytes to buffer in memtable before compacting");
307

308 309 310
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. Each memtable is of size"
Y
Yanqin Jin 已提交
311
             " write_buffer_size bytes.");
312

313 314 315 316 317 318 319 320 321 322
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together"
             "before writing to storage. This is cheap because it is an"
             "in-memory merge. If this feature is not enabled, then all these"
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check"
             " in all of these files. Also, an in-memory merge may result in"
             " writing less data to storage if there are duplicate records "
             " in each of these individual write buffers.");
323

324 325 326 327 328 329 330 331 332 333 334 335 336 337
DEFINE_int32(max_write_buffer_number_to_maintain,
             rocksdb::Options().max_write_buffer_number_to_maintain,
             "The total maximum number of write buffers to maintain in memory "
             "including copies of buffers that have already been flushed. "
             "Unlike max_write_buffer_number, this parameter does not affect "
             "flushing. This controls the minimum amount of write history "
             "that will be available in memory for conflict checking when "
             "Transactions are used. If this value is too low, some "
             "transactions may fail at commit time due to not being able to "
             "determine whether there were any write conflicts. Setting this "
             "value to 0 will cause write buffers to be freed immediately "
             "after they are flushed.  If this value is set to -1, "
             "'max_write_buffer_number' will be used.");

338 339 340 341 342
DEFINE_int32(max_background_jobs,
             rocksdb::Options().max_background_jobs,
             "The maximum number of concurrent background jobs that can occur "
             "in parallel.");

343 344 345 346
DEFINE_int32(num_bottom_pri_threads, 0,
             "The number of threads in the bottom-priority thread pool (used "
             "by universal compaction only).");

347 348 349 350 351 352 353 354
DEFINE_int32(num_high_pri_threads, 0,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");

DEFINE_int32(num_low_pri_threads, 0,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");

355 356 357 358
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");
359

360
DEFINE_int32(base_background_compactions, -1, "DEPRECATED");
361

362
DEFINE_uint64(subcompactions, 1,
363 364 365
              "Maximum number of subcompactions to divide L0-L1 compactions "
              "into.");
static const bool FLAGS_subcompactions_dummy
T
Tamir Duberstein 已提交
366
    __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_subcompactions,
367
                                                    &ValidateUint32Range);
368

369 370 371 372 373
DEFINE_int32(max_background_flushes,
             rocksdb::Options().max_background_flushes,
             "The maximum number of concurrent background flushes"
             " that can occur in parallel.");

374 375
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
376
             "style of compaction: level-based, universal and fifo");
377

378
static rocksdb::CompactionPri FLAGS_compaction_pri_e;
379
DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
380 381
             "priority of files to compaction: by size or by data age");

382 383 384
DEFINE_int32(universal_size_ratio, 0,
             "Percentage flexibility while comparing file size"
             " (for universal compaction only).");
385

386 387
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
             " single compaction run (for universal compaction only).");
388

389 390
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
391

392 393
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
394

395 396 397 398
DEFINE_int32(universal_compression_size_percent, -1,
             "The percentage of the database to compress for universal "
             "compaction. -1 means compress everything.");

399
DEFINE_bool(universal_allow_trivial_move, false,
400
            "Allow trivial move in universal compaction.");
401

Y
Yi Wu 已提交
402 403 404 405 406 407 408 409
DEFINE_int64(cache_size, 8 << 20,  // 8MB
             "Number of bytes to use as a cache of uncompressed data");

DEFINE_int32(cache_numshardbits, 6,
             "Number of shards for the block cache"
             " is 2 ** cache_numshardbits. Negative means use default settings."
             " This is applied only if FLAGS_cache_size is non-negative.");

410 411 412 413 414
DEFINE_double(cache_high_pri_pool_ratio, 0.0,
              "Ratio of block cache reserve for high pri blocks. "
              "If > 0.0, we also enable "
              "cache_index_and_filter_blocks_with_high_priority.");

Y
Yi Wu 已提交
415 416
DEFINE_bool(use_clock_cache, false,
            "Replace default LRU block cache with clock cache.");
417 418 419

DEFINE_int64(simcache_size, -1,
             "Number of bytes to use as a simcache of "
Y
Yi Wu 已提交
420
             "uncompressed data. Nagative value disables simcache.");
J
jorlow@chromium.org 已提交
421

422 423 424
DEFINE_bool(cache_index_and_filter_blocks, false,
            "Cache index/filter blocks in block cache.");

425 426 427
DEFINE_bool(partition_index_and_filters, false,
            "Partition index and filter blocks.");

428 429
DEFINE_bool(partition_index, false, "Partition index blocks");

430 431 432 433 434 435 436 437 438
DEFINE_int64(metadata_block_size,
             rocksdb::BlockBasedTableOptions().metadata_block_size,
             "Max partition size when partitioning index/filters");

// The default reduces the overhead of reading time with flash. With HDD, which
// offers much less throughput, however, this number better to be set to 1.
DEFINE_int32(ops_between_duration_checks, 1000,
             "Check duration limit every x ops");

439 440 441
DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
            "Pin index/filter blocks of L0 files in block cache.");

442 443 444 445
DEFINE_bool(
    pin_top_level_index_and_filter, false,
    "Pin top-level index of partitioned index/filter blocks in block cache.");

446 447
DEFINE_int32(block_size,
             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
448
             "Number of bytes in a block.");
449

450 451 452 453 454
DEFINE_int32(
    format_version,
    static_cast<int32_t>(rocksdb::BlockBasedTableOptions().format_version),
    "Format version of SST files.");

455 456
DEFINE_int32(block_restart_interval,
             rocksdb::BlockBasedTableOptions().block_restart_interval,
457
             "Number of keys between restart points "
458 459 460 461 462 463
             "for delta encoding of keys in data block.");

DEFINE_int32(index_block_restart_interval,
             rocksdb::BlockBasedTableOptions().index_block_restart_interval,
             "Number of keys between restart points "
             "for delta encoding of keys in index block.");
464

465 466 467 468
DEFINE_int32(read_amp_bytes_per_bit,
             rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit,
             "Number of bytes per bit to be used in block read-amp bitmap");

469 470 471 472
DEFINE_bool(enable_index_compression,
            rocksdb::BlockBasedTableOptions().enable_index_compression,
            "Compress the index block");

473 474 475
DEFINE_bool(block_align, rocksdb::BlockBasedTableOptions().block_align,
            "Align data blocks on page size");

476 477 478 479 480 481 482 483 484 485
DEFINE_bool(use_data_block_hash_index, false,
            "if use kDataBlockBinaryAndHash "
            "instead of kDataBlockBinarySearch. "
            "This is valid if only we use BlockTable");

DEFINE_double(data_block_hash_table_util_ratio, 0.75,
              "util ratio for data block hash index table. "
              "This is only valid if use_data_block_hash_index is "
              "set to true");

486 487 488
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data.");

489 490 491 492
DEFINE_int64(row_cache_size, 0,
             "Number of bytes to use as a cache of individual rows"
             " (0 = disabled).");

493 494 495
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time"
             " (use default if == 0)");
496

497 498 499 500
DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
             "If open_files is set to -1, this option set the number of "
             "threads that will be used to open files during DB::Open()");

A
Andrew Kryczka 已提交
501
DEFINE_bool(new_table_reader_for_compaction_inputs, true,
502 503 504 505
             "If true, uses a separate file handle for compaction inputs");

DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");

D
Dmitri Smirnov 已提交
506 507
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
             "Maximum windows randomaccess buffer size");
508

I
Islam AbdelRahman 已提交
509 510
DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
             "Maximum write buffer for Writable File");
511

512 513
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
             " use default settings.");
514 515 516
DEFINE_double(memtable_bloom_size_ratio, 0,
              "Ratio of memtable size used for bloom filter. 0 means no bloom "
              "filter.");
517 518
DEFINE_bool(memtable_whole_key_filtering, false,
            "Try to use whole key bloom filter in memtables.");
519 520
DEFINE_bool(memtable_use_huge_page, false,
            "Try to use huge page in memtables.");
S
Sanjay Ghemawat 已提交
521

522 523 524
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
            " database.  If you set this flag and also specify a benchmark that"
            " wants a fresh database, that benchmark will fail.");
525

526 527 528 529 530 531 532 533
DEFINE_bool(use_existing_keys, false,
            "If true, uses existing keys in the DB, "
            "rather than generating new ones. This involves some startup "
            "latency to load all keys into memory. It is supported for the "
            "same read/overwrite benchmarks as `-use_existing_db=true`, which "
            "must also be set for this flag to be enabled. When this flag is "
            "set, the value for `-num` will be ignored.");

534 535 536 537 538
DEFINE_bool(show_table_properties, false,
            "If true, then per-level table"
            " properties will be printed on every stats-interval when"
            " stats_interval is set and stats_per_interval is on.");

539
DEFINE_string(db, "", "Use the db with the following name.");
540

541 542 543 544 545 546 547 548 549 550 551 552 553 554
// Read cache flags

DEFINE_string(read_cache_path, "",
              "If not empty string, a read cache will be used in this path");

DEFINE_int64(read_cache_size, 4LL * 1024 * 1024 * 1024,
             "Maximum size of the read cache");

DEFINE_bool(read_cache_direct_write, true,
            "Whether to use Direct IO for writing to the read cache");

DEFINE_bool(read_cache_direct_read, true,
            "Whether to use Direct IO for reading from read cache");

555
DEFINE_bool(use_keep_filter, false, "Whether to use a noop compaction filter");
556

557 558 559 560 561 562 563 564
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
  if (value >= 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
            flagname, value);
    return false;
  }
  return true;
}
565

566 567
DEFINE_bool(verify_checksum, true,
            "Verify checksum for every block read"
568
            " from storage");
569

570
DEFINE_bool(statistics, false, "Database statistics");
S
Siying Dong 已提交
571 572
DEFINE_int32(stats_level, rocksdb::StatsLevel::kExceptDetailedTimers,
             "stats level for statistics");
573
DEFINE_string(statistics_string, "", "Serialized statistics string");
574
static class std::shared_ptr<rocksdb::Statistics> dbstats;
575

576 577
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
             " --num reads.");
H
heyongqiang 已提交
578

579 580
DEFINE_bool(finish_after_writes, false, "Write thread terminates after all writes are finished");

581
DEFINE_bool(sync, false, "Sync all writes to disk");
H
heyongqiang 已提交
582

583
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
M
Mark Callaghan 已提交
584

585
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
586

L
Lei Jin 已提交
587 588
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");

589 590 591
DEFINE_string(truth_db, "/dev/shm/truth_db/dbbench",
              "Truth key/values used when using verify");

592
DEFINE_int32(num_levels, 7, "The total number of levels");
H
heyongqiang 已提交
593

594 595
DEFINE_int64(target_file_size_base, rocksdb::Options().target_file_size_base,
             "Target file size at level-1");
H
heyongqiang 已提交
596

597 598
DEFINE_int32(target_file_size_multiplier,
             rocksdb::Options().target_file_size_multiplier,
599
             "A multiplier to compute target level-N file size (N >= 2)");
600

601 602 603
DEFINE_uint64(max_bytes_for_level_base,
              rocksdb::Options().max_bytes_for_level_base,
              "Max bytes for level-1");
H
heyongqiang 已提交
604

605 606 607
DEFINE_bool(level_compaction_dynamic_level_bytes, false,
            "Whether level size base is dynamic");

608 609
DEFINE_double(max_bytes_for_level_multiplier, 10,
              "A multiplier to compute max bytes for level-N (N >= 2)");
H
heyongqiang 已提交
610

611 612 613
static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
DEFINE_string(max_bytes_for_level_multiplier_additional, "",
              "A vector that specifies additional fanout per level");
614

615 616 617
DEFINE_int32(level0_stop_writes_trigger,
             rocksdb::Options().level0_stop_writes_trigger,
             "Number of files in level-0"
618
             " that will trigger put stop.");
619

620 621 622
DEFINE_int32(level0_slowdown_writes_trigger,
             rocksdb::Options().level0_slowdown_writes_trigger,
             "Number of files in level-0"
623
             " that will slow down writes.");
624

625 626 627
DEFINE_int32(level0_file_num_compaction_trigger,
             rocksdb::Options().level0_file_num_compaction_trigger,
             "Number of files in level-0"
628
             " when compactions start");
629

630 631 632 633 634 635 636 637 638 639 640 641 642
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value <= 0 || value>=100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
             " as percentage) for the ReadRandomWriteRandom workload. The "
             "default value 90 means 90% operations out of all reads and writes"
             " operations are reads. In other words, 9 gets for every 1 put.");

643 644 645 646 647
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
             " as percentage) for the ReadRandomMergeRandom workload. The"
             " default value 70 means 70% out of all read and merge operations"
             " are merges. In other words, 7 merges for every 3 gets.");

648 649 650 651 652 653
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
             "deletes (used in RandomWithVerify only). RandomWithVerify "
             "calculates writepercent as (100 - FLAGS_readwritepercent - "
             "deletepercent), so deletepercent must be smaller than (100 - "
             "FLAGS_readwritepercent)");

654 655 656 657 658
DEFINE_bool(optimize_filters_for_hits, false,
            "Optimizes bloom filters for workloads for most lookups return "
            "a value. For now this doesn't create bloom filters for the max "
            "level of the LSM to reduce metadata that should fit in RAM. ");

I
Igor Canadi 已提交
659 660
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
              "Ignored. Left here for backward compatibility");
661

662 663 664
DEFINE_int64(writes_before_delete_range, 0,
             "Number of writes before DeleteRange is called regularly.");

A
Andrew Kryczka 已提交
665
DEFINE_int64(writes_per_range_tombstone, 0,
666
             "Number of writes between range tombstones");
A
Andrew Kryczka 已提交
667 668 669 670 671 672 673

DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");

DEFINE_int64(max_num_range_tombstones, 0,
             "Maximum number of range tombstones "
             "to insert.");

674 675 676
DEFINE_bool(expand_range_tombstones, false,
            "Expand range tombstone into sequential regular tombstones.");

677
#ifndef ROCKSDB_LITE
678
// Transactions Options
A
agiardullo 已提交
679
DEFINE_bool(optimistic_transaction_db, false,
A
agiardullo 已提交
680 681 682
            "Open a OptimisticTransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
683 684 685 686
DEFINE_bool(transaction_db, false,
            "Open a TransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
687 688 689 690
DEFINE_uint64(transaction_sets, 2,
              "Number of keys each transaction will "
              "modify (use in RandomTransaction only).  Max: 9999");

691 692 693 694
DEFINE_bool(transaction_set_snapshot, false,
            "Setting to true will have each transaction call SetSnapshot()"
            " upon creation.");

A
agiardullo 已提交
695 696 697 698
DEFINE_int32(transaction_sleep, 0,
             "Max microseconds to sleep in between "
             "reading and writing a value (used in RandomTransaction only). ");

699 700 701
DEFINE_uint64(transaction_lock_timeout, 100,
              "If using a transaction_db, specifies the lock wait timeout in"
              " milliseconds before failing a transaction waiting on a lock");
702 703 704 705 706 707 708 709 710
DEFINE_string(
    options_file, "",
    "The path to a RocksDB options file.  If specified, then db_bench will "
    "run with the RocksDB options in the default column family of the "
    "specified options file. "
    "Note that with this setting, db_bench will ONLY accept the following "
    "RocksDB options related command-line arguments, all other arguments "
    "that are related to RocksDB options will be ignored:\n"
    "\t--use_existing_db\n"
711
    "\t--use_existing_keys\n"
712 713 714 715 716 717
    "\t--statistics\n"
    "\t--row_cache_size\n"
    "\t--row_cache_numshardbits\n"
    "\t--enable_io_prio\n"
    "\t--dump_malloc_stats\n"
    "\t--num_multi_db\n");
718

719
// FIFO Compaction Options
720 721
DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0,
              "The limit of total table file sizes to trigger FIFO compaction");
722

723 724
DEFINE_bool(fifo_compaction_allow_compaction, true,
            "Allow compaction in FIFO compaction.");
725

S
Sagar Vemuri 已提交
726
DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds.");
727 728 729 730

// Blob DB Options
DEFINE_bool(use_blob_db, false,
            "Open a BlobDB instance. "
S
Sagar Vemuri 已提交
731 732 733 734 735 736
            "Required for large value benchmark.");

DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection.");

DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB.");

Y
Yi Wu 已提交
737
DEFINE_uint64(blob_db_max_db_size, 0,
S
Sagar Vemuri 已提交
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
              "Max size limit of the directory where blob files are stored.");

DEFINE_uint64(blob_db_max_ttl_range, 86400,
              "TTL range to generate BlobDB data (in seconds).");

DEFINE_uint64(blob_db_ttl_range_secs, 3600,
              "TTL bucket size to use when creating blob files.");

DEFINE_uint64(blob_db_min_blob_size, 0,
              "Smallest blob to store in a file. Blobs smaller than this "
              "will be inlined with the key in the LSM tree.");

DEFINE_uint64(blob_db_bytes_per_sync, 0, "Bytes to sync blob file at.");

DEFINE_uint64(blob_db_file_size, 256 * 1024 * 1024,
              "Target size of each blob file.");
754

755 756 757 758 759 760 761 762 763 764 765 766 767
// Secondary DB instance Options
DEFINE_bool(use_secondary_db, false,
            "Open a RocksDB secondary instance. A primary instance can be "
            "running in another db_bench process.");

DEFINE_string(secondary_path, "",
              "Path to a directory used by the secondary instance to store "
              "private files, e.g. info log.");

DEFINE_int32(secondary_update_interval, 5,
             "Secondary instance attempts to catch up with the primary every "
             "secondary_update_interval seconds.");

768
#endif  // ROCKSDB_LITE
769

770
DEFINE_bool(report_bg_io_stats, false,
771 772
            "Measure times spents on I/Os while in compactions. ");

773 774 775
DEFINE_bool(use_stderr_info_logger, false,
            "Write info logs to stderr instead of to LOG file. ");

776 777
DEFINE_string(trace_file, "", "Trace workload to a file. ");

778 779 780
DEFINE_int32(trace_replay_fast_forward, 1,
             "Fast forward trace replay, must >= 1. ");

781
static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
782 783 784 785 786 787 788 789 790 791
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;
A
Albert Strasheim 已提交
792 793 794 795
  else if (!strcasecmp(ctype, "lz4"))
    return rocksdb::kLZ4Compression;
  else if (!strcasecmp(ctype, "lz4hc"))
    return rocksdb::kLZ4HCCompression;
796 797
  else if (!strcasecmp(ctype, "xpress"))
    return rocksdb::kXpressCompression;
798
  else if (!strcasecmp(ctype, "zstd"))
S
sdong 已提交
799
    return rocksdb::kZSTD;
800 801

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
802
  return rocksdb::kSnappyCompression;  // default value
803
}
804

805
static std::string ColumnFamilyName(size_t i) {
806 807 808 809
  if (i == 0) {
    return rocksdb::kDefaultColumnFamilyName;
  } else {
    char name[100];
S
sdong 已提交
810
    snprintf(name, sizeof(name), "column_family_name_%06zu", i);
811 812 813
    return std::string(name);
  }
}
I
Igor Canadi 已提交
814

815 816 817
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
818
    rocksdb::kSnappyCompression;
819

820 821
DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");

822 823 824 825
DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
             "Compression level. The meaning of this value is library-"
             "dependent. If unset, we try to use the default for the library "
             "specified in `--compression_type`");
826

827 828
DEFINE_int32(compression_max_dict_bytes,
             rocksdb::CompressionOptions().max_dict_bytes,
829 830 831
             "Maximum size of dictionary used to prime the compression "
             "library.");

832 833
DEFINE_int32(compression_zstd_max_train_bytes,
             rocksdb::CompressionOptions().zstd_max_train_bytes,
A
Andrew Kryczka 已提交
834 835 836
             "Maximum size of training data passed to zstd's dictionary "
             "trainer.");

837 838 839 840 841 842 843 844 845 846 847 848 849 850 851
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
             " from this level. Levels with number < min_level_to_compress are"
             " not compressed. Otherwise, apply compression_type to "
             "all levels.");

static bool ValidateTableCacheNumshardbits(const char* flagname,
                                           int32_t value) {
  if (0 >= value || value > 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be  0 < val <= 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(table_cache_numshardbits, 4, "");
852

853 854 855 856 857 858
#ifndef ROCKSDB_LITE
DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
              " with --hdfs.");
#endif  // ROCKSDB_LITE
DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
              " --env_uri.");
859
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
860

861 862
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
             "this is greater than zero. When 0 the interval grows over time.");
863

864 865 866
DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
             "overrides stats_interval when both are > 0.");

867 868
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
             " this is greater than 0.");
869

870 871 872 873 874 875 876 877
DEFINE_int64(report_interval_seconds, 0,
             "If greater than zero, it will write simple stats in CVS format "
             "to --report_file every N seconds");

DEFINE_string(report_file, "report.csv",
              "Filename where some simple stats are reported to (if "
              "--report_interval_seconds is bigger than 0)");

878 879 880 881
DEFINE_int32(thread_status_per_interval, 0,
             "Takes and report a snapshot of the current status of each thread"
             " when this is greater than 0.");

882
DEFINE_int32(perf_level, rocksdb::PerfLevel::kDisable, "Level of perf collection");
883

884
static bool ValidateRateLimit(const char* flagname, double value) {
D
Dmitri Smirnov 已提交
885
  const double EPSILON = 1e-10;
886 887 888 889 890 891 892
  if ( value < -EPSILON ) {
    fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
            flagname, value);
    return false;
  }
  return true;
}
893
DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED");
J
Jim Paton 已提交
894

895 896
DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");

897 898 899
DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024,
              "Slowdown writes if pending compaction bytes exceed this number");

900
DEFINE_uint64(hard_pending_compaction_bytes_limit, 128ull * 1024 * 1024 * 1024,
901
              "Stop writes if pending compaction bytes exceed this number");
902

S
sdong 已提交
903
DEFINE_uint64(delayed_write_rate, 8388608u,
S
sdong 已提交
904 905 906
              "Limited bytes allowed to DB when soft_rate_limit or "
              "level0_slowdown_writes_trigger triggers");

907 908 909
DEFINE_bool(enable_pipelined_write, true,
            "Allow WAL and memtable writes to be pipelined");

M
Maysam Yabandeh 已提交
910 911 912
DEFINE_bool(unordered_write, false,
            "Allow WAL and memtable writes to be pipelined");

913
DEFINE_bool(allow_concurrent_memtable_write, true,
914 915
            "Allow multi-writers to update mem tables in parallel.");

916 917 918 919 920 921 922
DEFINE_bool(inplace_update_support, rocksdb::Options().inplace_update_support,
            "Support in-place memtable update for smaller or same-size values");

DEFINE_uint64(inplace_update_num_locks,
              rocksdb::Options().inplace_update_num_locks,
              "Number of RW locks to protect in-place memtable updates");

923
DEFINE_bool(enable_write_thread_adaptive_yield, true,
924 925 926 927 928 929 930 931 932 933
            "Use a yielding spin loop for brief writer thread waits.");

DEFINE_uint64(
    write_thread_max_yield_usec, 100,
    "Maximum microseconds for enable_write_thread_adaptive_yield operation.");

DEFINE_uint64(write_thread_slow_yield_usec, 3,
              "The threshold at which a slow yield is considered a signal that "
              "other processes or threads want the core.");

934 935 936
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
             "When hard_rate_limit is set then this is the max time a put will"
             " be stalled.");
937

S
sdong 已提交
938 939
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");

A
Andrew Kryczka 已提交
940 941 942 943
DEFINE_bool(rate_limiter_auto_tuned, false,
            "Enable dynamic adjustment of rate limit according to demand for "
            "background I/O");

944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962

DEFINE_bool(sine_write_rate, false,
            "Use a sine wave write_rate_limit");

DEFINE_uint64(sine_write_rate_interval_milliseconds, 10000,
              "Interval of which the sine wave write_rate_limit is recalculated");

DEFINE_double(sine_a, 1,
             "A in f(x) = A sin(bx + c) + d");

DEFINE_double(sine_b, 1,
             "B in f(x) = A sin(bx + c) + d");

DEFINE_double(sine_c, 0,
             "C in f(x) = A sin(bx + c) + d");

DEFINE_double(sine_d, 1,
             "D in f(x) = A sin(bx + c) + d");

963 964 965
DEFINE_bool(rate_limit_bg_reads, false,
            "Use options.rate_limiter on compaction reads");

966 967
DEFINE_uint64(
    benchmark_write_rate_limit, 0,
968 969
    "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
    "is the global rate in bytes/second.");
970

971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016
// the parameters of mix_graph
DEFINE_double(key_dist_a, 0.0,
              "The parameter 'a' of key access distribution model "
              "f(x)=a*x^b");
DEFINE_double(key_dist_b, 0.0,
              "The parameter 'b' of key access distribution model "
              "f(x)=a*x^b");
DEFINE_double(value_theta, 0.0,
              "The parameter 'theta' of Generized Pareto Distribution "
              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_k, 0.0,
              "The parameter 'k' of Generized Pareto Distribution "
              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(value_sigma, 0.0,
              "The parameter 'theta' of Generized Pareto Distribution "
              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_theta, 0.0,
              "The parameter 'theta' of Generized Pareto Distribution "
              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_k, 0.0,
              "The parameter 'k' of Generized Pareto Distribution "
              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(iter_sigma, 0.0,
              "The parameter 'sigma' of Generized Pareto Distribution "
              "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)");
DEFINE_double(mix_get_ratio, 1.0,
              "The ratio of Get queries of mix_graph workload");
DEFINE_double(mix_put_ratio, 0.0,
              "The ratio of Put queries of mix_graph workload");
DEFINE_double(mix_seek_ratio, 0.0,
              "The ratio of Seek queries of mix_graph workload");
DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
DEFINE_int64(mix_ave_kv_size, 512,
             "The average key-value size of this workload");
DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
DEFINE_double(
    sine_mix_rate_noise, 0.0,
    "Add the noise ratio to the sine rate, it is between 0.0 and 1.0");
DEFINE_bool(sine_mix_rate, false,
            "Enable the sine QPS control on the mix workload");
DEFINE_uint64(
    sine_mix_rate_interval_milliseconds, 10000,
    "Interval of which the sine wave read_rate_limit is recalculated");
DEFINE_int64(mix_accesses, -1,
             "The total query accesses of mix_graph workload");

1017 1018 1019 1020 1021
DEFINE_uint64(
    benchmark_read_rate_limit, 0,
    "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
    "is the global rate in ops/second.");

1022 1023
DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
              "Max bytes allowed in one compaction");
1024

1025
#ifndef ROCKSDB_LITE
1026
DEFINE_bool(readonly, false, "Run read only benchmarks.");
Z
Zhongyi Xie 已提交
1027 1028 1029

DEFINE_bool(print_malloc_stats, false,
            "Print malloc stats to stdout after benchmarks finish.");
1030
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
1031

1032
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
1033

1034 1035 1036
DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
              " in MB.");
1037
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
1038

1039
DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
1040
            "Allow reads to occur via mmap-ing files");
1041

1042
DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
1043
            "Allow writes to occur via mmap-ing files");
1044

1045
DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
1046 1047
            "Use O_DIRECT for reading data");

1048 1049
DEFINE_bool(use_direct_io_for_flush_and_compaction,
            rocksdb::Options().use_direct_io_for_flush_and_compaction,
1050
            "Use O_DIRECT for background flush and compaction writes");
A
Aaron Gao 已提交
1051

1052 1053
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
            "Advise random access on table file open");
1054

1055 1056 1057
DEFINE_string(compaction_fadvice, "NORMAL",
              "Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
1058
  rocksdb::Options().access_hint_on_compaction_start;
1059

I
Igor Canadi 已提交
1060 1061 1062
DEFINE_bool(use_tailing_iterator, false,
            "Use tailing iterator to access a series of keys instead of get");

1063 1064 1065 1066
DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
            "Use adaptive mutex");

DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
1067
              "Allows OS to incrementally sync SST files to disk while they are"
1068 1069
              " being written, in the background. Issue one request for every"
              " bytes_per_sync written. 0 turns it off.");
1070 1071 1072 1073 1074 1075

DEFINE_uint64(wal_bytes_per_sync,  rocksdb::Options().wal_bytes_per_sync,
              "Allows OS to incrementally sync WAL files to disk while they are"
              " being written, in the background. Issue one request for every"
              " wal_bytes_per_sync written. 0 turns it off.");

A
Andres Noetzli 已提交
1076 1077 1078 1079 1080 1081 1082
DEFINE_bool(use_single_deletes, true,
            "Use single deletes (used in RandomReplaceKeys only).");

DEFINE_double(stddev, 2000.0,
              "Standard deviation of normal distribution used for picking keys"
              " (used in RandomReplaceKeys only).");

1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
DEFINE_int32(key_id_range, 100000,
             "Range of possible value of key id (used in TimeSeries only).");

DEFINE_string(expire_style, "none",
              "Style to remove expired time entries. Can be one of the options "
              "below: none (do not expired data), compaction_filter (use a "
              "compaction filter to remove expired data), delete (seek IDs and "
              "remove expired data) (used in TimeSeries only).");

DEFINE_uint64(
    time_range, 100000,
    "Range of timestamp that store in the database (used in TimeSeries"
    " only).");

DEFINE_int32(num_deletion_threads, 1,
             "Number of threads to do deletion (used in TimeSeries and delete "
             "expire_style only).");

1101 1102 1103
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
             " operations on a key in the memtable");

1104 1105 1106 1107 1108 1109 1110 1111
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
  if (value < 0 || value>=2000000000) {
    fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
            flagname, value);
    return false;
  }
  return true;
}
1112

L
Lei Jin 已提交
1113 1114
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
             "plain table");
1115 1116 1117
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
             "per prefix, 0 means no special handling of the prefix, "
             "i.e. use the prefix comes with the generated random number.");
1118 1119 1120 1121 1122 1123 1124 1125
DEFINE_bool(total_order_seek, false,
            "Enable total order seek regardless of index format.");
DEFINE_bool(prefix_same_as_start, false,
            "Enforce iterator to return keys with prefix same as seek key.");
DEFINE_bool(
    seek_missing_prefix, false,
    "Iterator seek to keys with non-exist prefixes. Require prefix_size > 8");

1126 1127 1128
DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
             "If non-zero, enable "
             "memtable insert with hint with the given prefix size.");
1129 1130
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
            "threads' IO priority");
1131 1132
DEFINE_bool(enable_cpu_prio, false, "Lower the background flush/compaction "
            "threads' CPU priority");
1133 1134 1135
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
            "table becomes an identity function. This is only valid when key "
            "is 8 bytes");
1136
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
1137
DEFINE_uint64(stats_dump_period_sec, rocksdb::Options().stats_dump_period_sec,
1138
              "Gap between printing stats to log in seconds");
1139 1140 1141 1142 1143 1144
DEFINE_uint64(stats_persist_period_sec,
              rocksdb::Options().stats_persist_period_sec,
              "Gap between persisting stats in seconds");
DEFINE_uint64(stats_history_buffer_size,
              rocksdb::Options().stats_history_buffer_size,
              "Max number of stats snapshots to keep in memory");
1145 1146 1147
DEFINE_int64(multiread_stride, 0,
             "Stride length for the keys in a MultiGet batch");
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");
J
Jim Paton 已提交
1148 1149 1150 1151

enum RepFactory {
  kSkipList,
  kPrefixHash,
1152
  kVectorRep,
1153
  kHashLinkedList,
J
Jim Paton 已提交
1154
};
I
Igor Canadi 已提交
1155

1156
static enum RepFactory StringToRepFactory(const char* ctype) {
1157 1158 1159 1160 1161 1162 1163 1164
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
    return kPrefixHash;
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;
1165 1166
  else if (!strcasecmp(ctype, "hash_linkedlist"))
    return kHashLinkedList;
1167 1168 1169 1170

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
I
Igor Canadi 已提交
1171

J
Jim Paton 已提交
1172
static enum RepFactory FLAGS_rep_factory;
1173
DEFINE_string(memtablerep, "skip_list", "");
1174
DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
L
Lei Jin 已提交
1175 1176
DEFINE_bool(use_plain_table, false, "if use plain table "
            "instead of block-based table format");
1177 1178
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
DEFINE_double(cuckoo_hash_ratio, 0.9, "Hash ratio for Cuckoo SST table.");
1179 1180 1181
DEFINE_bool(use_hash_search, false, "if use kHashSearch "
            "instead of kBinarySearch. "
            "This is valid if only we use BlockTable");
1182 1183 1184
DEFINE_bool(use_block_based_filter, false, "if use kBlockBasedFilter "
            "instead of kFullFilter for filter block. "
            "This is valid if only we use BlockTable");
1185 1186 1187 1188
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
              "If a new merge operator is specified, be sure to use fresh"
              " database The possible merge operators are defined in"
              " utilities/merge_operators.h");
T
Tomislav Novak 已提交
1189 1190 1191
DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
             "linear search first for this many steps from the previous "
             "position");
1192 1193
DEFINE_bool(report_file_operations, false, "if report number of file "
            "operations");
1194
DEFINE_int32(readahead_size, 0, "Iterator readahead size");
D
Deon Nicholas 已提交
1195

T
Tamir Duberstein 已提交
1196
static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) =
1197
    RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
1198

T
Tamir Duberstein 已提交
1199
static const bool FLAGS_hard_rate_limit_dummy __attribute__((__unused__)) =
1200
    RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
1201

T
Tamir Duberstein 已提交
1202
static const bool FLAGS_prefix_size_dummy __attribute__((__unused__)) =
1203
    RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
K
kailiu 已提交
1204

T
Tamir Duberstein 已提交
1205
static const bool FLAGS_key_size_dummy __attribute__((__unused__)) =
1206
    RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);
K
kailiu 已提交
1207

T
Tamir Duberstein 已提交
1208
static const bool FLAGS_cache_numshardbits_dummy __attribute__((__unused__)) =
1209 1210
    RegisterFlagValidator(&FLAGS_cache_numshardbits,
                          &ValidateCacheNumshardbits);
K
kailiu 已提交
1211

T
Tamir Duberstein 已提交
1212
static const bool FLAGS_readwritepercent_dummy __attribute__((__unused__)) =
1213
    RegisterFlagValidator(&FLAGS_readwritepercent, &ValidateInt32Percent);
K
kailiu 已提交
1214

I
Igor Canadi 已提交
1215 1216 1217
DEFINE_int32(disable_seek_compaction, false,
             "Not used, left here for backwards compatibility");

T
Tamir Duberstein 已提交
1218
static const bool FLAGS_deletepercent_dummy __attribute__((__unused__)) =
1219
    RegisterFlagValidator(&FLAGS_deletepercent, &ValidateInt32Percent);
T
Tamir Duberstein 已提交
1220
static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((__unused__)) =
1221 1222
    RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                          &ValidateTableCacheNumshardbits);
K
kailiu 已提交
1223

1224
namespace rocksdb {
J
jorlow@chromium.org 已提交
1225

1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247
namespace {
struct ReportFileOpCounters {
  std::atomic<int> open_counter_;
  std::atomic<int> read_counter_;
  std::atomic<int> append_counter_;
  std::atomic<uint64_t> bytes_read_;
  std::atomic<uint64_t> bytes_written_;
};

// A special Env to records and report file operations in db_bench
class ReportFileOpEnv : public EnvWrapper {
 public:
  explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }

  void reset() {
    counters_.open_counter_ = 0;
    counters_.read_counter_ = 0;
    counters_.append_counter_ = 0;
    counters_.bytes_read_ = 0;
    counters_.bytes_written_ = 0;
  }

1248 1249
  Status NewSequentialFile(const std::string& f,
                           std::unique_ptr<SequentialFile>* r,
I
Igor Sugak 已提交
1250
                           const EnvOptions& soptions) override {
1251 1252
    class CountingFile : public SequentialFile {
     private:
1253
      std::unique_ptr<SequentialFile> target_;
1254 1255 1256
      ReportFileOpCounters* counters_;

     public:
1257
      CountingFile(std::unique_ptr<SequentialFile>&& target,
1258 1259 1260
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

1261
      Status Read(size_t n, Slice* result, char* scratch) override {
1262 1263 1264 1265 1266 1267 1268
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }

1269
      Status Skip(uint64_t n) override { return target_->Skip(n); }
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
    };

    Status s = target()->NewSequentialFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewRandomAccessFile(const std::string& f,
1281
                             std::unique_ptr<RandomAccessFile>* r,
I
Igor Sugak 已提交
1282
                             const EnvOptions& soptions) override {
1283 1284
    class CountingFile : public RandomAccessFile {
     private:
1285
      std::unique_ptr<RandomAccessFile> target_;
1286 1287 1288
      ReportFileOpCounters* counters_;

     public:
1289
      CountingFile(std::unique_ptr<RandomAccessFile>&& target,
1290 1291
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}
1292 1293
      Status Read(uint64_t offset, size_t n, Slice* result,
                  char* scratch) const override {
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(offset, n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }
    };

    Status s = target()->NewRandomAccessFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

1310
  Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
I
Igor Sugak 已提交
1311
                         const EnvOptions& soptions) override {
1312 1313
    class CountingFile : public WritableFile {
     private:
1314
      std::unique_ptr<WritableFile> target_;
1315 1316 1317
      ReportFileOpCounters* counters_;

     public:
1318
      CountingFile(std::unique_ptr<WritableFile>&& target,
1319 1320 1321
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
1322
      Status Append(const Slice& data) override {
1323 1324 1325 1326 1327 1328 1329
        counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Append(data);
        counters_->bytes_written_.fetch_add(data.size(),
                                            std::memory_order_relaxed);
        return rv;
      }

1330
      Status Truncate(uint64_t size) override { return target_->Truncate(size); }
I
Igor Sugak 已提交
1331 1332 1333
      Status Close() override { return target_->Close(); }
      Status Flush() override { return target_->Flush(); }
      Status Sync() override { return target_->Sync(); }
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
    };

    Status s = target()->NewWritableFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  // getter
  ReportFileOpCounters* counters() { return &counters_; }

 private:
  ReportFileOpCounters counters_;
};

}  // namespace

1353
// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
1354 1355 1356
class RandomGenerator {
 private:
  std::string data_;
1357
  unsigned int pos_;
J
jorlow@chromium.org 已提交
1358 1359 1360 1361 1362 1363 1364 1365

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
1366
    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
J
jorlow@chromium.org 已提交
1367 1368 1369 1370 1371 1372 1373 1374
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

1375
  Slice Generate(unsigned int len) {
1376
    assert(len <= data_.size());
J
jorlow@chromium.org 已提交
1377 1378 1379 1380 1381 1382
    if (pos_ + len > data_.size()) {
      pos_ = 0;
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
A
Anirban Rahut 已提交
1383 1384 1385 1386 1387 1388 1389 1390 1391

  Slice GenerateWithTTL(unsigned int len) {
    assert(len <= data_.size());
    if (pos_ + len > data_.size()) {
      pos_ = 0;
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
1392
};
X
Xing Jin 已提交
1393

1394 1395 1396 1397 1398 1399 1400 1401
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

1402 1403 1404
struct DBWithColumnFamilies {
  std::vector<ColumnFamilyHandle*> cfh;
  DB* db;
1405
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1406
  OptimisticTransactionDB* opt_txn_db;
1407
#endif  // ROCKSDB_LITE
1408 1409 1410 1411 1412 1413
  std::atomic<size_t> num_created;  // Need to be updated after all the
                                    // new entries in cfh are set.
  size_t num_hot;  // Number of column families to be queried at each moment.
                   // After each CreateNewCf(), another num_hot number of new
                   // Column families will be created and used to be queried.
  port::Mutex create_cf_mutex;  // Only one thread can execute CreateNewCf()
1414 1415
  std::vector<int> cfh_idx_to_prob;  // ith index holds probability of operating
                                     // on cfh[i].
1416

1417 1418 1419 1420 1421 1422
  DBWithColumnFamilies()
      : db(nullptr)
#ifndef ROCKSDB_LITE
        , opt_txn_db(nullptr)
#endif  // ROCKSDB_LITE
  {
1423
    cfh.clear();
1424 1425
    num_created = 0;
    num_hot = 0;
1426
  }
1427 1428 1429 1430

  DBWithColumnFamilies(const DBWithColumnFamilies& other)
      : cfh(other.cfh),
        db(other.db),
1431
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1432
        opt_txn_db(other.opt_txn_db),
1433
#endif  // ROCKSDB_LITE
1434
        num_created(other.num_created.load()),
1435 1436 1437
        num_hot(other.num_hot),
        cfh_idx_to_prob(other.cfh_idx_to_prob) {
  }
1438

A
agiardullo 已提交
1439 1440 1441 1442
  void DeleteDBs() {
    std::for_each(cfh.begin(), cfh.end(),
                  [](ColumnFamilyHandle* cfhi) { delete cfhi; });
    cfh.clear();
1443
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1444 1445 1446
    if (opt_txn_db) {
      delete opt_txn_db;
      opt_txn_db = nullptr;
A
agiardullo 已提交
1447 1448
    } else {
      delete db;
1449
      db = nullptr;
A
agiardullo 已提交
1450
    }
1451 1452
#else
    delete db;
A
agiardullo 已提交
1453
    db = nullptr;
1454
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
1455 1456
  }

1457 1458
  ColumnFamilyHandle* GetCfh(int64_t rand_num) {
    assert(num_hot > 0);
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
    size_t rand_offset = 0;
    if (!cfh_idx_to_prob.empty()) {
      assert(cfh_idx_to_prob.size() == num_hot);
      int sum = 0;
      while (sum + cfh_idx_to_prob[rand_offset] < rand_num % 100) {
        sum += cfh_idx_to_prob[rand_offset];
        ++rand_offset;
      }
      assert(rand_offset < cfh_idx_to_prob.size());
    } else {
      rand_offset = rand_num % num_hot;
    }
1471
    return cfh[num_created.load(std::memory_order_acquire) - num_hot +
1472
               rand_offset];
1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495
  }

  // stage: assume CF from 0 to stage * num_hot has be created. Need to create
  //        stage * num_hot + 1 to stage * (num_hot + 1).
  void CreateNewCf(ColumnFamilyOptions options, int64_t stage) {
    MutexLock l(&create_cf_mutex);
    if ((stage + 1) * num_hot <= num_created) {
      // Already created.
      return;
    }
    auto new_num_created = num_created + num_hot;
    assert(new_num_created <= cfh.size());
    for (size_t i = num_created; i < new_num_created; i++) {
      Status s =
          db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i]));
      if (!s.ok()) {
        fprintf(stderr, "create column family error: %s\n",
                s.ToString().c_str());
        abort();
      }
    }
    num_created.store(new_num_created, std::memory_order_release);
  }
1496 1497
};

1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520
// a class that reports stats to CSV file
class ReporterAgent {
 public:
  ReporterAgent(Env* env, const std::string& fname,
                uint64_t report_interval_secs)
      : env_(env),
        total_ops_done_(0),
        last_report_(0),
        report_interval_secs_(report_interval_secs),
        stop_(false) {
    auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
    if (s.ok()) {
      s = report_file_->Append(Header() + "\n");
    }
    if (s.ok()) {
      s = report_file_->Flush();
    }
    if (!s.ok()) {
      fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
              s.ToString().c_str());
      abort();
    }

D
Dmitri Smirnov 已提交
1521
    reporting_thread_ = port::Thread([&]() { SleepAndReport(); });
1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
  }

  ~ReporterAgent() {
    {
      std::unique_lock<std::mutex> lk(mutex_);
      stop_ = true;
      stop_cv_.notify_all();
    }
    reporting_thread_.join();
  }

  // thread safe
  void ReportFinishedOps(int64_t num_ops) {
    total_ops_done_.fetch_add(num_ops);
  }

 private:
  std::string Header() const { return "secs_elapsed,interval_qps"; }
  void SleepAndReport() {
    uint64_t kMicrosInSecond = 1000 * 1000;
    auto time_started = env_->NowMicros();
    while (true) {
      {
        std::unique_lock<std::mutex> lk(mutex_);
        if (stop_ ||
            stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
                              [&]() { return stop_; })) {
          // stopping
          break;
        }
        // else -> timeout, which means time for a report!
      }
      auto total_ops_done_snapshot = total_ops_done_.load();
      // round the seconds elapsed
      auto secs_elapsed =
          (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
          kMicrosInSecond;
      std::string report = ToString(secs_elapsed) + "," +
                           ToString(total_ops_done_snapshot - last_report_) +
                           "\n";
      auto s = report_file_->Append(report);
      if (s.ok()) {
        s = report_file_->Flush();
      }
      if (!s.ok()) {
        fprintf(stderr,
                "Can't write to report file (%s), stopping the reporting\n",
                s.ToString().c_str());
        break;
      }
      last_report_ = total_ops_done_snapshot;
    }
  }

  Env* env_;
  std::unique_ptr<WritableFile> report_file_;
  std::atomic<int64_t> total_ops_done_;
  int64_t last_report_;
  const uint64_t report_interval_secs_;
D
Dmitri Smirnov 已提交
1581
  rocksdb::port::Thread reporting_thread_;
1582 1583 1584 1585 1586 1587
  std::mutex mutex_;
  // will notify on stop
  std::condition_variable stop_cv_;
  bool stop_;
};

1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
enum OperationType : unsigned char {
  kRead = 0,
  kWrite,
  kDelete,
  kSeek,
  kMerge,
  kUpdate,
  kCompress,
  kUncompress,
  kCrc,
  kHash,
  kOthers
};

static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
                          OperationTypeString = {
  {kRead, "read"},
  {kWrite, "write"},
  {kDelete, "delete"},
  {kSeek, "seek"},
  {kMerge, "merge"},
  {kUpdate, "update"},
  {kCompress, "compress"},
  {kCompress, "uncompress"},
  {kCrc, "crc"},
  {kHash, "hash"},
  {kOthers, "op"}
};

1617
class CombinedStats;
1618 1619
class Stats {
 private:
1620
  int id_;
D
Dmitri Smirnov 已提交
1621
  uint64_t start_;
1622
  uint64_t sine_interval_;
D
Dmitri Smirnov 已提交
1623
  uint64_t finish_;
1624
  double seconds_;
D
Dmitri Smirnov 已提交
1625 1626 1627 1628 1629 1630
  uint64_t done_;
  uint64_t last_report_done_;
  uint64_t next_report_;
  uint64_t bytes_;
  uint64_t last_op_finish_;
  uint64_t last_report_finish_;
1631
  std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
1632
                     std::hash<unsigned char>> hist_;
1633
  std::string message_;
1634
  bool exclude_from_merge_;
1635
  ReporterAgent* reporter_agent_;  // does not own
1636
  friend class CombinedStats;
1637 1638

 public:
1639
  Stats() { Start(-1); }
1640

1641 1642 1643 1644
  void SetReporterAgent(ReporterAgent* reporter_agent) {
    reporter_agent_ = reporter_agent;
  }

1645 1646 1647
  void Start(int id) {
    id_ = id;
    next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
1648
    last_op_finish_ = start_;
1649
    hist_.clear();
1650
    done_ = 0;
1651
    last_report_done_ = 0;
1652 1653
    bytes_ = 0;
    seconds_ = 0;
1654
    start_ = FLAGS_env->NowMicros();
1655
    sine_interval_ = FLAGS_env->NowMicros();
1656
    finish_ = start_;
1657
    last_report_finish_ = start_;
1658
    message_.clear();
1659 1660
    // When set, stats from this thread won't be merged with others.
    exclude_from_merge_ = false;
1661 1662 1663
  }

  void Merge(const Stats& other) {
1664 1665 1666
    if (other.exclude_from_merge_)
      return;

1667 1668 1669
    for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
      auto this_it = hist_.find(it->first);
      if (this_it != hist_.end()) {
1670
        this_it->second->Merge(*(other.hist_.at(it->first)));
1671 1672 1673 1674 1675
      } else {
        hist_.insert({ it->first, it->second });
      }
    }

1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
1687
    finish_ = FLAGS_env->NowMicros();
1688 1689 1690 1691 1692 1693 1694
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

1695
  void SetId(int id) { id_ = id; }
1696
  void SetExcludeFromMerge() { exclude_from_merge_ = true; }
1697

1698 1699 1700 1701
  void PrintThreadStatus() {
    std::vector<ThreadStatus> thread_list;
    FLAGS_env->GetThreadList(&thread_list);

1702
    fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1703
        "ThreadID", "ThreadType", "cfName", "Operation",
1704
        "ElapsedTime", "Stage", "State", "OperationProperties");
1705

1706 1707
    int64_t current_time = 0;
    Env::Default()->GetCurrentTime(&current_time);
1708
    for (auto ts : thread_list) {
1709
      fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
1710 1711 1712 1713
          ts.thread_id,
          ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
          ts.cf_name.c_str(),
          ThreadStatus::GetOperationName(ts.operation_type).c_str(),
1714
          ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(),
1715
          ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
1716
          ThreadStatus::GetStateName(ts.state_type).c_str());
1717 1718 1719 1720 1721 1722 1723 1724

      auto op_properties = ThreadStatus::InterpretOperationProperties(
          ts.operation_type, ts.op_properties);
      for (const auto& op_prop : op_properties) {
        fprintf(stderr, " %s %" PRIu64" |",
            op_prop.first.c_str(), op_prop.second);
      }
      fprintf(stderr, "\n");
1725 1726 1727
    }
  }

1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739
  void ResetSineInterval() {
    sine_interval_ = FLAGS_env->NowMicros();
  }

  uint64_t GetSineInterval() {
    return sine_interval_;
  }

  uint64_t GetStart() {
    return start_;
  }

1740 1741 1742 1743 1744
  void ResetLastOpTime() {
    // Set to now to avoid latency from calls to SleepForMicroseconds
    last_op_finish_ = FLAGS_env->NowMicros();
  }

1745 1746
  void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
                   enum OperationType op_type = kOthers) {
1747 1748 1749
    if (reporter_agent_) {
      reporter_agent_->ReportFinishedOps(num_ops);
    }
1750
    if (FLAGS_histogram) {
D
Dmitri Smirnov 已提交
1751 1752
      uint64_t now = FLAGS_env->NowMicros();
      uint64_t micros = now - last_op_finish_;
1753 1754 1755

      if (hist_.find(op_type) == hist_.end())
      {
1756 1757
        auto hist_temp = std::make_shared<HistogramImpl>();
        hist_.insert({op_type, std::move(hist_temp)});
1758
      }
1759
      hist_[op_type]->Add(micros);
1760

1761
      if (micros > 20000 && !FLAGS_stats_interval) {
D
Dmitri Smirnov 已提交
1762
        fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
1763 1764 1765 1766 1767
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

1768
    done_ += num_ops;
1769
    if (done_ >= next_report_) {
1770 1771 1772 1773 1774 1775 1776 1777
      if (!FLAGS_stats_interval) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
1778
        fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
1779
      } else {
D
Dmitri Smirnov 已提交
1780
        uint64_t now = FLAGS_env->NowMicros();
1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791
        int64_t usecs_since_last = now - last_report_finish_;

        // Determine whether to print status where interval is either
        // each N operations or each N seconds.

        if (FLAGS_stats_interval_seconds &&
            usecs_since_last < (FLAGS_stats_interval_seconds * 1000000)) {
          // Don't check again for this many operations
          next_report_ += FLAGS_stats_interval;

        } else {
1792

1793 1794 1795
          fprintf(stderr,
                  "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
                  "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
D
Dmitri Smirnov 已提交
1796
                  FLAGS_env->TimeToString(now/1000000).c_str(),
1797 1798 1799 1800 1801 1802 1803 1804
                  id_,
                  done_ - last_report_done_, done_,
                  (done_ - last_report_done_) /
                  (usecs_since_last / 1000000.0),
                  done_ / ((now - start_) / 1000000.0),
                  (now - last_report_finish_) / 1000000.0,
                  (now - start_) / 1000000.0);

1805
          if (id_ == 0 && FLAGS_stats_per_interval) {
1806 1807 1808 1809 1810 1811 1812
            std::string stats;

            if (db_with_cfh && db_with_cfh->num_created.load()) {
              for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) {
                if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
                                    &stats))
                  fprintf(stderr, "%s\n", stats.c_str());
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842
                if (FLAGS_show_table_properties) {
                  for (int level = 0; level < FLAGS_num_levels; ++level) {
                    if (db->GetProperty(
                            db_with_cfh->cfh[i],
                            "rocksdb.aggregated-table-properties-at-level" +
                                ToString(level),
                            &stats)) {
                      if (stats.find("# entries=0") == std::string::npos) {
                        fprintf(stderr, "Level[%d]: %s\n", level,
                                stats.c_str());
                      }
                    }
                  }
                }
              }
            } else if (db) {
              if (db->GetProperty("rocksdb.stats", &stats)) {
                fprintf(stderr, "%s\n", stats.c_str());
              }
              if (FLAGS_show_table_properties) {
                for (int level = 0; level < FLAGS_num_levels; ++level) {
                  if (db->GetProperty(
                          "rocksdb.aggregated-table-properties-at-level" +
                              ToString(level),
                          &stats)) {
                    if (stats.find("# entries=0") == std::string::npos) {
                      fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str());
                    }
                  }
                }
1843 1844
              }
            }
1845
          }
M
Mark Callaghan 已提交
1846

1847 1848 1849 1850
          next_report_ += FLAGS_stats_interval;
          last_report_finish_ = now;
          last_report_done_ = done_;
        }
1851
      }
1852 1853 1854 1855
      if (id_ == 0 && FLAGS_thread_status_per_interval) {
        PrintThreadStatus();
      }
      fflush(stderr);
1856 1857 1858 1859 1860 1861 1862 1863 1864
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
1865
    // that does not call FinishedOps().
1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
1879 1880
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
1881

D
Dhruba Borthakur 已提交
1882
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
1883
            name.ToString().c_str(),
1884
            seconds_ * 1e6 / done_,
D
Dhruba Borthakur 已提交
1885
            (long)throughput,
1886 1887 1888
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
1889 1890 1891
      for (auto it = hist_.begin(); it != hist_.end(); ++it) {
        fprintf(stdout, "Microseconds per %s:\n%s\n",
                OperationTypeString[it->first].c_str(),
1892
                it->second->ToString().c_str());
1893
      }
1894
    }
1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909
    if (FLAGS_report_file_operations) {
      ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
      ReportFileOpCounters* counters = env->counters();
      fprintf(stdout, "Num files opened: %d\n",
              counters->open_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Read(): %d\n",
              counters->read_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Append(): %d\n",
              counters->append_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
              counters->bytes_read_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
              counters->bytes_written_.load(std::memory_order_relaxed));
      env->reset();
    }
1910 1911 1912 1913
    fflush(stdout);
  }
};

1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982
class CombinedStats {
 public:
  void AddStats(const Stats& stat) {
    uint64_t total_ops = stat.done_;
    uint64_t total_bytes_ = stat.bytes_;
    double elapsed;

    if (total_ops < 1) {
      total_ops = 1;
    }

    elapsed = (stat.finish_ - stat.start_) * 1e-6;
    throughput_ops_.emplace_back(total_ops / elapsed);

    if (total_bytes_ > 0) {
      double mbs = (total_bytes_ / 1048576.0);
      throughput_mbs_.emplace_back(mbs / elapsed);
    }
  }

  void Report(const std::string& bench_name) {
    const char* name = bench_name.c_str();
    int num_runs = static_cast<int>(throughput_ops_.size());

    if (throughput_mbs_.size() == throughput_ops_.size()) {
      fprintf(stdout,
              "%s [AVG    %d runs] : %d ops/sec; %6.1f MB/sec\n"
              "%s [MEDIAN %d runs] : %d ops/sec; %6.1f MB/sec\n",
              name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)),
              CalcAvg(throughput_mbs_), name, num_runs,
              static_cast<int>(CalcMedian(throughput_ops_)),
              CalcMedian(throughput_mbs_));
    } else {
      fprintf(stdout,
              "%s [AVG    %d runs] : %d ops/sec\n"
              "%s [MEDIAN %d runs] : %d ops/sec\n",
              name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)), name,
              num_runs, static_cast<int>(CalcMedian(throughput_ops_)));
    }
  }

 private:
  double CalcAvg(std::vector<double> data) {
    double avg = 0;
    for (double x : data) {
      avg += x;
    }
    avg = avg / data.size();
    return avg;
  }

  double CalcMedian(std::vector<double> data) {
    assert(data.size() > 0);
    std::sort(data.begin(), data.end());

    size_t mid = data.size() / 2;
    if (data.size() % 2 == 1) {
      // Odd number of entries
      return data[mid];
    } else {
      // Even number of entries
      return (data[mid] + data[mid - 1]) / 2;
    }
  }

  std::vector<double> throughput_ops_;
  std::vector<double> throughput_mbs_;
};

1983 1984 1985 1986 1987 1988 1989 1990 1991 1992
class TimestampEmulator {
 private:
  std::atomic<uint64_t> timestamp_;

 public:
  TimestampEmulator() : timestamp_(0) {}
  uint64_t Get() const { return timestamp_.load(); }
  void Inc() { timestamp_++; }
};

1993 1994 1995 1996 1997
// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;
1998
  int perf_level;
1999
  std::shared_ptr<RateLimiter> write_rate_limiter;
2000
  std::shared_ptr<RateLimiter> read_rate_limiter;
2001 2002 2003 2004 2005 2006 2007

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

2008 2009
  long num_initialized;
  long num_done;
2010 2011
  bool start;

2012
  SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
2013 2014 2015 2016 2017
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
2018
  Random64 rand;         // Has different seeds for different threads
2019
  Stats stats;
2020
  SharedState* shared;
2021

A
Abhishek Kona 已提交
2022
  /* implicit */ ThreadState(int index)
2023
      : tid(index),
2024
        rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
2025 2026 2027
  }
};

M
Mark Callaghan 已提交
2028 2029
class Duration {
 public:
D
Dmitri Smirnov 已提交
2030
  Duration(uint64_t max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) {
M
Mark Callaghan 已提交
2031 2032
    max_seconds_ = max_seconds;
    max_ops_= max_ops;
2033
    ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops;
M
Mark Callaghan 已提交
2034 2035 2036 2037
    ops_ = 0;
    start_at_ = FLAGS_env->NowMicros();
  }

2038 2039
  int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; }

L
Lei Jin 已提交
2040
  bool Done(int64_t increment) {
2041
    if (increment <= 0) increment = 1;    // avoid Done(0) and infinite loops
M
Mark Callaghan 已提交
2042 2043 2044
    ops_ += increment;

    if (max_seconds_) {
2045
      // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
2046 2047
      auto granularity = FLAGS_ops_between_duration_checks;
      if ((ops_ / granularity) != ((ops_ - increment) / granularity)) {
D
Dmitri Smirnov 已提交
2048 2049
        uint64_t now = FLAGS_env->NowMicros();
        return ((now - start_at_) / 1000000) >= max_seconds_;
M
Mark Callaghan 已提交
2050 2051 2052 2053 2054 2055 2056 2057 2058
      } else {
        return false;
      }
    } else {
      return ops_ > max_ops_;
    }
  }

 private:
D
Dmitri Smirnov 已提交
2059
  uint64_t max_seconds_;
2060
  int64_t max_ops_;
2061
  int64_t ops_per_stage_;
2062
  int64_t ops_;
D
Dmitri Smirnov 已提交
2063
  uint64_t start_at_;
M
Mark Callaghan 已提交
2064 2065
};

J
jorlow@chromium.org 已提交
2066 2067
class Benchmark {
 private:
2068 2069 2070
  std::shared_ptr<Cache> cache_;
  std::shared_ptr<Cache> compressed_cache_;
  std::shared_ptr<const FilterPolicy> filter_policy_;
T
Tyler Harter 已提交
2071
  const SliceTransform* prefix_extractor_;
2072 2073
  DBWithColumnFamilies db_;
  std::vector<DBWithColumnFamilies> multi_dbs_;
2074
  int64_t num_;
2075
  int value_size_;
2076
  int key_size_;
2077 2078
  int prefix_size_;
  int64_t keys_per_prefix_;
L
Lei Jin 已提交
2079
  int64_t entries_per_batch_;
2080
  int64_t writes_before_delete_range_;
A
Andrew Kryczka 已提交
2081 2082 2083
  int64_t writes_per_range_tombstone_;
  int64_t range_tombstone_width_;
  int64_t max_num_range_tombstones_;
2084
  WriteOptions write_options_;
2085
  Options open_options_;  // keep options around to properly destroy db later
2086
#ifndef ROCKSDB_LITE
2087
  TraceOptions trace_options_;
2088
#endif
2089
  int64_t reads_;
Y
Yueh-Hsuan Chiang 已提交
2090
  int64_t deletes_;
2091
  double read_random_exp_range_;
2092 2093 2094
  int64_t writes_;
  int64_t readwrites_;
  int64_t merge_keys_;
2095
  bool report_file_operations_;
Y
Yi Wu 已提交
2096
  bool use_blob_db_;
2097
  std::vector<std::string> keys_;
2098

2099 2100
  class ErrorHandlerListener : public EventListener {
   public:
S
Siying Dong 已提交
2101
#ifndef ROCKSDB_LITE
2102 2103 2104 2105 2106 2107
    ErrorHandlerListener()
        : mutex_(),
          cv_(&mutex_),
          no_auto_recovery_(false),
          recovery_complete_(false) {}

2108
    ~ErrorHandlerListener() override {}
2109 2110

    void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
2111 2112
                              Status /*bg_error*/,
                              bool* auto_recovery) override {
2113 2114 2115 2116 2117
      if (*auto_recovery && no_auto_recovery_) {
        *auto_recovery = false;
      }
    }

2118
    void OnErrorRecoveryCompleted(Status /*old_bg_error*/) override {
2119 2120 2121 2122 2123
      InstrumentedMutexLock l(&mutex_);
      recovery_complete_ = true;
      cv_.SignalAll();
    }

Y
Yi Wu 已提交
2124
    bool WaitForRecovery(uint64_t abs_time_us) {
2125 2126
      InstrumentedMutexLock l(&mutex_);
      if (!recovery_complete_) {
Y
Yi Wu 已提交
2127
        cv_.TimedWait(abs_time_us);
2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142
      }
      if (recovery_complete_) {
        recovery_complete_ = false;
        return true;
      }
      return false;
    }

    void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }

   private:
    InstrumentedMutex mutex_;
    InstrumentedCondVar cv_;
    bool no_auto_recovery_;
    bool recovery_complete_;
S
Siying Dong 已提交
2143 2144 2145 2146
#else   // ROCKSDB_LITE
    bool WaitForRecovery(uint64_t /*abs_time_us*/) { return true; }
    void EnableAutoRecovery(bool /*enable*/) {}
#endif  // ROCKSDB_LITE
2147 2148 2149 2150
  };

  std::shared_ptr<ErrorHandlerListener> listener_;

2151 2152 2153 2154 2155 2156 2157 2158
  bool SanityCheck() {
    if (FLAGS_compression_ratio > 1) {
      fprintf(stderr, "compression_ratio should be between 0 and 1\n");
      return false;
    }
    return true;
  }

2159
  inline bool CompressSlice(const CompressionInfo& compression_info,
2160
                            const Slice& input, std::string* compressed) {
2161 2162 2163
    bool ok = true;
    switch (FLAGS_compression_type_e) {
      case rocksdb::kSnappyCompression:
2164
        ok = Snappy_Compress(compression_info, input.data(), input.size(),
2165
                             compressed);
2166 2167
        break;
      case rocksdb::kZlibCompression:
2168
        ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
2169
                           compressed);
2170 2171
        break;
      case rocksdb::kBZip2Compression:
2172
        ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
2173
                            compressed);
2174 2175
        break;
      case rocksdb::kLZ4Compression:
2176
        ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
2177
                          compressed);
2178 2179
        break;
      case rocksdb::kLZ4HCCompression:
2180
        ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
2181
                            compressed);
2182
        break;
2183 2184 2185 2186
      case rocksdb::kXpressCompression:
        ok = XPRESS_Compress(input.data(),
          input.size(), compressed);
        break;
S
sdong 已提交
2187
      case rocksdb::kZSTD:
2188
        ok = ZSTD_Compress(compression_info, input.data(), input.size(),
2189
                           compressed);
2190
        break;
2191 2192 2193 2194 2195 2196
      default:
        ok = false;
    }
    return ok;
  }

2197 2198
  void PrintHeader() {
    PrintEnvironment();
2199
    fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
2200 2201 2202
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
2203 2204 2205
    fprintf(stdout, "Entries:    %" PRIu64 "\n", num_);
    fprintf(stdout, "Prefix:    %d bytes\n", FLAGS_prefix_size);
    fprintf(stdout, "Keys per prefix:    %" PRIu64 "\n", keys_per_prefix_);
2206
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
2207
            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
2208
             / 1048576.0));
2209
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
2210 2211
            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
              * num_)
2212
             / 1048576.0));
2213 2214
    fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
            FLAGS_benchmark_write_rate_limit);
2215 2216
    fprintf(stdout, "Read rate: %" PRIu64 " ops/second\n",
            FLAGS_benchmark_read_rate_limit);
2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228
    if (FLAGS_enable_numa) {
      fprintf(stderr, "Running in NUMA enabled mode.\n");
#ifndef NUMA
      fprintf(stderr, "NUMA is not defined in the system.\n");
      exit(1);
#else
      if (numa_available() == -1) {
        fprintf(stderr, "NUMA is not supported by the system.\n");
        exit(1);
      }
#endif
    }
2229

N
Nathan Bronson 已提交
2230 2231
    auto compression = CompressionTypeToString(FLAGS_compression_type_e);
    fprintf(stdout, "Compression: %s\n", compression.c_str());
2232 2233
    fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
            FLAGS_sample_for_compression);
2234

J
Jim Paton 已提交
2235 2236 2237 2238 2239 2240 2241 2242 2243 2244
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
        fprintf(stdout, "Memtablerep: prefix_hash\n");
        break;
      case kSkipList:
        fprintf(stdout, "Memtablerep: skip_list\n");
        break;
      case kVectorRep:
        fprintf(stdout, "Memtablerep: vector\n");
        break;
2245 2246 2247
      case kHashLinkedList:
        fprintf(stdout, "Memtablerep: hash_linkedlist\n");
        break;
J
Jim Paton 已提交
2248
    }
2249
    fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
J
Jim Paton 已提交
2250

N
Nathan Bronson 已提交
2251
    PrintWarnings(compression.c_str());
2252 2253 2254
    fprintf(stdout, "------------------------------------------------\n");
  }

2255
  void PrintWarnings(const char* compression) {
2256 2257 2258 2259 2260 2261 2262 2263 2264
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
2265
    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
2266 2267
      // The test string should not be too small.
      const int len = FLAGS_block_size;
2268
      std::string input_str(len, 'y');
2269
      std::string compressed;
2270 2271 2272
      CompressionOptions opts;
      CompressionContext context(FLAGS_compression_type_e);
      CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
2273 2274
                           FLAGS_compression_type_e,
                           FLAGS_sample_for_compression);
2275
      bool result = CompressSlice(info, Slice(input_str), &compressed);
2276 2277

      if (!result) {
2278 2279 2280 2281 2282
        fprintf(stdout, "WARNING: %s compression is not enabled\n",
                compression);
      } else if (compressed.size() >= input_str.size()) {
        fprintf(stdout, "WARNING: %s compression is not effective\n",
                compression);
2283
      }
2284
    }
2285 2286
  }

K
kailiu 已提交
2287 2288 2289 2290 2291 2292 2293
// Current the following isn't equivalent to OS_LINUX.
#if defined(__linux)
  static Slice TrimSpace(Slice s) {
    unsigned int start = 0;
    while (start < s.size() && isspace(s[start])) {
      start++;
    }
S
sdong 已提交
2294
    unsigned int limit = static_cast<unsigned int>(s.size());
K
kailiu 已提交
2295 2296 2297 2298 2299 2300 2301
    while (limit > start && isspace(s[limit-1])) {
      limit--;
    }
    return Slice(s.data() + start, limit - start);
  }
#endif

2302
  void PrintEnvironment() {
H
Hyunyoung Lee 已提交
2303
    fprintf(stderr, "RocksDB:    version %d.%d\n",
2304 2305 2306
            kMajorVersion, kMinorVersion);

#if defined(__linux)
2307
    time_t now = time(nullptr);
2308 2309 2310 2311 2312
    char buf[52];
    // Lint complains about ctime() usage, so replace it with ctime_r(). The
    // requirement is to provide a buffer which is at least 26 bytes.
    fprintf(stderr, "Date:       %s",
            ctime_r(&now, buf));  // ctime_r() adds newline
2313 2314

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
2315
    if (cpuinfo != nullptr) {
2316 2317 2318 2319
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
2320
      while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
2321
        const char* sep = strchr(line, ':');
2322
        if (sep == nullptr) {
2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362
  static bool KeyExpired(const TimestampEmulator* timestamp_emulator,
                         const Slice& key) {
    const char* pos = key.data();
    pos += 8;
    uint64_t timestamp = 0;
    if (port::kLittleEndian) {
      int bytes_to_fill = 8;
      for (int i = 0; i < bytes_to_fill; ++i) {
        timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
                      << ((bytes_to_fill - i - 1) << 3));
      }
    } else {
      memcpy(&timestamp, pos, sizeof(timestamp));
    }
    return timestamp_emulator->Get() - timestamp > FLAGS_time_range;
  }

  class ExpiredTimeFilter : public CompactionFilter {
   public:
    explicit ExpiredTimeFilter(
        const std::shared_ptr<TimestampEmulator>& timestamp_emulator)
        : timestamp_emulator_(timestamp_emulator) {}
A
Andrew Kryczka 已提交
2363 2364 2365
    bool Filter(int /*level*/, const Slice& key,
                const Slice& /*existing_value*/, std::string* /*new_value*/,
                bool* /*value_changed*/) const override {
2366 2367 2368 2369 2370 2371 2372 2373
      return KeyExpired(timestamp_emulator_.get(), key);
    }
    const char* Name() const override { return "ExpiredTimeFilter"; }

   private:
    std::shared_ptr<TimestampEmulator> timestamp_emulator_;
  };

2374 2375
  class KeepFilter : public CompactionFilter {
   public:
2376 2377 2378
    bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
                std::string* /*new_value*/,
                bool* /*value_changed*/) const override {
2379 2380 2381
      return false;
    }

2382
    const char* Name() const override { return "KeepFilter"; }
2383 2384
  };

Y
Yi Wu 已提交
2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396
  std::shared_ptr<Cache> NewCache(int64_t capacity) {
    if (capacity <= 0) {
      return nullptr;
    }
    if (FLAGS_use_clock_cache) {
      auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
      if (!cache) {
        fprintf(stderr, "Clock cache not supported.");
        exit(1);
      }
      return cache;
    } else {
2397 2398 2399
      return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits,
                         false /*strict_capacity_limit*/,
                         FLAGS_cache_high_pri_pool_ratio);
Y
Yi Wu 已提交
2400 2401 2402
    }
  }

J
jorlow@chromium.org 已提交
2403
 public:
2404
  Benchmark()
Y
Yi Wu 已提交
2405 2406
      : cache_(NewCache(FLAGS_cache_size)),
        compressed_cache_(NewCache(FLAGS_compressed_cache_size)),
2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425
        filter_policy_(FLAGS_bloom_bits >= 0
                           ? NewBloomFilterPolicy(FLAGS_bloom_bits,
                                                  FLAGS_use_block_based_filter)
                           : nullptr),
        prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
        num_(FLAGS_num),
        value_size_(FLAGS_value_size),
        key_size_(FLAGS_key_size),
        prefix_size_(FLAGS_prefix_size),
        keys_per_prefix_(FLAGS_keys_per_prefix),
        entries_per_batch_(1),
        reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
        read_random_exp_range_(0.0),
        writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
        readwrites_(
            (FLAGS_writes < 0 && FLAGS_reads < 0)
                ? FLAGS_num
                : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)),
        merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
Y
Yi Wu 已提交
2426 2427
        report_file_operations_(FLAGS_report_file_operations),
#ifndef ROCKSDB_LITE
2428
        use_blob_db_(FLAGS_use_blob_db)
Y
Yi Wu 已提交
2429
#else
2430
        use_blob_db_(false)
Y
Yi Wu 已提交
2431
#endif  // !ROCKSDB_LITE
2432
  {
2433 2434 2435 2436 2437 2438 2439 2440 2441 2442
    // use simcache instead of cache
    if (FLAGS_simcache_size >= 0) {
      if (FLAGS_cache_numshardbits >= 1) {
        cache_ =
            NewSimCache(cache_, FLAGS_simcache_size, FLAGS_cache_numshardbits);
      } else {
        cache_ = NewSimCache(cache_, FLAGS_simcache_size, 0);
      }
    }

2443 2444 2445 2446 2447 2448 2449 2450 2451 2452
    if (report_file_operations_) {
      if (!FLAGS_hdfs.empty()) {
        fprintf(stderr,
                "--hdfs and --report_file_operations cannot be enabled "
                "at the same time");
        exit(1);
      }
      FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
    }

2453 2454 2455 2456 2457
    if (FLAGS_prefix_size > FLAGS_key_size) {
      fprintf(stderr, "prefix size is larger than key size");
      exit(1);
    }

J
jorlow@chromium.org 已提交
2458
    std::vector<std::string> files;
2459
    FLAGS_env->GetChildren(FLAGS_db, &files);
2460
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
2461
      if (Slice(files[i]).starts_with("heap-")) {
2462
        FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
J
jorlow@chromium.org 已提交
2463 2464
      }
    }
2465
    if (!FLAGS_use_existing_db) {
2466 2467 2468 2469
      Options options;
      if (!FLAGS_wal_dir.empty()) {
        options.wal_dir = FLAGS_wal_dir;
      }
Y
Yi Wu 已提交
2470 2471 2472 2473 2474
#ifndef ROCKSDB_LITE
      if (use_blob_db_) {
        blob_db::DestroyBlobDB(FLAGS_db, options, blob_db::BlobDBOptions());
      }
#endif  // !ROCKSDB_LITE
2475
      DestroyDB(FLAGS_db, options);
2476 2477 2478 2479 2480 2481 2482 2483 2484 2485
      if (!FLAGS_wal_dir.empty()) {
        FLAGS_env->DeleteDir(FLAGS_wal_dir);
      }

      if (FLAGS_num_multi_db > 1) {
        FLAGS_env->CreateDir(FLAGS_db);
        if (!FLAGS_wal_dir.empty()) {
          FLAGS_env->CreateDir(FLAGS_wal_dir);
        }
      }
2486
    }
2487 2488

    listener_.reset(new ErrorHandlerListener());
J
jorlow@chromium.org 已提交
2489 2490 2491
  }

  ~Benchmark() {
A
agiardullo 已提交
2492
    db_.DeleteDBs();
T
Tyler Harter 已提交
2493
    delete prefix_extractor_;
I
Igor Canadi 已提交
2494 2495 2496 2497
    if (cache_.get() != nullptr) {
      // this will leak, but we're shutting down so nobody cares
      cache_->DisownData();
    }
J
jorlow@chromium.org 已提交
2498 2499
  }

2500
  Slice AllocateKey(std::unique_ptr<const char[]>* key_guard) {
2501 2502 2503
    char* data = new char[key_size_];
    const char* const_data = data;
    key_guard->reset(const_data);
2504
    return Slice(key_guard->get(), key_size_);
L
Lei Jin 已提交
2505 2506
  }

2507 2508
  // Generate key according to the given specification and random number.
  // The resulting key will have the following format (if keys_per_prefix_
2509
  // is positive), extra trailing bytes are either cut off or padded with '0'.
2510 2511 2512 2513 2514 2515 2516 2517 2518
  // The prefix value is derived from key value.
  //   ----------------------------
  //   | prefix 00000 | key 00000 |
  //   ----------------------------
  // If keys_per_prefix_ is 0, the key is simply a binary representation of
  // random number followed by trailing '0's
  //   ----------------------------
  //   |        key 00000         |
  //   ----------------------------
L
Lei Jin 已提交
2519
  void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
2520 2521 2522 2523 2524 2525 2526
    if (!keys_.empty()) {
      assert(FLAGS_use_existing_keys);
      assert(keys_.size() == static_cast<size_t>(num_keys));
      assert(v < static_cast<uint64_t>(num_keys));
      *key = keys_[v];
      return;
    }
L
Lei Jin 已提交
2527
    char* start = const_cast<char*>(key->data());
2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558
    char* pos = start;
    if (keys_per_prefix_ > 0) {
      int64_t num_prefix = num_keys / keys_per_prefix_;
      int64_t prefix = v % num_prefix;
      int bytes_to_fill = std::min(prefix_size_, 8);
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
      }
      if (prefix_size_ > 8) {
        // fill the rest with 0s
        memset(pos + 8, '0', prefix_size_ - 8);
      }
      pos += prefix_size_;
    }

    int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
    if (port::kLittleEndian) {
      for (int i = 0; i < bytes_to_fill; ++i) {
        pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
      }
    } else {
      memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
    }
    pos += bytes_to_fill;
    if (key_size_ > pos - start) {
      memset(pos, '0', key_size_ - (pos - start));
    }
X
Xing Jin 已提交
2559 2560
  }

2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571
  void GenerateKeyFromIntForSeek(uint64_t v, int64_t num_keys, Slice* key) {
    GenerateKeyFromInt(v, num_keys, key);
    if (FLAGS_seek_missing_prefix) {
      assert(prefix_size_ > 8);
      char* key_ptr = const_cast<char*>(key->data());
      // This rely on GenerateKeyFromInt filling paddings with '0's.
      // Putting a '1' will create a non-existing prefix.
      key_ptr[8] = '1';
    }
  }

2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583
  std::string GetPathForMultiple(std::string base_name, size_t id) {
    if (!base_name.empty()) {
#ifndef OS_WIN
      if (base_name.back() != '/') {
        base_name += '/';
      }
#else
      if (base_name.back() != '\\') {
        base_name += '\\';
      }
#endif
    }
2584
    return base_name + ToString(id);
2585 2586
  }

2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601
  void VerifyDBFromDB(std::string& truth_db_name) {
    DBWithColumnFamilies truth_db;
    auto s = DB::OpenForReadOnly(open_options_, truth_db_name, &truth_db.db);
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
    ReadOptions ro;
    ro.total_order_seek = true;
    std::unique_ptr<Iterator> truth_iter(truth_db.db->NewIterator(ro));
    std::unique_ptr<Iterator> db_iter(db_.db->NewIterator(ro));
    // Verify that all the key/values in truth_db are retrivable in db with
    // ::Get
    fprintf(stderr, "Verifying db >= truth_db with ::Get...\n");
    for (truth_iter->SeekToFirst(); truth_iter->Valid(); truth_iter->Next()) {
2602 2603 2604 2605 2606
      std::string value;
      s = db_.db->Get(ro, truth_iter->key(), &value);
      assert(s.ok());
      // TODO(myabandeh): provide debugging hints
      assert(Slice(value) == truth_iter->value());
2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617
    }
    // Verify that the db iterator does not give any extra key/value
    fprintf(stderr, "Verifying db == truth_db...\n");
    for (db_iter->SeekToFirst(), truth_iter->SeekToFirst(); db_iter->Valid();
         db_iter->Next(), truth_iter->Next()) {
      assert(truth_iter->Valid());
      assert(truth_iter->value() == db_iter->value());
    }
    // No more key should be left unchecked in truth_db
    assert(!truth_iter->Valid());
    fprintf(stderr, "...Verified\n");
2618 2619
  }

J
jorlow@chromium.org 已提交
2620
  void Run() {
2621 2622 2623
    if (!SanityCheck()) {
      exit(1);
    }
2624
    Open(&open_options_);
2625
    PrintHeader();
2626 2627
    std::stringstream benchmark_stream(FLAGS_benchmarks);
    std::string name;
2628
    std::unique_ptr<ExpiredTimeFilter> filter;
2629
    while (std::getline(benchmark_stream, name, ',')) {
X
Xing Jin 已提交
2630
      // Sanitize parameters
2631
      num_ = FLAGS_num;
2632
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
2633
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
Y
Yueh-Hsuan Chiang 已提交
2634
      deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
2635
      value_size_ = FLAGS_value_size;
2636
      key_size_ = FLAGS_key_size;
2637
      entries_per_batch_ = FLAGS_batch_size;
2638
      writes_before_delete_range_ = FLAGS_writes_before_delete_range;
A
Andrew Kryczka 已提交
2639 2640 2641
      writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
      range_tombstone_width_ = FLAGS_range_tombstone_width;
      max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
2642
      write_options_ = WriteOptions();
2643
      read_random_exp_range_ = FLAGS_read_random_exp_range;
2644 2645 2646
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
H
heyongqiang 已提交
2647 2648
      write_options_.disableWAL = FLAGS_disable_wal;

2649
      void (Benchmark::*method)(ThreadState*) = nullptr;
A
agiardullo 已提交
2650 2651
      void (Benchmark::*post_process_method)() = nullptr;

2652
      bool fresh_db = false;
2653
      int num_threads = FLAGS_threads;
2654

2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684
      int num_repeat = 1;
      int num_warmup = 0;
      if (!name.empty() && *name.rbegin() == ']') {
        auto it = name.find('[');
        if (it == std::string::npos) {
          fprintf(stderr, "unknown benchmark arguments '%s'\n", name.c_str());
          exit(1);
        }
        std::string args = name.substr(it + 1);
        args.resize(args.size() - 1);
        name.resize(it);

        std::string bench_arg;
        std::stringstream args_stream(args);
        while (std::getline(args_stream, bench_arg, '-')) {
          if (bench_arg.empty()) {
            continue;
          }
          if (bench_arg[0] == 'X') {
            // Repeat the benchmark n times
            std::string num_str = bench_arg.substr(1);
            num_repeat = std::stoi(num_str);
          } else if (bench_arg[0] == 'W') {
            // Warm up the benchmark for n times
            std::string num_str = bench_arg.substr(1);
            num_warmup = std::stoi(num_str);
          }
        }
      }

2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708
      // Both fillseqdeterministic and filluniquerandomdeterministic
      // fill the levels except the max level with UNIQUE_RANDOM
      // and fill the max level with fillseq and filluniquerandom, respectively
      if (name == "fillseqdeterministic" ||
          name == "filluniquerandomdeterministic") {
        if (!FLAGS_disable_auto_compactions) {
          fprintf(stderr,
                  "Please disable_auto_compactions in FillDeterministic "
                  "benchmark\n");
          exit(1);
        }
        if (num_threads > 1) {
          fprintf(stderr,
                  "filldeterministic multithreaded not supported"
                  ", use 1 thread\n");
          num_threads = 1;
        }
        fresh_db = true;
        if (name == "fillseqdeterministic") {
          method = &Benchmark::WriteSeqDeterministic;
        } else {
          method = &Benchmark::WriteUniqueRandomDeterministic;
        }
      } else if (name == "fillseq") {
2709 2710
        fresh_db = true;
        method = &Benchmark::WriteSeq;
2711
      } else if (name == "fillbatch") {
2712 2713 2714
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
2715
      } else if (name == "fillrandom") {
2716 2717
        fresh_db = true;
        method = &Benchmark::WriteRandom;
2718
      } else if (name == "filluniquerandom") {
2719 2720
        fresh_db = true;
        if (num_threads > 1) {
2721 2722 2723
          fprintf(stderr,
                  "filluniquerandom multithreaded not supported"
                  ", use 1 thread");
2724
          num_threads = 1;
2725 2726
        }
        method = &Benchmark::WriteUniqueRandom;
2727
      } else if (name == "overwrite") {
2728
        method = &Benchmark::WriteRandom;
2729
      } else if (name == "fillsync") {
2730 2731 2732 2733
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
2734
      } else if (name == "fill100K") {
2735 2736 2737 2738
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
2739
      } else if (name == "readseq") {
2740
        method = &Benchmark::ReadSequential;
2741
      } else if (name == "readtocache") {
M
Mark Callaghan 已提交
2742 2743 2744
        method = &Benchmark::ReadSequential;
        num_threads = 1;
        reads_ = num_;
2745
      } else if (name == "readreverse") {
2746
        method = &Benchmark::ReadReverse;
2747
      } else if (name == "readrandom") {
2748 2749 2750 2751
        if (FLAGS_multiread_stride) {
          fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
                  entries_per_batch_);
        }
2752
        method = &Benchmark::ReadRandom;
2753
      } else if (name == "readrandomfast") {
L
Lei Jin 已提交
2754
        method = &Benchmark::ReadRandomFast;
2755
      } else if (name == "multireadrandom") {
M
mike@arpaia.co 已提交
2756 2757
        fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
                entries_per_batch_);
L
Lei Jin 已提交
2758
        method = &Benchmark::MultiReadRandom;
2759 2760
      } else if (name == "mixgraph") {
        method = &Benchmark::MixGraph;
2761
      } else if (name == "readmissing") {
L
Lei Jin 已提交
2762 2763
        ++key_size_;
        method = &Benchmark::ReadRandom;
2764
      } else if (name == "newiterator") {
2765
        method = &Benchmark::IteratorCreation;
2766
      } else if (name == "newiteratorwhilewriting") {
2767 2768
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::IteratorCreationWhileWriting;
2769
      } else if (name == "seekrandom") {
S
Sanjay Ghemawat 已提交
2770
        method = &Benchmark::SeekRandom;
2771
      } else if (name == "seekrandomwhilewriting") {
L
Lei Jin 已提交
2772 2773
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::SeekRandomWhileWriting;
2774
      } else if (name == "seekrandomwhilemerging") {
2775 2776
        num_threads++;  // Add extra thread for merging
        method = &Benchmark::SeekRandomWhileMerging;
2777
      } else if (name == "readrandomsmall") {
2778
        reads_ /= 1000;
2779
        method = &Benchmark::ReadRandom;
2780
      } else if (name == "deleteseq") {
S
Sanjay Ghemawat 已提交
2781
        method = &Benchmark::DeleteSeq;
2782
      } else if (name == "deleterandom") {
S
Sanjay Ghemawat 已提交
2783
        method = &Benchmark::DeleteRandom;
2784
      } else if (name == "readwhilewriting") {
2785 2786
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
2787
      } else if (name == "readwhilemerging") {
M
Mark Callaghan 已提交
2788 2789
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileMerging;
Y
Yi Wu 已提交
2790 2791 2792
      } else if (name == "readwhilescanning") {
        num_threads++;  // Add extra thread for scaning
        method = &Benchmark::ReadWhileScanning;
2793
      } else if (name == "readrandomwriterandom") {
2794
        method = &Benchmark::ReadRandomWriteRandom;
2795
      } else if (name == "readrandommergerandom") {
2796 2797
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2798
                  name.c_str());
L
Lei Jin 已提交
2799
          exit(1);
2800
        }
L
Lei Jin 已提交
2801
        method = &Benchmark::ReadRandomMergeRandom;
2802
      } else if (name == "updaterandom") {
M
Mark Callaghan 已提交
2803
        method = &Benchmark::UpdateRandom;
P
Pooya Shareghi 已提交
2804 2805
      } else if (name == "xorupdaterandom") {
        method = &Benchmark::XORUpdateRandom;
2806
      } else if (name == "appendrandom") {
D
Deon Nicholas 已提交
2807
        method = &Benchmark::AppendRandom;
2808
      } else if (name == "mergerandom") {
D
Deon Nicholas 已提交
2809 2810
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2811
                  name.c_str());
L
Lei Jin 已提交
2812
          exit(1);
D
Deon Nicholas 已提交
2813
        }
L
Lei Jin 已提交
2814
        method = &Benchmark::MergeRandom;
2815
      } else if (name == "randomwithverify") {
2816
        method = &Benchmark::RandomWithVerify;
2817
      } else if (name == "fillseekseq") {
T
Tomislav Novak 已提交
2818
        method = &Benchmark::WriteSeqSeekSeq;
2819
      } else if (name == "compact") {
2820
        method = &Benchmark::Compact;
2821 2822
      } else if (name == "compactall") {
        CompactAll();
2823
      } else if (name == "crc32c") {
2824
        method = &Benchmark::Crc32c;
2825
      } else if (name == "xxhash") {
I
xxHash  
Igor Canadi 已提交
2826
        method = &Benchmark::xxHash;
2827
      } else if (name == "acquireload") {
2828
        method = &Benchmark::AcquireLoad;
2829
      } else if (name == "compress") {
A
Albert Strasheim 已提交
2830
        method = &Benchmark::Compress;
2831
      } else if (name == "uncompress") {
A
Albert Strasheim 已提交
2832
        method = &Benchmark::Uncompress;
2833
#ifndef ROCKSDB_LITE
2834
      } else if (name == "randomtransaction") {
A
agiardullo 已提交
2835 2836
        method = &Benchmark::RandomTransaction;
        post_process_method = &Benchmark::RandomTransactionVerify;
2837
#endif  // ROCKSDB_LITE
A
Andres Noetzli 已提交
2838 2839 2840
      } else if (name == "randomreplacekeys") {
        fresh_db = true;
        method = &Benchmark::RandomReplaceKeys;
2841 2842 2843 2844 2845 2846 2847 2848 2849
      } else if (name == "timeseries") {
        timestamp_emulator_.reset(new TimestampEmulator());
        if (FLAGS_expire_style == "compaction_filter") {
          filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
          fprintf(stdout, "Compaction filter is used to remove expired data");
          open_options_.compaction_filter = filter.get();
        }
        fresh_db = true;
        method = &Benchmark::TimeSeries;
2850
      } else if (name == "stats") {
2851
        PrintStats("rocksdb.stats");
S
Siying Dong 已提交
2852 2853
      } else if (name == "resetstats") {
        ResetStats();
2854 2855
      } else if (name == "verify") {
        VerifyDBFromDB(FLAGS_truth_db);
2856
      } else if (name == "levelstats") {
2857
        PrintStats("rocksdb.levelstats");
2858
      } else if (name == "sstables") {
2859
        PrintStats("rocksdb.sstables");
2860 2861 2862 2863 2864 2865 2866 2867 2868 2869
      } else if (name == "replay") {
        if (num_threads > 1) {
          fprintf(stderr, "Multi-threaded replay is not yet supported\n");
          exit(1);
        }
        if (FLAGS_trace_file == "") {
          fprintf(stderr, "Please set --trace_file to be replayed from\n");
          exit(1);
        }
        method = &Benchmark::Replay;
2870 2871 2872
      } else if (!name.empty()) {  // No error message for empty name
        fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
        exit(1);
2873
      }
2874 2875 2876 2877

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
2878
                  name.c_str());
2879
          method = nullptr;
2880
        } else {
2881
          if (db_.db != nullptr) {
A
agiardullo 已提交
2882
            db_.DeleteDBs();
2883
            DestroyDB(FLAGS_db, open_options_);
2884
          }
2885
          Options options = open_options_;
2886
          for (size_t i = 0; i < multi_dbs_.size(); i++) {
2887
            delete multi_dbs_[i].db;
2888 2889 2890 2891
            if (!open_options_.wal_dir.empty()) {
              options.wal_dir = GetPathForMultiple(open_options_.wal_dir, i);
            }
            DestroyDB(GetPathForMultiple(FLAGS_db, i), options);
2892 2893
          }
          multi_dbs_.clear();
2894
        }
2895
        Open(&open_options_);  // use open_options for the last accessed
2896 2897
      }

2898
      if (method != nullptr) {
2899
        fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2900

2901
#ifndef ROCKSDB_LITE
2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923
        // A trace_file option can be provided both for trace and replay
        // operations. But db_bench does not support tracing and replaying at
        // the same time, for now. So, start tracing only when it is not a
        // replay.
        if (FLAGS_trace_file != "" && name != "replay") {
          std::unique_ptr<TraceWriter> trace_writer;
          Status s = NewFileTraceWriter(FLAGS_env, EnvOptions(),
                                        FLAGS_trace_file, &trace_writer);
          if (!s.ok()) {
            fprintf(stderr, "Encountered an error starting a trace, %s\n",
                    s.ToString().c_str());
            exit(1);
          }
          s = db_.db->StartTrace(trace_options_, std::move(trace_writer));
          if (!s.ok()) {
            fprintf(stderr, "Encountered an error starting a trace, %s\n",
                    s.ToString().c_str());
            exit(1);
          }
          fprintf(stdout, "Tracing the workload to: [%s]\n",
                  FLAGS_trace_file.c_str());
        }
2924
#endif  // ROCKSDB_LITE
2925

2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945
        if (num_warmup > 0) {
          printf("Warming up benchmark by running %d times\n", num_warmup);
        }

        for (int i = 0; i < num_warmup; i++) {
          RunBenchmark(num_threads, name, method);
        }

        if (num_repeat > 1) {
          printf("Running benchmark for %d times\n", num_repeat);
        }

        CombinedStats combined_stats;
        for (int i = 0; i < num_repeat; i++) {
          Stats stats = RunBenchmark(num_threads, name, method);
          combined_stats.AddStats(stats);
        }
        if (num_repeat > 1) {
          combined_stats.Report(name);
        }
J
jorlow@chromium.org 已提交
2946
      }
A
agiardullo 已提交
2947 2948 2949
      if (post_process_method != nullptr) {
        (this->*post_process_method)();
      }
J
jorlow@chromium.org 已提交
2950
    }
2951

2952 2953 2954 2955 2956 2957
    if (secondary_update_thread_) {
      secondary_update_stopped_.store(1, std::memory_order_relaxed);
      secondary_update_thread_->join();
      secondary_update_thread_.reset();
    }

2958
#ifndef ROCKSDB_LITE
2959 2960 2961 2962 2963 2964 2965
    if (name != "replay" && FLAGS_trace_file != "") {
      Status s = db_.db->EndTrace();
      if (!s.ok()) {
        fprintf(stderr, "Encountered an error ending the trace, %s\n",
                s.ToString().c_str());
      }
    }
2966
#endif  // ROCKSDB_LITE
2967

2968
    if (FLAGS_statistics) {
K
krad 已提交
2969
      fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2970
    }
I
Islam AbdelRahman 已提交
2971
    if (FLAGS_simcache_size >= 0) {
2972
      fprintf(stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
S
Siying Dong 已提交
2973 2974 2975
              static_cast_with_check<SimCache, Cache>(cache_.get())
                  ->ToString()
                  .c_str());
2976
    }
2977 2978

#ifndef ROCKSDB_LITE
2979 2980 2981 2982
    if (FLAGS_use_secondary_db) {
      fprintf(stdout, "Secondary instance updated  %" PRIu64 " times.\n",
              secondary_db_updates_);
    }
2983
#endif  // ROCKSDB_LITE
J
jorlow@chromium.org 已提交
2984 2985
  }

2986
 private:
2987
  std::shared_ptr<TimestampEmulator> timestamp_emulator_;
2988 2989 2990
  std::unique_ptr<port::Thread> secondary_update_thread_;
  std::atomic<int> secondary_update_stopped_{0};
  uint64_t secondary_db_updates_ = 0;
2991

2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

3014
    SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
3015
    perf_context.EnablePerLevelPerfContext();
3016
    thread->stats.Start(thread->tid);
3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

3029 3030
  Stats RunBenchmark(int n, Slice name,
                     void (Benchmark::*method)(ThreadState*)) {
3031 3032 3033 3034 3035
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;
3036 3037 3038 3039
    if (FLAGS_benchmark_write_rate_limit > 0) {
      shared.write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
3040
    if (FLAGS_benchmark_read_rate_limit > 0) {
3041 3042 3043
      shared.read_rate_limiter.reset(NewGenericRateLimiter(
          FLAGS_benchmark_read_rate_limit, 100000 /* refill_period_us */,
          10 /* fairness */, RateLimiter::Mode::kReadsOnly));
3044
    }
3045

3046 3047 3048 3049 3050 3051
    std::unique_ptr<ReporterAgent> reporter_agent;
    if (FLAGS_report_interval_seconds > 0) {
      reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
                                             FLAGS_report_interval_seconds));
    }

3052
    ThreadArg* arg = new ThreadArg[n];
3053

3054
    for (int i = 0; i < n; i++) {
3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071
#ifdef NUMA
      if (FLAGS_enable_numa) {
        // Performs a local allocation of memory to threads in numa node.
        int n_nodes = numa_num_task_nodes();  // Number of nodes in NUMA.
        numa_exit_on_error = 1;
        int numa_node = i % n_nodes;
        bitmask* nodes = numa_allocate_nodemask();
        numa_bitmask_clearall(nodes);
        numa_bitmask_setbit(nodes, numa_node);
        // numa_bind() call binds the process to the node and these
        // properties are passed on to the thread that is created in
        // StartThread method called later in the loop.
        numa_bind(nodes);
        numa_set_strict(1);
        numa_free_nodemask(nodes);
      }
#endif
3072 3073 3074 3075
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
3076
      arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
3077
      arg[i].thread->shared = &shared;
3078
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

3093 3094 3095 3096
    // Stats for some threads can be excluded.
    Stats merge_stats;
    for (int i = 0; i < n; i++) {
      merge_stats.Merge(arg[i].thread->stats);
3097
    }
3098
    merge_stats.Report(name);
3099

3100 3101 3102 3103 3104
    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;

3105
    return merge_stats;
3106 3107 3108
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
3109
    // Checksum about 500MB of data total
3110 3111 3112 3113
    const int size = FLAGS_block_size; // use --block_size option for db_bench
    std::string labels = "(" + ToString(FLAGS_block_size) + " per op)";
    const char* label = labels.c_str();

J
jorlow@chromium.org 已提交
3114
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
3115 3116 3117 3118
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
3119
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
J
jorlow@chromium.org 已提交
3120 3121 3122 3123 3124
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

3125 3126
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
3127 3128
  }

I
xxHash  
Igor Canadi 已提交
3129 3130 3131 3132 3133 3134 3135 3136 3137
  void xxHash(ThreadState* thread) {
    // Checksum about 500MB of data total
    const int size = 4096;
    const char* label = "(4K per op)";
    std::string data(size, 'x');
    int64_t bytes = 0;
    unsigned int xxh32 = 0;
    while (bytes < 500 * 1048576) {
      xxh32 = XXH32(data.data(), size, 0);
3138
      thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
I
xxHash  
Igor Canadi 已提交
3139 3140 3141 3142 3143 3144 3145 3146 3147
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));

    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
  }

3148
  void AcquireLoad(ThreadState* thread) {
3149
    int dummy;
I
Igor Canadi 已提交
3150
    std::atomic<void*> ap(&dummy);
3151
    int count = 0;
3152
    void *ptr = nullptr;
3153
    thread->stats.AddMessage("(each op is 1000 loads)");
3154 3155
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
I
Igor Canadi 已提交
3156
        ptr = ap.load(std::memory_order_acquire);
3157 3158
      }
      count++;
3159
      thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
3160
    }
3161
    if (ptr == nullptr) exit(1);  // Disable unused variable warning.
3162 3163
  }

A
Albert Strasheim 已提交
3164
  void Compress(ThreadState *thread) {
3165
    RandomGenerator gen;
3166
    Slice input = gen.Generate(FLAGS_block_size);
3167 3168 3169 3170
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
3171 3172 3173
    CompressionOptions opts;
    CompressionContext context(FLAGS_compression_type_e);
    CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
3174 3175
                         FLAGS_compression_type_e,
                         FLAGS_sample_for_compression);
A
Albert Strasheim 已提交
3176 3177
    // Compress 1G
    while (ok && bytes < int64_t(1) << 30) {
3178
      compressed.clear();
3179
      ok = CompressSlice(info, input, &compressed);
3180 3181
      produced += compressed.size();
      bytes += input.size();
3182
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
3183 3184 3185
    }

    if (!ok) {
A
Albert Strasheim 已提交
3186
      thread->stats.AddMessage("(compression failure)");
3187
    } else {
D
Daniel Black 已提交
3188
      char buf[340];
3189 3190
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
3191 3192
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
3193 3194 3195
    }
  }

A
Albert Strasheim 已提交
3196
  void Uncompress(ThreadState *thread) {
3197
    RandomGenerator gen;
3198
    Slice input = gen.Generate(FLAGS_block_size);
3199
    std::string compressed;
A
Albert Strasheim 已提交
3200

3201 3202
    CompressionContext compression_ctx(FLAGS_compression_type_e);
    CompressionOptions compression_opts;
3203 3204 3205
    CompressionInfo compression_info(
        compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
        FLAGS_compression_type_e, FLAGS_sample_for_compression);
3206
    UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
3207 3208 3209
    UncompressionInfo uncompression_info(uncompression_ctx,
                                         UncompressionDict::GetEmptyDict(),
                                         FLAGS_compression_type_e);
3210

3211
    bool ok = CompressSlice(compression_info, input, &compressed);
3212
    int64_t bytes = 0;
A
Albert Strasheim 已提交
3213 3214
    int decompress_size;
    while (ok && bytes < 1024 * 1048576) {
3215
      CacheAllocationPtr uncompressed;
A
Albert Strasheim 已提交
3216
      switch (FLAGS_compression_type_e) {
3217 3218 3219 3220 3221 3222 3223 3224
        case rocksdb::kSnappyCompression: {
          // get size and allocate here to make comparison fair
          size_t ulength = 0;
          if (!Snappy_GetUncompressedLength(compressed.data(),
                                            compressed.size(), &ulength)) {
            ok = false;
            break;
          }
3225
          uncompressed = AllocateBlock(ulength, nullptr);
3226
          ok = Snappy_Uncompress(compressed.data(), compressed.size(),
3227
                                 uncompressed.get());
3228 3229
          break;
        }
A
Albert Strasheim 已提交
3230
      case rocksdb::kZlibCompression:
3231
        uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(),
3232
                                       compressed.size(), &decompress_size, 2);
3233
        ok = uncompressed.get() != nullptr;
A
Albert Strasheim 已提交
3234 3235
        break;
      case rocksdb::kBZip2Compression:
I
Igor Canadi 已提交
3236
        uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
3237
                                        &decompress_size, 2);
3238
        ok = uncompressed.get() != nullptr;
A
Albert Strasheim 已提交
3239 3240
        break;
      case rocksdb::kLZ4Compression:
3241
        uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
3242
                                      compressed.size(), &decompress_size, 2);
3243
        ok = uncompressed.get() != nullptr;
A
Albert Strasheim 已提交
3244 3245
        break;
      case rocksdb::kLZ4HCCompression:
3246
        uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
3247
                                      compressed.size(), &decompress_size, 2);
3248
        ok = uncompressed.get() != nullptr;
A
Albert Strasheim 已提交
3249
        break;
3250
      case rocksdb::kXpressCompression:
3251 3252 3253
        uncompressed.reset(XPRESS_Uncompress(
            compressed.data(), compressed.size(), &decompress_size));
        ok = uncompressed.get() != nullptr;
3254
        break;
S
sdong 已提交
3255
      case rocksdb::kZSTD:
3256
        uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
3257
                                       compressed.size(), &decompress_size);
3258
        ok = uncompressed.get() != nullptr;
3259
        break;
A
Albert Strasheim 已提交
3260 3261 3262
      default:
        ok = false;
      }
3263
      bytes += input.size();
3264
      thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
3265 3266 3267
    }

    if (!ok) {
A
Albert Strasheim 已提交
3268
      thread->stats.AddMessage("(compression failure)");
3269
    } else {
3270
      thread->stats.AddBytes(bytes);
3271 3272 3273
    }
  }

3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291
  // Returns true if the options is initialized from the specified
  // options file.
  bool InitializeOptionsFromFile(Options* opts) {
#ifndef ROCKSDB_LITE
    printf("Initializing RocksDB Options from the specified file\n");
    DBOptions db_opts;
    std::vector<ColumnFamilyDescriptor> cf_descs;
    if (FLAGS_options_file != "") {
      auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
                                   &cf_descs);
      if (s.ok()) {
        *opts = Options(db_opts, cf_descs[0].options);
        return true;
      }
      fprintf(stderr, "Unable to load options file %s --- %s\n",
              FLAGS_options_file.c_str(), s.ToString().c_str());
      exit(1);
    }
3292 3293
#else
    (void)opts;
3294 3295 3296 3297 3298 3299
#endif
    return false;
  }

  void InitializeOptionsFromFlags(Options* opts) {
    printf("Initializing RocksDB Options from command-line flags\n");
3300 3301
    Options& options = *opts;

3302
    assert(db_.db == nullptr);
3303

3304
    options.max_open_files = FLAGS_open_files;
3305 3306 3307 3308
    if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
      options.write_buffer_manager.reset(
          new WriteBufferManager(FLAGS_db_write_buffer_size, cache_));
    }
3309
    options.write_buffer_size = FLAGS_write_buffer_size;
3310
    options.max_write_buffer_number = FLAGS_max_write_buffer_number;
3311 3312
    options.min_write_buffer_number_to_merge =
      FLAGS_min_write_buffer_number_to_merge;
3313 3314
    options.max_write_buffer_number_to_maintain =
        FLAGS_max_write_buffer_number_to_maintain;
3315
    options.max_background_jobs = FLAGS_max_background_jobs;
3316
    options.max_background_compactions = FLAGS_max_background_compactions;
3317
    options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
3318
    options.max_background_flushes = FLAGS_max_background_flushes;
3319
    options.compaction_style = FLAGS_compaction_style_e;
3320
    options.compaction_pri = FLAGS_compaction_pri_e;
3321 3322 3323
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
    options.use_direct_reads = FLAGS_use_direct_reads;
3324 3325
    options.use_direct_io_for_flush_and_compaction =
        FLAGS_use_direct_io_for_flush_and_compaction;
3326
#ifndef ROCKSDB_LITE
3327
    options.ttl = FLAGS_fifo_compaction_ttl;
3328
    options.compaction_options_fifo = CompactionOptionsFIFO(
3329
        FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024,
3330
        FLAGS_fifo_compaction_allow_compaction);
3331
#endif  // ROCKSDB_LITE
3332
    if (FLAGS_prefix_size != 0) {
3333 3334 3335
      options.prefix_extractor.reset(
          NewFixedPrefixTransform(FLAGS_prefix_size));
    }
3336 3337 3338 3339 3340 3341 3342
    if (FLAGS_use_uint64_comparator) {
      options.comparator = test::Uint64Comparator();
      if (FLAGS_key_size != 8) {
        fprintf(stderr, "Using Uint64 comparator but key size is not 8.\n");
        exit(1);
      }
    }
3343 3344 3345
    if (FLAGS_use_stderr_info_logger) {
      options.info_log.reset(new StderrLogger());
    }
3346
    options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
3347
    options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
3348
    options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
3349 3350 3351 3352 3353
    if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
      options.memtable_insert_with_hint_prefix_extractor.reset(
          NewCappedPrefixTransform(
              FLAGS_memtable_insert_with_hint_prefix_size));
    }
L
Lei Jin 已提交
3354
    options.bloom_locality = FLAGS_bloom_locality;
3355
    options.max_file_opening_threads = FLAGS_file_opening_threads;
3356 3357 3358
    options.new_table_reader_for_compaction_inputs =
        FLAGS_new_table_reader_for_compaction_inputs;
    options.compaction_readahead_size = FLAGS_compaction_readahead_size;
3359
    options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
3360
    options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
3361
    options.use_fsync = FLAGS_use_fsync;
3362
    options.num_levels = FLAGS_num_levels;
H
heyongqiang 已提交
3363 3364 3365
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
3366 3367
    options.level_compaction_dynamic_level_bytes =
        FLAGS_level_compaction_dynamic_level_bytes;
H
heyongqiang 已提交
3368 3369
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
3370 3371 3372 3373
    if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
                                     FLAGS_rep_factory == kHashLinkedList)) {
      fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
                      "HashLinkedList memtablerep is used\n");
J
Jim Paton 已提交
3374 3375 3376 3377
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kSkipList:
T
Tomislav Novak 已提交
3378 3379
        options.memtable_factory.reset(new SkipListFactory(
            FLAGS_skip_list_lookahead));
J
Jim Paton 已提交
3380
        break;
S
sdong 已提交
3381 3382 3383 3384 3385
#ifndef ROCKSDB_LITE
      case kPrefixHash:
        options.memtable_factory.reset(
            NewHashSkipListRepFactory(FLAGS_hash_bucket_count));
        break;
3386 3387 3388 3389
      case kHashLinkedList:
        options.memtable_factory.reset(NewHashLinkListRepFactory(
            FLAGS_hash_bucket_count));
        break;
J
Jim Paton 已提交
3390 3391 3392 3393 3394
      case kVectorRep:
        options.memtable_factory.reset(
          new VectorRepFactory
        );
        break;
S
sdong 已提交
3395 3396 3397 3398 3399
#else
      default:
        fprintf(stderr, "Only skip list is supported in lite mode\n");
        exit(1);
#endif  // ROCKSDB_LITE
J
Jim Paton 已提交
3400
    }
L
Lei Jin 已提交
3401
    if (FLAGS_use_plain_table) {
S
sdong 已提交
3402
#ifndef ROCKSDB_LITE
3403 3404
      if (FLAGS_rep_factory != kPrefixHash &&
          FLAGS_rep_factory != kHashLinkedList) {
L
Lei Jin 已提交
3405 3406 3407 3408 3409 3410 3411
        fprintf(stderr, "Waring: plain table is used with skipList\n");
      }

      int bloom_bits_per_key = FLAGS_bloom_bits;
      if (bloom_bits_per_key < 0) {
        bloom_bits_per_key = 0;
      }
S
Stanislau Hlebik 已提交
3412 3413 3414 3415 3416 3417 3418

      PlainTableOptions plain_table_options;
      plain_table_options.user_key_len = FLAGS_key_size;
      plain_table_options.bloom_bits_per_key = bloom_bits_per_key;
      plain_table_options.hash_table_ratio = 0.75;
      options.table_factory = std::shared_ptr<TableFactory>(
          NewPlainTableFactory(plain_table_options));
S
sdong 已提交
3419 3420 3421 3422
#else
      fprintf(stderr, "Plain table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
3423
    } else if (FLAGS_use_cuckoo_table) {
S
sdong 已提交
3424
#ifndef ROCKSDB_LITE
3425 3426 3427 3428
      if (FLAGS_cuckoo_hash_ratio > 1 || FLAGS_cuckoo_hash_ratio < 0) {
        fprintf(stderr, "Invalid cuckoo_hash_ratio\n");
        exit(1);
      }
3429 3430 3431 3432 3433 3434

      if (!FLAGS_mmap_read) {
        fprintf(stderr, "cuckoo table format requires mmap read to operate\n");
        exit(1);
      }

3435 3436 3437
      rocksdb::CuckooTableOptions table_options;
      table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
      table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
3438
      options.table_factory = std::shared_ptr<TableFactory>(
3439
          NewCuckooTableFactory(table_options));
S
sdong 已提交
3440 3441 3442 3443
#else
      fprintf(stderr, "Cuckoo table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
3444 3445 3446
    } else {
      BlockBasedTableOptions block_based_options;
      if (FLAGS_use_hash_search) {
3447 3448 3449 3450 3451
        if (FLAGS_prefix_size == 0) {
          fprintf(stderr,
              "prefix_size not assigned when enable use_hash_search \n");
          exit(1);
        }
3452 3453 3454 3455
        block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
      } else {
        block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
      }
3456
      if (FLAGS_partition_index_and_filters || FLAGS_partition_index) {
3457 3458 3459
        if (FLAGS_use_hash_search) {
          fprintf(stderr,
                  "use_hash_search is incompatible with "
3460
                  "partition index and is ignored");
3461 3462 3463 3464
        }
        block_based_options.index_type =
            BlockBasedTableOptions::kTwoLevelIndexSearch;
        block_based_options.metadata_block_size = FLAGS_metadata_block_size;
3465 3466 3467
        if (FLAGS_partition_index_and_filters) {
          block_based_options.partition_filters = true;
        }
3468
      }
3469 3470 3471
      if (cache_ == nullptr) {
        block_based_options.no_block_cache = true;
      }
3472 3473
      block_based_options.cache_index_and_filter_blocks =
          FLAGS_cache_index_and_filter_blocks;
3474 3475
      block_based_options.pin_l0_filter_and_index_blocks_in_cache =
          FLAGS_pin_l0_filter_and_index_blocks_in_cache;
3476 3477
      block_based_options.pin_top_level_index_and_filter =
          FLAGS_pin_top_level_index_and_filter;
3478 3479 3480 3481
      if (FLAGS_cache_high_pri_pool_ratio > 1e-6) {  // > 0.0 + eps
        block_based_options.cache_index_and_filter_blocks_with_high_priority =
            true;
      }
3482 3483 3484 3485
      block_based_options.block_cache = cache_;
      block_based_options.block_cache_compressed = compressed_cache_;
      block_based_options.block_size = FLAGS_block_size;
      block_based_options.block_restart_interval = FLAGS_block_restart_interval;
3486 3487
      block_based_options.index_block_restart_interval =
          FLAGS_index_block_restart_interval;
3488
      block_based_options.filter_policy = filter_policy_;
3489 3490
      block_based_options.format_version =
          static_cast<uint32_t>(FLAGS_format_version);
3491
      block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
3492 3493
      block_based_options.enable_index_compression =
          FLAGS_enable_index_compression;
3494
      block_based_options.block_align = FLAGS_block_align;
3495 3496 3497 3498 3499 3500 3501 3502 3503
      if (FLAGS_use_data_block_hash_index) {
        block_based_options.data_block_index_type =
            rocksdb::BlockBasedTableOptions::kDataBlockBinaryAndHash;
      } else {
        block_based_options.data_block_index_type =
            rocksdb::BlockBasedTableOptions::kDataBlockBinarySearch;
      }
      block_based_options.data_block_hash_table_util_ratio =
          FLAGS_data_block_hash_table_util_ratio;
3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542
      if (FLAGS_read_cache_path != "") {
#ifndef ROCKSDB_LITE
        Status rc_status;

        // Read cache need to be provided with a the Logger, we will put all
        // reac cache logs in the read cache path in a file named rc_LOG
        rc_status = FLAGS_env->CreateDirIfMissing(FLAGS_read_cache_path);
        std::shared_ptr<Logger> read_cache_logger;
        if (rc_status.ok()) {
          rc_status = FLAGS_env->NewLogger(FLAGS_read_cache_path + "/rc_LOG",
                                           &read_cache_logger);
        }

        if (rc_status.ok()) {
          PersistentCacheConfig rc_cfg(FLAGS_env, FLAGS_read_cache_path,
                                       FLAGS_read_cache_size,
                                       read_cache_logger);

          rc_cfg.enable_direct_reads = FLAGS_read_cache_direct_read;
          rc_cfg.enable_direct_writes = FLAGS_read_cache_direct_write;
          rc_cfg.writer_qdepth = 4;
          rc_cfg.writer_dispatch_size = 4 * 1024;

          auto pcache = std::make_shared<BlockCacheTier>(rc_cfg);
          block_based_options.persistent_cache = pcache;
          rc_status = pcache->Open();
        }

        if (!rc_status.ok()) {
          fprintf(stderr, "Error initializing read cache, %s\n",
                  rc_status.ToString().c_str());
          exit(1);
        }
#else
        fprintf(stderr, "Read cache is not supported in LITE\n");
        exit(1);

#endif
      }
3543 3544
      options.table_factory.reset(
          NewBlockBasedTableFactory(block_based_options));
L
Lei Jin 已提交
3545
    }
3546 3547
    if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
      if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
3548 3549
          (unsigned int)FLAGS_num_levels) {
        fprintf(stderr, "Insufficient number of fanouts specified %d\n",
3550
                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
3551 3552 3553
        exit(1);
      }
      options.max_bytes_for_level_multiplier_additional =
3554
        FLAGS_max_bytes_for_level_multiplier_additional_v;
3555
    }
H
heyongqiang 已提交
3556
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
M
Mark Callaghan 已提交
3557
    options.level0_file_num_compaction_trigger =
3558
        FLAGS_level0_file_num_compaction_trigger;
H
heyongqiang 已提交
3559 3560
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
3561
    options.compression = FLAGS_compression_type_e;
3562
    options.sample_for_compression = FLAGS_sample_for_compression;
3563 3564
    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
3565 3566
    options.max_total_wal_size = FLAGS_max_total_wal_size;

3567 3568
    if (FLAGS_min_level_to_compress >= 0) {
      assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
3569
      options.compression_per_level.resize(FLAGS_num_levels);
3570
      for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
3571 3572
        options.compression_per_level[i] = kNoCompression;
      }
3573
      for (int i = FLAGS_min_level_to_compress;
3574
           i < FLAGS_num_levels; i++) {
3575
        options.compression_per_level[i] = FLAGS_compression_type_e;
3576 3577
      }
    }
J
Jim Paton 已提交
3578 3579
    options.soft_rate_limit = FLAGS_soft_rate_limit;
    options.hard_rate_limit = FLAGS_hard_rate_limit;
3580 3581
    options.soft_pending_compaction_bytes_limit =
        FLAGS_soft_pending_compaction_bytes_limit;
3582 3583
    options.hard_pending_compaction_bytes_limit =
        FLAGS_hard_pending_compaction_bytes_limit;
S
sdong 已提交
3584
    options.delayed_write_rate = FLAGS_delayed_write_rate;
3585 3586
    options.allow_concurrent_memtable_write =
        FLAGS_allow_concurrent_memtable_write;
3587 3588
    options.inplace_update_support = FLAGS_inplace_update_support;
    options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
3589 3590
    options.enable_write_thread_adaptive_yield =
        FLAGS_enable_write_thread_adaptive_yield;
3591
    options.enable_pipelined_write = FLAGS_enable_pipelined_write;
M
Maysam Yabandeh 已提交
3592
    options.unordered_write = FLAGS_unordered_write;
3593 3594
    options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
    options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
J
Jim Paton 已提交
3595 3596
    options.rate_limit_delay_max_milliseconds =
      FLAGS_rate_limit_delay_max_milliseconds;
3597
    options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
3598
    options.max_compaction_bytes = FLAGS_max_compaction_bytes;
3599
    options.disable_auto_compactions = FLAGS_disable_auto_compactions;
3600
    options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
3601 3602

    // fill storage options
3603
    options.advise_random_on_open = FLAGS_advise_random_on_open;
3604
    options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
H
Haobo Xu 已提交
3605
    options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
H
Haobo Xu 已提交
3606
    options.bytes_per_sync = FLAGS_bytes_per_sync;
3607
    options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
H
Haobo Xu 已提交
3608

D
Deon Nicholas 已提交
3609
    // merge operator options
3610 3611 3612
    options.merge_operator = MergeOperators::CreateFromStringId(
        FLAGS_merge_operator);
    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
D
Deon Nicholas 已提交
3613 3614 3615 3616
      fprintf(stderr, "invalid merge operator: %s\n",
              FLAGS_merge_operator.c_str());
      exit(1);
    }
3617
    options.max_successive_merges = FLAGS_max_successive_merges;
3618
    options.report_bg_io_stats = FLAGS_report_bg_io_stats;
D
Deon Nicholas 已提交
3619

3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
      options.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    }
    if (FLAGS_universal_min_merge_width != 0) {
      options.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    }
    if (FLAGS_universal_max_merge_width != 0) {
      options.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
      options.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    }
3637 3638 3639 3640
    if (FLAGS_universal_compression_size_percent != -1) {
      options.compaction_options_universal.compression_size_percent =
        FLAGS_universal_compression_size_percent;
    }
3641 3642
    options.compaction_options_universal.allow_trivial_move =
        FLAGS_universal_allow_trivial_move;
3643 3644 3645
    if (FLAGS_thread_status_per_interval > 0) {
      options.enable_thread_tracking = true;
    }
3646

3647
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
3648 3649 3650 3651
    if (FLAGS_readonly && FLAGS_transaction_db) {
      fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
      exit(1);
    }
3652 3653 3654 3655 3656
    if (FLAGS_use_secondary_db &&
        (FLAGS_transaction_db || FLAGS_optimistic_transaction_db)) {
      fprintf(stderr, "Cannot use use_secondary_db flag with transaction_db\n");
      exit(1);
    }
3657
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
3658

3659 3660 3661 3662 3663
  }

  void InitializeOptionsGeneral(Options* opts) {
    Options& options = *opts;

3664
    options.create_missing_column_families = FLAGS_num_column_families > 1;
3665 3666 3667
    options.statistics = dbstats;
    options.wal_dir = FLAGS_wal_dir;
    options.create_if_missing = !FLAGS_use_existing_db;
3668
    options.dump_malloc_stats = FLAGS_dump_malloc_stats;
3669 3670
    options.stats_dump_period_sec =
        static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
3671 3672 3673 3674
    options.stats_persist_period_sec =
        static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
    options.stats_history_buffer_size =
        static_cast<size_t>(FLAGS_stats_history_buffer_size);
3675

A
Andrew Kryczka 已提交
3676 3677 3678 3679
    options.compression_opts.level = FLAGS_compression_level;
    options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
    options.compression_opts.zstd_max_train_bytes =
        FLAGS_compression_zstd_max_train_bytes;
3680 3681 3682 3683 3684 3685 3686
    // If this is a block based table, set some related options
    if (options.table_factory->Name() == BlockBasedTableFactory::kName &&
        options.table_factory->GetOptions() != nullptr) {
      BlockBasedTableOptions* table_options =
          reinterpret_cast<BlockBasedTableOptions*>(
              options.table_factory->GetOptions());
      if (FLAGS_cache_size) {
3687 3688
        table_options->block_cache = cache_;
      }
3689 3690 3691 3692
      if (FLAGS_bloom_bits >= 0) {
        table_options->filter_policy.reset(NewBloomFilterPolicy(
            FLAGS_bloom_bits, FLAGS_use_block_based_filter));
      }
3693
    }
3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705
    if (FLAGS_row_cache_size) {
      if (FLAGS_cache_numshardbits >= 1) {
        options.row_cache =
            NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
      } else {
        options.row_cache = NewLRUCache(FLAGS_row_cache_size);
      }
    }
    if (FLAGS_enable_io_prio) {
      FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
      FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
    }
3706 3707 3708 3709
    if (FLAGS_enable_cpu_prio) {
      FLAGS_env->LowerThreadPoolCPUPriority(Env::LOW);
      FLAGS_env->LowerThreadPoolCPUPriority(Env::HIGH);
    }
I
Igor Canadi 已提交
3710
    options.env = FLAGS_env;
3711 3712 3713
    if (FLAGS_sine_write_rate) {
      FLAGS_benchmark_write_rate_limit = static_cast<uint64_t>(SineRate(0));
    }
3714

3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726
    if (FLAGS_rate_limiter_bytes_per_sec > 0) {
      if (FLAGS_rate_limit_bg_reads &&
          !FLAGS_new_table_reader_for_compaction_inputs) {
        fprintf(stderr,
                "rate limit compaction reads must have "
                "new_table_reader_for_compaction_inputs set\n");
        exit(1);
      }
      options.rate_limiter.reset(NewGenericRateLimiter(
          FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */,
          10 /* fairness */,
          FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
A
Andrew Kryczka 已提交
3727 3728
                                    : RateLimiter::Mode::kWritesOnly,
          FLAGS_rate_limiter_auto_tuned));
3729 3730
    }

3731
    options.listeners.emplace_back(listener_);
3732 3733 3734 3735
    if (FLAGS_num_multi_db <= 1) {
      OpenDb(options, FLAGS_db, &db_);
    } else {
      multi_dbs_.clear();
3736
      multi_dbs_.resize(FLAGS_num_multi_db);
3737
      auto wal_dir = options.wal_dir;
3738
      for (int i = 0; i < FLAGS_num_multi_db; i++) {
3739 3740 3741 3742
        if (!wal_dir.empty()) {
          options.wal_dir = GetPathForMultiple(wal_dir, i);
        }
        OpenDb(options, GetPathForMultiple(FLAGS_db, i), &multi_dbs_[i]);
3743
      }
3744
      options.wal_dir = wal_dir;
3745
    }
3746 3747 3748 3749 3750 3751

    // KeepFilter is a noop filter, this can be used to test compaction filter
    if (FLAGS_use_keep_filter) {
      options.compaction_filter = new KeepFilter();
      fprintf(stdout, "A noop compaction filter is used\n");
    }
3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764

    if (FLAGS_use_existing_keys) {
      // Only work on single database
      assert(db_.db != nullptr);
      ReadOptions read_opts;
      read_opts.total_order_seek = true;
      Iterator* iter = db_.db->NewIterator(read_opts);
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        keys_.emplace_back(iter->key().ToString());
      }
      delete iter;
      FLAGS_num = keys_.size();
    }
3765 3766 3767 3768 3769
  }

  void Open(Options* opts) {
    if (!InitializeOptionsFromFile(opts)) {
      InitializeOptionsFromFlags(opts);
3770
    }
3771

3772
    InitializeOptionsGeneral(opts);
3773 3774
  }

Y
Yi Wu 已提交
3775
  void OpenDb(Options options, const std::string& db_name,
3776
      DBWithColumnFamilies* db) {
H
heyongqiang 已提交
3777
    Status s;
3778 3779
    // Open with column families if necessary.
    if (FLAGS_num_column_families > 1) {
3780 3781 3782 3783 3784 3785 3786
      size_t num_hot = FLAGS_num_column_families;
      if (FLAGS_num_hot_column_families > 0 &&
          FLAGS_num_hot_column_families < FLAGS_num_column_families) {
        num_hot = FLAGS_num_hot_column_families;
      } else {
        FLAGS_num_hot_column_families = FLAGS_num_column_families;
      }
3787
      std::vector<ColumnFamilyDescriptor> column_families;
3788
      for (size_t i = 0; i < num_hot; i++) {
3789 3790 3791
        column_families.push_back(ColumnFamilyDescriptor(
              ColumnFamilyName(i), ColumnFamilyOptions(options)));
      }
3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813
      std::vector<int> cfh_idx_to_prob;
      if (!FLAGS_column_family_distribution.empty()) {
        std::stringstream cf_prob_stream(FLAGS_column_family_distribution);
        std::string cf_prob;
        int sum = 0;
        while (std::getline(cf_prob_stream, cf_prob, ',')) {
          cfh_idx_to_prob.push_back(std::stoi(cf_prob));
          sum += cfh_idx_to_prob.back();
        }
        if (sum != 100) {
          fprintf(stderr, "column_family_distribution items must sum to 100\n");
          exit(1);
        }
        if (cfh_idx_to_prob.size() != num_hot) {
          fprintf(stderr,
                  "got %" ROCKSDB_PRIszt
                  " column_family_distribution items; expected "
                  "%" ROCKSDB_PRIszt "\n",
                  cfh_idx_to_prob.size(), num_hot);
          exit(1);
        }
      }
3814
#ifndef ROCKSDB_LITE
3815 3816 3817
      if (FLAGS_readonly) {
        s = DB::OpenForReadOnly(options, db_name, column_families,
            &db->cfh, &db->db);
A
agiardullo 已提交
3818
      } else if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
3819
        s = OptimisticTransactionDB::Open(options, db_name, column_families,
A
agiardullo 已提交
3820 3821 3822 3823 3824 3825 3826
                                          &db->cfh, &db->opt_txn_db);
        if (s.ok()) {
          db->db = db->opt_txn_db->GetBaseDB();
        }
      } else if (FLAGS_transaction_db) {
        TransactionDB* ptr;
        TransactionDBOptions txn_db_options;
3827 3828 3829 3830 3831
        if (options.unordered_write) {
          options.two_write_queues = true;
          txn_db_options.skip_concurrency_control = true;
          txn_db_options.write_policy = WRITE_PREPARED;
        }
A
agiardullo 已提交
3832 3833
        s = TransactionDB::Open(options, txn_db_options, db_name,
                                column_families, &db->cfh, &ptr);
A
agiardullo 已提交
3834
        if (s.ok()) {
A
agiardullo 已提交
3835
          db->db = ptr;
A
agiardullo 已提交
3836
        }
3837 3838 3839
      } else {
        s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
      }
3840 3841 3842
#else
      s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
#endif  // ROCKSDB_LITE
3843 3844 3845
      db->cfh.resize(FLAGS_num_column_families);
      db->num_created = num_hot;
      db->num_hot = num_hot;
3846
      db->cfh_idx_to_prob = std::move(cfh_idx_to_prob);
3847
#ifndef ROCKSDB_LITE
3848 3849
    } else if (FLAGS_readonly) {
      s = DB::OpenForReadOnly(options, db_name, &db->db);
A
agiardullo 已提交
3850 3851 3852 3853 3854
    } else if (FLAGS_optimistic_transaction_db) {
      s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db);
      if (s.ok()) {
        db->db = db->opt_txn_db->GetBaseDB();
      }
A
agiardullo 已提交
3855
    } else if (FLAGS_transaction_db) {
3856
      TransactionDB* ptr = nullptr;
A
agiardullo 已提交
3857
      TransactionDBOptions txn_db_options;
3858 3859 3860 3861 3862
      if (options.unordered_write) {
        options.two_write_queues = true;
        txn_db_options.skip_concurrency_control = true;
        txn_db_options.write_policy = WRITE_PREPARED;
      }
Y
Yi Wu 已提交
3863 3864 3865 3866
      s = CreateLoggerFromOptions(db_name, options, &options.info_log);
      if (s.ok()) {
        s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
      }
A
agiardullo 已提交
3867
      if (s.ok()) {
A
agiardullo 已提交
3868
        db->db = ptr;
A
agiardullo 已提交
3869
      }
3870
    } else if (FLAGS_use_blob_db) {
A
Anirban Rahut 已提交
3871
      blob_db::BlobDBOptions blob_db_options;
3872
      blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc;
S
Sagar Vemuri 已提交
3873
      blob_db_options.is_fifo = FLAGS_blob_db_is_fifo;
Y
Yi Wu 已提交
3874
      blob_db_options.max_db_size = FLAGS_blob_db_max_db_size;
S
Sagar Vemuri 已提交
3875 3876 3877 3878
      blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs;
      blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
      blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
      blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
3879
      blob_db::BlobDB* ptr = nullptr;
Y
Yi Wu 已提交
3880
      s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
A
Anirban Rahut 已提交
3881 3882 3883
      if (s.ok()) {
        db->db = ptr;
      }
3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909
    } else if (FLAGS_use_secondary_db) {
      if (FLAGS_secondary_path.empty()) {
        std::string default_secondary_path;
        FLAGS_env->GetTestDirectory(&default_secondary_path);
        default_secondary_path += "/dbbench_secondary";
        FLAGS_secondary_path = default_secondary_path;
      }
      s = DB::OpenAsSecondary(options, db_name, FLAGS_secondary_path, &db->db);
      if (s.ok() && FLAGS_secondary_update_interval > 0) {
        secondary_update_thread_.reset(new port::Thread(
            [this](int interval, DBWithColumnFamilies* _db) {
              while (0 == secondary_update_stopped_.load(
                              std::memory_order_relaxed)) {
                Status secondary_update_status =
                    _db->db->TryCatchUpWithPrimary();
                if (!secondary_update_status.ok()) {
                  fprintf(stderr, "Failed to catch up with primary: %s\n",
                          secondary_update_status.ToString().c_str());
                  break;
                }
                ++secondary_db_updates_;
                FLAGS_env->SleepForMicroseconds(interval * 1000000);
              }
            },
            FLAGS_secondary_update_interval, db));
      }
A
Anirban Rahut 已提交
3910
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
3911
    } else {
3912
      s = DB::Open(options, db_name, &db->db);
H
heyongqiang 已提交
3913
    }
3914 3915 3916 3917 3918 3919
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

3920 3921 3922 3923
  enum WriteMode {
    RANDOM, SEQUENTIAL, UNIQUE_RANDOM
  };

3924 3925 3926 3927 3928 3929 3930 3931 3932
  void WriteSeqDeterministic(ThreadState* thread) {
    DoDeterministicCompact(thread, open_options_.compaction_style, SEQUENTIAL);
  }

  void WriteUniqueRandomDeterministic(ThreadState* thread) {
    DoDeterministicCompact(thread, open_options_.compaction_style,
                           UNIQUE_RANDOM);
  }

3933
  void WriteSeq(ThreadState* thread) {
3934
    DoWrite(thread, SEQUENTIAL);
3935
  }
3936

3937
  void WriteRandom(ThreadState* thread) {
3938
    DoWrite(thread, RANDOM);
3939 3940
  }

3941 3942 3943 3944
  void WriteUniqueRandom(ThreadState* thread) {
    DoWrite(thread, UNIQUE_RANDOM);
  }

3945 3946
  class KeyGenerator {
   public:
A
Andrew Kryczka 已提交
3947 3948 3949
    KeyGenerator(Random64* rand, WriteMode mode, uint64_t num,
                 uint64_t /*num_per_set*/ = 64 * 1024)
        : rand_(rand), mode_(mode), num_(num), next_(0) {
3950 3951 3952 3953 3954 3955 3956 3957 3958
      if (mode_ == UNIQUE_RANDOM) {
        // NOTE: if memory consumption of this approach becomes a concern,
        // we can either break it into pieces and only random shuffle a section
        // each time. Alternatively, use a bit map implementation
        // (https://reviews.facebook.net/differential/diff/54627/)
        values_.resize(num_);
        for (uint64_t i = 0; i < num_; ++i) {
          values_[i] = i;
        }
3959 3960 3961
        std::shuffle(
            values_.begin(), values_.end(),
            std::default_random_engine(static_cast<unsigned int>(FLAGS_seed)));
3962 3963 3964 3965 3966 3967 3968 3969 3970 3971
      }
    }

    uint64_t Next() {
      switch (mode_) {
        case SEQUENTIAL:
          return next_++;
        case RANDOM:
          return rand_->Next() % num_;
        case UNIQUE_RANDOM:
3972
          assert(next_ < num_);
3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986
          return values_[next_++];
      }
      assert(false);
      return std::numeric_limits<uint64_t>::max();
    }

   private:
    Random64* rand_;
    WriteMode mode_;
    const uint64_t num_;
    uint64_t next_;
    std::vector<uint64_t> values_;
  };

3987
  DB* SelectDB(ThreadState* thread) {
3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999
    return SelectDBWithCfh(thread)->db;
  }

  DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
    return SelectDBWithCfh(thread->rand.Next());
  }

  DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
    if (db_.db != nullptr) {
      return &db_;
    } else  {
      return &multi_dbs_[rand_int % multi_dbs_.size()];
4000 4001
    }
  }
4002

4003 4004 4005 4006
  double SineRate(double x) {
    return FLAGS_sine_a*sin((FLAGS_sine_b*x) + FLAGS_sine_c) + FLAGS_sine_d;
  }

4007 4008
  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
4009
    const int64_t num_ops = writes_ == 0 ? num_ : writes_;
4010

4011
    size_t num_key_gens = 1;
4012
    if (db_.db == nullptr) {
4013 4014 4015
      num_key_gens = multi_dbs_.size();
    }
    std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
4016 4017 4018 4019 4020 4021 4022 4023 4024
    int64_t max_ops = num_ops * num_key_gens;
    int64_t ops_per_stage = max_ops;
    if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
      ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
                                       FLAGS_num_hot_column_families) +
                      1;
    }

    Duration duration(test_duration, max_ops, ops_per_stage);
4025
    for (size_t i = 0; i < num_key_gens; i++) {
4026 4027
      key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode,
                                         num_ + max_num_range_tombstones_,
4028
                                         ops_per_stage));
4029
    }
M
Mark Callaghan 已提交
4030

4031
    if (num_ != FLAGS_num) {
4032
      char msg[100];
4033
      snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
4034
      thread->stats.AddMessage(msg);
4035 4036
    }

4037
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
4038 4039
    WriteBatch batch;
    Status s;
4040
    int64_t bytes = 0;
L
Lei Jin 已提交
4041

4042 4043
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
A
Andrew Kryczka 已提交
4044 4045 4046 4047
    std::unique_ptr<const char[]> begin_key_guard;
    Slice begin_key = AllocateKey(&begin_key_guard);
    std::unique_ptr<const char[]> end_key_guard;
    Slice end_key = AllocateKey(&end_key_guard);
4048 4049 4050 4051 4052 4053 4054 4055
    std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
    std::vector<Slice> expanded_keys;
    if (FLAGS_expand_range_tombstones) {
      expanded_key_guards.resize(range_tombstone_width_);
      for (auto& expanded_key_guard : expanded_key_guards) {
        expanded_keys.emplace_back(AllocateKey(&expanded_key_guard));
      }
    }
A
Andrew Kryczka 已提交
4056

4057
    int64_t stage = 0;
A
Andrew Kryczka 已提交
4058
    int64_t num_written = 0;
M
Mark Callaghan 已提交
4059
    while (!duration.Done(entries_per_batch_)) {
4060 4061 4062 4063 4064 4065 4066 4067 4068 4069
      if (duration.GetStage() != stage) {
        stage = duration.GetStage();
        if (db_.db != nullptr) {
          db_.CreateNewCf(open_options_, stage);
        } else {
          for (auto& db : multi_dbs_) {
            db.CreateNewCf(open_options_, stage);
          }
        }
      }
4070

4071 4072
      size_t id = thread->rand.Next() % num_key_gens;
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
J
jorlow@chromium.org 已提交
4073
      batch.Clear();
4074 4075 4076

      if (thread->shared->write_rate_limiter.get() != nullptr) {
        thread->shared->write_rate_limiter->Request(
4077
            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
4078
            nullptr /* stats */, RateLimiter::OpType::kWrite);
4079 4080 4081 4082 4083 4084
        // Set time at which last op finished to Now() to hide latency and
        // sleep from rate limiter. Also, do the check once per batch, not
        // once per write.
        thread->stats.ResetLastOpTime();
      }

L
Lei Jin 已提交
4085
      for (int64_t j = 0; j < entries_per_batch_; j++) {
4086 4087
        int64_t rand_num = key_gens[id]->Next();
        GenerateKeyFromInt(rand_num, FLAGS_num, &key);
Y
Yi Wu 已提交
4088 4089
        if (use_blob_db_) {
#ifndef ROCKSDB_LITE
A
Anirban Rahut 已提交
4090
          Slice val = gen.Generate(value_size_);
S
Sagar Vemuri 已提交
4091
          int ttl = rand() % FLAGS_blob_db_max_ttl_range;
A
Anirban Rahut 已提交
4092 4093 4094
          blob_db::BlobDB* blobdb =
              static_cast<blob_db::BlobDB*>(db_with_cfh->db);
          s = blobdb->PutWithTTL(write_options_, key, val, ttl);
Y
Yi Wu 已提交
4095
#endif  //  ROCKSDB_LITE
4096
        } else if (FLAGS_num_column_families <= 1) {
4097 4098 4099 4100 4101
          batch.Put(key, gen.Generate(value_size_));
        } else {
          // We use same rand_num as seed for key and column family so that we
          // can deterministically find the cfh corresponding to a particular
          // key while reading the key.
4102 4103
          batch.Put(db_with_cfh->GetCfh(rand_num), key,
                    gen.Generate(value_size_));
4104
        }
L
Lei Jin 已提交
4105
        bytes += value_size_ + key_size_;
A
Andrew Kryczka 已提交
4106 4107
        ++num_written;
        if (writes_per_range_tombstone_ > 0 &&
4108 4109 4110
            num_written > writes_before_delete_range_ &&
            (num_written - writes_before_delete_range_) /
                    writes_per_range_tombstone_ <=
A
Andrew Kryczka 已提交
4111
                max_num_range_tombstones_ &&
4112 4113 4114
            (num_written - writes_before_delete_range_) %
                    writes_per_range_tombstone_ ==
                0) {
A
Andrew Kryczka 已提交
4115
          int64_t begin_num = key_gens[id]->Next();
4116 4117 4118 4119 4120
          if (FLAGS_expand_range_tombstones) {
            for (int64_t offset = 0; offset < range_tombstone_width_;
                 ++offset) {
              GenerateKeyFromInt(begin_num + offset, FLAGS_num,
                                 &expanded_keys[offset]);
Y
Yi Wu 已提交
4121 4122
              if (use_blob_db_) {
#ifndef ROCKSDB_LITE
4123 4124
                s = db_with_cfh->db->Delete(write_options_,
                                            expanded_keys[offset]);
Y
Yi Wu 已提交
4125
#endif  //  ROCKSDB_LITE
4126 4127 4128 4129 4130 4131 4132
              } else if (FLAGS_num_column_families <= 1) {
                batch.Delete(expanded_keys[offset]);
              } else {
                batch.Delete(db_with_cfh->GetCfh(rand_num),
                             expanded_keys[offset]);
              }
            }
A
Andrew Kryczka 已提交
4133
          } else {
4134 4135 4136
            GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
            GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
                               &end_key);
Y
Yi Wu 已提交
4137 4138
            if (use_blob_db_) {
#ifndef ROCKSDB_LITE
4139 4140 4141
              s = db_with_cfh->db->DeleteRange(
                  write_options_, db_with_cfh->db->DefaultColumnFamily(),
                  begin_key, end_key);
Y
Yi Wu 已提交
4142
#endif  //  ROCKSDB_LITE
4143 4144 4145 4146 4147 4148
            } else if (FLAGS_num_column_families <= 1) {
              batch.DeleteRange(begin_key, end_key);
            } else {
              batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key,
                                end_key);
            }
A
Andrew Kryczka 已提交
4149 4150
          }
        }
4151
      }
Y
Yi Wu 已提交
4152
      if (!use_blob_db_) {
4153 4154
        s = db_with_cfh->db->Write(write_options_, &batch);
      }
4155
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
4156
                                entries_per_batch_, kWrite);
4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177
      if (FLAGS_sine_write_rate) {
        uint64_t now = FLAGS_env->NowMicros();

        uint64_t usecs_since_last;
        if (now > thread->stats.GetSineInterval()) {
          usecs_since_last = now - thread->stats.GetSineInterval();
        } else {
          usecs_since_last = 0;
        }

        if (usecs_since_last >
            (FLAGS_sine_write_rate_interval_milliseconds * uint64_t{1000})) {
          double usecs_since_start =
                  static_cast<double>(now - thread->stats.GetStart());
          thread->stats.ResetSineInterval();
          uint64_t write_rate =
                  static_cast<uint64_t>(SineRate(usecs_since_start / 1000000.0));
          thread->shared->write_rate_limiter.reset(
                  NewGenericRateLimiter(write_rate));
        }
      }
4178 4179 4180 4181
      if (!s.ok()) {
        s = listener_->WaitForRecovery(600000000) ? Status::OK() : s;
      }

J
jorlow@chromium.org 已提交
4182 4183 4184 4185 4186
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
    }
4187
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
4188 4189
  }

4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205
  Status DoDeterministicCompact(ThreadState* thread,
                                CompactionStyle compaction_style,
                                WriteMode write_mode) {
#ifndef ROCKSDB_LITE
    ColumnFamilyMetaData meta;
    std::vector<DB*> db_list;
    if (db_.db != nullptr) {
      db_list.push_back(db_.db);
    } else {
      for (auto& db : multi_dbs_) {
        db_list.push_back(db.db);
      }
    }
    std::vector<Options> options_list;
    for (auto db : db_list) {
      options_list.push_back(db->GetOptions());
4206 4207 4208 4209 4210 4211 4212
      if (compaction_style != kCompactionStyleFIFO) {
        db->SetOptions({{"disable_auto_compactions", "1"},
                        {"level0_slowdown_writes_trigger", "400000000"},
                        {"level0_stop_writes_trigger", "400000000"}});
      } else {
        db->SetOptions({{"disable_auto_compactions", "1"}});
      }
4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255
    }

    assert(!db_list.empty());
    auto num_db = db_list.size();
    size_t num_levels = static_cast<size_t>(open_options_.num_levels);
    size_t output_level = open_options_.num_levels - 1;
    std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db);
    std::vector<size_t> num_files_at_level0(num_db, 0);
    if (compaction_style == kCompactionStyleLevel) {
      if (num_levels == 0) {
        return Status::InvalidArgument("num_levels should be larger than 1");
      }
      bool should_stop = false;
      while (!should_stop) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        for (size_t i = 0; i < num_db; i++) {
          auto db = db_list[i];
          db->Flush(FlushOptions());
          db->GetColumnFamilyMetaData(&meta);
          if (num_files_at_level0[i] == meta.levels[0].files.size() ||
              writes_ == 0) {
            should_stop = true;
            continue;
          }
          sorted_runs[i].emplace_back(
              meta.levels[0].files.begin(),
              meta.levels[0].files.end() - num_files_at_level0[i]);
          num_files_at_level0[i] = meta.levels[0].files.size();
          if (sorted_runs[i].back().size() == 1) {
            should_stop = true;
            continue;
          }
          if (sorted_runs[i].size() == output_level) {
            auto& L1 = sorted_runs[i].back();
            L1.erase(L1.begin(), L1.begin() + L1.size() / 3);
            should_stop = true;
            continue;
          }
        }
4256
        writes_ /= static_cast<int64_t>(open_options_.max_bytes_for_level_multiplier);
4257 4258 4259
      }
      for (size_t i = 0; i < num_db; i++) {
        if (sorted_runs[i].size() < num_levels - 1) {
4260
          fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt " levels\n", num_levels);
4261 4262 4263 4264 4265 4266
          exit(1);
        }
      }
      for (size_t i = 0; i < num_db; i++) {
        auto db = db_list[i];
        auto compactionOptions = CompactionOptions();
4267
        compactionOptions.compression = FLAGS_compression_type_e;
4268 4269 4270 4271
        auto options = db->GetOptions();
        MutableCFOptions mutable_cf_options(options);
        for (size_t j = 0; j < sorted_runs[i].size(); j++) {
          compactionOptions.output_file_size_limit =
4272 4273
              MaxFileSizeForLevel(mutable_cf_options,
                  static_cast<int>(output_level), compaction_style);
4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307
          std::cout << sorted_runs[i][j].size() << std::endl;
          db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name,
                                               sorted_runs[i][j].front().name},
                           static_cast<int>(output_level - j) /*level*/);
        }
      }
    } else if (compaction_style == kCompactionStyleUniversal) {
      auto ratio = open_options_.compaction_options_universal.size_ratio;
      bool should_stop = false;
      while (!should_stop) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        for (size_t i = 0; i < num_db; i++) {
          auto db = db_list[i];
          db->Flush(FlushOptions());
          db->GetColumnFamilyMetaData(&meta);
          if (num_files_at_level0[i] == meta.levels[0].files.size() ||
              writes_ == 0) {
            should_stop = true;
            continue;
          }
          sorted_runs[i].emplace_back(
              meta.levels[0].files.begin(),
              meta.levels[0].files.end() - num_files_at_level0[i]);
          num_files_at_level0[i] = meta.levels[0].files.size();
          if (sorted_runs[i].back().size() == 1) {
            should_stop = true;
            continue;
          }
          num_files_at_level0[i] = meta.levels[0].files.size();
        }
4308
        writes_ =  static_cast<int64_t>(writes_* static_cast<double>(100) / (ratio + 200));
4309 4310 4311
      }
      for (size_t i = 0; i < num_db; i++) {
        if (sorted_runs[i].size() < num_levels) {
4312
          fprintf(stderr, "n is too small to fill %" ROCKSDB_PRIszt  " levels\n", num_levels);
4313 4314 4315 4316 4317 4318
          exit(1);
        }
      }
      for (size_t i = 0; i < num_db; i++) {
        auto db = db_list[i];
        auto compactionOptions = CompactionOptions();
4319
        compactionOptions.compression = FLAGS_compression_type_e;
4320 4321 4322 4323
        auto options = db->GetOptions();
        MutableCFOptions mutable_cf_options(options);
        for (size_t j = 0; j < sorted_runs[i].size(); j++) {
          compactionOptions.output_file_size_limit =
4324 4325
              MaxFileSizeForLevel(mutable_cf_options,
                  static_cast<int>(output_level), compaction_style);
4326 4327 4328 4329 4330 4331 4332 4333
          db->CompactFiles(
              compactionOptions,
              {sorted_runs[i][j].back().name, sorted_runs[i][j].front().name},
              (output_level > j ? static_cast<int>(output_level - j)
                                : 0) /*level*/);
        }
      }
    } else if (compaction_style == kCompactionStyleFIFO) {
4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344 4345 4346 4347 4348 4349 4350 4351 4352 4353 4354 4355 4356 4357 4358 4359 4360 4361 4362 4363 4364
      if (num_levels != 1) {
        return Status::InvalidArgument(
          "num_levels should be 1 for FIFO compaction");
      }
      if (FLAGS_num_multi_db != 0) {
        return Status::InvalidArgument("Doesn't support multiDB");
      }
      auto db = db_list[0];
      std::vector<std::string> file_names;
      while (true) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        db->Flush(FlushOptions());
        db->GetColumnFamilyMetaData(&meta);
        auto total_size = meta.levels[0].size;
        if (total_size >=
          db->GetOptions().compaction_options_fifo.max_table_files_size) {
          for (auto file_meta : meta.levels[0].files) {
            file_names.emplace_back(file_meta.name);
          }
          break;
        }
      }
      // TODO(shuzhang1989): Investigate why CompactFiles not working
      // auto compactionOptions = CompactionOptions();
      // db->CompactFiles(compactionOptions, file_names, 0);
      auto compactionOptions = CompactRangeOptions();
      db->CompactRange(compactionOptions, nullptr, nullptr);
4365 4366 4367 4368 4369 4370 4371 4372 4373 4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386
    } else {
      fprintf(stdout,
              "%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
              "filldeterministic");
      return Status::InvalidArgument("None compaction is not supported");
    }

// Verify seqno and key range
// Note: the seqno get changed at the max level by implementation
// optimization, so skip the check of the max level.
#ifndef NDEBUG
    for (size_t k = 0; k < num_db; k++) {
      auto db = db_list[k];
      db->GetColumnFamilyMetaData(&meta);
      // verify the number of sorted runs
      if (compaction_style == kCompactionStyleLevel) {
        assert(num_levels - 1 == sorted_runs[k].size());
      } else if (compaction_style == kCompactionStyleUniversal) {
        assert(meta.levels[0].files.size() + num_levels - 1 ==
               sorted_runs[k].size());
      } else if (compaction_style == kCompactionStyleFIFO) {
        // TODO(gzh): FIFO compaction
4387 4388 4389 4390 4391
        db->GetColumnFamilyMetaData(&meta);
        auto total_size = meta.levels[0].size;
        assert(total_size <=
          db->GetOptions().compaction_options_fifo.max_table_files_size);
          break;
4392 4393 4394 4395 4396 4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419 4420 4421 4422 4423 4424 4425 4426 4427 4428 4429 4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440 4441 4442 4443 4444 4445 4446 4447 4448 4449 4450 4451 4452 4453 4454 4455 4456
      }

      // verify smallest/largest seqno and key range of each sorted run
      auto max_level = num_levels - 1;
      int level;
      for (size_t i = 0; i < sorted_runs[k].size(); i++) {
        level = static_cast<int>(max_level - i);
        SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber;
        SequenceNumber sorted_run_largest_seqno = 0;
        std::string sorted_run_smallest_key, sorted_run_largest_key;
        bool first_key = true;
        for (auto fileMeta : sorted_runs[k][i]) {
          sorted_run_smallest_seqno =
              std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno);
          sorted_run_largest_seqno =
              std::max(sorted_run_largest_seqno, fileMeta.largest_seqno);
          if (first_key ||
              db->DefaultColumnFamily()->GetComparator()->Compare(
                  fileMeta.smallestkey, sorted_run_smallest_key) < 0) {
            sorted_run_smallest_key = fileMeta.smallestkey;
          }
          if (first_key ||
              db->DefaultColumnFamily()->GetComparator()->Compare(
                  fileMeta.largestkey, sorted_run_largest_key) > 0) {
            sorted_run_largest_key = fileMeta.largestkey;
          }
          first_key = false;
        }
        if (compaction_style == kCompactionStyleLevel ||
            (compaction_style == kCompactionStyleUniversal && level > 0)) {
          SequenceNumber level_smallest_seqno = kMaxSequenceNumber;
          SequenceNumber level_largest_seqno = 0;
          for (auto fileMeta : meta.levels[level].files) {
            level_smallest_seqno =
                std::min(level_smallest_seqno, fileMeta.smallest_seqno);
            level_largest_seqno =
                std::max(level_largest_seqno, fileMeta.largest_seqno);
          }
          assert(sorted_run_smallest_key ==
                 meta.levels[level].files.front().smallestkey);
          assert(sorted_run_largest_key ==
                 meta.levels[level].files.back().largestkey);
          if (level != static_cast<int>(max_level)) {
            // compaction at max_level would change sequence number
            assert(sorted_run_smallest_seqno == level_smallest_seqno);
            assert(sorted_run_largest_seqno == level_largest_seqno);
          }
        } else if (compaction_style == kCompactionStyleUniversal) {
          // level <= 0 means sorted runs on level 0
          auto level0_file =
              meta.levels[0].files[sorted_runs[k].size() - 1 - i];
          assert(sorted_run_smallest_key == level0_file.smallestkey);
          assert(sorted_run_largest_key == level0_file.largestkey);
          if (level != static_cast<int>(max_level)) {
            assert(sorted_run_smallest_seqno == level0_file.smallest_seqno);
            assert(sorted_run_largest_seqno == level0_file.largest_seqno);
          }
        }
      }
    }
#endif
    // print the size of each sorted_run
    for (size_t k = 0; k < num_db; k++) {
      auto db = db_list[k];
      fprintf(stdout,
4457
              "---------------------- DB %" ROCKSDB_PRIszt " LSM ---------------------\n", k);
4458 4459 4460 4461 4462 4463 4464
      db->GetColumnFamilyMetaData(&meta);
      for (auto& levelMeta : meta.levels) {
        if (levelMeta.files.empty()) {
          continue;
        }
        if (levelMeta.level == 0) {
          for (auto& fileMeta : levelMeta.files) {
4465
            fprintf(stdout, "Level[%d]: %s(size: %" ROCKSDB_PRIszt " bytes)\n",
4466
                    levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
4467 4468
          }
        } else {
4469
          fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
4470 4471 4472 4473 4474 4475 4476 4477 4478 4479 4480 4481 4482 4483 4484 4485
                  levelMeta.level, levelMeta.files.front().name.c_str(),
                  levelMeta.files.back().name.c_str(), levelMeta.size);
        }
      }
    }
    for (size_t i = 0; i < num_db; i++) {
      db_list[i]->SetOptions(
          {{"disable_auto_compactions",
            std::to_string(options_list[i].disable_auto_compactions)},
           {"level0_slowdown_writes_trigger",
            std::to_string(options_list[i].level0_slowdown_writes_trigger)},
           {"level0_stop_writes_trigger",
            std::to_string(options_list[i].level0_stop_writes_trigger)}});
    }
    return Status::OK();
#else
4486 4487 4488
    (void)thread;
    (void)compaction_style;
    (void)write_mode;
4489 4490 4491 4492 4493 4494
    fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n");
    return Status::NotSupported(
        "Rocksdb Lite doesn't support filldeterministic");
#endif  // ROCKSDB_LITE
  }

4495
  void ReadSequential(ThreadState* thread) {
4496 4497
    if (db_.db != nullptr) {
      ReadSequential(thread, db_.db);
4498
    } else {
4499 4500
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadSequential(thread, db_with_cfh.db);
4501 4502 4503 4504 4505
      }
    }
  }

  void ReadSequential(ThreadState* thread, DB* db) {
4506 4507 4508 4509
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;

    Iterator* iter = db->NewIterator(options);
4510
    int64_t i = 0;
4511
    int64_t bytes = 0;
4512
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
4513
      bytes += iter->key().size() + iter->value().size();
4514
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
4515
      ++i;
4516 4517 4518

      if (thread->shared->read_rate_limiter.get() != nullptr &&
          i % 1024 == 1023) {
4519
        thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
4520 4521
                                                   nullptr /* stats */,
                                                   RateLimiter::OpType::kRead);
4522
      }
4523
    }
4524

4525
    delete iter;
4526
    thread->stats.AddBytes(bytes);
4527
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4528 4529
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
4530
    }
4531 4532
  }

4533
  void ReadReverse(ThreadState* thread) {
4534 4535
    if (db_.db != nullptr) {
      ReadReverse(thread, db_.db);
4536
    } else {
4537 4538
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadReverse(thread, db_with_cfh.db);
4539 4540 4541 4542 4543 4544
      }
    }
  }

  void ReadReverse(ThreadState* thread, DB* db) {
    Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
4545
    int64_t i = 0;
4546
    int64_t bytes = 0;
4547
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
4548
      bytes += iter->key().size() + iter->value().size();
4549
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
4550
      ++i;
4551 4552
      if (thread->shared->read_rate_limiter.get() != nullptr &&
          i % 1024 == 1023) {
4553
        thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH,
4554 4555
                                                   nullptr /* stats */,
                                                   RateLimiter::OpType::kRead);
4556
      }
4557 4558
    }
    delete iter;
4559
    thread->stats.AddBytes(bytes);
4560 4561
  }

L
Lei Jin 已提交
4562 4563 4564
  void ReadRandomFast(ThreadState* thread) {
    int64_t read = 0;
    int64_t found = 0;
4565
    int64_t nonexist = 0;
L
Lei Jin 已提交
4566
    ReadOptions options(FLAGS_verify_checksum, true);
4567 4568
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
4569 4570 4571 4572 4573 4574 4575 4576 4577 4578 4579 4580 4581 4582
    std::string value;
    DB* db = SelectDBWithCfh(thread)->db;

    int64_t pot = 1;
    while (pot < FLAGS_num) {
      pot <<= 1;
    }

    Duration duration(FLAGS_duration, reads_);
    do {
      for (int i = 0; i < 100; ++i) {
        int64_t key_rand = thread->rand.Next() & (pot - 1);
        GenerateKeyFromInt(key_rand, FLAGS_num, &key);
        ++read;
4583 4584
        auto status = db->Get(options, key, &value);
        if (status.ok()) {
L
Lei Jin 已提交
4585
          ++found;
4586
        } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
4587 4588
          fprintf(stderr, "Get returned an error: %s\n",
                  status.ToString().c_str());
4589
          abort();
L
Lei Jin 已提交
4590
        }
4591 4592 4593
        if (key_rand >= FLAGS_num) {
          ++nonexist;
        }
L
Lei Jin 已提交
4594
      }
4595
      if (thread->shared->read_rate_limiter.get() != nullptr) {
4596 4597
        thread->shared->read_rate_limiter->Request(
            100, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4598 4599
      }

4600
      thread->stats.FinishedOps(nullptr, db, 100, kRead);
L
Lei Jin 已提交
4601 4602 4603
    } while (!duration.Done(100));

    char msg[100];
4604 4605 4606
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
             "issued %" PRIu64 " non-exist keys)\n",
             found, read, nonexist);
L
Lei Jin 已提交
4607 4608 4609

    thread->stats.AddMessage(msg);

4610
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4611 4612
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
L
Lei Jin 已提交
4613 4614 4615
    }
  }

4616 4617 4618 4619 4620 4621 4622 4623 4624 4625 4626
  int64_t GetRandomKey(Random64* rand) {
    uint64_t rand_int = rand->Next();
    int64_t key_rand;
    if (read_random_exp_range_ == 0) {
      key_rand = rand_int % FLAGS_num;
    } else {
      const uint64_t kBigInt = static_cast<uint64_t>(1U) << 62;
      long double order = -static_cast<long double>(rand_int % kBigInt) /
                          static_cast<long double>(kBigInt) *
                          read_random_exp_range_;
      long double exp_ran = std::exp(order);
4627
      uint64_t rand_num =
4628
          static_cast<int64_t>(exp_ran * static_cast<long double>(FLAGS_num));
4629 4630 4631 4632
      // Map to a different number to avoid locality.
      const uint64_t kBigPrime = 0x5bd1e995;
      // Overflow is like %(2^64). Will have little impact of results.
      key_rand = static_cast<int64_t>((rand_num * kBigPrime) % FLAGS_num);
4633 4634 4635 4636
    }
    return key_rand;
  }

4637
  void ReadRandom(ThreadState* thread) {
L
Lei Jin 已提交
4638
    int64_t read = 0;
L
Lei Jin 已提交
4639
    int64_t found = 0;
4640
    int64_t bytes = 0;
4641 4642
    int num_keys = 0;
    int64_t key_rand = GetRandomKey(&thread->rand);
L
Lei Jin 已提交
4643
    ReadOptions options(FLAGS_verify_checksum, true);
4644 4645
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
M
Maysam Yabandeh 已提交
4646
    PinnableSlice pinnable_val;
4647

L
Lei Jin 已提交
4648 4649
    Duration duration(FLAGS_duration, reads_);
    while (!duration.Done(1)) {
4650 4651 4652 4653 4654
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
      // We use same key_rand as seed for key and column family so that we can
      // deterministically find the cfh corresponding to a particular key, as it
      // is done in DoWrite method.
      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668
      if (entries_per_batch_ > 1 && FLAGS_multiread_stride) {
        if (++num_keys == entries_per_batch_) {
          num_keys = 0;
          key_rand = GetRandomKey(&thread->rand);
          if ((key_rand + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
              FLAGS_num) {
            key_rand = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
          }
        } else {
          key_rand += FLAGS_multiread_stride;
        }
      } else {
        key_rand = GetRandomKey(&thread->rand);
      }
L
Lei Jin 已提交
4669
      read++;
4670 4671
      Status s;
      if (FLAGS_num_column_families > 1) {
4672
        s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
4673
                                 &pinnable_val);
4674
      } else {
4675 4676 4677 4678
        pinnable_val.Reset();
        s = db_with_cfh->db->Get(options,
                                 db_with_cfh->db->DefaultColumnFamily(), key,
                                 &pinnable_val);
4679 4680
      }
      if (s.ok()) {
L
Lei Jin 已提交
4681
        found++;
4682
        bytes += key.size() + pinnable_val.size();
4683
      } else if (!s.IsNotFound()) {
I
Igor Canadi 已提交
4684
        fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
4685
        abort();
M
Mark Callaghan 已提交
4686
      }
4687 4688 4689

      if (thread->shared->read_rate_limiter.get() != nullptr &&
          read % 256 == 255) {
4690 4691
        thread->shared->read_rate_limiter->Request(
            256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
4692 4693
      }

4694
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
S
Sanjay Ghemawat 已提交
4695
    }
4696

S
Sanjay Ghemawat 已提交
4697
    char msg[100];
L
Lei Jin 已提交
4698
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
4699
             found, read);
4700

4701
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
4702
    thread->stats.AddMessage(msg);
4703

4704
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
4705 4706
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
4707
    }
S
Sanjay Ghemawat 已提交
4708 4709
  }

L
Lei Jin 已提交
4710 4711 4712 4713
  // Calls MultiGet over a list of keys from a random distribution.
  // Returns the total number of keys found.
  void MultiReadRandom(ThreadState* thread) {
    int64_t read = 0;
4714
    int64_t num_multireads = 0;
4715
    int64_t found = 0;
4716
    ReadOptions options(FLAGS_verify_checksum, true);
S
sdong 已提交
4717
    std::vector<Slice> keys;
4718
    std::vector<std::unique_ptr<const char[]> > key_guards;
L
Lei Jin 已提交
4719
    std::vector<std::string> values(entries_per_batch_);
4720
    PinnableSlice* pin_values = new PinnableSlice[entries_per_batch_];
4721
    std::unique_ptr<PinnableSlice[]> pin_values_guard(pin_values);
4722
    std::vector<Status> stat_list(entries_per_batch_);
4723
    while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
4724
      key_guards.push_back(std::unique_ptr<const char[]>());
4725
      keys.push_back(AllocateKey(&key_guards.back()));
J
jorlow@chromium.org 已提交
4726 4727
    }

M
Mark Callaghan 已提交
4728
    Duration duration(FLAGS_duration, reads_);
L
Lei Jin 已提交
4729
    while (!duration.Done(1)) {
4730
      DB* db = SelectDB(thread);
4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744
      if (FLAGS_multiread_stride) {
        int64_t key = GetRandomKey(&thread->rand);
        if ((key + (entries_per_batch_ - 1) * FLAGS_multiread_stride) >=
            (int64_t)FLAGS_num) {
          key = FLAGS_num - entries_per_batch_ * FLAGS_multiread_stride;
        }
        for (int64_t i = 0; i < entries_per_batch_; ++i) {
          GenerateKeyFromInt(key, FLAGS_num, &keys[i]);
          key += FLAGS_multiread_stride;
        }
      } else {
        for (int64_t i = 0; i < entries_per_batch_; ++i) {
          GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
        }
4745
      }
4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776
      if (!FLAGS_multiread_batched) {
        std::vector<Status> statuses = db->MultiGet(options, keys, &values);
        assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);

        read += entries_per_batch_;
        num_multireads++;
        for (int64_t i = 0; i < entries_per_batch_; ++i) {
          if (statuses[i].ok()) {
            ++found;
          } else if (!statuses[i].IsNotFound()) {
            fprintf(stderr, "MultiGet returned an error: %s\n",
                    statuses[i].ToString().c_str());
            abort();
          }
        }
      } else {
        db->MultiGet(options, db->DefaultColumnFamily(), keys.size(),
                     keys.data(), pin_values, stat_list.data());

        read += entries_per_batch_;
        num_multireads++;
        for (int64_t i = 0; i < entries_per_batch_; ++i) {
          if (stat_list[i].ok()) {
            ++found;
          } else if (!stat_list[i].IsNotFound()) {
            fprintf(stderr, "MultiGet returned an error: %s\n",
                    stat_list[i].ToString().c_str());
            abort();
          }
          stat_list[i] = Status::OK();
          pin_values[i].Reset();
4777 4778
        }
      }
4779 4780
      if (thread->shared->read_rate_limiter.get() != nullptr &&
          num_multireads % 256 == 255) {
4781
        thread->shared->read_rate_limiter->Request(
4782 4783
            256 * entries_per_batch_, Env::IO_HIGH, nullptr /* stats */,
            RateLimiter::OpType::kRead);
4784
      }
4785
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
4786
    }
4787 4788

    char msg[100];
4789
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
L
Lei Jin 已提交
4790
             found, read);
4791
    thread->stats.AddMessage(msg);
4792 4793
  }

4794 4795 4796 4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833 4834 4835 4836 4837 4838 4839 4840 4841 4842 4843 4844 4845 4846 4847 4848 4849 4850 4851 4852 4853 4854
  // THe reverse function of Pareto function
  int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) {
    double ret;
    if (k == 0.0) {
      ret = theta - sigma * std::log(u);
    } else {
      ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k;
    }
    return static_cast<int64_t>(ceil(ret));
  }
  // inversion of y=ax^b
  int64_t PowerCdfInversion(double u, double a, double b) {
    double ret;
    ret = std::pow((u / a), (1 / b));
    return static_cast<int64_t>(ceil(ret));
  }

  // Add the noice to the QPS
  double AddNoise(double origin, double noise_ratio) {
    if (noise_ratio < 0.0 || noise_ratio > 1.0) {
      return origin;
    }
    int band_int = static_cast<int>(FLAGS_sine_a);
    double delta = (rand() % band_int - band_int / 2) * noise_ratio;
    if (origin + delta < 0) {
      return origin;
    } else {
      return (origin + delta);
    }
  }

  // decide the query type
  // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge
  class QueryDecider {
   public:
    std::vector<int> type_;
    std::vector<double> ratio_;
    int range_;

    QueryDecider() {}
    ~QueryDecider() {}

    Status Initiate(std::vector<double> ratio_input) {
      int range_max = 1000;
      double sum = 0.0;
      for (auto& ratio : ratio_input) {
        sum += ratio;
      }
      range_ = 0;
      for (auto& ratio : ratio_input) {
        range_ += static_cast<int>(ceil(range_max * (ratio / sum)));
        type_.push_back(range_);
        ratio_.push_back(ratio / sum);
      }
      return Status::OK();
    }

    int GetType(int64_t rand_num) {
      if (rand_num < 0) {
        rand_num = rand_num * (-1);
      }
4855
      assert(range_ != 0);
4856 4857 4858 4859 4860 4861 4862 4863 4864 4865 4866 4867 4868 4869 4870 4871 4872 4873 4874
      int pos = static_cast<int>(rand_num % range_);
      for (int i = 0; i < static_cast<int>(type_.size()); i++) {
        if (pos < type_[i]) {
          return i;
        }
      }
      return 0;
    }
  };

  // The graph wokrload mixed with Get, Put, Iterator
  void MixGraph(ThreadState* thread) {
    int64_t read = 0;  // including single gets and Next of iterators
    int64_t gets = 0;
    int64_t puts = 0;
    int64_t found = 0;
    int64_t seek = 0;
    int64_t seek_found = 0;
    int64_t bytes = 0;
4875
    const int64_t default_value_max = 1 * 1024 * 1024;
4876
    int64_t value_max = default_value_max;
4877 4878 4879
    int64_t scan_len_max = FLAGS_mix_max_scan_len;
    double write_rate = 1000000.0;
    double read_rate = 1000000.0;
4880 4881
    std::vector<double> ratio{FLAGS_mix_get_ratio, FLAGS_mix_put_ratio,
                              FLAGS_mix_seek_ratio};
4882
    char value_buffer[default_value_max];
4883 4884 4885
    QueryDecider query;
    RandomGenerator gen;
    Status s;
4886
    if (value_max > FLAGS_mix_max_value_size) {
4887 4888
      value_max = FLAGS_mix_max_value_size;
    }
4889 4890 4891 4892 4893 4894 4895 4896 4897 4898

    ReadOptions options(FLAGS_verify_checksum, true);
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    PinnableSlice pinnable_val;
    query.Initiate(ratio);

    // the limit of qps initiation
    if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
      thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
4899
          static_cast<int64_t>(read_rate), 100000 /* refill_period_us */, 10 /* fairness */,
4900 4901
          RateLimiter::Mode::kReadsOnly));
      thread->shared->write_rate_limiter.reset(
4902
          NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
4903 4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922 4923 4924 4925 4926 4927 4928 4929 4930 4931 4932 4933 4934 4935 4936 4937
    }

    Duration duration(FLAGS_duration, reads_);
    while (!duration.Done(1)) {
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
      int64_t rand_v, key_rand, key_seed;
      rand_v = GetRandomKey(&thread->rand) % FLAGS_num;
      double u = static_cast<double>(rand_v) / FLAGS_num;
      key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b);
      Random64 rand(key_seed);
      key_rand = static_cast<int64_t>(rand.Next()) % FLAGS_num;
      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
      int query_type = query.GetType(rand_v);

      // change the qps
      uint64_t now = FLAGS_env->NowMicros();
      uint64_t usecs_since_last;
      if (now > thread->stats.GetSineInterval()) {
        usecs_since_last = now - thread->stats.GetSineInterval();
      } else {
        usecs_since_last = 0;
      }

      if (usecs_since_last >
          (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
        double usecs_since_start =
            static_cast<double>(now - thread->stats.GetStart());
        thread->stats.ResetSineInterval();
        double mix_rate_with_noise = AddNoise(
            SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
        read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
        write_rate =
            mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;

        thread->shared->write_rate_limiter.reset(
4938
            NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
4939
        thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
4940
            static_cast<int64_t>(read_rate),
4941 4942 4943 4944 4945 4946 4947 4948 4949 4950 4951 4952 4953 4954 4955 4956 4957 4958 4959 4960 4961 4962 4963 4964 4965 4966 4967 4968 4969 4970 4971 4972
            FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
            RateLimiter::Mode::kReadsOnly));
      }
      // Start the query
      if (query_type == 0) {
        // the Get query
        gets++;
        read++;
        if (FLAGS_num_column_families > 1) {
          s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
                                   &pinnable_val);
        } else {
          pinnable_val.Reset();
          s = db_with_cfh->db->Get(options,
                                   db_with_cfh->db->DefaultColumnFamily(), key,
                                   &pinnable_val);
        }

        if (s.ok()) {
          found++;
          bytes += key.size() + pinnable_val.size();
        } else if (!s.IsNotFound()) {
          fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
          abort();
        }

        if (thread->shared->read_rate_limiter.get() != nullptr &&
            read % 256 == 255) {
          thread->shared->read_rate_limiter->Request(
              256, Env::IO_HIGH, nullptr /* stats */,
              RateLimiter::OpType::kRead);
        }
4973
        thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
4974 4975 4976 4977 4978 4979 4980 4981 4982 4983
      } else if (query_type == 1) {
        // the Put query
        puts++;
        int64_t value_size = ParetoCdfInversion(
            u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
        if (value_size < 0) {
          value_size = 10;
        } else if (value_size > value_max) {
          value_size = value_size % value_max;
        }
4984 4985 4986
        s = db_with_cfh->db->Put(
            write_options_, key,
            gen.Generate(static_cast<unsigned int>(value_size)));
4987 4988 4989 4990 4991 4992 4993 4994 4995 4996
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }

        if (thread->shared->write_rate_limiter) {
          thread->shared->write_rate_limiter->Request(
              key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
              RateLimiter::OpType::kWrite);
        }
4997
        thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
4998 4999 5000 5001 5002 5003 5004 5005 5006 5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017 5018 5019 5020 5021 5022 5023 5024
      } else if (query_type == 2) {
        // Seek query
        if (db_with_cfh->db != nullptr) {
          Iterator* single_iter = nullptr;
          single_iter = db_with_cfh->db->NewIterator(options);
          if (single_iter != nullptr) {
            single_iter->Seek(key);
            seek++;
            read++;
            if (single_iter->Valid() && single_iter->key().compare(key) == 0) {
              seek_found++;
            }
            int64_t scan_length =
                ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k,
                                   FLAGS_iter_sigma) %
                scan_len_max;
            for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) {
              Slice value = single_iter->value();
              memcpy(value_buffer, value.data(),
                     std::min(value.size(), sizeof(value_buffer)));
              bytes += single_iter->key().size() + single_iter->value().size();
              single_iter->Next();
              assert(single_iter->status().ok());
            }
          }
          delete single_iter;
        }
5025
        thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kSeek);
5026 5027
      }
    }
5028
    char msg[256];
5029 5030 5031 5032 5033 5034 5035 5036 5037 5038 5039 5040 5041 5042
    snprintf(msg, sizeof(msg),
             "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64
             " in %" PRIu64 " found)\n",
             gets, puts, seek, found, read);

    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(msg);

    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
    }
  }

5043 5044 5045 5046
  void IteratorCreation(ThreadState* thread) {
    Duration duration(FLAGS_duration, reads_);
    ReadOptions options(FLAGS_verify_checksum, true);
    while (!duration.Done(1)) {
5047 5048
      DB* db = SelectDB(thread);
      Iterator* iter = db->NewIterator(options);
5049
      delete iter;
5050
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
5051 5052 5053
    }
  }

5054 5055 5056 5057
  void IteratorCreationWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      IteratorCreation(thread);
    } else {
5058
      BGWriter(thread, kWrite);
5059 5060 5061
    }
  }

S
Sanjay Ghemawat 已提交
5062
  void SeekRandom(ThreadState* thread) {
L
Lei Jin 已提交
5063
    int64_t read = 0;
5064
    int64_t found = 0;
5065
    int64_t bytes = 0;
L
Lei Jin 已提交
5066
    ReadOptions options(FLAGS_verify_checksum, true);
5067 5068
    options.total_order_seek = FLAGS_total_order_seek;
    options.prefix_same_as_start = FLAGS_prefix_same_as_start;
L
Lei Jin 已提交
5069
    options.tailing = FLAGS_use_tailing_iterator;
5070
    options.readahead_size = FLAGS_readahead_size;
5071 5072 5073

    Iterator* single_iter = nullptr;
    std::vector<Iterator*> multi_iters;
5074 5075
    if (db_.db != nullptr) {
      single_iter = db_.db->NewIterator(options);
5076
    } else {
5077 5078
      for (const auto& db_with_cfh : multi_dbs_) {
        multi_iters.push_back(db_with_cfh.db->NewIterator(options));
5079 5080 5081
      }
    }

5082 5083
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
5084

5085 5086 5087 5088 5089
    std::unique_ptr<const char[]> upper_bound_key_guard;
    Slice upper_bound = AllocateKey(&upper_bound_key_guard);
    std::unique_ptr<const char[]> lower_bound_key_guard;
    Slice lower_bound = AllocateKey(&lower_bound_key_guard);

L
Lei Jin 已提交
5090
    Duration duration(FLAGS_duration, reads_);
5091
    char value_buffer[256];
M
Mark Callaghan 已提交
5092
    while (!duration.Done(1)) {
5093
      int64_t seek_pos = thread->rand.Next() % FLAGS_num;
5094 5095
      GenerateKeyFromIntForSeek(static_cast<uint64_t>(seek_pos), FLAGS_num,
                                &key);
5096 5097 5098
      if (FLAGS_max_scan_distance != 0) {
        if (FLAGS_reverse_iterator) {
          GenerateKeyFromInt(
5099 5100
              static_cast<uint64_t>(std::max(
                  static_cast<int64_t>(0), seek_pos - FLAGS_max_scan_distance)),
5101 5102 5103 5104 5105 5106 5107 5108 5109 5110
              FLAGS_num, &lower_bound);
          options.iterate_lower_bound = &lower_bound;
        } else {
          GenerateKeyFromInt(
              (uint64_t)std::min(FLAGS_num, seek_pos + FLAGS_max_scan_distance),
              FLAGS_num, &upper_bound);
          options.iterate_upper_bound = &upper_bound;
        }
      }

M
Mark Callaghan 已提交
5111 5112 5113 5114 5115 5116 5117 5118 5119 5120 5121
      if (!FLAGS_use_tailing_iterator) {
        if (db_.db != nullptr) {
          delete single_iter;
          single_iter = db_.db->NewIterator(options);
        } else {
          for (auto iter : multi_iters) {
            delete iter;
          }
          multi_iters.clear();
          for (const auto& db_with_cfh : multi_dbs_) {
            multi_iters.push_back(db_with_cfh.db->NewIterator(options));
5122 5123 5124
          }
        }
      }
5125 5126 5127 5128 5129 5130 5131
      // Pick a Iterator to use
      Iterator* iter_to_use = single_iter;
      if (single_iter == nullptr) {
        iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
      }

      iter_to_use->Seek(key);
L
Lei Jin 已提交
5132
      read++;
5133
      if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
L
Lei Jin 已提交
5134 5135
        found++;
      }
5136 5137 5138 5139 5140 5141

      for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
        // Copy out iterator's value to make sure we read them.
        Slice value = iter_to_use->value();
        memcpy(value_buffer, value.data(),
               std::min(value.size(), sizeof(value_buffer)));
5142
        bytes += iter_to_use->key().size() + iter_to_use->value().size();
M
Mark Callaghan 已提交
5143 5144 5145 5146 5147 5148

        if (!FLAGS_reverse_iterator) {
          iter_to_use->Next();
        } else {
          iter_to_use->Prev();
        }
5149 5150 5151
        assert(iter_to_use->status().ok());
      }

5152 5153
      if (thread->shared->read_rate_limiter.get() != nullptr &&
          read % 256 == 255) {
5154 5155
        thread->shared->read_rate_limiter->Request(
            256, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
5156 5157
      }

5158
      thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
S
Sanjay Ghemawat 已提交
5159
    }
5160 5161 5162 5163
    delete single_iter;
    for (auto iter : multi_iters) {
      delete iter;
    }
L
Lei Jin 已提交
5164

S
Sanjay Ghemawat 已提交
5165
    char msg[100];
L
Lei Jin 已提交
5166
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
5167
             found, read);
5168
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
5169
    thread->stats.AddMessage(msg);
5170
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
5171 5172
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
L
Lei Jin 已提交
5173
    }
S
Sanjay Ghemawat 已提交
5174
  }
L
Lei Jin 已提交
5175 5176 5177 5178 5179

  void SeekRandomWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
5180
      BGWriter(thread, kWrite);
L
Lei Jin 已提交
5181 5182
    }
  }
S
Sanjay Ghemawat 已提交
5183

5184 5185 5186 5187 5188 5189 5190 5191
  void SeekRandomWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

S
Sanjay Ghemawat 已提交
5192 5193
  void DoDelete(ThreadState* thread, bool seq) {
    WriteBatch batch;
Y
Yueh-Hsuan Chiang 已提交
5194
    Duration duration(seq ? 0 : FLAGS_duration, deletes_);
L
Lei Jin 已提交
5195
    int64_t i = 0;
5196 5197
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
5198

M
Mark Callaghan 已提交
5199
    while (!duration.Done(entries_per_batch_)) {
5200
      DB* db = SelectDB(thread);
S
Sanjay Ghemawat 已提交
5201
      batch.Clear();
L
Lei Jin 已提交
5202 5203 5204
      for (int64_t j = 0; j < entries_per_batch_; ++j) {
        const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
        GenerateKeyFromInt(k, FLAGS_num, &key);
5205
        batch.Delete(key);
S
Sanjay Ghemawat 已提交
5206
      }
5207
      auto s = db->Write(write_options_, &batch);
5208
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
S
Sanjay Ghemawat 已提交
5209 5210 5211 5212
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
L
Lei Jin 已提交
5213
      i += entries_per_batch_;
S
Sanjay Ghemawat 已提交
5214 5215 5216 5217 5218 5219 5220 5221 5222 5223 5224
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

5225 5226 5227 5228
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
5229
      BGWriter(thread, kWrite);
5230 5231
    }
  }
5232

M
Mark Callaghan 已提交
5233 5234 5235 5236 5237 5238 5239 5240
  void ReadWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

5241
  void BGWriter(ThreadState* thread, enum OperationType write_merge) {
5242 5243
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
5244
    int64_t bytes = 0;
5245

5246 5247 5248 5249 5250
    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
5251 5252 5253 5254

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

5255 5256
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
5257 5258
    uint32_t written = 0;
    bool hint_printed = false;
5259 5260

    while (true) {
5261
      DB* db = SelectDB(thread);
5262 5263
      {
        MutexLock l(&thread->shared->mu);
5264 5265 5266 5267
        if (FLAGS_finish_after_writes && written == writes_) {
          fprintf(stderr, "Exiting the writer after %u writes...\n", written);
          break;
        }
5268 5269
        if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
          // Other threads have finished
5270 5271 5272 5273 5274 5275 5276 5277 5278 5279 5280
          if (FLAGS_finish_after_writes) {
            // Wait for the writes to be finished
            if (!hint_printed) {
              fprintf(stderr, "Reads are finished. Have %d more writes to do\n",
                      (int)writes_ - written);
              hint_printed = true;
            }
          } else {
            // Finish the write immediately
            break;
          }
5281
        }
5282 5283 5284
      }

      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
5285 5286
      Status s;

5287
      if (write_merge == kWrite) {
5288
        s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
5289
      } else {
5290
        s = db->Merge(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
5291
      }
5292
      written++;
M
Mark Callaghan 已提交
5293

5294
      if (!s.ok()) {
M
Mark Callaghan 已提交
5295
        fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
5296 5297
        exit(1);
      }
5298
      bytes += key.size() + value_size_;
5299
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
5300

5301 5302
      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
5303
            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
5304
            nullptr /* stats */, RateLimiter::OpType::kWrite);
5305 5306
      }
    }
5307
    thread->stats.AddBytes(bytes);
5308 5309
  }

Y
Yi Wu 已提交
5310 5311 5312 5313 5314 5315 5316 5317 5318 5319 5320 5321 5322 5323 5324 5325 5326
  void ReadWhileScanning(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      BGScan(thread);
    }
  }

  void BGScan(ThreadState* thread) {
    if (FLAGS_num_multi_db > 0) {
      fprintf(stderr, "Not supporting multiple DBs.\n");
      abort();
    }
    assert(db_.db != nullptr);
    ReadOptions read_options;
    Iterator* iter = db_.db->NewIterator(read_options);

5327
    fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_);
Y
Yi Wu 已提交
5328 5329 5330 5331 5332 5333 5334 5335 5336 5337 5338 5339 5340 5341 5342 5343 5344 5345 5346 5347 5348
    Duration duration(FLAGS_duration, reads_);
    uint64_t num_seek_to_first = 0;
    uint64_t num_next = 0;
    while (!duration.Done(1)) {
      if (!iter->Valid()) {
        iter->SeekToFirst();
        num_seek_to_first++;
      } else if (!iter->status().ok()) {
        fprintf(stderr, "Iterator error: %s\n",
                iter->status().ToString().c_str());
        abort();
      } else {
        iter->Next();
        num_next++;
      }

      thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
    }
    delete iter;
  }

5349
  // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
5350
  // in DB atomically i.e in a single batch. Also refer GetMany.
5351 5352
  Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
                 const Slice& value) {
5353 5354 5355 5356 5357 5358 5359 5360 5361 5362
    std::string suffixes[3] = {"2", "1", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Put(keys[i], value);
    }

5363
    s = db->Write(writeoptions, &batch);
5364 5365 5366 5367 5368
    return s;
  }


  // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
5369
  // in DB atomically i.e in a single batch. Also refer GetMany.
5370 5371
  Status DeleteMany(DB* db, const WriteOptions& writeoptions,
                    const Slice& key) {
5372 5373 5374 5375 5376 5377 5378 5379 5380 5381
    std::string suffixes[3] = {"1", "2", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Delete(keys[i]);
    }

5382
    s = db->Write(writeoptions, &batch);
5383 5384 5385 5386 5387
    return s;
  }

  // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
  // in the same snapshot, and verifies that all the values are identical.
5388
  // ASSUMES that PutMany was used to put (K, V) into the DB.
5389 5390
  Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
                 std::string* value) {
5391 5392 5393 5394 5395
    std::string suffixes[3] = {"0", "1", "2"};
    std::string keys[3];
    Slice key_slices[3];
    std::string values[3];
    ReadOptions readoptionscopy = readoptions;
5396
    readoptionscopy.snapshot = db->GetSnapshot();
5397 5398 5399 5400
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      key_slices[i] = keys[i];
5401
      s = db->Get(readoptionscopy, key_slices[i], value);
5402 5403 5404 5405 5406 5407 5408 5409 5410 5411 5412
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
      } else {
        values[i] = *value;
      }
    }
5413
    db->ReleaseSnapshot(readoptionscopy.snapshot);
5414 5415 5416 5417 5418 5419 5420 5421 5422 5423 5424 5425 5426

    if ((values[0] != values[1]) || (values[1] != values[2])) {
      fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
              key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
              values[2].c_str());
      // we continue after error rather than exiting so that we can
      // find more errors if any
    }

    return s;
  }

  // Differs from readrandomwriterandom in the following ways:
5427
  // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
5428 5429 5430 5431
  // (b) Does deletes as well (per FLAGS_deletepercent)
  // (c) In order to achieve high % of 'found' during lookups, and to do
  //     multiple writes (including puts and deletes) it uses upto
  //     FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
5432
  // (d) Does not have a MultiGet option.
5433 5434 5435 5436
  void RandomWithVerify(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
5437
    int64_t found = 0;
5438 5439 5440
    int get_weight = 0;
    int put_weight = 0;
    int delete_weight = 0;
5441 5442 5443
    int64_t gets_done = 0;
    int64_t puts_done = 0;
    int64_t deletes_done = 0;
5444

5445 5446
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
5447

5448
    // the number of iterations is the larger of read_ or write_
5449
    for (int64_t i = 0; i < readwrites_; i++) {
5450
      DB* db = SelectDB(thread);
5451
      if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
5452
        // one batch completed, reinitialize for next batch
5453 5454 5455 5456
        get_weight = FLAGS_readwritepercent;
        delete_weight = FLAGS_deletepercent;
        put_weight = 100 - get_weight - delete_weight;
      }
L
Lei Jin 已提交
5457 5458
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
          FLAGS_numdistinct, &key);
5459 5460
      if (get_weight > 0) {
        // do all the gets first
5461
        Status s = GetMany(db, options, key, &value);
5462
        if (!s.ok() && !s.IsNotFound()) {
5463
          fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
5464 5465 5466 5467 5468 5469 5470
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
        get_weight--;
        gets_done++;
5471
        thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
5472 5473 5474
      } else if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
5475
        Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
5476
        if (!s.ok()) {
5477
          fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
5478 5479 5480 5481
          exit(1);
        }
        put_weight--;
        puts_done++;
5482
        thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
5483
      } else if (delete_weight > 0) {
5484
        Status s = DeleteMany(db, write_options_, key);
5485
        if (!s.ok()) {
5486
          fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
5487 5488 5489 5490
          exit(1);
        }
        delete_weight--;
        deletes_done++;
5491
        thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
5492 5493
      }
    }
D
Daniel Black 已提交
5494
    char msg[128];
5495
    snprintf(msg, sizeof(msg),
5496 5497
             "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
             PRIu64 " found:%" PRIu64 ")",
5498 5499 5500 5501
             gets_done, puts_done, deletes_done, readwrites_, found);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
5502
  // This is different from ReadWhileWriting because it does not use
5503
  // an extra thread.
5504 5505 5506 5507
  void ReadRandomWriteRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
5508
    int64_t found = 0;
5509 5510
    int get_weight = 0;
    int put_weight = 0;
5511 5512
    int64_t reads_done = 0;
    int64_t writes_done = 0;
M
Mark Callaghan 已提交
5513 5514
    Duration duration(FLAGS_duration, readwrites_);

5515 5516
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
5517

5518
    // the number of iterations is the larger of read_ or write_
M
Mark Callaghan 已提交
5519
    while (!duration.Done(1)) {
5520
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
5521
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
5522
      if (get_weight == 0 && put_weight == 0) {
X
Xing Jin 已提交
5523
        // one batch completed, reinitialize for next batch
5524 5525 5526 5527 5528
        get_weight = FLAGS_readwritepercent;
        put_weight = 100 - get_weight;
      }
      if (get_weight > 0) {
        // do all the gets first
5529
        Status s = db->Get(options, key, &value);
5530 5531 5532 5533 5534 5535 5536
        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
5537 5538
        get_weight--;
        reads_done++;
5539
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
5540 5541 5542
      } else  if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
5543
        Status s = db->Put(write_options_, key, gen.Generate(value_size_));
5544 5545 5546 5547 5548 5549
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        put_weight--;
        writes_done++;
5550
        thread->stats.FinishedOps(nullptr, db, 1, kWrite);
5551 5552 5553
      }
    }
    char msg[100];
5554 5555
    snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
             " total:%" PRIu64 " found:%" PRIu64 ")",
5556
             reads_done, writes_done, readwrites_, found);
5557 5558 5559
    thread->stats.AddMessage(msg);
  }

M
Mark Callaghan 已提交
5560 5561 5562 5563 5564 5565
  //
  // Read-modify-write for random keys
  void UpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
5566
    int64_t found = 0;
5567
    int64_t bytes = 0;
M
Mark Callaghan 已提交
5568 5569
    Duration duration(FLAGS_duration, readwrites_);

5570 5571
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
M
Mark Callaghan 已提交
5572 5573
    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
5574
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
5575
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
5576

5577 5578 5579
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
5580
        bytes += key.size() + value.size();
5581
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
5582 5583
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
5584
        abort();
M
Mark Callaghan 已提交
5585 5586
      }

5587 5588 5589 5590 5591 5592
      if (thread->shared->write_rate_limiter) {
        thread->shared->write_rate_limiter->Request(
            key.size() + value_size_, Env::IO_HIGH, nullptr /*stats*/,
            RateLimiter::OpType::kWrite);
      }

5593
      Status s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
5594 5595 5596 5597
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
5598
      bytes += key.size() + value_size_;
5599
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
M
Mark Callaghan 已提交
5600 5601
    }
    char msg[100];
5602
    snprintf(msg, sizeof(msg),
5603
             "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
5604
    thread->stats.AddBytes(bytes);
M
Mark Callaghan 已提交
5605 5606 5607
    thread->stats.AddMessage(msg);
  }

P
Pooya Shareghi 已提交
5608 5609 5610 5611 5612 5613 5614 5615 5616 5617 5618 5619 5620 5621 5622 5623 5624 5625 5626 5627 5628 5629 5630 5631 5632 5633 5634 5635 5636 5637 5638 5639 5640 5641 5642 5643 5644 5645 5646 5647 5648 5649 5650 5651 5652 5653 5654 5655 5656 5657 5658 5659
  // Read-XOR-write for random keys. Xors the existing value with a randomly
  // generated value, and stores the result. Assuming A in the array of bytes
  // representing the existing value, we generate an array B of the same size,
  // then compute C = A^B as C[i]=A[i]^B[i], and store C
  void XORUpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string existing_value;
    int64_t found = 0;
    Duration duration(FLAGS_duration, readwrites_);

    BytesXOROperator xor_operator;

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
      DB* db = SelectDB(thread);
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);

      auto status = db->Get(options, key, &existing_value);
      if (status.ok()) {
        ++found;
      } else if (!status.IsNotFound()) {
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
        exit(1);
      }

      Slice value = gen.Generate(value_size_);
      std::string new_value;

      if (status.ok()) {
        Slice existing_value_slice = Slice(existing_value);
        xor_operator.XOR(&existing_value_slice, value, &new_value);
      } else {
        xor_operator.XOR(nullptr, value, &new_value);
      }

      Status s = db->Put(write_options_, key, Slice(new_value));
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedOps(nullptr, db, 1);
    }
    char msg[100];
    snprintf(msg, sizeof(msg),
             "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
    thread->stats.AddMessage(msg);
  }

D
Deon Nicholas 已提交
5660 5661 5662 5663 5664 5665 5666
  // Read-modify-write for random keys.
  // Each operation causes the key grow by value_size (simulating an append).
  // Generally used for benchmarking against merges of similar type
  void AppendRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
5667
    int64_t found = 0;
5668
    int64_t bytes = 0;
D
Deon Nicholas 已提交
5669

5670 5671
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
5672 5673 5674
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
5675
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
5676
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
D
Deon Nicholas 已提交
5677

5678 5679 5680
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
5681
        bytes += key.size() + value.size();
5682
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
5683 5684
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
5685
        abort();
D
Deon Nicholas 已提交
5686 5687 5688 5689 5690 5691 5692 5693
      } else {
        // If not existing, then just assume an empty string of data
        value.clear();
      }

      // Update the value (by appending data)
      Slice operand = gen.Generate(value_size_);
      if (value.size() > 0) {
5694
        // Use a delimiter to match the semantics for StringAppendOperator
D
Deon Nicholas 已提交
5695 5696 5697 5698 5699
        value.append(1,',');
      }
      value.append(operand.data(), operand.size());

      // Write back to the database
5700
      Status s = db->Put(write_options_, key, value);
D
Deon Nicholas 已提交
5701 5702 5703 5704
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
5705
      bytes += key.size() + value.size();
5706
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
D
Deon Nicholas 已提交
5707
    }
L
Lei Jin 已提交
5708

D
Deon Nicholas 已提交
5709
    char msg[100];
5710 5711
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
            readwrites_, found);
5712
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
5713 5714 5715 5716 5717 5718 5719 5720 5721 5722
    thread->stats.AddMessage(msg);
  }

  // Read-modify-write for random keys (using MergeOperator)
  // The merge operator to use should be defined by FLAGS_merge_operator
  // Adjust FLAGS_value_size so that the keys are reasonable for this operator
  // Assumes that the merge operator is non-null (i.e.: is well-defined)
  //
  // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
  // to simulate random additions over 64-bit integers using merge.
5723 5724 5725
  //
  // The number of merges on the same key can be controlled by adjusting
  // FLAGS_merge_keys.
D
Deon Nicholas 已提交
5726 5727
  void MergeRandom(ThreadState* thread) {
    RandomGenerator gen;
5728
    int64_t bytes = 0;
5729 5730
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
5731 5732 5733
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
5734 5735 5736
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
      int64_t key_rand = thread->rand.Next() % merge_keys_;
      GenerateKeyFromInt(key_rand, merge_keys_, &key);
D
Deon Nicholas 已提交
5737

5738 5739 5740 5741 5742 5743 5744 5745 5746 5747
      Status s;
      if (FLAGS_num_column_families > 1) {
        s = db_with_cfh->db->Merge(write_options_,
                                   db_with_cfh->GetCfh(key_rand), key,
                                   gen.Generate(value_size_));
      } else {
        s = db_with_cfh->db->Merge(write_options_,
                                   db_with_cfh->db->DefaultColumnFamily(), key,
                                   gen.Generate(value_size_));
      }
D
Deon Nicholas 已提交
5748 5749 5750 5751 5752

      if (!s.ok()) {
        fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
        exit(1);
      }
5753
      bytes += key.size() + value_size_;
5754
      thread->stats.FinishedOps(nullptr, db_with_cfh->db, 1, kMerge);
D
Deon Nicholas 已提交
5755 5756 5757 5758
    }

    // Print some statistics
    char msg[100];
5759
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
5760
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
5761 5762 5763
    thread->stats.AddMessage(msg);
  }

5764 5765 5766 5767 5768 5769 5770 5771 5772 5773 5774
  // Read and merge random keys. The amount of reads and merges are controlled
  // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
  // keys (and thus also the number of reads and merges on the same key) can be
  // adjusted with FLAGS_merge_keys.
  //
  // As with MergeRandom, the merge operator to use should be defined by
  // FLAGS_merge_operator.
  void ReadRandomMergeRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
5775 5776 5777
    int64_t num_hits = 0;
    int64_t num_gets = 0;
    int64_t num_merges = 0;
5778 5779
    size_t max_length = 0;

5780 5781
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
5782 5783 5784
    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
5785
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
5786
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
5787 5788 5789 5790

      bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;

      if (do_merge) {
5791
        Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
5792 5793 5794 5795 5796
        if (!s.ok()) {
          fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
          exit(1);
        }
        num_merges++;
5797
        thread->stats.FinishedOps(nullptr, db, 1, kMerge);
5798
      } else {
5799
        Status s = db->Get(options, key, &value);
5800 5801 5802 5803 5804 5805 5806 5807 5808 5809 5810
        if (value.length() > max_length)
          max_length = value.length();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          num_hits++;
        }
        num_gets++;
5811
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
5812 5813
      }
    }
L
Lei Jin 已提交
5814

5815 5816
    char msg[100];
    snprintf(msg, sizeof(msg),
S
sdong 已提交
5817 5818
             "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64
             " hits:%" PRIu64 " maxlength:%" ROCKSDB_PRIszt ")",
5819 5820 5821 5822
             num_gets, num_merges, readwrites_, num_hits, max_length);
    thread->stats.AddMessage(msg);
  }

T
Tomislav Novak 已提交
5823 5824 5825 5826 5827 5828 5829 5830 5831 5832
  void WriteSeqSeekSeq(ThreadState* thread) {
    writes_ = FLAGS_num;
    DoWrite(thread, SEQUENTIAL);
    // exclude writes from the ops/sec calculation
    thread->stats.Start(thread->tid);

    DB* db = SelectDB(thread);
    std::unique_ptr<Iterator> iter(
      db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));

5833 5834
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
T
Tomislav Novak 已提交
5835 5836 5837 5838
    for (int64_t i = 0; i < FLAGS_num; ++i) {
      GenerateKeyFromInt(i, FLAGS_num, &key);
      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
5839
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
5840

5841
      for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
M
Mark Callaghan 已提交
5842 5843 5844 5845 5846
        if (!FLAGS_reverse_iterator) {
          iter->Next();
        } else {
          iter->Prev();
        }
T
Tomislav Novak 已提交
5847 5848
        GenerateKeyFromInt(++i, FLAGS_num, &key);
        assert(iter->Valid() && iter->key() == key);
5849
        thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
5850 5851 5852 5853
      }

      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
5854
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
5855 5856 5857
    }
  }

5858
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
5859 5860 5861 5862 5863 5864 5865 5866 5867 5868 5869 5870 5871 5872 5873 5874
  // This benchmark stress tests Transactions.  For a given --duration (or
  // total number of --writes, a Transaction will perform a read-modify-write
  // to increment the value of a key in each of N(--transaction-sets) sets of
  // keys (where each set has --num keys).  If --threads is set, this will be
  // done in parallel.
  //
  // To test transactions, use --transaction_db=true.  Not setting this
  // parameter
  // will run the same benchmark without transactions.
  //
  // RandomTransactionVerify() will then validate the correctness of the results
  // by checking if the sum of all keys in each set is the same.
  void RandomTransaction(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    Duration duration(FLAGS_duration, readwrites_);
    ReadOptions read_options(FLAGS_verify_checksum, true);
S
SherlockNoMad 已提交
5875
    uint16_t num_prefix_ranges = static_cast<uint16_t>(FLAGS_transaction_sets);
A
agiardullo 已提交
5876
    uint64_t transactions_done = 0;
A
agiardullo 已提交
5877 5878 5879 5880 5881 5882

    if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
      fprintf(stderr, "invalid value for transaction_sets\n");
      abort();
    }

A
agiardullo 已提交
5883 5884 5885 5886 5887 5888 5889 5890
    TransactionOptions txn_options;
    txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
    txn_options.set_snapshot = FLAGS_transaction_set_snapshot;

    RandomTransactionInserter inserter(&thread->rand, write_options_,
                                       read_options, FLAGS_num,
                                       num_prefix_ranges);

A
agiardullo 已提交
5891 5892 5893 5894 5895 5896 5897 5898
    if (FLAGS_num_multi_db > 1) {
      fprintf(stderr,
              "Cannot run RandomTransaction benchmark with "
              "FLAGS_multi_db > 1.");
      abort();
    }

    while (!duration.Done(1)) {
A
agiardullo 已提交
5899
      bool success;
A
agiardullo 已提交
5900

A
agiardullo 已提交
5901 5902
      // RandomTransactionInserter will attempt to insert a key for each
      // # of FLAGS_transaction_sets
A
agiardullo 已提交
5903
      if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
5904
        success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
A
agiardullo 已提交
5905 5906
      } else if (FLAGS_transaction_db) {
        TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);
A
agiardullo 已提交
5907
        success = inserter.TransactionDBInsert(txn_db, txn_options);
A
agiardullo 已提交
5908
      } else {
A
agiardullo 已提交
5909
        success = inserter.DBInsert(db_.db);
A
agiardullo 已提交
5910 5911
      }

A
agiardullo 已提交
5912 5913 5914 5915
      if (!success) {
        fprintf(stderr, "Unexpected error: %s\n",
                inserter.GetLastStatus().ToString().c_str());
        abort();
5916 5917
      }

A
agiardullo 已提交
5918
      thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
A
agiardullo 已提交
5919 5920 5921 5922
      transactions_done++;
    }

    char msg[100];
A
agiardullo 已提交
5923
    if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
A
agiardullo 已提交
5924 5925
      snprintf(msg, sizeof(msg),
               "( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
A
agiardullo 已提交
5926
               transactions_done, inserter.GetFailureCount());
A
agiardullo 已提交
5927 5928 5929 5930 5931
    } else {
      snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
    }
    thread->stats.AddMessage(msg);

5932
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
5933 5934
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
A
agiardullo 已提交
5935
    }
5936
    thread->stats.AddBytes(static_cast<int64_t>(inserter.GetBytesInserted()));
A
agiardullo 已提交
5937 5938 5939 5940 5941 5942
  }

  // Verifies consistency of data after RandomTransaction() has been run.
  // Since each iteration of RandomTransaction() incremented a key in each set
  // by the same value, the sum of the keys in each set should be the same.
  void RandomTransactionVerify() {
A
agiardullo 已提交
5943
    if (!FLAGS_transaction_db && !FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
5944 5945 5946 5947
      // transactions not used, nothing to verify.
      return;
    }

A
agiardullo 已提交
5948
    Status s =
S
SherlockNoMad 已提交
5949 5950
        RandomTransactionInserter::Verify(db_.db,
                            static_cast<uint16_t>(FLAGS_transaction_sets));
A
agiardullo 已提交
5951

A
agiardullo 已提交
5952 5953 5954 5955
    if (s.ok()) {
      fprintf(stdout, "RandomTransactionVerify Success.\n");
    } else {
      fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
A
agiardullo 已提交
5956 5957
    }
  }
5958
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
5959

A
Andres Noetzli 已提交
5960 5961 5962 5963 5964 5965 5966 5967 5968 5969 5970 5971 5972 5973 5974 5975 5976 5977 5978 5979 5980 5981 5982 5983 5984 5985 5986 5987 5988 5989 5990 5991 5992 5993 5994 5995 5996 5997 5998 5999 6000 6001 6002 6003 6004 6005 6006 6007 6008
  // Writes and deletes random keys without overwriting keys.
  //
  // This benchmark is intended to partially replicate the behavior of MyRocks
  // secondary indices: All data is stored in keys and updates happen by
  // deleting the old version of the key and inserting the new version.
  void RandomReplaceKeys(ThreadState* thread) {
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
    size_t max_counter = 50;
    RandomGenerator gen;

    Status s;
    DB* db = SelectDB(thread);
    for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
      GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
      s = db->Put(write_options_, key, gen.Generate(value_size_));
      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }
    }

    db->GetSnapshot();

    std::default_random_engine generator;
    std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
                                                  FLAGS_stddev);
    Duration duration(FLAGS_duration, FLAGS_num);
    while (!duration.Done(1)) {
      int64_t rnd_id = static_cast<int64_t>(distribution(generator));
      int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
                                static_cast<int64_t>(0));
      GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                         &key);
      s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
                                   : db->Delete(write_options_, key);
      if (s.ok()) {
        counters[key_id] = (counters[key_id] + 1) % max_counter;
        GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                           &key);
        s = db->Put(write_options_, key, Slice());
      }

      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }

6009
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
A
Andres Noetzli 已提交
6010 6011 6012 6013 6014 6015 6016 6017 6018 6019
    }

    char msg[200];
    snprintf(msg, sizeof(msg),
             "use single deletes: %d, "
             "standard deviation: %lf\n",
             FLAGS_use_single_deletes, FLAGS_stddev);
    thread->stats.AddMessage(msg);
  }

6020 6021 6022 6023 6024 6025 6026 6027 6028 6029 6030 6031 6032 6033 6034 6035 6036 6037 6038 6039 6040 6041 6042 6043 6044 6045 6046 6047 6048 6049 6050 6051 6052 6053 6054 6055 6056 6057 6058 6059 6060 6061 6062 6063 6064 6065 6066 6067 6068 6069 6070 6071 6072 6073 6074 6075 6076 6077 6078 6079 6080 6081
  void TimeSeriesReadOrDelete(ThreadState* thread, bool do_deletion) {
    ReadOptions options(FLAGS_verify_checksum, true);
    int64_t read = 0;
    int64_t found = 0;
    int64_t bytes = 0;

    Iterator* iter = nullptr;
    // Only work on single database
    assert(db_.db != nullptr);
    iter = db_.db->NewIterator(options);

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);

    char value_buffer[256];
    while (true) {
      {
        MutexLock l(&thread->shared->mu);
        if (thread->shared->num_done >= 1) {
          // Write thread have finished
          break;
        }
      }
      if (!FLAGS_use_tailing_iterator) {
        delete iter;
        iter = db_.db->NewIterator(options);
      }
      // Pick a Iterator to use

      int64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
      GenerateKeyFromInt(key_id, FLAGS_num, &key);
      // Reset last 8 bytes to 0
      char* start = const_cast<char*>(key.data());
      start += key.size() - 8;
      memset(start, 0, 8);
      ++read;

      bool key_found = false;
      // Seek the prefix
      for (iter->Seek(key); iter->Valid() && iter->key().starts_with(key);
           iter->Next()) {
        key_found = true;
        // Copy out iterator's value to make sure we read them.
        if (do_deletion) {
          bytes += iter->key().size();
          if (KeyExpired(timestamp_emulator_.get(), iter->key())) {
            thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
            db_.db->Delete(write_options_, iter->key());
          } else {
            break;
          }
        } else {
          bytes += iter->key().size() + iter->value().size();
          thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
          Slice value = iter->value();
          memcpy(value_buffer, value.data(),
                 std::min(value.size(), sizeof(value_buffer)));

          assert(iter->status().ok());
        }
      }
      found += key_found;
6082 6083

      if (thread->shared->read_rate_limiter.get() != nullptr) {
6084 6085
        thread->shared->read_rate_limiter->Request(
            1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead);
6086
      }
6087 6088 6089 6090 6091 6092 6093 6094 6095
    }
    delete iter;

    char msg[100];
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found,
             read);
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(msg);
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
6096 6097
      thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") +
                               get_perf_context()->ToString());
6098 6099 6100 6101 6102 6103 6104 6105 6106 6107 6108 6109 6110 6111 6112 6113 6114 6115 6116 6117 6118 6119 6120 6121 6122 6123 6124 6125 6126 6127 6128 6129 6130 6131 6132 6133 6134 6135 6136 6137 6138 6139 6140 6141 6142 6143 6144 6145 6146 6147 6148 6149 6150 6151 6152 6153 6154 6155
    }
  }

  void TimeSeriesWrite(ThreadState* thread) {
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
    int64_t bytes = 0;

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);

    Duration duration(FLAGS_duration, writes_);
    while (!duration.Done(1)) {
      DB* db = SelectDB(thread);

      uint64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
      // Write key id
      GenerateKeyFromInt(key_id, FLAGS_num, &key);
      // Write timestamp

      char* start = const_cast<char*>(key.data());
      char* pos = start + 8;
      int bytes_to_fill =
          std::min(key_size_ - static_cast<int>(pos - start), 8);
      uint64_t timestamp_value = timestamp_emulator_->Get();
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&timestamp_value), bytes_to_fill);
      }

      timestamp_emulator_->Inc();

      Status s;

      s = db->Put(write_options_, key, gen.Generate(value_size_));

      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      bytes = key.size() + value_size_;
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
      thread->stats.AddBytes(bytes);

      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
6156
            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
6157
            nullptr /* stats */, RateLimiter::OpType::kWrite);
6158 6159 6160 6161 6162 6163 6164 6165 6166 6167 6168 6169 6170 6171 6172 6173
      }
    }
  }

  void TimeSeries(ThreadState* thread) {
    if (thread->tid > 0) {
      bool do_deletion = FLAGS_expire_style == "delete" &&
                         thread->tid <= FLAGS_num_deletion_threads;
      TimeSeriesReadOrDelete(thread, do_deletion);
    } else {
      TimeSeriesWrite(thread);
      thread->stats.Stop();
      thread->stats.Report("timeseries write");
    }
  }

6174
  void Compact(ThreadState* thread) {
6175
    DB* db = SelectDB(thread);
6176
    CompactRangeOptions cro;
6177 6178
    cro.bottommost_level_compaction =
        BottommostLevelCompaction::kForceOptimized;
6179
    db->CompactRange(cro, nullptr, nullptr);
J
jorlow@chromium.org 已提交
6180 6181
  }

6182 6183 6184 6185 6186 6187 6188 6189 6190
  void CompactAll() {
    if (db_.db != nullptr) {
      db_.db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
    }
    for (const auto& db_with_cfh : multi_dbs_) {
      db_with_cfh.db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
    }
  }

S
Siying Dong 已提交
6191 6192 6193 6194 6195 6196 6197 6198 6199
  void ResetStats() {
    if (db_.db != nullptr) {
      db_.db->ResetStats();
    }
    for (const auto& db_with_cfh : multi_dbs_) {
      db_with_cfh.db->ResetStats();
    }
  }

S
Sanjay Ghemawat 已提交
6200
  void PrintStats(const char* key) {
6201 6202
    if (db_.db != nullptr) {
      PrintStats(db_.db, key, false);
6203
    }
6204 6205
    for (const auto& db_with_cfh : multi_dbs_) {
      PrintStats(db_with_cfh.db, key, true);
6206 6207 6208 6209 6210 6211 6212
    }
  }

  void PrintStats(DB* db, const char* key, bool print_header = false) {
    if (print_header) {
      fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
    }
6213
    std::string stats;
6214
    if (!db->GetProperty(key, &stats)) {
6215
      stats = "(failed)";
6216
    }
6217
    fprintf(stdout, "\n%s\n", stats.c_str());
6218
  }
6219 6220 6221 6222 6223 6224 6225 6226 6227

  void Replay(ThreadState* thread) {
    if (db_.db != nullptr) {
      Replay(thread, &db_);
    }
  }

  void Replay(ThreadState* /*thread*/, DBWithColumnFamilies* db_with_cfh) {
    Status s;
6228
    std::unique_ptr<TraceReader> trace_reader;
6229 6230 6231 6232 6233 6234 6235 6236 6237 6238 6239 6240
    s = NewFileTraceReader(FLAGS_env, EnvOptions(), FLAGS_trace_file,
                           &trace_reader);
    if (!s.ok()) {
      fprintf(
          stderr,
          "Encountered an error creating a TraceReader from the trace file. "
          "Error: %s\n",
          s.ToString().c_str());
      exit(1);
    }
    Replayer replayer(db_with_cfh->db, db_with_cfh->cfh,
                      std::move(trace_reader));
6241 6242
    replayer.SetFastForward(
        static_cast<uint32_t>(FLAGS_trace_replay_fast_forward));
6243 6244 6245 6246 6247 6248 6249 6250 6251
    s = replayer.Replay();
    if (s.ok()) {
      fprintf(stdout, "Replay started from trace_file: %s\n",
              FLAGS_trace_file.c_str());
    } else {
      fprintf(stderr, "Starting replay failed. Error: %s\n",
              s.ToString().c_str());
    }
  }
J
jorlow@chromium.org 已提交
6252 6253
};

6254
int db_bench_tool(int argc, char** argv) {
I
Igor Canadi 已提交
6255
  rocksdb::port::InstallStackTraceHandler();
6256 6257 6258 6259 6260 6261
  static bool initialized = false;
  if (!initialized) {
    SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                    " [OPTIONS]...");
    initialized = true;
  }
6262
  ParseCommandLineFlags(&argc, &argv, true);
6263
  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
6264 6265 6266 6267 6268 6269 6270 6271 6272 6273 6274 6275 6276 6277 6278 6279 6280 6281
#ifndef ROCKSDB_LITE
  if (FLAGS_statistics && !FLAGS_statistics_string.empty()) {
    fprintf(stderr,
            "Cannot provide both --statistics and --statistics_string.\n");
    exit(1);
  }
  if (!FLAGS_statistics_string.empty()) {
    std::unique_ptr<Statistics> custom_stats_guard;
    dbstats.reset(NewCustomObject<Statistics>(FLAGS_statistics_string,
                                              &custom_stats_guard));
    custom_stats_guard.release();
    if (dbstats == nullptr) {
      fprintf(stderr, "No Statistics registered matching string: %s\n",
              FLAGS_statistics_string.c_str());
      exit(1);
    }
  }
#endif  // ROCKSDB_LITE
6282 6283
  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
J
jorlow@chromium.org 已提交
6284
  }
S
Siying Dong 已提交
6285
  if (dbstats) {
6286
    dbstats->set_stats_level(static_cast<StatsLevel>(FLAGS_stats_level));
S
Siying Dong 已提交
6287
  }
6288
  FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
J
jorlow@chromium.org 已提交
6289

I
Igor Canadi 已提交
6290 6291
  std::vector<std::string> fanout = rocksdb::StringSplit(
      FLAGS_max_bytes_for_level_multiplier_additional, ',');
6292
  for (size_t j = 0; j < fanout.size(); j++) {
6293
    FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
S
sdong 已提交
6294 6295 6296 6297 6298
#ifndef CYGWIN
        std::stoi(fanout[j]));
#else
        stoi(fanout[j]));
#endif
6299 6300 6301 6302 6303
  }

  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());

6304 6305 6306 6307 6308 6309
#ifndef ROCKSDB_LITE
  std::unique_ptr<Env> custom_env_guard;
  if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
    fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
    exit(1);
  } else if (!FLAGS_env_uri.empty()) {
6310
    FLAGS_env = NewCustomObject<Env>(FLAGS_env_uri, &custom_env_guard);
6311 6312 6313 6314 6315 6316
    if (FLAGS_env == nullptr) {
      fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
      exit(1);
    }
  }
#endif  // ROCKSDB_LITE
6317 6318 6319 6320 6321 6322 6323
  if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {
    fprintf(stderr,
            "`-use_existing_db` must be true for `-use_existing_keys` to be "
            "settable\n");
    exit(1);
  }

6324 6325 6326 6327 6328 6329 6330 6331 6332 6333 6334 6335 6336 6337 6338 6339 6340 6341 6342
  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
  }

  if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
  else {
    fprintf(stdout, "Unknown compaction fadvice:%s\n",
            FLAGS_compaction_fadvice.c_str());
  }

  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());

6343 6344 6345
  // Note options sanitization may increase thread pool sizes according to
  // max_background_flushes/max_background_compactions/max_background_jobs
  FLAGS_env->SetBackgroundThreads(FLAGS_num_high_pri_threads,
6346
                                  rocksdb::Env::Priority::HIGH);
6347 6348
  FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
                                  rocksdb::Env::Priority::BOTTOM);
6349 6350
  FLAGS_env->SetBackgroundThreads(FLAGS_num_low_pri_threads,
                                  rocksdb::Env::Priority::LOW);
6351

H
heyongqiang 已提交
6352
  // Choose a location for the test database if none given with --db=<path>
6353 6354 6355 6356 6357
  if (FLAGS_db.empty()) {
    std::string default_db_path;
    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
    default_db_path += "/dbbench";
    FLAGS_db = default_db_path;
H
heyongqiang 已提交
6358 6359
  }

6360 6361 6362 6363 6364 6365
  if (FLAGS_stats_interval_seconds > 0) {
    // When both are set then FLAGS_stats_interval determines the frequency
    // at which the timer is checked for FLAGS_stats_interval_seconds
    FLAGS_stats_interval = 1000;
  }

6366 6367 6368 6369 6370
  if (FLAGS_seek_missing_prefix && FLAGS_prefix_size <= 8) {
    fprintf(stderr, "prefix_size > 8 required by --seek_missing_prefix\n");
    exit(1);
  }

6371
  rocksdb::Benchmark benchmark;
J
jorlow@chromium.org 已提交
6372
  benchmark.Run();
6373

Z
Zhongyi Xie 已提交
6374
#ifndef ROCKSDB_LITE
6375 6376 6377 6378 6379
  if (FLAGS_print_malloc_stats) {
    std::string stats_string;
    rocksdb::DumpMallocStats(&stats_string);
    fprintf(stdout, "Malloc stats:\n%s\n", stats_string.c_str());
  }
Z
Zhongyi Xie 已提交
6380
#endif  // ROCKSDB_LITE
6381

J
jorlow@chromium.org 已提交
6382 6383
  return 0;
}
6384
}  // namespace rocksdb
J
Jonathan Wiepert 已提交
6385
#endif