db_bench_tool.cc 172.6 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

L
liuhuahang 已提交
10
#ifndef __STDC_FORMAT_MACROS
11
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
12
#endif
13

J
Jonathan Wiepert 已提交
14
#ifdef GFLAGS
15 16 17 18
#ifdef NUMA
#include <numa.h>
#include <numaif.h>
#endif
D
Dmitri Smirnov 已提交
19
#ifndef OS_WIN
20
#include <unistd.h>
D
Dmitri Smirnov 已提交
21
#endif
22
#include <fcntl.h>
23
#include <gflags/gflags.h>
24
#include <inttypes.h>
J
jorlow@chromium.org 已提交
25 26
#include <stdio.h>
#include <stdlib.h>
27
#include <sys/types.h>
28 29
#include <atomic>
#include <condition_variable>
30
#include <cstddef>
31 32
#include <mutex>
#include <thread>
33
#include <unordered_map>
34

J
jorlow@chromium.org 已提交
35 36
#include "db/db_impl.h"
#include "db/version_set.h"
A
agiardullo 已提交
37 38 39
#include "hdfs/env_hdfs.h"
#include "port/port.h"
#include "port/stack_trace.h"
40 41 42
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
43
#include "rocksdb/filter_policy.h"
A
agiardullo 已提交
44 45 46
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
S
sdong 已提交
47
#include "rocksdb/rate_limiter.h"
A
agiardullo 已提交
48
#include "rocksdb/slice.h"
49
#include "rocksdb/slice_transform.h"
50
#include "rocksdb/utilities/env_registry.h"
A
agiardullo 已提交
51
#include "rocksdb/utilities/optimistic_transaction_db.h"
52
#include "rocksdb/utilities/options_util.h"
53
#include "rocksdb/utilities/sim_cache.h"
A
agiardullo 已提交
54 55
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
A
agiardullo 已提交
56
#include "rocksdb/write_batch.h"
I
Igor Canadi 已提交
57
#include "util/compression.h"
A
agiardullo 已提交
58
#include "util/crc32c.h"
J
jorlow@chromium.org 已提交
59
#include "util/histogram.h"
60
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
61
#include "util/random.h"
I
Igor Canadi 已提交
62
#include "util/statistics.h"
63
#include "util/stderr_logger.h"
A
agiardullo 已提交
64
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
65
#include "util/testutil.h"
A
agiardullo 已提交
66
#include "util/transaction_test_util.h"
I
xxHash  
Igor Canadi 已提交
67
#include "util/xxhash.h"
68
#include "utilities/blob_db/blob_db.h"
D
Deon Nicholas 已提交
69
#include "utilities/merge_operators.h"
J
jorlow@chromium.org 已提交
70

D
Dmitri Smirnov 已提交
71
#ifdef OS_WIN
S
sdong 已提交
72
#include <io.h>  // open/close
D
Dmitri Smirnov 已提交
73 74
#endif

75
namespace {
76 77 78
using GFLAGS::ParseCommandLineFlags;
using GFLAGS::RegisterFlagValidator;
using GFLAGS::SetUsageMessage;
T
Tyler Harter 已提交
79

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
DEFINE_string(
    benchmarks,
    "fillseq,"
    "fillseqdeterministic,"
    "fillsync,"
    "fillrandom,"
    "filluniquerandomdeterministic,"
    "overwrite,"
    "readrandom,"
    "newiterator,"
    "newiteratorwhilewriting,"
    "seekrandom,"
    "seekrandomwhilewriting,"
    "seekrandomwhilemerging,"
    "readseq,"
    "readreverse,"
    "compact,"
    "readrandom,"
    "multireadrandom,"
    "readseq,"
    "readtocache,"
    "readreverse,"
    "readwhilewriting,"
    "readwhilemerging,"
    "readrandomwriterandom,"
    "updaterandom,"
    "randomwithverify,"
    "fill100K,"
    "crc32c,"
    "xxhash,"
    "compress,"
    "uncompress,"
    "acquireload,"
    "fillseekseq,"
    "randomtransaction,"
    "randomreplacekeys,"
    "timeseries",

    "Comma-separated list of operations to run in the specified"
    " order. Available benchmarks:\n"
    "\tfillseq       -- write N values in sequential key"
    " order in async mode\n"
    "\tfillseqdeterministic       -- write N values in the specified"
    " key order and keep the shape of the LSM tree\n"
    "\tfillrandom    -- write N values in random key order in async"
    " mode\n"
    "\tfilluniquerandomdeterministic       -- write N values in a random"
    " key order and keep the shape of the LSM tree\n"
    "\toverwrite     -- overwrite N values in random key order in"
    " async mode\n"
    "\tfillsync      -- write N/100 values in random key order in "
    "sync mode\n"
    "\tfill100K      -- write N/1000 100K values in random order in"
    " async mode\n"
    "\tdeleteseq     -- delete N keys in sequential order\n"
    "\tdeleterandom  -- delete N keys in random order\n"
    "\treadseq       -- read N times sequentially\n"
    "\treadtocache   -- 1 thread reading database sequentially\n"
    "\treadreverse   -- read N times in reverse order\n"
    "\treadrandom    -- read N times in random order\n"
    "\treadmissing   -- read N missing keys in random order\n"
    "\treadwhilewriting      -- 1 writer, N threads doing random "
    "reads\n"
    "\treadwhilemerging      -- 1 merger, N threads doing random "
    "reads\n"
    "\treadrandomwriterandom -- N threads doing random-read, "
    "random-write\n"
    "\tprefixscanrandom      -- prefix scan N times in random order\n"
    "\tupdaterandom  -- N threads doing read-modify-write for random "
    "keys\n"
    "\tappendrandom  -- N threads doing read-modify-write with "
    "growing values\n"
    "\tmergerandom   -- same as updaterandom/appendrandom using merge"
    " operator. "
    "Must be used with merge_operator\n"
    "\treadrandommergerandom -- perform N random read-or-merge "
    "operations. Must be used with merge_operator\n"
    "\tnewiterator   -- repeated iterator creation\n"
    "\tseekrandom    -- N random seeks, call Next seek_nexts times "
    "per seek\n"
    "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
    "overwrite\n"
    "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
    "merge\n"
    "\tcrc32c        -- repeated crc32c of 4K of data\n"
    "\txxhash        -- repeated xxHash of 4K of data\n"
    "\tacquireload   -- load N*1000 times\n"
    "\tfillseekseq   -- write N values in sequential key, then read "
    "them by seeking to each key\n"
    "\trandomtransaction     -- execute N random transactions and "
    "verify correctness\n"
    "\trandomreplacekeys     -- randomly replaces N keys by deleting "
    "the old version and putting the new version\n\n"
    "\ttimeseries            -- 1 writer generates time series data "
    "and multiple readers doing random reads on id\n\n"
    "Meta operations:\n"
    "\tcompact     -- Compact the entire DB\n"
    "\tstats       -- Print DB stats\n"
    "\tlevelstats  -- Print the number of files and bytes per level\n"
    "\tsstables    -- Print sstable info\n"
    "\theapprofile -- Dump a heap profile (if supported by this"
    " port)\n");
182 183 184 185 186 187 188 189

DEFINE_int64(num, 1000000, "Number of key/values to place in database");

DEFINE_int64(numdistinct, 1000,
             "Number of distinct keys to use. Used in RandomWithVerify to "
             "read/write on fewer keys so that gets are more likely to find the"
             " key and puts are more likely to update the same key");

190 191 192 193
DEFINE_int64(merge_keys, -1,
             "Number of distinct keys to use for MergeRandom and "
             "ReadRandomMergeRandom. "
             "If negative, there will be FLAGS_num keys.");
194
DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
195

196
DEFINE_int32(
197
    num_hot_column_families, 0,
198 199 200 201 202
    "Number of Hot Column Families. If more than 0, only write to this "
    "number of column families. After finishing all the writes to them, "
    "create new set of column families and insert to them. Only used "
    "when num_column_families > 1.");

203 204 205
DEFINE_int64(reads, -1, "Number of read operations to do.  "
             "If negative, do FLAGS_num reads.");

Y
Yueh-Hsuan Chiang 已提交
206 207 208
DEFINE_int64(deletes, -1, "Number of delete operations to do.  "
             "If negative, do FLAGS_num deletions.");

L
Lei Jin 已提交
209 210
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");

211 212 213 214 215 216 217 218 219
DEFINE_int64(seed, 0, "Seed base for random number generators. "
             "When 0 it is deterministic.");

DEFINE_int32(threads, 1, "Number of concurrent threads to run.");

DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
             " When 0 then num & reads determine the test duration");

DEFINE_int32(value_size, 100, "Size of each value");
T
Tyler Harter 已提交
220

221 222
DEFINE_int32(seek_nexts, 0,
             "How many times to call Next() after Seek() in "
223 224
             "fillseekseq, seekrandom, seekrandomwhilewriting and "
             "seekrandomwhilemerging");
T
Tomislav Novak 已提交
225

M
Mark Callaghan 已提交
226 227 228 229
DEFINE_bool(reverse_iterator, false,
            "When true use Prev rather than Next for iterators that do "
            "Seek and then Next");

230
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
231

232 233
DEFINE_int64(batch_size, 1, "Batch size");

234 235 236
static bool ValidateKeySize(const char* flagname, int32_t value) {
  return true;
}
237

238 239
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
  if (value > std::numeric_limits<uint32_t>::max()) {
240
    fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
241 242 243 244 245 246
            (unsigned long)value);
    return false;
  }
  return true;
}

247
DEFINE_int32(key_size, 16, "size of each key");
248

249 250 251
DEFINE_int32(num_multi_db, 0,
             "Number of DBs used in the benchmark. 0 means single DB.");

252 253
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
              " to this fraction of their original size after compression");
J
jorlow@chromium.org 已提交
254

255 256
DEFINE_double(read_random_exp_range, 0.0,
              "Read random's key will be generated using distribution of "
257
              "num * exp(-r) where r is uniform number from 0 to this value. "
258 259 260
              "The larger the number is, the more skewed the reads are. "
              "Only used in readrandom and multireadrandom benchmarks.");

261
DEFINE_bool(histogram, false, "Print histogram of operation timings");
J
jorlow@chromium.org 已提交
262

263 264 265 266 267 268 269 270
DEFINE_bool(enable_numa, false,
            "Make operations aware of NUMA architecture and bind memory "
            "and cpus corresponding to nodes together. In NUMA, memory "
            "in same node as CPUs are closer when compared to memory in "
            "other nodes. Reads can be faster when the process is bound to "
            "CPU and memory of same node. Use \"$numactl --hardware\" command "
            "to see NUMA memory architecture.");

271 272 273
DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
             "Number of bytes to buffer in all memtables before compacting");

274
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
275
             "Number of bytes to buffer in memtable before compacting");
276

277 278 279 280
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. Each memtable is of size"
             "write_buffer_size.");
281

282 283 284 285 286 287 288 289 290 291
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together"
             "before writing to storage. This is cheap because it is an"
             "in-memory merge. If this feature is not enabled, then all these"
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check"
             " in all of these files. Also, an in-memory merge may result in"
             " writing less data to storage if there are duplicate records "
             " in each of these individual write buffers.");
292

293 294 295 296 297 298 299 300 301 302 303 304 305 306
DEFINE_int32(max_write_buffer_number_to_maintain,
             rocksdb::Options().max_write_buffer_number_to_maintain,
             "The total maximum number of write buffers to maintain in memory "
             "including copies of buffers that have already been flushed. "
             "Unlike max_write_buffer_number, this parameter does not affect "
             "flushing. This controls the minimum amount of write history "
             "that will be available in memory for conflict checking when "
             "Transactions are used. If this value is too low, some "
             "transactions may fail at commit time due to not being able to "
             "determine whether there were any write conflicts. Setting this "
             "value to 0 will cause write buffers to be freed immediately "
             "after they are flushed.  If this value is set to -1, "
             "'max_write_buffer_number' will be used.");

307 308 309 310
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");
311

312 313 314 315 316
DEFINE_int32(base_background_compactions,
             rocksdb::Options().base_background_compactions,
             "The base number of concurrent background compactions"
             " to occur in parallel.");

317
DEFINE_uint64(subcompactions, 1,
318 319 320 321 322
              "Maximum number of subcompactions to divide L0-L1 compactions "
              "into.");
static const bool FLAGS_subcompactions_dummy
    __attribute__((unused)) = RegisterFlagValidator(&FLAGS_subcompactions,
                                                    &ValidateUint32Range);
323

324 325 326 327 328
DEFINE_int32(max_background_flushes,
             rocksdb::Options().max_background_flushes,
             "The maximum number of concurrent background flushes"
             " that can occur in parallel.");

329 330 331
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
             "style of compaction: level-based vs universal");
332

333
static rocksdb::CompactionPri FLAGS_compaction_pri_e;
334
DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
335 336
             "priority of files to compaction: by size or by data age");

337 338 339
DEFINE_int32(universal_size_ratio, 0,
             "Percentage flexibility while comparing file size"
             " (for universal compaction only).");
340

341 342
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
             " single compaction run (for universal compaction only).");
343

344 345
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
346

347 348
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
349

350 351 352 353
DEFINE_int32(universal_compression_size_percent, -1,
             "The percentage of the database to compress for universal "
             "compaction. -1 means compress everything.");

354
DEFINE_bool(universal_allow_trivial_move, false,
355
            "Allow trivial move in universal compaction.");
356

Y
Yi Wu 已提交
357 358 359 360 361 362 363 364 365 366
DEFINE_int64(cache_size, 8 << 20,  // 8MB
             "Number of bytes to use as a cache of uncompressed data");

DEFINE_int32(cache_numshardbits, 6,
             "Number of shards for the block cache"
             " is 2 ** cache_numshardbits. Negative means use default settings."
             " This is applied only if FLAGS_cache_size is non-negative.");

DEFINE_bool(use_clock_cache, false,
            "Replace default LRU block cache with clock cache.");
367 368 369

DEFINE_int64(simcache_size, -1,
             "Number of bytes to use as a simcache of "
Y
Yi Wu 已提交
370
             "uncompressed data. Nagative value disables simcache.");
J
jorlow@chromium.org 已提交
371

372 373 374
DEFINE_bool(cache_index_and_filter_blocks, false,
            "Cache index/filter blocks in block cache.");

375 376 377
DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
            "Pin index/filter blocks of L0 files in block cache.");

378 379
DEFINE_int32(block_size,
             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
380
             "Number of bytes in a block.");
381

382 383
DEFINE_int32(block_restart_interval,
             rocksdb::BlockBasedTableOptions().block_restart_interval,
384
             "Number of keys between restart points "
385 386 387 388 389 390
             "for delta encoding of keys in data block.");

DEFINE_int32(index_block_restart_interval,
             rocksdb::BlockBasedTableOptions().index_block_restart_interval,
             "Number of keys between restart points "
             "for delta encoding of keys in index block.");
391

392 393 394 395
DEFINE_int32(read_amp_bytes_per_bit,
             rocksdb::BlockBasedTableOptions().read_amp_bytes_per_bit,
             "Number of bytes per bit to be used in block read-amp bitmap");

396 397 398
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data.");

399 400 401 402
DEFINE_int64(row_cache_size, 0,
             "Number of bytes to use as a cache of individual rows"
             " (0 = disabled).");

403 404 405
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time"
             " (use default if == 0)");
406

407 408 409 410
DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
             "If open_files is set to -1, this option set the number of "
             "threads that will be used to open files during DB::Open()");

411 412 413 414 415
DEFINE_int32(new_table_reader_for_compaction_inputs, true,
             "If true, uses a separate file handle for compaction inputs");

DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");

D
Dmitri Smirnov 已提交
416 417
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
             "Maximum windows randomaccess buffer size");
418

I
Islam AbdelRahman 已提交
419 420
DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
             "Maximum write buffer for Writable File");
421

A
Andrew Kryczka 已提交
422 423
DEFINE_bool(skip_table_builder_flush, false, "Skip flushing block in "
            "table builder ");
424

425 426
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
             " use default settings.");
427 428 429
DEFINE_double(memtable_bloom_size_ratio, 0,
              "Ratio of memtable size used for bloom filter. 0 means no bloom "
              "filter.");
430 431
DEFINE_bool(memtable_use_huge_page, false,
            "Try to use huge page in memtables.");
S
Sanjay Ghemawat 已提交
432

433 434 435
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
            " database.  If you set this flag and also specify a benchmark that"
            " wants a fresh database, that benchmark will fail.");
436

437 438 439 440 441
DEFINE_bool(show_table_properties, false,
            "If true, then per-level table"
            " properties will be printed on every stats-interval when"
            " stats_interval is set and stats_per_interval is on.");

442
DEFINE_string(db, "", "Use the db with the following name.");
443

444 445 446 447 448 449 450 451
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
  if (value >= 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
            flagname, value);
    return false;
  }
  return true;
}
452

453 454
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
            " from storage");
455

456
DEFINE_bool(statistics, false, "Database statistics");
457
static class std::shared_ptr<rocksdb::Statistics> dbstats;
458

459 460
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
             " --num reads.");
H
heyongqiang 已提交
461

462
DEFINE_bool(sync, false, "Sync all writes to disk");
H
heyongqiang 已提交
463

464 465
DEFINE_bool(disable_data_sync, false, "If true, do not wait until data is"
            " synced to disk.");
M
Mark Callaghan 已提交
466

467
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
M
Mark Callaghan 已提交
468

469
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
470

L
Lei Jin 已提交
471 472
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");

473
DEFINE_int32(num_levels, 7, "The total number of levels");
H
heyongqiang 已提交
474

475
DEFINE_int64(target_file_size_base, 2 * 1048576, "Target file size at level-1");
H
heyongqiang 已提交
476

477 478
DEFINE_int32(target_file_size_multiplier, 1,
             "A multiplier to compute target level-N file size (N >= 2)");
479

480
DEFINE_uint64(max_bytes_for_level_base,  10 * 1048576, "Max bytes for level-1");
H
heyongqiang 已提交
481

482 483 484
DEFINE_bool(level_compaction_dynamic_level_bytes, false,
            "Whether level size base is dynamic");

485 486
DEFINE_double(max_bytes_for_level_multiplier, 10,
              "A multiplier to compute max bytes for level-N (N >= 2)");
H
heyongqiang 已提交
487

488 489 490
static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
DEFINE_string(max_bytes_for_level_multiplier_additional, "",
              "A vector that specifies additional fanout per level");
491

492 493 494
DEFINE_int32(level0_stop_writes_trigger,
             rocksdb::Options().level0_stop_writes_trigger,
             "Number of files in level-0"
495
             " that will trigger put stop.");
496

497 498 499
DEFINE_int32(level0_slowdown_writes_trigger,
             rocksdb::Options().level0_slowdown_writes_trigger,
             "Number of files in level-0"
500
             " that will slow down writes.");
501

502 503 504
DEFINE_int32(level0_file_num_compaction_trigger,
             rocksdb::Options().level0_file_num_compaction_trigger,
             "Number of files in level-0"
505
             " when compactions start");
506

507 508 509 510 511 512 513 514 515 516 517 518 519
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value <= 0 || value>=100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
             " as percentage) for the ReadRandomWriteRandom workload. The "
             "default value 90 means 90% operations out of all reads and writes"
             " operations are reads. In other words, 9 gets for every 1 put.");

520 521 522 523 524
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
             " as percentage) for the ReadRandomMergeRandom workload. The"
             " default value 70 means 70% out of all read and merge operations"
             " are merges. In other words, 7 merges for every 3 gets.");

525 526 527 528 529 530
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
             "deletes (used in RandomWithVerify only). RandomWithVerify "
             "calculates writepercent as (100 - FLAGS_readwritepercent - "
             "deletepercent), so deletepercent must be smaller than (100 - "
             "FLAGS_readwritepercent)");

531 532 533 534 535
DEFINE_bool(optimize_filters_for_hits, false,
            "Optimizes bloom filters for workloads for most lookups return "
            "a value. For now this doesn't create bloom filters for the max "
            "level of the LSM to reduce metadata that should fit in RAM. ");

I
Igor Canadi 已提交
536 537
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
              "Ignored. Left here for backward compatibility");
538

A
Andrew Kryczka 已提交
539 540 541 542 543 544 545 546 547 548
DEFINE_int64(writes_per_range_tombstone, 0,
             "Number of writes between range "
             "tombstones");

DEFINE_int64(range_tombstone_width, 100, "Number of keys in tombstone's range");

DEFINE_int64(max_num_range_tombstones, 0,
             "Maximum number of range tombstones "
             "to insert.");

549 550 551
DEFINE_bool(expand_range_tombstones, false,
            "Expand range tombstone into sequential regular tombstones.");

552
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
553
DEFINE_bool(optimistic_transaction_db, false,
A
agiardullo 已提交
554 555 556
            "Open a OptimisticTransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
557 558 559 560
DEFINE_bool(transaction_db, false,
            "Open a TransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
561 562 563 564
DEFINE_uint64(transaction_sets, 2,
              "Number of keys each transaction will "
              "modify (use in RandomTransaction only).  Max: 9999");

565 566 567 568
DEFINE_bool(transaction_set_snapshot, false,
            "Setting to true will have each transaction call SetSnapshot()"
            " upon creation.");

A
agiardullo 已提交
569 570 571 572
DEFINE_int32(transaction_sleep, 0,
             "Max microseconds to sleep in between "
             "reading and writing a value (used in RandomTransaction only). ");

573 574 575
DEFINE_uint64(transaction_lock_timeout, 100,
              "If using a transaction_db, specifies the lock wait timeout in"
              " milliseconds before failing a transaction waiting on a lock");
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
DEFINE_string(
    options_file, "",
    "The path to a RocksDB options file.  If specified, then db_bench will "
    "run with the RocksDB options in the default column family of the "
    "specified options file. "
    "Note that with this setting, db_bench will ONLY accept the following "
    "RocksDB options related command-line arguments, all other arguments "
    "that are related to RocksDB options will be ignored:\n"
    "\t--use_existing_db\n"
    "\t--statistics\n"
    "\t--row_cache_size\n"
    "\t--row_cache_numshardbits\n"
    "\t--enable_io_prio\n"
    "\t--dump_malloc_stats\n"
    "\t--num_multi_db\n");
591
#endif  // ROCKSDB_LITE
592

593
DEFINE_bool(report_bg_io_stats, false,
594 595
            "Measure times spents on I/Os while in compactions. ");

596 597 598
DEFINE_bool(use_stderr_info_logger, false,
            "Write info logs to stderr instead of to LOG file. ");

599 600
DEFINE_bool(use_blob_db, false, "Whether to use BlobDB. ");

601 602 603 604 605 606 607 608 609 610 611
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;
A
Albert Strasheim 已提交
612 613 614 615
  else if (!strcasecmp(ctype, "lz4"))
    return rocksdb::kLZ4Compression;
  else if (!strcasecmp(ctype, "lz4hc"))
    return rocksdb::kLZ4HCCompression;
616 617
  else if (!strcasecmp(ctype, "xpress"))
    return rocksdb::kXpressCompression;
618
  else if (!strcasecmp(ctype, "zstd"))
S
sdong 已提交
619
    return rocksdb::kZSTD;
620 621

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
622
  return rocksdb::kSnappyCompression;  // default value
623
}
624

S
sdong 已提交
625
std::string ColumnFamilyName(size_t i) {
626 627 628 629
  if (i == 0) {
    return rocksdb::kDefaultColumnFamilyName;
  } else {
    char name[100];
S
sdong 已提交
630
    snprintf(name, sizeof(name), "column_family_name_%06zu", i);
631 632 633
    return std::string(name);
  }
}
I
Igor Canadi 已提交
634

635 636 637
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
638
    rocksdb::kSnappyCompression;
639

640 641 642 643
DEFINE_int32(compression_level, -1,
             "Compression level. For zlib this should be -1 for the "
             "default level, or between 0 and 9.");

644 645 646 647
DEFINE_int32(compression_max_dict_bytes, 0,
             "Maximum size of dictionary used to prime the compression "
             "library.");

648 649 650 651 652 653 654 655 656
static bool ValidateCompressionLevel(const char* flagname, int32_t value) {
  if (value < -1 || value > 9) {
    fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n",
            flagname, value);
    return false;
  }
  return true;
}

I
Igor Canadi 已提交
657
static const bool FLAGS_compression_level_dummy __attribute__((unused)) =
658
    RegisterFlagValidator(&FLAGS_compression_level, &ValidateCompressionLevel);
659

660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
             " from this level. Levels with number < min_level_to_compress are"
             " not compressed. Otherwise, apply compression_type to "
             "all levels.");

static bool ValidateTableCacheNumshardbits(const char* flagname,
                                           int32_t value) {
  if (0 >= value || value > 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be  0 < val <= 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(table_cache_numshardbits, 4, "");
675

676 677 678 679 680 681
#ifndef ROCKSDB_LITE
DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
              " with --hdfs.");
#endif  // ROCKSDB_LITE
DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
              " --env_uri.");
682
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
683

684 685
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
             "this is greater than zero. When 0 the interval grows over time.");
686

687 688 689
DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
             "overrides stats_interval when both are > 0.");

690 691
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
             " this is greater than 0.");
692

693 694 695 696 697 698 699 700
DEFINE_int64(report_interval_seconds, 0,
             "If greater than zero, it will write simple stats in CVS format "
             "to --report_file every N seconds");

DEFINE_string(report_file, "report.csv",
              "Filename where some simple stats are reported to (if "
              "--report_interval_seconds is bigger than 0)");

701 702 703 704
DEFINE_int32(thread_status_per_interval, 0,
             "Takes and report a snapshot of the current status of each thread"
             " when this is greater than 0.");

705
DEFINE_int32(perf_level, rocksdb::PerfLevel::kDisable, "Level of perf collection");
706

707
static bool ValidateRateLimit(const char* flagname, double value) {
D
Dmitri Smirnov 已提交
708
  const double EPSILON = 1e-10;
709 710 711 712 713 714 715
  if ( value < -EPSILON ) {
    fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
            flagname, value);
    return false;
  }
  return true;
}
716
DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED");
J
Jim Paton 已提交
717

718 719
DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");

720 721 722
DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024,
              "Slowdown writes if pending compaction bytes exceed this number");

723
DEFINE_uint64(hard_pending_compaction_bytes_limit, 128ull * 1024 * 1024 * 1024,
724
              "Stop writes if pending compaction bytes exceed this number");
725

S
sdong 已提交
726
DEFINE_uint64(delayed_write_rate, 8388608u,
S
sdong 已提交
727 728 729
              "Limited bytes allowed to DB when soft_rate_limit or "
              "level0_slowdown_writes_trigger triggers");

730 731 732 733 734 735 736 737 738 739 740 741 742 743
DEFINE_bool(allow_concurrent_memtable_write, false,
            "Allow multi-writers to update mem tables in parallel.");

DEFINE_bool(enable_write_thread_adaptive_yield, false,
            "Use a yielding spin loop for brief writer thread waits.");

DEFINE_uint64(
    write_thread_max_yield_usec, 100,
    "Maximum microseconds for enable_write_thread_adaptive_yield operation.");

DEFINE_uint64(write_thread_slow_yield_usec, 3,
              "The threshold at which a slow yield is considered a signal that "
              "other processes or threads want the core.");

744 745 746
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
             "When hard_rate_limit is set then this is the max time a put will"
             " be stalled.");
747

S
sdong 已提交
748 749
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");

750 751
DEFINE_uint64(
    benchmark_write_rate_limit, 0,
752 753
    "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
    "is the global rate in bytes/second.");
754

755 756 757 758 759
DEFINE_uint64(
    benchmark_read_rate_limit, 0,
    "If non-zero, db_bench will rate-limit the reads from RocksDB. This "
    "is the global rate in ops/second.");

760 761
DEFINE_uint64(max_compaction_bytes, rocksdb::Options().max_compaction_bytes,
              "Max bytes allowed in one compaction");
762

763
#ifndef ROCKSDB_LITE
764
DEFINE_bool(readonly, false, "Run read only benchmarks.");
765
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
766

767
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
768

769 770 771
DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
              " in MB.");
772
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
773

774 775
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
            "Allow buffered io using OS buffers");
776

777 778
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
            "Allow reads to occur via mmap-ing files");
779

780 781
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
            "Allow writes to occur via mmap-ing files");
782

783 784 785
DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads,
            "Use O_DIRECT for reading data");

786 787
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
            "Advise random access on table file open");
788

789 790 791
DEFINE_string(compaction_fadvice, "NORMAL",
              "Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
792
  rocksdb::Options().access_hint_on_compaction_start;
793

I
Igor Canadi 已提交
794 795 796
DEFINE_bool(use_tailing_iterator, false,
            "Use tailing iterator to access a series of keys instead of get");

797 798 799 800
DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
            "Use adaptive mutex");

DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
801
              "Allows OS to incrementally sync SST files to disk while they are"
802 803
              " being written, in the background. Issue one request for every"
              " bytes_per_sync written. 0 turns it off.");
804 805 806 807 808 809

DEFINE_uint64(wal_bytes_per_sync,  rocksdb::Options().wal_bytes_per_sync,
              "Allows OS to incrementally sync WAL files to disk while they are"
              " being written, in the background. Issue one request for every"
              " wal_bytes_per_sync written. 0 turns it off.");

A
Andres Noetzli 已提交
810 811 812 813 814 815 816
DEFINE_bool(use_single_deletes, true,
            "Use single deletes (used in RandomReplaceKeys only).");

DEFINE_double(stddev, 2000.0,
              "Standard deviation of normal distribution used for picking keys"
              " (used in RandomReplaceKeys only).");

817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834
DEFINE_int32(key_id_range, 100000,
             "Range of possible value of key id (used in TimeSeries only).");

DEFINE_string(expire_style, "none",
              "Style to remove expired time entries. Can be one of the options "
              "below: none (do not expired data), compaction_filter (use a "
              "compaction filter to remove expired data), delete (seek IDs and "
              "remove expired data) (used in TimeSeries only).");

DEFINE_uint64(
    time_range, 100000,
    "Range of timestamp that store in the database (used in TimeSeries"
    " only).");

DEFINE_int32(num_deletion_threads, 1,
             "Number of threads to do deletion (used in TimeSeries and delete "
             "expire_style only).");

835 836 837
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
             " operations on a key in the memtable");

838 839 840 841 842 843 844 845
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
  if (value < 0 || value>=2000000000) {
    fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
            flagname, value);
    return false;
  }
  return true;
}
L
Lei Jin 已提交
846 847
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
             "plain table");
848 849 850
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
             "per prefix, 0 means no special handling of the prefix, "
             "i.e. use the prefix comes with the generated random number.");
851 852 853
DEFINE_int32(memtable_insert_with_hint_prefix_size, 0,
             "If non-zero, enable "
             "memtable insert with hint with the given prefix size.");
854 855
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
            "threads' IO priority");
856 857 858
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
            "table becomes an identity function. This is only valid when key "
            "is 8 bytes");
859
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
J
Jim Paton 已提交
860 861 862 863

enum RepFactory {
  kSkipList,
  kPrefixHash,
864
  kVectorRep,
865 866
  kHashLinkedList,
  kCuckoo
J
Jim Paton 已提交
867
};
I
Igor Canadi 已提交
868

869 870 871 872 873 874 875 876 877
enum RepFactory StringToRepFactory(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
    return kPrefixHash;
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;
878 879
  else if (!strcasecmp(ctype, "hash_linkedlist"))
    return kHashLinkedList;
880 881
  else if (!strcasecmp(ctype, "cuckoo"))
    return kCuckoo;
882 883 884 885

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
I
Igor Canadi 已提交
886

J
Jim Paton 已提交
887
static enum RepFactory FLAGS_rep_factory;
888
DEFINE_string(memtablerep, "skip_list", "");
889
DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
L
Lei Jin 已提交
890 891
DEFINE_bool(use_plain_table, false, "if use plain table "
            "instead of block-based table format");
892 893
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
DEFINE_double(cuckoo_hash_ratio, 0.9, "Hash ratio for Cuckoo SST table.");
894 895 896
DEFINE_bool(use_hash_search, false, "if use kHashSearch "
            "instead of kBinarySearch. "
            "This is valid if only we use BlockTable");
897 898 899
DEFINE_bool(use_block_based_filter, false, "if use kBlockBasedFilter "
            "instead of kFullFilter for filter block. "
            "This is valid if only we use BlockTable");
900 901 902 903
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
              "If a new merge operator is specified, be sure to use fresh"
              " database The possible merge operators are defined in"
              " utilities/merge_operators.h");
T
Tomislav Novak 已提交
904 905 906
DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
             "linear search first for this many steps from the previous "
             "position");
907 908
DEFINE_bool(report_file_operations, false, "if report number of file "
            "operations");
D
Deon Nicholas 已提交
909

K
kailiu 已提交
910
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
911
    RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
912 913

static const bool FLAGS_hard_rate_limit_dummy __attribute__((unused)) =
914
    RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
915 916

static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
917
    RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
K
kailiu 已提交
918 919

static const bool FLAGS_key_size_dummy __attribute__((unused)) =
920
    RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);
K
kailiu 已提交
921 922

static const bool FLAGS_cache_numshardbits_dummy __attribute__((unused)) =
923 924
    RegisterFlagValidator(&FLAGS_cache_numshardbits,
                          &ValidateCacheNumshardbits);
K
kailiu 已提交
925 926

static const bool FLAGS_readwritepercent_dummy __attribute__((unused)) =
927
    RegisterFlagValidator(&FLAGS_readwritepercent, &ValidateInt32Percent);
K
kailiu 已提交
928

I
Igor Canadi 已提交
929 930 931
DEFINE_int32(disable_seek_compaction, false,
             "Not used, left here for backwards compatibility");

K
kailiu 已提交
932
static const bool FLAGS_deletepercent_dummy __attribute__((unused)) =
933 934 935 936
    RegisterFlagValidator(&FLAGS_deletepercent, &ValidateInt32Percent);
static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) =
    RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                          &ValidateTableCacheNumshardbits);
937
}  // namespace
K
kailiu 已提交
938

939
namespace rocksdb {
J
jorlow@chromium.org 已提交
940

941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963
namespace {
struct ReportFileOpCounters {
  std::atomic<int> open_counter_;
  std::atomic<int> read_counter_;
  std::atomic<int> append_counter_;
  std::atomic<uint64_t> bytes_read_;
  std::atomic<uint64_t> bytes_written_;
};

// A special Env to records and report file operations in db_bench
class ReportFileOpEnv : public EnvWrapper {
 public:
  explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }

  void reset() {
    counters_.open_counter_ = 0;
    counters_.read_counter_ = 0;
    counters_.append_counter_ = 0;
    counters_.bytes_read_ = 0;
    counters_.bytes_written_ = 0;
  }

  Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
I
Igor Sugak 已提交
964
                           const EnvOptions& soptions) override {
965 966 967 968 969 970 971 972 973 974
    class CountingFile : public SequentialFile {
     private:
      unique_ptr<SequentialFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<SequentialFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
975
      virtual Status Read(size_t n, Slice* result, char* scratch) override {
976 977 978 979 980 981 982
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }

I
Igor Sugak 已提交
983
      virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
984 985 986 987 988 989 990 991 992 993 994 995
    };

    Status s = target()->NewSequentialFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewRandomAccessFile(const std::string& f,
                             unique_ptr<RandomAccessFile>* r,
I
Igor Sugak 已提交
996
                             const EnvOptions& soptions) override {
997 998 999 1000 1001 1002 1003 1004 1005 1006
    class CountingFile : public RandomAccessFile {
     private:
      unique_ptr<RandomAccessFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<RandomAccessFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}
      virtual Status Read(uint64_t offset, size_t n, Slice* result,
I
Igor Sugak 已提交
1007
                          char* scratch) const override {
1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(offset, n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }
    };

    Status s = target()->NewRandomAccessFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
I
Igor Sugak 已提交
1025
                         const EnvOptions& soptions) override {
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035
    class CountingFile : public WritableFile {
     private:
      unique_ptr<WritableFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<WritableFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
1036
      Status Append(const Slice& data) override {
1037 1038 1039 1040 1041 1042 1043
        counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Append(data);
        counters_->bytes_written_.fetch_add(data.size(),
                                            std::memory_order_relaxed);
        return rv;
      }

1044
      Status Truncate(uint64_t size) override { return target_->Truncate(size); }
I
Igor Sugak 已提交
1045 1046 1047
      Status Close() override { return target_->Close(); }
      Status Flush() override { return target_->Flush(); }
      Status Sync() override { return target_->Sync(); }
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
    };

    Status s = target()->NewWritableFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  // getter
  ReportFileOpCounters* counters() { return &counters_; }

 private:
  ReportFileOpCounters counters_;
};

}  // namespace

1067
// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
1068 1069 1070
class RandomGenerator {
 private:
  std::string data_;
1071
  unsigned int pos_;
J
jorlow@chromium.org 已提交
1072 1073 1074 1075 1076 1077 1078 1079

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
1080
    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
J
jorlow@chromium.org 已提交
1081 1082 1083 1084 1085 1086 1087 1088
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

1089
  Slice Generate(unsigned int len) {
1090
    assert(len <= data_.size());
J
jorlow@chromium.org 已提交
1091 1092 1093 1094 1095 1096
    if (pos_ + len > data_.size()) {
      pos_ = 0;
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
1097
};
X
Xing Jin 已提交
1098

1099 1100 1101 1102 1103 1104 1105 1106
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

1107 1108 1109
struct DBWithColumnFamilies {
  std::vector<ColumnFamilyHandle*> cfh;
  DB* db;
1110
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1111
  OptimisticTransactionDB* opt_txn_db;
1112
#endif  // ROCKSDB_LITE
1113 1114 1115 1116 1117 1118 1119
  std::atomic<size_t> num_created;  // Need to be updated after all the
                                    // new entries in cfh are set.
  size_t num_hot;  // Number of column families to be queried at each moment.
                   // After each CreateNewCf(), another num_hot number of new
                   // Column families will be created and used to be queried.
  port::Mutex create_cf_mutex;  // Only one thread can execute CreateNewCf()

1120 1121 1122 1123 1124 1125
  DBWithColumnFamilies()
      : db(nullptr)
#ifndef ROCKSDB_LITE
        , opt_txn_db(nullptr)
#endif  // ROCKSDB_LITE
  {
1126
    cfh.clear();
1127 1128
    num_created = 0;
    num_hot = 0;
1129
  }
1130 1131 1132 1133

  DBWithColumnFamilies(const DBWithColumnFamilies& other)
      : cfh(other.cfh),
        db(other.db),
1134
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1135
        opt_txn_db(other.opt_txn_db),
1136
#endif  // ROCKSDB_LITE
1137 1138 1139
        num_created(other.num_created.load()),
        num_hot(other.num_hot) {}

A
agiardullo 已提交
1140 1141 1142 1143
  void DeleteDBs() {
    std::for_each(cfh.begin(), cfh.end(),
                  [](ColumnFamilyHandle* cfhi) { delete cfhi; });
    cfh.clear();
1144
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1145 1146 1147
    if (opt_txn_db) {
      delete opt_txn_db;
      opt_txn_db = nullptr;
A
agiardullo 已提交
1148 1149
    } else {
      delete db;
1150
      db = nullptr;
A
agiardullo 已提交
1151
    }
1152 1153
#else
    delete db;
A
agiardullo 已提交
1154
    db = nullptr;
1155
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
1156 1157
  }

1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
  ColumnFamilyHandle* GetCfh(int64_t rand_num) {
    assert(num_hot > 0);
    return cfh[num_created.load(std::memory_order_acquire) - num_hot +
               rand_num % num_hot];
  }

  // stage: assume CF from 0 to stage * num_hot has be created. Need to create
  //        stage * num_hot + 1 to stage * (num_hot + 1).
  void CreateNewCf(ColumnFamilyOptions options, int64_t stage) {
    MutexLock l(&create_cf_mutex);
    if ((stage + 1) * num_hot <= num_created) {
      // Already created.
      return;
    }
    auto new_num_created = num_created + num_hot;
    assert(new_num_created <= cfh.size());
    for (size_t i = num_created; i < new_num_created; i++) {
      Status s =
          db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i]));
      if (!s.ok()) {
        fprintf(stderr, "create column family error: %s\n",
                s.ToString().c_str());
        abort();
      }
    }
    num_created.store(new_num_created, std::memory_order_release);
  }
1185 1186
};

1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
// a class that reports stats to CSV file
class ReporterAgent {
 public:
  ReporterAgent(Env* env, const std::string& fname,
                uint64_t report_interval_secs)
      : env_(env),
        total_ops_done_(0),
        last_report_(0),
        report_interval_secs_(report_interval_secs),
        stop_(false) {
    auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
    if (s.ok()) {
      s = report_file_->Append(Header() + "\n");
    }
    if (s.ok()) {
      s = report_file_->Flush();
    }
    if (!s.ok()) {
      fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
              s.ToString().c_str());
      abort();
    }

    reporting_thread_ = std::thread([&]() { SleepAndReport(); });
  }

  ~ReporterAgent() {
    {
      std::unique_lock<std::mutex> lk(mutex_);
      stop_ = true;
      stop_cv_.notify_all();
    }
    reporting_thread_.join();
  }

  // thread safe
  void ReportFinishedOps(int64_t num_ops) {
    total_ops_done_.fetch_add(num_ops);
  }

 private:
  std::string Header() const { return "secs_elapsed,interval_qps"; }
  void SleepAndReport() {
    uint64_t kMicrosInSecond = 1000 * 1000;
    auto time_started = env_->NowMicros();
    while (true) {
      {
        std::unique_lock<std::mutex> lk(mutex_);
        if (stop_ ||
            stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
                              [&]() { return stop_; })) {
          // stopping
          break;
        }
        // else -> timeout, which means time for a report!
      }
      auto total_ops_done_snapshot = total_ops_done_.load();
      // round the seconds elapsed
      auto secs_elapsed =
          (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
          kMicrosInSecond;
      std::string report = ToString(secs_elapsed) + "," +
                           ToString(total_ops_done_snapshot - last_report_) +
                           "\n";
      auto s = report_file_->Append(report);
      if (s.ok()) {
        s = report_file_->Flush();
      }
      if (!s.ok()) {
        fprintf(stderr,
                "Can't write to report file (%s), stopping the reporting\n",
                s.ToString().c_str());
        break;
      }
      last_report_ = total_ops_done_snapshot;
    }
  }

  Env* env_;
  std::unique_ptr<WritableFile> report_file_;
  std::atomic<int64_t> total_ops_done_;
  int64_t last_report_;
  const uint64_t report_interval_secs_;
  std::thread reporting_thread_;
  std::mutex mutex_;
  // will notify on stop
  std::condition_variable stop_cv_;
  bool stop_;
};

1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
enum OperationType : unsigned char {
  kRead = 0,
  kWrite,
  kDelete,
  kSeek,
  kMerge,
  kUpdate,
  kCompress,
  kUncompress,
  kCrc,
  kHash,
  kOthers
};

static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
                          OperationTypeString = {
  {kRead, "read"},
  {kWrite, "write"},
  {kDelete, "delete"},
  {kSeek, "seek"},
  {kMerge, "merge"},
  {kUpdate, "update"},
  {kCompress, "compress"},
  {kCompress, "uncompress"},
  {kCrc, "crc"},
  {kHash, "hash"},
  {kOthers, "op"}
};

1306
class CombinedStats;
1307 1308
class Stats {
 private:
1309
  int id_;
D
Dmitri Smirnov 已提交
1310 1311
  uint64_t start_;
  uint64_t finish_;
1312
  double seconds_;
D
Dmitri Smirnov 已提交
1313 1314 1315 1316 1317 1318
  uint64_t done_;
  uint64_t last_report_done_;
  uint64_t next_report_;
  uint64_t bytes_;
  uint64_t last_op_finish_;
  uint64_t last_report_finish_;
1319
  std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
1320
                     std::hash<unsigned char>> hist_;
1321
  std::string message_;
1322
  bool exclude_from_merge_;
1323
  ReporterAgent* reporter_agent_;  // does not own
1324
  friend class CombinedStats;
1325 1326

 public:
1327
  Stats() { Start(-1); }
1328

1329 1330 1331 1332
  void SetReporterAgent(ReporterAgent* reporter_agent) {
    reporter_agent_ = reporter_agent;
  }

1333 1334 1335
  void Start(int id) {
    id_ = id;
    next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
1336
    last_op_finish_ = start_;
1337
    hist_.clear();
1338
    done_ = 0;
1339
    last_report_done_ = 0;
1340 1341
    bytes_ = 0;
    seconds_ = 0;
1342
    start_ = FLAGS_env->NowMicros();
1343
    finish_ = start_;
1344
    last_report_finish_ = start_;
1345
    message_.clear();
1346 1347
    // When set, stats from this thread won't be merged with others.
    exclude_from_merge_ = false;
1348 1349 1350
  }

  void Merge(const Stats& other) {
1351 1352 1353
    if (other.exclude_from_merge_)
      return;

1354 1355 1356
    for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
      auto this_it = hist_.find(it->first);
      if (this_it != hist_.end()) {
1357
        this_it->second->Merge(*(other.hist_.at(it->first)));
1358 1359 1360 1361 1362
      } else {
        hist_.insert({ it->first, it->second });
      }
    }

1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
1374
    finish_ = FLAGS_env->NowMicros();
1375 1376 1377 1378 1379 1380 1381
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

1382
  void SetId(int id) { id_ = id; }
1383
  void SetExcludeFromMerge() { exclude_from_merge_ = true; }
1384

1385 1386 1387 1388
  void PrintThreadStatus() {
    std::vector<ThreadStatus> thread_list;
    FLAGS_env->GetThreadList(&thread_list);

1389
    fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1390
        "ThreadID", "ThreadType", "cfName", "Operation",
1391
        "ElapsedTime", "Stage", "State", "OperationProperties");
1392

1393 1394
    int64_t current_time = 0;
    Env::Default()->GetCurrentTime(&current_time);
1395
    for (auto ts : thread_list) {
1396
      fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
1397 1398 1399 1400
          ts.thread_id,
          ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
          ts.cf_name.c_str(),
          ThreadStatus::GetOperationName(ts.operation_type).c_str(),
1401
          ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(),
1402
          ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
1403
          ThreadStatus::GetStateName(ts.state_type).c_str());
1404 1405 1406 1407 1408 1409 1410 1411

      auto op_properties = ThreadStatus::InterpretOperationProperties(
          ts.operation_type, ts.op_properties);
      for (const auto& op_prop : op_properties) {
        fprintf(stderr, " %s %" PRIu64" |",
            op_prop.first.c_str(), op_prop.second);
      }
      fprintf(stderr, "\n");
1412 1413 1414
    }
  }

1415 1416 1417 1418 1419
  void ResetLastOpTime() {
    // Set to now to avoid latency from calls to SleepForMicroseconds
    last_op_finish_ = FLAGS_env->NowMicros();
  }

1420 1421
  void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
                   enum OperationType op_type = kOthers) {
1422 1423 1424
    if (reporter_agent_) {
      reporter_agent_->ReportFinishedOps(num_ops);
    }
1425
    if (FLAGS_histogram) {
D
Dmitri Smirnov 已提交
1426 1427
      uint64_t now = FLAGS_env->NowMicros();
      uint64_t micros = now - last_op_finish_;
1428 1429 1430

      if (hist_.find(op_type) == hist_.end())
      {
1431 1432
        auto hist_temp = std::make_shared<HistogramImpl>();
        hist_.insert({op_type, std::move(hist_temp)});
1433
      }
1434
      hist_[op_type]->Add(micros);
1435

1436
      if (micros > 20000 && !FLAGS_stats_interval) {
D
Dmitri Smirnov 已提交
1437
        fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
1438 1439 1440 1441 1442
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

1443
    done_ += num_ops;
1444
    if (done_ >= next_report_) {
1445 1446 1447 1448 1449 1450 1451 1452
      if (!FLAGS_stats_interval) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
1453
        fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
1454
      } else {
D
Dmitri Smirnov 已提交
1455
        uint64_t now = FLAGS_env->NowMicros();
1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
        int64_t usecs_since_last = now - last_report_finish_;

        // Determine whether to print status where interval is either
        // each N operations or each N seconds.

        if (FLAGS_stats_interval_seconds &&
            usecs_since_last < (FLAGS_stats_interval_seconds * 1000000)) {
          // Don't check again for this many operations
          next_report_ += FLAGS_stats_interval;

        } else {
1467

1468 1469 1470
          fprintf(stderr,
                  "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
                  "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
D
Dmitri Smirnov 已提交
1471
                  FLAGS_env->TimeToString(now/1000000).c_str(),
1472 1473 1474 1475 1476 1477 1478 1479
                  id_,
                  done_ - last_report_done_, done_,
                  (done_ - last_report_done_) /
                  (usecs_since_last / 1000000.0),
                  done_ / ((now - start_) / 1000000.0),
                  (now - last_report_finish_) / 1000000.0,
                  (now - start_) / 1000000.0);

1480
          if (id_ == 0 && FLAGS_stats_per_interval) {
1481 1482 1483 1484 1485 1486 1487
            std::string stats;

            if (db_with_cfh && db_with_cfh->num_created.load()) {
              for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) {
                if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
                                    &stats))
                  fprintf(stderr, "%s\n", stats.c_str());
1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
                if (FLAGS_show_table_properties) {
                  for (int level = 0; level < FLAGS_num_levels; ++level) {
                    if (db->GetProperty(
                            db_with_cfh->cfh[i],
                            "rocksdb.aggregated-table-properties-at-level" +
                                ToString(level),
                            &stats)) {
                      if (stats.find("# entries=0") == std::string::npos) {
                        fprintf(stderr, "Level[%d]: %s\n", level,
                                stats.c_str());
                      }
                    }
                  }
                }
              }
            } else if (db) {
              if (db->GetProperty("rocksdb.stats", &stats)) {
                fprintf(stderr, "%s\n", stats.c_str());
              }
              if (FLAGS_show_table_properties) {
                for (int level = 0; level < FLAGS_num_levels; ++level) {
                  if (db->GetProperty(
                          "rocksdb.aggregated-table-properties-at-level" +
                              ToString(level),
                          &stats)) {
                    if (stats.find("# entries=0") == std::string::npos) {
                      fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str());
                    }
                  }
                }
1518 1519
              }
            }
1520
          }
M
Mark Callaghan 已提交
1521

1522 1523 1524 1525
          next_report_ += FLAGS_stats_interval;
          last_report_finish_ = now;
          last_report_done_ = done_;
        }
1526
      }
1527 1528 1529 1530
      if (id_ == 0 && FLAGS_thread_status_per_interval) {
        PrintThreadStatus();
      }
      fflush(stderr);
1531 1532 1533 1534 1535 1536 1537 1538 1539
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
1540
    // that does not call FinishedOps().
1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
1554 1555
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
1556

D
Dhruba Borthakur 已提交
1557
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
1558
            name.ToString().c_str(),
1559
            elapsed * 1e6 / done_,
D
Dhruba Borthakur 已提交
1560
            (long)throughput,
1561 1562 1563
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
1564 1565 1566
      for (auto it = hist_.begin(); it != hist_.end(); ++it) {
        fprintf(stdout, "Microseconds per %s:\n%s\n",
                OperationTypeString[it->first].c_str(),
1567
                it->second->ToString().c_str());
1568
      }
1569
    }
1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
    if (FLAGS_report_file_operations) {
      ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
      ReportFileOpCounters* counters = env->counters();
      fprintf(stdout, "Num files opened: %d\n",
              counters->open_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Read(): %d\n",
              counters->read_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Append(): %d\n",
              counters->append_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
              counters->bytes_read_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
              counters->bytes_written_.load(std::memory_order_relaxed));
      env->reset();
    }
1585 1586 1587 1588
    fflush(stdout);
  }
};

1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
class CombinedStats {
 public:
  void AddStats(const Stats& stat) {
    uint64_t total_ops = stat.done_;
    uint64_t total_bytes_ = stat.bytes_;
    double elapsed;

    if (total_ops < 1) {
      total_ops = 1;
    }

    elapsed = (stat.finish_ - stat.start_) * 1e-6;
    throughput_ops_.emplace_back(total_ops / elapsed);

    if (total_bytes_ > 0) {
      double mbs = (total_bytes_ / 1048576.0);
      throughput_mbs_.emplace_back(mbs / elapsed);
    }
  }

  void Report(const std::string& bench_name) {
    const char* name = bench_name.c_str();
    int num_runs = static_cast<int>(throughput_ops_.size());

    if (throughput_mbs_.size() == throughput_ops_.size()) {
      fprintf(stdout,
              "%s [AVG    %d runs] : %d ops/sec; %6.1f MB/sec\n"
              "%s [MEDIAN %d runs] : %d ops/sec; %6.1f MB/sec\n",
              name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)),
              CalcAvg(throughput_mbs_), name, num_runs,
              static_cast<int>(CalcMedian(throughput_ops_)),
              CalcMedian(throughput_mbs_));
    } else {
      fprintf(stdout,
              "%s [AVG    %d runs] : %d ops/sec\n"
              "%s [MEDIAN %d runs] : %d ops/sec\n",
              name, num_runs, static_cast<int>(CalcAvg(throughput_ops_)), name,
              num_runs, static_cast<int>(CalcMedian(throughput_ops_)));
    }
  }

 private:
  double CalcAvg(std::vector<double> data) {
    double avg = 0;
    for (double x : data) {
      avg += x;
    }
    avg = avg / data.size();
    return avg;
  }

  double CalcMedian(std::vector<double> data) {
    assert(data.size() > 0);
    std::sort(data.begin(), data.end());

    size_t mid = data.size() / 2;
    if (data.size() % 2 == 1) {
      // Odd number of entries
      return data[mid];
    } else {
      // Even number of entries
      return (data[mid] + data[mid - 1]) / 2;
    }
  }

  std::vector<double> throughput_ops_;
  std::vector<double> throughput_mbs_;
};

1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
class TimestampEmulator {
 private:
  std::atomic<uint64_t> timestamp_;

 public:
  TimestampEmulator() : timestamp_(0) {}
  uint64_t Get() const { return timestamp_.load(); }
  void Inc() { timestamp_++; }
};

1668 1669 1670 1671 1672
// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;
1673
  int perf_level;
1674
  std::shared_ptr<RateLimiter> write_rate_limiter;
1675
  std::shared_ptr<RateLimiter> read_rate_limiter;
1676 1677 1678 1679 1680 1681 1682

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

1683 1684
  long num_initialized;
  long num_done;
1685 1686
  bool start;

1687
  SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
1688 1689 1690 1691 1692
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
1693
  Random64 rand;         // Has different seeds for different threads
1694
  Stats stats;
1695
  SharedState* shared;
1696

A
Abhishek Kona 已提交
1697
  /* implicit */ ThreadState(int index)
1698
      : tid(index),
1699
        rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
1700 1701 1702
  }
};

M
Mark Callaghan 已提交
1703 1704
class Duration {
 public:
D
Dmitri Smirnov 已提交
1705
  Duration(uint64_t max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) {
M
Mark Callaghan 已提交
1706 1707
    max_seconds_ = max_seconds;
    max_ops_= max_ops;
1708
    ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops;
M
Mark Callaghan 已提交
1709 1710 1711 1712
    ops_ = 0;
    start_at_ = FLAGS_env->NowMicros();
  }

1713 1714
  int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; }

L
Lei Jin 已提交
1715
  bool Done(int64_t increment) {
1716
    if (increment <= 0) increment = 1;    // avoid Done(0) and infinite loops
M
Mark Callaghan 已提交
1717 1718 1719
    ops_ += increment;

    if (max_seconds_) {
1720 1721
      // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
      if ((ops_/1000) != ((ops_-increment)/1000)) {
D
Dmitri Smirnov 已提交
1722 1723
        uint64_t now = FLAGS_env->NowMicros();
        return ((now - start_at_) / 1000000) >= max_seconds_;
M
Mark Callaghan 已提交
1724 1725 1726 1727 1728 1729 1730 1731 1732
      } else {
        return false;
      }
    } else {
      return ops_ > max_ops_;
    }
  }

 private:
D
Dmitri Smirnov 已提交
1733
  uint64_t max_seconds_;
1734
  int64_t max_ops_;
1735
  int64_t ops_per_stage_;
1736
  int64_t ops_;
D
Dmitri Smirnov 已提交
1737
  uint64_t start_at_;
M
Mark Callaghan 已提交
1738 1739
};

J
jorlow@chromium.org 已提交
1740 1741
class Benchmark {
 private:
1742 1743 1744
  std::shared_ptr<Cache> cache_;
  std::shared_ptr<Cache> compressed_cache_;
  std::shared_ptr<const FilterPolicy> filter_policy_;
T
Tyler Harter 已提交
1745
  const SliceTransform* prefix_extractor_;
1746 1747
  DBWithColumnFamilies db_;
  std::vector<DBWithColumnFamilies> multi_dbs_;
1748
  int64_t num_;
1749
  int value_size_;
1750
  int key_size_;
1751 1752
  int prefix_size_;
  int64_t keys_per_prefix_;
L
Lei Jin 已提交
1753
  int64_t entries_per_batch_;
A
Andrew Kryczka 已提交
1754 1755 1756
  int64_t writes_per_range_tombstone_;
  int64_t range_tombstone_width_;
  int64_t max_num_range_tombstones_;
1757
  WriteOptions write_options_;
1758
  Options open_options_;  // keep options around to properly destroy db later
1759
  int64_t reads_;
Y
Yueh-Hsuan Chiang 已提交
1760
  int64_t deletes_;
1761
  double read_random_exp_range_;
1762 1763 1764
  int64_t writes_;
  int64_t readwrites_;
  int64_t merge_keys_;
1765
  bool report_file_operations_;
1766 1767 1768 1769 1770 1771 1772 1773 1774

  bool SanityCheck() {
    if (FLAGS_compression_ratio > 1) {
      fprintf(stderr, "compression_ratio should be between 0 and 1\n");
      return false;
    }
    return true;
  }

1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797
  inline bool CompressSlice(const Slice& input, std::string* compressed) {
    bool ok = true;
    switch (FLAGS_compression_type_e) {
      case rocksdb::kSnappyCompression:
        ok = Snappy_Compress(Options().compression_opts, input.data(),
                             input.size(), compressed);
        break;
      case rocksdb::kZlibCompression:
        ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
                           input.size(), compressed);
        break;
      case rocksdb::kBZip2Compression:
        ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
                            input.size(), compressed);
        break;
      case rocksdb::kLZ4Compression:
        ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
                          input.size(), compressed);
        break;
      case rocksdb::kLZ4HCCompression:
        ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
                            input.size(), compressed);
        break;
1798 1799 1800 1801
      case rocksdb::kXpressCompression:
        ok = XPRESS_Compress(input.data(),
          input.size(), compressed);
        break;
S
sdong 已提交
1802
      case rocksdb::kZSTD:
1803 1804 1805
        ok = ZSTD_Compress(Options().compression_opts, input.data(),
                           input.size(), compressed);
        break;
1806 1807 1808 1809 1810 1811
      default:
        ok = false;
    }
    return ok;
  }

1812 1813
  void PrintHeader() {
    PrintEnvironment();
1814
    fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
1815 1816 1817
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
1818 1819 1820
    fprintf(stdout, "Entries:    %" PRIu64 "\n", num_);
    fprintf(stdout, "Prefix:    %d bytes\n", FLAGS_prefix_size);
    fprintf(stdout, "Keys per prefix:    %" PRIu64 "\n", keys_per_prefix_);
1821
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
1822
            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
1823
             / 1048576.0));
1824
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
1825 1826
            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
              * num_)
1827
             / 1048576.0));
1828 1829
    fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
            FLAGS_benchmark_write_rate_limit);
1830 1831
    fprintf(stdout, "Read rate: %" PRIu64 " ops/second\n",
            FLAGS_benchmark_read_rate_limit);
1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843
    if (FLAGS_enable_numa) {
      fprintf(stderr, "Running in NUMA enabled mode.\n");
#ifndef NUMA
      fprintf(stderr, "NUMA is not defined in the system.\n");
      exit(1);
#else
      if (numa_available() == -1) {
        fprintf(stderr, "NUMA is not supported by the system.\n");
        exit(1);
      }
#endif
    }
1844

N
Nathan Bronson 已提交
1845 1846
    auto compression = CompressionTypeToString(FLAGS_compression_type_e);
    fprintf(stdout, "Compression: %s\n", compression.c_str());
1847

J
Jim Paton 已提交
1848 1849 1850 1851 1852 1853 1854 1855 1856 1857
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
        fprintf(stdout, "Memtablerep: prefix_hash\n");
        break;
      case kSkipList:
        fprintf(stdout, "Memtablerep: skip_list\n");
        break;
      case kVectorRep:
        fprintf(stdout, "Memtablerep: vector\n");
        break;
1858 1859 1860
      case kHashLinkedList:
        fprintf(stdout, "Memtablerep: hash_linkedlist\n");
        break;
1861 1862 1863
      case kCuckoo:
        fprintf(stdout, "Memtablerep: cuckoo\n");
        break;
J
Jim Paton 已提交
1864
    }
1865
    fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
J
Jim Paton 已提交
1866

N
Nathan Bronson 已提交
1867
    PrintWarnings(compression.c_str());
1868 1869 1870
    fprintf(stdout, "------------------------------------------------\n");
  }

1871
  void PrintWarnings(const char* compression) {
1872 1873 1874 1875 1876 1877 1878 1879 1880
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
1881
    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
1882 1883
      // The test string should not be too small.
      const int len = FLAGS_block_size;
1884
      std::string input_str(len, 'y');
1885
      std::string compressed;
1886
      bool result = CompressSlice(Slice(input_str), &compressed);
1887 1888

      if (!result) {
1889 1890 1891 1892 1893
        fprintf(stdout, "WARNING: %s compression is not enabled\n",
                compression);
      } else if (compressed.size() >= input_str.size()) {
        fprintf(stdout, "WARNING: %s compression is not effective\n",
                compression);
1894
      }
1895
    }
1896 1897
  }

K
kailiu 已提交
1898 1899 1900 1901 1902 1903 1904
// Current the following isn't equivalent to OS_LINUX.
#if defined(__linux)
  static Slice TrimSpace(Slice s) {
    unsigned int start = 0;
    while (start < s.size() && isspace(s[start])) {
      start++;
    }
S
sdong 已提交
1905
    unsigned int limit = static_cast<unsigned int>(s.size());
K
kailiu 已提交
1906 1907 1908 1909 1910 1911 1912
    while (limit > start && isspace(s[limit-1])) {
      limit--;
    }
    return Slice(s.data() + start, limit - start);
  }
#endif

1913
  void PrintEnvironment() {
H
Hyunyoung Lee 已提交
1914
    fprintf(stderr, "RocksDB:    version %d.%d\n",
1915 1916 1917
            kMajorVersion, kMinorVersion);

#if defined(__linux)
1918
    time_t now = time(nullptr);
1919 1920 1921 1922 1923
    char buf[52];
    // Lint complains about ctime() usage, so replace it with ctime_r(). The
    // requirement is to provide a buffer which is at least 26 bytes.
    fprintf(stderr, "Date:       %s",
            ctime_r(&now, buf));  // ctime_r() adds newline
1924 1925

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
1926
    if (cpuinfo != nullptr) {
1927 1928 1929 1930
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
1931
      while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
1932
        const char* sep = strchr(line, ':');
1933
        if (sep == nullptr) {
1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983
  static bool KeyExpired(const TimestampEmulator* timestamp_emulator,
                         const Slice& key) {
    const char* pos = key.data();
    pos += 8;
    uint64_t timestamp = 0;
    if (port::kLittleEndian) {
      int bytes_to_fill = 8;
      for (int i = 0; i < bytes_to_fill; ++i) {
        timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
                      << ((bytes_to_fill - i - 1) << 3));
      }
    } else {
      memcpy(&timestamp, pos, sizeof(timestamp));
    }
    return timestamp_emulator->Get() - timestamp > FLAGS_time_range;
  }

  class ExpiredTimeFilter : public CompactionFilter {
   public:
    explicit ExpiredTimeFilter(
        const std::shared_ptr<TimestampEmulator>& timestamp_emulator)
        : timestamp_emulator_(timestamp_emulator) {}
    bool Filter(int level, const Slice& key, const Slice& existing_value,
                std::string* new_value, bool* value_changed) const override {
      return KeyExpired(timestamp_emulator_.get(), key);
    }
    const char* Name() const override { return "ExpiredTimeFilter"; }

   private:
    std::shared_ptr<TimestampEmulator> timestamp_emulator_;
  };

Y
Yi Wu 已提交
1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999
  std::shared_ptr<Cache> NewCache(int64_t capacity) {
    if (capacity <= 0) {
      return nullptr;
    }
    if (FLAGS_use_clock_cache) {
      auto cache = NewClockCache((size_t)capacity, FLAGS_cache_numshardbits);
      if (!cache) {
        fprintf(stderr, "Clock cache not supported.");
        exit(1);
      }
      return cache;
    } else {
      return NewLRUCache((size_t)capacity, FLAGS_cache_numshardbits);
    }
  }

J
jorlow@chromium.org 已提交
2000
 public:
2001
  Benchmark()
Y
Yi Wu 已提交
2002 2003
      : cache_(NewCache(FLAGS_cache_size)),
        compressed_cache_(NewCache(FLAGS_compressed_cache_size)),
2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
        filter_policy_(FLAGS_bloom_bits >= 0
                           ? NewBloomFilterPolicy(FLAGS_bloom_bits,
                                                  FLAGS_use_block_based_filter)
                           : nullptr),
        prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
        num_(FLAGS_num),
        value_size_(FLAGS_value_size),
        key_size_(FLAGS_key_size),
        prefix_size_(FLAGS_prefix_size),
        keys_per_prefix_(FLAGS_keys_per_prefix),
        entries_per_batch_(1),
        reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
        read_random_exp_range_(0.0),
        writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
        readwrites_(
            (FLAGS_writes < 0 && FLAGS_reads < 0)
                ? FLAGS_num
                : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)),
        merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
I
Igor Canadi 已提交
2023
        report_file_operations_(FLAGS_report_file_operations) {
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033
    // use simcache instead of cache
    if (FLAGS_simcache_size >= 0) {
      if (FLAGS_cache_numshardbits >= 1) {
        cache_ =
            NewSimCache(cache_, FLAGS_simcache_size, FLAGS_cache_numshardbits);
      } else {
        cache_ = NewSimCache(cache_, FLAGS_simcache_size, 0);
      }
    }

2034 2035 2036 2037 2038 2039 2040 2041 2042 2043
    if (report_file_operations_) {
      if (!FLAGS_hdfs.empty()) {
        fprintf(stderr,
                "--hdfs and --report_file_operations cannot be enabled "
                "at the same time");
        exit(1);
      }
      FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
    }

2044 2045 2046 2047 2048
    if (FLAGS_prefix_size > FLAGS_key_size) {
      fprintf(stderr, "prefix size is larger than key size");
      exit(1);
    }

J
jorlow@chromium.org 已提交
2049
    std::vector<std::string> files;
2050
    FLAGS_env->GetChildren(FLAGS_db, &files);
2051
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
2052
      if (Slice(files[i]).starts_with("heap-")) {
2053
        FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
J
jorlow@chromium.org 已提交
2054 2055
      }
    }
2056
    if (!FLAGS_use_existing_db) {
2057 2058 2059 2060 2061
      Options options;
      if (!FLAGS_wal_dir.empty()) {
        options.wal_dir = FLAGS_wal_dir;
      }
      DestroyDB(FLAGS_db, options);
2062 2063 2064 2065 2066 2067 2068 2069 2070 2071
      if (!FLAGS_wal_dir.empty()) {
        FLAGS_env->DeleteDir(FLAGS_wal_dir);
      }

      if (FLAGS_num_multi_db > 1) {
        FLAGS_env->CreateDir(FLAGS_db);
        if (!FLAGS_wal_dir.empty()) {
          FLAGS_env->CreateDir(FLAGS_wal_dir);
        }
      }
2072
    }
J
jorlow@chromium.org 已提交
2073 2074 2075
  }

  ~Benchmark() {
A
agiardullo 已提交
2076
    db_.DeleteDBs();
T
Tyler Harter 已提交
2077
    delete prefix_extractor_;
I
Igor Canadi 已提交
2078 2079 2080 2081
    if (cache_.get() != nullptr) {
      // this will leak, but we're shutting down so nobody cares
      cache_->DisownData();
    }
J
jorlow@chromium.org 已提交
2082 2083
  }

2084
  Slice AllocateKey(std::unique_ptr<const char[]>* key_guard) {
2085 2086 2087
    char* data = new char[key_size_];
    const char* const_data = data;
    key_guard->reset(const_data);
2088
    return Slice(key_guard->get(), key_size_);
L
Lei Jin 已提交
2089 2090
  }

2091 2092
  // Generate key according to the given specification and random number.
  // The resulting key will have the following format (if keys_per_prefix_
2093
  // is positive), extra trailing bytes are either cut off or padded with '0'.
2094 2095 2096 2097 2098 2099 2100 2101 2102
  // The prefix value is derived from key value.
  //   ----------------------------
  //   | prefix 00000 | key 00000 |
  //   ----------------------------
  // If keys_per_prefix_ is 0, the key is simply a binary representation of
  // random number followed by trailing '0's
  //   ----------------------------
  //   |        key 00000         |
  //   ----------------------------
L
Lei Jin 已提交
2103 2104
  void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
    char* start = const_cast<char*>(key->data());
2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135
    char* pos = start;
    if (keys_per_prefix_ > 0) {
      int64_t num_prefix = num_keys / keys_per_prefix_;
      int64_t prefix = v % num_prefix;
      int bytes_to_fill = std::min(prefix_size_, 8);
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
      }
      if (prefix_size_ > 8) {
        // fill the rest with 0s
        memset(pos + 8, '0', prefix_size_ - 8);
      }
      pos += prefix_size_;
    }

    int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
    if (port::kLittleEndian) {
      for (int i = 0; i < bytes_to_fill; ++i) {
        pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
      }
    } else {
      memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
    }
    pos += bytes_to_fill;
    if (key_size_ > pos - start) {
      memset(pos, '0', key_size_ - (pos - start));
    }
X
Xing Jin 已提交
2136 2137
  }

2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149
  std::string GetPathForMultiple(std::string base_name, size_t id) {
    if (!base_name.empty()) {
#ifndef OS_WIN
      if (base_name.back() != '/') {
        base_name += '/';
      }
#else
      if (base_name.back() != '\\') {
        base_name += '\\';
      }
#endif
    }
2150
    return base_name + ToString(id);
2151 2152
  }

J
jorlow@chromium.org 已提交
2153
  void Run() {
2154 2155 2156
    if (!SanityCheck()) {
      exit(1);
    }
2157
    Open(&open_options_);
2158
    PrintHeader();
2159 2160
    std::stringstream benchmark_stream(FLAGS_benchmarks);
    std::string name;
2161
    std::unique_ptr<ExpiredTimeFilter> filter;
2162
    while (std::getline(benchmark_stream, name, ',')) {
X
Xing Jin 已提交
2163
      // Sanitize parameters
2164
      num_ = FLAGS_num;
2165
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
2166
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
Y
Yueh-Hsuan Chiang 已提交
2167
      deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
2168
      value_size_ = FLAGS_value_size;
2169
      key_size_ = FLAGS_key_size;
2170
      entries_per_batch_ = FLAGS_batch_size;
A
Andrew Kryczka 已提交
2171 2172 2173
      writes_per_range_tombstone_ = FLAGS_writes_per_range_tombstone;
      range_tombstone_width_ = FLAGS_range_tombstone_width;
      max_num_range_tombstones_ = FLAGS_max_num_range_tombstones;
2174
      write_options_ = WriteOptions();
2175
      read_random_exp_range_ = FLAGS_read_random_exp_range;
2176 2177 2178
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
H
heyongqiang 已提交
2179 2180
      write_options_.disableWAL = FLAGS_disable_wal;

2181
      void (Benchmark::*method)(ThreadState*) = nullptr;
A
agiardullo 已提交
2182 2183
      void (Benchmark::*post_process_method)() = nullptr;

2184
      bool fresh_db = false;
2185
      int num_threads = FLAGS_threads;
2186

2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216
      int num_repeat = 1;
      int num_warmup = 0;
      if (!name.empty() && *name.rbegin() == ']') {
        auto it = name.find('[');
        if (it == std::string::npos) {
          fprintf(stderr, "unknown benchmark arguments '%s'\n", name.c_str());
          exit(1);
        }
        std::string args = name.substr(it + 1);
        args.resize(args.size() - 1);
        name.resize(it);

        std::string bench_arg;
        std::stringstream args_stream(args);
        while (std::getline(args_stream, bench_arg, '-')) {
          if (bench_arg.empty()) {
            continue;
          }
          if (bench_arg[0] == 'X') {
            // Repeat the benchmark n times
            std::string num_str = bench_arg.substr(1);
            num_repeat = std::stoi(num_str);
          } else if (bench_arg[0] == 'W') {
            // Warm up the benchmark for n times
            std::string num_str = bench_arg.substr(1);
            num_warmup = std::stoi(num_str);
          }
        }
      }

2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240
      // Both fillseqdeterministic and filluniquerandomdeterministic
      // fill the levels except the max level with UNIQUE_RANDOM
      // and fill the max level with fillseq and filluniquerandom, respectively
      if (name == "fillseqdeterministic" ||
          name == "filluniquerandomdeterministic") {
        if (!FLAGS_disable_auto_compactions) {
          fprintf(stderr,
                  "Please disable_auto_compactions in FillDeterministic "
                  "benchmark\n");
          exit(1);
        }
        if (num_threads > 1) {
          fprintf(stderr,
                  "filldeterministic multithreaded not supported"
                  ", use 1 thread\n");
          num_threads = 1;
        }
        fresh_db = true;
        if (name == "fillseqdeterministic") {
          method = &Benchmark::WriteSeqDeterministic;
        } else {
          method = &Benchmark::WriteUniqueRandomDeterministic;
        }
      } else if (name == "fillseq") {
2241 2242
        fresh_db = true;
        method = &Benchmark::WriteSeq;
2243
      } else if (name == "fillbatch") {
2244 2245 2246
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
2247
      } else if (name == "fillrandom") {
2248 2249
        fresh_db = true;
        method = &Benchmark::WriteRandom;
2250
      } else if (name == "filluniquerandom") {
2251 2252
        fresh_db = true;
        if (num_threads > 1) {
2253 2254 2255
          fprintf(stderr,
                  "filluniquerandom multithreaded not supported"
                  ", use 1 thread");
2256
          num_threads = 1;
2257 2258
        }
        method = &Benchmark::WriteUniqueRandom;
2259
      } else if (name == "overwrite") {
2260
        method = &Benchmark::WriteRandom;
2261
      } else if (name == "fillsync") {
2262 2263 2264 2265
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
2266
      } else if (name == "fill100K") {
2267 2268 2269 2270
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
2271
      } else if (name == "readseq") {
2272
        method = &Benchmark::ReadSequential;
2273
      } else if (name == "readtocache") {
M
Mark Callaghan 已提交
2274 2275 2276
        method = &Benchmark::ReadSequential;
        num_threads = 1;
        reads_ = num_;
2277
      } else if (name == "readreverse") {
2278
        method = &Benchmark::ReadReverse;
2279
      } else if (name == "readrandom") {
2280
        method = &Benchmark::ReadRandom;
2281
      } else if (name == "readrandomfast") {
L
Lei Jin 已提交
2282
        method = &Benchmark::ReadRandomFast;
2283
      } else if (name == "multireadrandom") {
M
mike@arpaia.co 已提交
2284 2285
        fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
                entries_per_batch_);
L
Lei Jin 已提交
2286
        method = &Benchmark::MultiReadRandom;
2287
      } else if (name == "readmissing") {
L
Lei Jin 已提交
2288 2289
        ++key_size_;
        method = &Benchmark::ReadRandom;
2290
      } else if (name == "newiterator") {
2291
        method = &Benchmark::IteratorCreation;
2292
      } else if (name == "newiteratorwhilewriting") {
2293 2294
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::IteratorCreationWhileWriting;
2295
      } else if (name == "seekrandom") {
S
Sanjay Ghemawat 已提交
2296
        method = &Benchmark::SeekRandom;
2297
      } else if (name == "seekrandomwhilewriting") {
L
Lei Jin 已提交
2298 2299
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::SeekRandomWhileWriting;
2300
      } else if (name == "seekrandomwhilemerging") {
2301 2302
        num_threads++;  // Add extra thread for merging
        method = &Benchmark::SeekRandomWhileMerging;
2303
      } else if (name == "readrandomsmall") {
2304
        reads_ /= 1000;
2305
        method = &Benchmark::ReadRandom;
2306
      } else if (name == "deleteseq") {
S
Sanjay Ghemawat 已提交
2307
        method = &Benchmark::DeleteSeq;
2308
      } else if (name == "deleterandom") {
S
Sanjay Ghemawat 已提交
2309
        method = &Benchmark::DeleteRandom;
2310
      } else if (name == "readwhilewriting") {
2311 2312
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
2313
      } else if (name == "readwhilemerging") {
M
Mark Callaghan 已提交
2314 2315
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileMerging;
2316
      } else if (name == "readrandomwriterandom") {
2317
        method = &Benchmark::ReadRandomWriteRandom;
2318
      } else if (name == "readrandommergerandom") {
2319 2320
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2321
                  name.c_str());
L
Lei Jin 已提交
2322
          exit(1);
2323
        }
L
Lei Jin 已提交
2324
        method = &Benchmark::ReadRandomMergeRandom;
2325
      } else if (name == "updaterandom") {
M
Mark Callaghan 已提交
2326
        method = &Benchmark::UpdateRandom;
2327
      } else if (name == "appendrandom") {
D
Deon Nicholas 已提交
2328
        method = &Benchmark::AppendRandom;
2329
      } else if (name == "mergerandom") {
D
Deon Nicholas 已提交
2330 2331
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2332
                  name.c_str());
L
Lei Jin 已提交
2333
          exit(1);
D
Deon Nicholas 已提交
2334
        }
L
Lei Jin 已提交
2335
        method = &Benchmark::MergeRandom;
2336
      } else if (name == "randomwithverify") {
2337
        method = &Benchmark::RandomWithVerify;
2338
      } else if (name == "fillseekseq") {
T
Tomislav Novak 已提交
2339
        method = &Benchmark::WriteSeqSeekSeq;
2340
      } else if (name == "compact") {
2341
        method = &Benchmark::Compact;
2342
      } else if (name == "crc32c") {
2343
        method = &Benchmark::Crc32c;
2344
      } else if (name == "xxhash") {
I
xxHash  
Igor Canadi 已提交
2345
        method = &Benchmark::xxHash;
2346
      } else if (name == "acquireload") {
2347
        method = &Benchmark::AcquireLoad;
2348
      } else if (name == "compress") {
A
Albert Strasheim 已提交
2349
        method = &Benchmark::Compress;
2350
      } else if (name == "uncompress") {
A
Albert Strasheim 已提交
2351
        method = &Benchmark::Uncompress;
2352
#ifndef ROCKSDB_LITE
2353
      } else if (name == "randomtransaction") {
A
agiardullo 已提交
2354 2355
        method = &Benchmark::RandomTransaction;
        post_process_method = &Benchmark::RandomTransactionVerify;
2356
#endif  // ROCKSDB_LITE
A
Andres Noetzli 已提交
2357 2358 2359
      } else if (name == "randomreplacekeys") {
        fresh_db = true;
        method = &Benchmark::RandomReplaceKeys;
2360 2361 2362 2363 2364 2365 2366 2367 2368
      } else if (name == "timeseries") {
        timestamp_emulator_.reset(new TimestampEmulator());
        if (FLAGS_expire_style == "compaction_filter") {
          filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
          fprintf(stdout, "Compaction filter is used to remove expired data");
          open_options_.compaction_filter = filter.get();
        }
        fresh_db = true;
        method = &Benchmark::TimeSeries;
2369
      } else if (name == "stats") {
2370
        PrintStats("rocksdb.stats");
2371
      } else if (name == "levelstats") {
2372
        PrintStats("rocksdb.levelstats");
2373
      } else if (name == "sstables") {
2374
        PrintStats("rocksdb.sstables");
2375 2376 2377
      } else if (!name.empty()) {  // No error message for empty name
        fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
        exit(1);
2378
      }
2379 2380 2381 2382

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
2383
                  name.c_str());
2384
          method = nullptr;
2385
        } else {
2386
          if (db_.db != nullptr) {
A
agiardullo 已提交
2387
            db_.DeleteDBs();
2388
            DestroyDB(FLAGS_db, open_options_);
2389
          }
2390
          Options options = open_options_;
2391
          for (size_t i = 0; i < multi_dbs_.size(); i++) {
2392
            delete multi_dbs_[i].db;
2393 2394 2395 2396
            if (!open_options_.wal_dir.empty()) {
              options.wal_dir = GetPathForMultiple(open_options_.wal_dir, i);
            }
            DestroyDB(GetPathForMultiple(FLAGS_db, i), options);
2397 2398
          }
          multi_dbs_.clear();
2399
        }
2400
        Open(&open_options_);  // use open_options for the last accessed
2401 2402
      }

2403
      if (method != nullptr) {
2404
        fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424
        if (num_warmup > 0) {
          printf("Warming up benchmark by running %d times\n", num_warmup);
        }

        for (int i = 0; i < num_warmup; i++) {
          RunBenchmark(num_threads, name, method);
        }

        if (num_repeat > 1) {
          printf("Running benchmark for %d times\n", num_repeat);
        }

        CombinedStats combined_stats;
        for (int i = 0; i < num_repeat; i++) {
          Stats stats = RunBenchmark(num_threads, name, method);
          combined_stats.AddStats(stats);
        }
        if (num_repeat > 1) {
          combined_stats.Report(name);
        }
J
jorlow@chromium.org 已提交
2425
      }
A
agiardullo 已提交
2426 2427 2428
      if (post_process_method != nullptr) {
        (this->*post_process_method)();
      }
J
jorlow@chromium.org 已提交
2429
    }
2430
    if (FLAGS_statistics) {
K
krad 已提交
2431
      fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2432
    }
I
Islam AbdelRahman 已提交
2433
    if (FLAGS_simcache_size >= 0) {
2434 2435 2436
      fprintf(stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
              std::dynamic_pointer_cast<SimCache>(cache_)->ToString().c_str());
    }
J
jorlow@chromium.org 已提交
2437 2438
  }

2439
 private:
2440
  std::shared_ptr<TimestampEmulator> timestamp_emulator_;
2441

2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

2464
    SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
2465
    thread->stats.Start(thread->tid);
2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

2478 2479
  Stats RunBenchmark(int n, Slice name,
                     void (Benchmark::*method)(ThreadState*)) {
2480 2481 2482 2483 2484
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;
2485 2486 2487 2488
    if (FLAGS_benchmark_write_rate_limit > 0) {
      shared.write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
2489 2490 2491 2492
    if (FLAGS_benchmark_read_rate_limit > 0) {
      shared.read_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_read_rate_limit));
    }
2493

2494 2495 2496 2497 2498 2499
    std::unique_ptr<ReporterAgent> reporter_agent;
    if (FLAGS_report_interval_seconds > 0) {
      reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
                                             FLAGS_report_interval_seconds));
    }

2500
    ThreadArg* arg = new ThreadArg[n];
2501

2502
    for (int i = 0; i < n; i++) {
2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519
#ifdef NUMA
      if (FLAGS_enable_numa) {
        // Performs a local allocation of memory to threads in numa node.
        int n_nodes = numa_num_task_nodes();  // Number of nodes in NUMA.
        numa_exit_on_error = 1;
        int numa_node = i % n_nodes;
        bitmask* nodes = numa_allocate_nodemask();
        numa_bitmask_clearall(nodes);
        numa_bitmask_setbit(nodes, numa_node);
        // numa_bind() call binds the process to the node and these
        // properties are passed on to the thread that is created in
        // StartThread method called later in the loop.
        numa_bind(nodes);
        numa_set_strict(1);
        numa_free_nodemask(nodes);
      }
#endif
2520 2521 2522 2523
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
2524
      arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
2525
      arg[i].thread->shared = &shared;
2526
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

2541 2542 2543 2544
    // Stats for some threads can be excluded.
    Stats merge_stats;
    for (int i = 0; i < n; i++) {
      merge_stats.Merge(arg[i].thread->stats);
2545
    }
2546
    merge_stats.Report(name);
2547

2548 2549 2550 2551 2552
    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;

2553
    return merge_stats;
2554 2555 2556
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
2557
    // Checksum about 500MB of data total
2558 2559
    const int size = 4096;
    const char* label = "(4K per op)";
J
jorlow@chromium.org 已提交
2560
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
2561 2562 2563 2564
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
2565
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
J
jorlow@chromium.org 已提交
2566 2567 2568 2569 2570
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

2571 2572
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
2573 2574
  }

I
xxHash  
Igor Canadi 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583
  void xxHash(ThreadState* thread) {
    // Checksum about 500MB of data total
    const int size = 4096;
    const char* label = "(4K per op)";
    std::string data(size, 'x');
    int64_t bytes = 0;
    unsigned int xxh32 = 0;
    while (bytes < 500 * 1048576) {
      xxh32 = XXH32(data.data(), size, 0);
2584
      thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
I
xxHash  
Igor Canadi 已提交
2585 2586 2587 2588 2589 2590 2591 2592 2593
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));

    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
  }

2594
  void AcquireLoad(ThreadState* thread) {
2595
    int dummy;
I
Igor Canadi 已提交
2596
    std::atomic<void*> ap(&dummy);
2597
    int count = 0;
2598
    void *ptr = nullptr;
2599
    thread->stats.AddMessage("(each op is 1000 loads)");
2600 2601
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
I
Igor Canadi 已提交
2602
        ptr = ap.load(std::memory_order_acquire);
2603 2604
      }
      count++;
2605
      thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
2606
    }
2607
    if (ptr == nullptr) exit(1);  // Disable unused variable warning.
2608 2609
  }

A
Albert Strasheim 已提交
2610
  void Compress(ThreadState *thread) {
2611
    RandomGenerator gen;
2612
    Slice input = gen.Generate(FLAGS_block_size);
2613 2614 2615 2616
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
A
Albert Strasheim 已提交
2617 2618 2619

    // Compress 1G
    while (ok && bytes < int64_t(1) << 30) {
2620
      compressed.clear();
2621
      ok = CompressSlice(input, &compressed);
2622 2623
      produced += compressed.size();
      bytes += input.size();
2624
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
2625 2626 2627
    }

    if (!ok) {
A
Albert Strasheim 已提交
2628
      thread->stats.AddMessage("(compression failure)");
2629
    } else {
D
Daniel Black 已提交
2630
      char buf[340];
2631 2632
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
2633 2634
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
2635 2636 2637
    }
  }

A
Albert Strasheim 已提交
2638
  void Uncompress(ThreadState *thread) {
2639
    RandomGenerator gen;
2640
    Slice input = gen.Generate(FLAGS_block_size);
2641
    std::string compressed;
A
Albert Strasheim 已提交
2642

2643
    bool ok = CompressSlice(input, &compressed);
2644
    int64_t bytes = 0;
A
Albert Strasheim 已提交
2645 2646 2647 2648
    int decompress_size;
    while (ok && bytes < 1024 * 1048576) {
      char *uncompressed = nullptr;
      switch (FLAGS_compression_type_e) {
2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661
        case rocksdb::kSnappyCompression: {
          // get size and allocate here to make comparison fair
          size_t ulength = 0;
          if (!Snappy_GetUncompressedLength(compressed.data(),
                                            compressed.size(), &ulength)) {
            ok = false;
            break;
          }
          uncompressed = new char[ulength];
          ok = Snappy_Uncompress(compressed.data(), compressed.size(),
                                 uncompressed);
          break;
        }
A
Albert Strasheim 已提交
2662
      case rocksdb::kZlibCompression:
I
Igor Canadi 已提交
2663
        uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(),
2664
                                       &decompress_size, 2);
A
Albert Strasheim 已提交
2665 2666 2667
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kBZip2Compression:
I
Igor Canadi 已提交
2668
        uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
2669
                                        &decompress_size, 2);
A
Albert Strasheim 已提交
2670 2671 2672
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4Compression:
I
Igor Canadi 已提交
2673
        uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
2674
                                      &decompress_size, 2);
A
Albert Strasheim 已提交
2675 2676 2677
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4HCCompression:
I
Igor Canadi 已提交
2678
        uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
2679
                                      &decompress_size, 2);
A
Albert Strasheim 已提交
2680 2681
        ok = uncompressed != nullptr;
        break;
2682 2683 2684 2685 2686
      case rocksdb::kXpressCompression:
        uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
          &decompress_size);
        ok = uncompressed != nullptr;
        break;
S
sdong 已提交
2687
      case rocksdb::kZSTD:
2688 2689 2690 2691
        uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(),
                                       &decompress_size);
        ok = uncompressed != nullptr;
        break;
A
Albert Strasheim 已提交
2692 2693 2694 2695
      default:
        ok = false;
      }
      delete[] uncompressed;
2696
      bytes += input.size();
2697
      thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
2698 2699 2700
    }

    if (!ok) {
A
Albert Strasheim 已提交
2701
      thread->stats.AddMessage("(compression failure)");
2702
    } else {
2703
      thread->stats.AddBytes(bytes);
2704 2705 2706
    }
  }

2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730
  // Returns true if the options is initialized from the specified
  // options file.
  bool InitializeOptionsFromFile(Options* opts) {
#ifndef ROCKSDB_LITE
    printf("Initializing RocksDB Options from the specified file\n");
    DBOptions db_opts;
    std::vector<ColumnFamilyDescriptor> cf_descs;
    if (FLAGS_options_file != "") {
      auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
                                   &cf_descs);
      if (s.ok()) {
        *opts = Options(db_opts, cf_descs[0].options);
        return true;
      }
      fprintf(stderr, "Unable to load options file %s --- %s\n",
              FLAGS_options_file.c_str(), s.ToString().c_str());
      exit(1);
    }
#endif
    return false;
  }

  void InitializeOptionsFromFlags(Options* opts) {
    printf("Initializing RocksDB Options from command-line flags\n");
2731 2732
    Options& options = *opts;

2733
    assert(db_.db == nullptr);
2734

2735
    options.create_missing_column_families = FLAGS_num_column_families > 1;
2736
    options.max_open_files = FLAGS_open_files;
2737
    options.db_write_buffer_size = FLAGS_db_write_buffer_size;
2738
    options.write_buffer_size = FLAGS_write_buffer_size;
2739
    options.max_write_buffer_number = FLAGS_max_write_buffer_number;
2740 2741
    options.min_write_buffer_number_to_merge =
      FLAGS_min_write_buffer_number_to_merge;
2742 2743
    options.max_write_buffer_number_to_maintain =
        FLAGS_max_write_buffer_number_to_maintain;
2744
    options.base_background_compactions = FLAGS_base_background_compactions;
2745
    options.max_background_compactions = FLAGS_max_background_compactions;
2746
    options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2747
    options.max_background_flushes = FLAGS_max_background_flushes;
2748
    options.compaction_style = FLAGS_compaction_style_e;
2749
    options.compaction_pri = FLAGS_compaction_pri_e;
2750 2751 2752
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
    options.use_direct_reads = FLAGS_use_direct_reads;
2753
    if (FLAGS_prefix_size != 0) {
2754 2755 2756
      options.prefix_extractor.reset(
          NewFixedPrefixTransform(FLAGS_prefix_size));
    }
2757 2758 2759 2760 2761 2762 2763
    if (FLAGS_use_uint64_comparator) {
      options.comparator = test::Uint64Comparator();
      if (FLAGS_key_size != 8) {
        fprintf(stderr, "Using Uint64 comparator but key size is not 8.\n");
        exit(1);
      }
    }
2764 2765 2766
    if (FLAGS_use_stderr_info_logger) {
      options.info_log.reset(new StderrLogger());
    }
2767
    options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
2768
    options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
2769 2770 2771 2772 2773
    if (FLAGS_memtable_insert_with_hint_prefix_size > 0) {
      options.memtable_insert_with_hint_prefix_extractor.reset(
          NewCappedPrefixTransform(
              FLAGS_memtable_insert_with_hint_prefix_size));
    }
L
Lei Jin 已提交
2774
    options.bloom_locality = FLAGS_bloom_locality;
2775
    options.max_file_opening_threads = FLAGS_file_opening_threads;
2776 2777 2778
    options.new_table_reader_for_compaction_inputs =
        FLAGS_new_table_reader_for_compaction_inputs;
    options.compaction_readahead_size = FLAGS_compaction_readahead_size;
2779
    options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
2780
    options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
H
heyongqiang 已提交
2781
    options.disableDataSync = FLAGS_disable_data_sync;
2782
    options.use_fsync = FLAGS_use_fsync;
2783
    options.num_levels = FLAGS_num_levels;
H
heyongqiang 已提交
2784 2785 2786
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2787 2788
    options.level_compaction_dynamic_level_bytes =
        FLAGS_level_compaction_dynamic_level_bytes;
H
heyongqiang 已提交
2789 2790
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
2791 2792 2793 2794
    if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
                                     FLAGS_rep_factory == kHashLinkedList)) {
      fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
                      "HashLinkedList memtablerep is used\n");
J
Jim Paton 已提交
2795 2796 2797 2798
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kSkipList:
T
Tomislav Novak 已提交
2799 2800
        options.memtable_factory.reset(new SkipListFactory(
            FLAGS_skip_list_lookahead));
J
Jim Paton 已提交
2801
        break;
S
sdong 已提交
2802 2803 2804 2805 2806
#ifndef ROCKSDB_LITE
      case kPrefixHash:
        options.memtable_factory.reset(
            NewHashSkipListRepFactory(FLAGS_hash_bucket_count));
        break;
2807 2808 2809 2810
      case kHashLinkedList:
        options.memtable_factory.reset(NewHashLinkListRepFactory(
            FLAGS_hash_bucket_count));
        break;
J
Jim Paton 已提交
2811 2812 2813 2814 2815
      case kVectorRep:
        options.memtable_factory.reset(
          new VectorRepFactory
        );
        break;
2816 2817 2818 2819
      case kCuckoo:
        options.memtable_factory.reset(NewHashCuckooRepFactory(
            options.write_buffer_size, FLAGS_key_size + FLAGS_value_size));
        break;
S
sdong 已提交
2820 2821 2822 2823 2824
#else
      default:
        fprintf(stderr, "Only skip list is supported in lite mode\n");
        exit(1);
#endif  // ROCKSDB_LITE
J
Jim Paton 已提交
2825
    }
L
Lei Jin 已提交
2826
    if (FLAGS_use_plain_table) {
S
sdong 已提交
2827
#ifndef ROCKSDB_LITE
2828 2829
      if (FLAGS_rep_factory != kPrefixHash &&
          FLAGS_rep_factory != kHashLinkedList) {
L
Lei Jin 已提交
2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840
        fprintf(stderr, "Waring: plain table is used with skipList\n");
      }
      if (!FLAGS_mmap_read && !FLAGS_mmap_write) {
        fprintf(stderr, "plain table format requires mmap to operate\n");
        exit(1);
      }

      int bloom_bits_per_key = FLAGS_bloom_bits;
      if (bloom_bits_per_key < 0) {
        bloom_bits_per_key = 0;
      }
S
Stanislau Hlebik 已提交
2841 2842 2843 2844 2845 2846 2847

      PlainTableOptions plain_table_options;
      plain_table_options.user_key_len = FLAGS_key_size;
      plain_table_options.bloom_bits_per_key = bloom_bits_per_key;
      plain_table_options.hash_table_ratio = 0.75;
      options.table_factory = std::shared_ptr<TableFactory>(
          NewPlainTableFactory(plain_table_options));
S
sdong 已提交
2848 2849 2850 2851
#else
      fprintf(stderr, "Plain table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
2852
    } else if (FLAGS_use_cuckoo_table) {
S
sdong 已提交
2853
#ifndef ROCKSDB_LITE
2854 2855 2856 2857
      if (FLAGS_cuckoo_hash_ratio > 1 || FLAGS_cuckoo_hash_ratio < 0) {
        fprintf(stderr, "Invalid cuckoo_hash_ratio\n");
        exit(1);
      }
2858 2859 2860
      rocksdb::CuckooTableOptions table_options;
      table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
      table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
2861
      options.table_factory = std::shared_ptr<TableFactory>(
2862
          NewCuckooTableFactory(table_options));
S
sdong 已提交
2863 2864 2865 2866
#else
      fprintf(stderr, "Cuckoo table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
2867 2868 2869
    } else {
      BlockBasedTableOptions block_based_options;
      if (FLAGS_use_hash_search) {
2870 2871 2872 2873 2874
        if (FLAGS_prefix_size == 0) {
          fprintf(stderr,
              "prefix_size not assigned when enable use_hash_search \n");
          exit(1);
        }
2875 2876 2877 2878
        block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
      } else {
        block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
      }
2879 2880 2881
      if (cache_ == nullptr) {
        block_based_options.no_block_cache = true;
      }
2882 2883
      block_based_options.cache_index_and_filter_blocks =
          FLAGS_cache_index_and_filter_blocks;
2884 2885
      block_based_options.pin_l0_filter_and_index_blocks_in_cache =
          FLAGS_pin_l0_filter_and_index_blocks_in_cache;
2886 2887 2888 2889
      block_based_options.block_cache = cache_;
      block_based_options.block_cache_compressed = compressed_cache_;
      block_based_options.block_size = FLAGS_block_size;
      block_based_options.block_restart_interval = FLAGS_block_restart_interval;
2890 2891
      block_based_options.index_block_restart_interval =
          FLAGS_index_block_restart_interval;
2892
      block_based_options.filter_policy = filter_policy_;
I
Islam AbdelRahman 已提交
2893
      block_based_options.skip_table_builder_flush =
2894
          FLAGS_skip_table_builder_flush;
2895
      block_based_options.format_version = 2;
2896
      block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
2897 2898
      options.table_factory.reset(
          NewBlockBasedTableFactory(block_based_options));
L
Lei Jin 已提交
2899
    }
2900 2901
    if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
      if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
2902 2903
          (unsigned int)FLAGS_num_levels) {
        fprintf(stderr, "Insufficient number of fanouts specified %d\n",
2904
                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
2905 2906 2907
        exit(1);
      }
      options.max_bytes_for_level_multiplier_additional =
2908
        FLAGS_max_bytes_for_level_multiplier_additional_v;
2909
    }
H
heyongqiang 已提交
2910
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
M
Mark Callaghan 已提交
2911
    options.level0_file_num_compaction_trigger =
2912
        FLAGS_level0_file_num_compaction_trigger;
H
heyongqiang 已提交
2913 2914
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
2915
    options.compression = FLAGS_compression_type_e;
2916
    options.compression_opts.level = FLAGS_compression_level;
2917
    options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2918 2919
    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
2920 2921
    options.max_total_wal_size = FLAGS_max_total_wal_size;

2922 2923
    if (FLAGS_min_level_to_compress >= 0) {
      assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
2924
      options.compression_per_level.resize(FLAGS_num_levels);
2925
      for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
2926 2927
        options.compression_per_level[i] = kNoCompression;
      }
2928
      for (int i = FLAGS_min_level_to_compress;
2929
           i < FLAGS_num_levels; i++) {
2930
        options.compression_per_level[i] = FLAGS_compression_type_e;
2931 2932
      }
    }
J
Jim Paton 已提交
2933 2934
    options.soft_rate_limit = FLAGS_soft_rate_limit;
    options.hard_rate_limit = FLAGS_hard_rate_limit;
2935 2936
    options.soft_pending_compaction_bytes_limit =
        FLAGS_soft_pending_compaction_bytes_limit;
2937 2938
    options.hard_pending_compaction_bytes_limit =
        FLAGS_hard_pending_compaction_bytes_limit;
S
sdong 已提交
2939
    options.delayed_write_rate = FLAGS_delayed_write_rate;
2940 2941 2942 2943 2944 2945
    options.allow_concurrent_memtable_write =
        FLAGS_allow_concurrent_memtable_write;
    options.enable_write_thread_adaptive_yield =
        FLAGS_enable_write_thread_adaptive_yield;
    options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
    options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
J
Jim Paton 已提交
2946 2947
    options.rate_limit_delay_max_milliseconds =
      FLAGS_rate_limit_delay_max_milliseconds;
2948
    options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
2949
    options.max_compaction_bytes = FLAGS_max_compaction_bytes;
2950
    options.disable_auto_compactions = FLAGS_disable_auto_compactions;
2951
    options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
2952 2953

    // fill storage options
2954 2955 2956
    options.allow_os_buffer = FLAGS_bufferedio;
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
2957
    options.advise_random_on_open = FLAGS_advise_random_on_open;
2958
    options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
H
Haobo Xu 已提交
2959
    options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
H
Haobo Xu 已提交
2960
    options.bytes_per_sync = FLAGS_bytes_per_sync;
2961
    options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
H
Haobo Xu 已提交
2962

D
Deon Nicholas 已提交
2963
    // merge operator options
2964 2965 2966
    options.merge_operator = MergeOperators::CreateFromStringId(
        FLAGS_merge_operator);
    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
D
Deon Nicholas 已提交
2967 2968 2969 2970
      fprintf(stderr, "invalid merge operator: %s\n",
              FLAGS_merge_operator.c_str());
      exit(1);
    }
2971
    options.max_successive_merges = FLAGS_max_successive_merges;
2972
    options.report_bg_io_stats = FLAGS_report_bg_io_stats;
D
Deon Nicholas 已提交
2973

2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
      options.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    }
    if (FLAGS_universal_min_merge_width != 0) {
      options.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    }
    if (FLAGS_universal_max_merge_width != 0) {
      options.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
      options.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    }
2991 2992 2993 2994
    if (FLAGS_universal_compression_size_percent != -1) {
      options.compaction_options_universal.compression_size_percent =
        FLAGS_universal_compression_size_percent;
    }
2995 2996
    options.compaction_options_universal.allow_trivial_move =
        FLAGS_universal_allow_trivial_move;
2997 2998 2999
    if (FLAGS_thread_status_per_interval > 0) {
      options.enable_thread_tracking = true;
    }
S
sdong 已提交
3000 3001 3002 3003
    if (FLAGS_rate_limiter_bytes_per_sec > 0) {
      options.rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec));
    }
3004

3005
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
3006 3007 3008 3009
    if (FLAGS_readonly && FLAGS_transaction_db) {
      fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
      exit(1);
    }
3010
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
3011

3012 3013 3014 3015 3016 3017 3018 3019
  }

  void InitializeOptionsGeneral(Options* opts) {
    Options& options = *opts;

    options.statistics = dbstats;
    options.wal_dir = FLAGS_wal_dir;
    options.create_if_missing = !FLAGS_use_existing_db;
3020
    options.dump_malloc_stats = FLAGS_dump_malloc_stats;
3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033

    if (FLAGS_row_cache_size) {
      if (FLAGS_cache_numshardbits >= 1) {
        options.row_cache =
            NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
      } else {
        options.row_cache = NewLRUCache(FLAGS_row_cache_size);
      }
    }
    if (FLAGS_enable_io_prio) {
      FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
      FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
    }
I
Igor Canadi 已提交
3034
    options.env = FLAGS_env;
3035

3036 3037 3038 3039
    if (FLAGS_num_multi_db <= 1) {
      OpenDb(options, FLAGS_db, &db_);
    } else {
      multi_dbs_.clear();
3040
      multi_dbs_.resize(FLAGS_num_multi_db);
3041
      auto wal_dir = options.wal_dir;
3042
      for (int i = 0; i < FLAGS_num_multi_db; i++) {
3043 3044 3045 3046
        if (!wal_dir.empty()) {
          options.wal_dir = GetPathForMultiple(wal_dir, i);
        }
        OpenDb(options, GetPathForMultiple(FLAGS_db, i), &multi_dbs_[i]);
3047
      }
3048
      options.wal_dir = wal_dir;
3049
    }
3050 3051 3052 3053 3054
  }

  void Open(Options* opts) {
    if (!InitializeOptionsFromFile(opts)) {
      InitializeOptionsFromFlags(opts);
3055
    }
3056

3057
    InitializeOptionsGeneral(opts);
3058 3059
  }

3060 3061
  void OpenDb(const Options& options, const std::string& db_name,
      DBWithColumnFamilies* db) {
H
heyongqiang 已提交
3062
    Status s;
3063 3064
    // Open with column families if necessary.
    if (FLAGS_num_column_families > 1) {
3065 3066 3067 3068 3069 3070 3071
      size_t num_hot = FLAGS_num_column_families;
      if (FLAGS_num_hot_column_families > 0 &&
          FLAGS_num_hot_column_families < FLAGS_num_column_families) {
        num_hot = FLAGS_num_hot_column_families;
      } else {
        FLAGS_num_hot_column_families = FLAGS_num_column_families;
      }
3072
      std::vector<ColumnFamilyDescriptor> column_families;
3073
      for (size_t i = 0; i < num_hot; i++) {
3074 3075 3076
        column_families.push_back(ColumnFamilyDescriptor(
              ColumnFamilyName(i), ColumnFamilyOptions(options)));
      }
3077
#ifndef ROCKSDB_LITE
3078 3079 3080
      if (FLAGS_readonly) {
        s = DB::OpenForReadOnly(options, db_name, column_families,
            &db->cfh, &db->db);
A
agiardullo 已提交
3081
      } else if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
3082
        s = OptimisticTransactionDB::Open(options, db_name, column_families,
A
agiardullo 已提交
3083 3084 3085 3086 3087 3088 3089 3090 3091
                                          &db->cfh, &db->opt_txn_db);
        if (s.ok()) {
          db->db = db->opt_txn_db->GetBaseDB();
        }
      } else if (FLAGS_transaction_db) {
        TransactionDB* ptr;
        TransactionDBOptions txn_db_options;
        s = TransactionDB::Open(options, txn_db_options, db_name,
                                column_families, &db->cfh, &ptr);
A
agiardullo 已提交
3092
        if (s.ok()) {
A
agiardullo 已提交
3093
          db->db = ptr;
A
agiardullo 已提交
3094
        }
3095 3096 3097
      } else {
        s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
      }
3098 3099 3100
#else
      s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
#endif  // ROCKSDB_LITE
3101 3102 3103
      db->cfh.resize(FLAGS_num_column_families);
      db->num_created = num_hot;
      db->num_hot = num_hot;
3104
#ifndef ROCKSDB_LITE
3105 3106
    } else if (FLAGS_readonly) {
      s = DB::OpenForReadOnly(options, db_name, &db->db);
A
agiardullo 已提交
3107 3108 3109 3110 3111
    } else if (FLAGS_optimistic_transaction_db) {
      s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db);
      if (s.ok()) {
        db->db = db->opt_txn_db->GetBaseDB();
      }
A
agiardullo 已提交
3112
    } else if (FLAGS_transaction_db) {
A
agiardullo 已提交
3113 3114 3115
      TransactionDB* ptr;
      TransactionDBOptions txn_db_options;
      s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
A
agiardullo 已提交
3116
      if (s.ok()) {
A
agiardullo 已提交
3117
        db->db = ptr;
A
agiardullo 已提交
3118
      }
3119
#endif  // ROCKSDB_LITE
3120 3121
    } else if (FLAGS_use_blob_db) {
      s = NewBlobDB(options, db_name, &db->db);
H
heyongqiang 已提交
3122
    } else {
3123
      s = DB::Open(options, db_name, &db->db);
H
heyongqiang 已提交
3124
    }
3125 3126 3127 3128 3129 3130
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

3131 3132 3133 3134
  enum WriteMode {
    RANDOM, SEQUENTIAL, UNIQUE_RANDOM
  };

3135 3136 3137 3138 3139 3140 3141 3142 3143
  void WriteSeqDeterministic(ThreadState* thread) {
    DoDeterministicCompact(thread, open_options_.compaction_style, SEQUENTIAL);
  }

  void WriteUniqueRandomDeterministic(ThreadState* thread) {
    DoDeterministicCompact(thread, open_options_.compaction_style,
                           UNIQUE_RANDOM);
  }

3144
  void WriteSeq(ThreadState* thread) {
3145
    DoWrite(thread, SEQUENTIAL);
3146
  }
3147

3148
  void WriteRandom(ThreadState* thread) {
3149
    DoWrite(thread, RANDOM);
3150 3151
  }

3152 3153 3154 3155
  void WriteUniqueRandom(ThreadState* thread) {
    DoWrite(thread, UNIQUE_RANDOM);
  }

3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172
  class KeyGenerator {
   public:
    KeyGenerator(Random64* rand, WriteMode mode,
        uint64_t num, uint64_t num_per_set = 64 * 1024)
      : rand_(rand),
        mode_(mode),
        num_(num),
        next_(0) {
      if (mode_ == UNIQUE_RANDOM) {
        // NOTE: if memory consumption of this approach becomes a concern,
        // we can either break it into pieces and only random shuffle a section
        // each time. Alternatively, use a bit map implementation
        // (https://reviews.facebook.net/differential/diff/54627/)
        values_.resize(num_);
        for (uint64_t i = 0; i < num_; ++i) {
          values_[i] = i;
        }
3173 3174 3175
        std::shuffle(
            values_.begin(), values_.end(),
            std::default_random_engine(static_cast<unsigned int>(FLAGS_seed)));
3176 3177 3178 3179 3180 3181 3182 3183 3184 3185
      }
    }

    uint64_t Next() {
      switch (mode_) {
        case SEQUENTIAL:
          return next_++;
        case RANDOM:
          return rand_->Next() % num_;
        case UNIQUE_RANDOM:
3186
          assert(next_ + 1 < num_);
3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200
          return values_[next_++];
      }
      assert(false);
      return std::numeric_limits<uint64_t>::max();
    }

   private:
    Random64* rand_;
    WriteMode mode_;
    const uint64_t num_;
    uint64_t next_;
    std::vector<uint64_t> values_;
  };

3201
  DB* SelectDB(ThreadState* thread) {
3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213
    return SelectDBWithCfh(thread)->db;
  }

  DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
    return SelectDBWithCfh(thread->rand.Next());
  }

  DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
    if (db_.db != nullptr) {
      return &db_;
    } else  {
      return &multi_dbs_[rand_int % multi_dbs_.size()];
3214 3215
    }
  }
3216

3217 3218
  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
3219
    const int64_t num_ops = writes_ == 0 ? num_ : writes_;
3220

3221
    size_t num_key_gens = 1;
3222
    if (db_.db == nullptr) {
3223 3224 3225
      num_key_gens = multi_dbs_.size();
    }
    std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
3226 3227 3228 3229 3230 3231 3232 3233 3234
    int64_t max_ops = num_ops * num_key_gens;
    int64_t ops_per_stage = max_ops;
    if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
      ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
                                       FLAGS_num_hot_column_families) +
                      1;
    }

    Duration duration(test_duration, max_ops, ops_per_stage);
3235
    for (size_t i = 0; i < num_key_gens; i++) {
3236
      key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_,
3237
                                         ops_per_stage));
3238
    }
M
Mark Callaghan 已提交
3239

3240
    if (num_ != FLAGS_num) {
3241
      char msg[100];
3242
      snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
3243
      thread->stats.AddMessage(msg);
3244 3245
    }

3246
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
3247 3248
    WriteBatch batch;
    Status s;
3249
    int64_t bytes = 0;
L
Lei Jin 已提交
3250

3251 3252
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
A
Andrew Kryczka 已提交
3253 3254 3255 3256
    std::unique_ptr<const char[]> begin_key_guard;
    Slice begin_key = AllocateKey(&begin_key_guard);
    std::unique_ptr<const char[]> end_key_guard;
    Slice end_key = AllocateKey(&end_key_guard);
3257 3258 3259 3260 3261 3262 3263 3264
    std::vector<std::unique_ptr<const char[]>> expanded_key_guards;
    std::vector<Slice> expanded_keys;
    if (FLAGS_expand_range_tombstones) {
      expanded_key_guards.resize(range_tombstone_width_);
      for (auto& expanded_key_guard : expanded_key_guards) {
        expanded_keys.emplace_back(AllocateKey(&expanded_key_guard));
      }
    }
A
Andrew Kryczka 已提交
3265

3266
    int64_t stage = 0;
A
Andrew Kryczka 已提交
3267
    int64_t num_written = 0;
M
Mark Callaghan 已提交
3268
    while (!duration.Done(entries_per_batch_)) {
3269 3270 3271 3272 3273 3274 3275 3276 3277 3278
      if (duration.GetStage() != stage) {
        stage = duration.GetStage();
        if (db_.db != nullptr) {
          db_.CreateNewCf(open_options_, stage);
        } else {
          for (auto& db : multi_dbs_) {
            db.CreateNewCf(open_options_, stage);
          }
        }
      }
3279

3280 3281
      size_t id = thread->rand.Next() % num_key_gens;
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
J
jorlow@chromium.org 已提交
3282
      batch.Clear();
3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293

      if (thread->shared->write_rate_limiter.get() != nullptr) {
        thread->shared->write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_),
            Env::IO_HIGH);
        // Set time at which last op finished to Now() to hide latency and
        // sleep from rate limiter. Also, do the check once per batch, not
        // once per write.
        thread->stats.ResetLastOpTime();
      }

L
Lei Jin 已提交
3294
      for (int64_t j = 0; j < entries_per_batch_; j++) {
3295 3296
        int64_t rand_num = key_gens[id]->Next();
        GenerateKeyFromInt(rand_num, FLAGS_num, &key);
3297 3298 3299 3300
        if (FLAGS_use_blob_db) {
          s = db_with_cfh->db->Put(write_options_, key,
                                   gen.Generate(value_size_));
        } else if (FLAGS_num_column_families <= 1) {
3301 3302 3303 3304 3305
          batch.Put(key, gen.Generate(value_size_));
        } else {
          // We use same rand_num as seed for key and column family so that we
          // can deterministically find the cfh corresponding to a particular
          // key while reading the key.
3306 3307
          batch.Put(db_with_cfh->GetCfh(rand_num), key,
                    gen.Generate(value_size_));
3308
        }
L
Lei Jin 已提交
3309
        bytes += value_size_ + key_size_;
A
Andrew Kryczka 已提交
3310 3311 3312 3313 3314 3315
        ++num_written;
        if (writes_per_range_tombstone_ > 0 &&
            num_written / writes_per_range_tombstone_ <
                max_num_range_tombstones_ &&
            num_written % writes_per_range_tombstone_ == 0) {
          int64_t begin_num = key_gens[id]->Next();
3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330
          if (FLAGS_expand_range_tombstones) {
            for (int64_t offset = 0; offset < range_tombstone_width_;
                 ++offset) {
              GenerateKeyFromInt(begin_num + offset, FLAGS_num,
                                 &expanded_keys[offset]);
              if (FLAGS_use_blob_db) {
                s = db_with_cfh->db->Delete(write_options_,
                                            expanded_keys[offset]);
              } else if (FLAGS_num_column_families <= 1) {
                batch.Delete(expanded_keys[offset]);
              } else {
                batch.Delete(db_with_cfh->GetCfh(rand_num),
                             expanded_keys[offset]);
              }
            }
A
Andrew Kryczka 已提交
3331
          } else {
3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344
            GenerateKeyFromInt(begin_num, FLAGS_num, &begin_key);
            GenerateKeyFromInt(begin_num + range_tombstone_width_, FLAGS_num,
                               &end_key);
            if (FLAGS_use_blob_db) {
              s = db_with_cfh->db->DeleteRange(
                  write_options_, db_with_cfh->db->DefaultColumnFamily(),
                  begin_key, end_key);
            } else if (FLAGS_num_column_families <= 1) {
              batch.DeleteRange(begin_key, end_key);
            } else {
              batch.DeleteRange(db_with_cfh->GetCfh(rand_num), begin_key,
                                end_key);
            }
A
Andrew Kryczka 已提交
3345 3346
          }
        }
3347
      }
3348 3349 3350
      if (!FLAGS_use_blob_db) {
        s = db_with_cfh->db->Write(write_options_, &batch);
      }
3351
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
3352
                                entries_per_batch_, kWrite);
J
jorlow@chromium.org 已提交
3353 3354 3355 3356 3357
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
    }
3358
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
3359 3360
  }

3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595
  Status DoDeterministicCompact(ThreadState* thread,
                                CompactionStyle compaction_style,
                                WriteMode write_mode) {
#ifndef ROCKSDB_LITE
    ColumnFamilyMetaData meta;
    std::vector<DB*> db_list;
    if (db_.db != nullptr) {
      db_list.push_back(db_.db);
    } else {
      for (auto& db : multi_dbs_) {
        db_list.push_back(db.db);
      }
    }
    std::vector<Options> options_list;
    for (auto db : db_list) {
      options_list.push_back(db->GetOptions());
      db->SetOptions({{"disable_auto_compactions", "1"},
                      {"level0_slowdown_writes_trigger", "400000000"},
                      {"level0_stop_writes_trigger", "400000000"}});
    }

    assert(!db_list.empty());
    auto num_db = db_list.size();
    size_t num_levels = static_cast<size_t>(open_options_.num_levels);
    size_t output_level = open_options_.num_levels - 1;
    std::vector<std::vector<std::vector<SstFileMetaData>>> sorted_runs(num_db);
    std::vector<size_t> num_files_at_level0(num_db, 0);

    if (compaction_style == kCompactionStyleLevel) {
      if (num_levels == 0) {
        return Status::InvalidArgument("num_levels should be larger than 1");
      }
      bool should_stop = false;
      while (!should_stop) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        for (size_t i = 0; i < num_db; i++) {
          auto db = db_list[i];
          db->Flush(FlushOptions());
          db->GetColumnFamilyMetaData(&meta);
          if (num_files_at_level0[i] == meta.levels[0].files.size() ||
              writes_ == 0) {
            should_stop = true;
            continue;
          }
          sorted_runs[i].emplace_back(
              meta.levels[0].files.begin(),
              meta.levels[0].files.end() - num_files_at_level0[i]);
          num_files_at_level0[i] = meta.levels[0].files.size();
          if (sorted_runs[i].back().size() == 1) {
            should_stop = true;
            continue;
          }
          if (sorted_runs[i].size() == output_level) {
            auto& L1 = sorted_runs[i].back();
            L1.erase(L1.begin(), L1.begin() + L1.size() / 3);
            should_stop = true;
            continue;
          }
        }
        writes_ /= open_options_.max_bytes_for_level_multiplier;
      }
      for (size_t i = 0; i < num_db; i++) {
        if (sorted_runs[i].size() < num_levels - 1) {
          fprintf(stderr, "n is too small to fill %lu levels\n", num_levels);
          exit(1);
        }
      }
      for (size_t i = 0; i < num_db; i++) {
        auto db = db_list[i];
        auto compactionOptions = CompactionOptions();
        auto options = db->GetOptions();
        MutableCFOptions mutable_cf_options(options);
        for (size_t j = 0; j < sorted_runs[i].size(); j++) {
          compactionOptions.output_file_size_limit =
              mutable_cf_options.MaxFileSizeForLevel(
                  static_cast<int>(output_level));
          std::cout << sorted_runs[i][j].size() << std::endl;
          db->CompactFiles(compactionOptions, {sorted_runs[i][j].back().name,
                                               sorted_runs[i][j].front().name},
                           static_cast<int>(output_level - j) /*level*/);
        }
      }
    } else if (compaction_style == kCompactionStyleUniversal) {
      auto ratio = open_options_.compaction_options_universal.size_ratio;
      bool should_stop = false;
      while (!should_stop) {
        if (sorted_runs[0].empty()) {
          DoWrite(thread, write_mode);
        } else {
          DoWrite(thread, UNIQUE_RANDOM);
        }
        for (size_t i = 0; i < num_db; i++) {
          auto db = db_list[i];
          db->Flush(FlushOptions());
          db->GetColumnFamilyMetaData(&meta);
          if (num_files_at_level0[i] == meta.levels[0].files.size() ||
              writes_ == 0) {
            should_stop = true;
            continue;
          }
          sorted_runs[i].emplace_back(
              meta.levels[0].files.begin(),
              meta.levels[0].files.end() - num_files_at_level0[i]);
          num_files_at_level0[i] = meta.levels[0].files.size();
          if (sorted_runs[i].back().size() == 1) {
            should_stop = true;
            continue;
          }
          num_files_at_level0[i] = meta.levels[0].files.size();
        }
        writes_ *= static_cast<double>(100) / (ratio + 200);
      }
      for (size_t i = 0; i < num_db; i++) {
        if (sorted_runs[i].size() < num_levels) {
          fprintf(stderr, "n is too small to fill %lu levels\n", num_levels);
          exit(1);
        }
      }
      for (size_t i = 0; i < num_db; i++) {
        auto db = db_list[i];
        auto compactionOptions = CompactionOptions();
        auto options = db->GetOptions();
        MutableCFOptions mutable_cf_options(options);
        for (size_t j = 0; j < sorted_runs[i].size(); j++) {
          compactionOptions.output_file_size_limit =
              mutable_cf_options.MaxFileSizeForLevel(
                  static_cast<int>(output_level));
          db->CompactFiles(
              compactionOptions,
              {sorted_runs[i][j].back().name, sorted_runs[i][j].front().name},
              (output_level > j ? static_cast<int>(output_level - j)
                                : 0) /*level*/);
        }
      }
    } else if (compaction_style == kCompactionStyleFIFO) {
      return Status::InvalidArgument("FIFO compaction is not supported");
    } else {
      fprintf(stdout,
              "%-12s : skipped (-compaction_stype=kCompactionStyleNone)\n",
              "filldeterministic");
      return Status::InvalidArgument("None compaction is not supported");
    }

// Verify seqno and key range
// Note: the seqno get changed at the max level by implementation
// optimization, so skip the check of the max level.
#ifndef NDEBUG
    for (size_t k = 0; k < num_db; k++) {
      auto db = db_list[k];
      db->GetColumnFamilyMetaData(&meta);
      // verify the number of sorted runs
      if (compaction_style == kCompactionStyleLevel) {
        assert(num_levels - 1 == sorted_runs[k].size());
      } else if (compaction_style == kCompactionStyleUniversal) {
        assert(meta.levels[0].files.size() + num_levels - 1 ==
               sorted_runs[k].size());
      } else if (compaction_style == kCompactionStyleFIFO) {
        // TODO(gzh): FIFO compaction
      }

      // verify smallest/largest seqno and key range of each sorted run
      auto max_level = num_levels - 1;
      int level;
      for (size_t i = 0; i < sorted_runs[k].size(); i++) {
        level = static_cast<int>(max_level - i);
        SequenceNumber sorted_run_smallest_seqno = kMaxSequenceNumber;
        SequenceNumber sorted_run_largest_seqno = 0;
        std::string sorted_run_smallest_key, sorted_run_largest_key;
        bool first_key = true;
        for (auto fileMeta : sorted_runs[k][i]) {
          sorted_run_smallest_seqno =
              std::min(sorted_run_smallest_seqno, fileMeta.smallest_seqno);
          sorted_run_largest_seqno =
              std::max(sorted_run_largest_seqno, fileMeta.largest_seqno);
          if (first_key ||
              db->DefaultColumnFamily()->GetComparator()->Compare(
                  fileMeta.smallestkey, sorted_run_smallest_key) < 0) {
            sorted_run_smallest_key = fileMeta.smallestkey;
          }
          if (first_key ||
              db->DefaultColumnFamily()->GetComparator()->Compare(
                  fileMeta.largestkey, sorted_run_largest_key) > 0) {
            sorted_run_largest_key = fileMeta.largestkey;
          }
          first_key = false;
        }
        if (compaction_style == kCompactionStyleLevel ||
            (compaction_style == kCompactionStyleUniversal && level > 0)) {
          SequenceNumber level_smallest_seqno = kMaxSequenceNumber;
          SequenceNumber level_largest_seqno = 0;
          for (auto fileMeta : meta.levels[level].files) {
            level_smallest_seqno =
                std::min(level_smallest_seqno, fileMeta.smallest_seqno);
            level_largest_seqno =
                std::max(level_largest_seqno, fileMeta.largest_seqno);
          }
          assert(sorted_run_smallest_key ==
                 meta.levels[level].files.front().smallestkey);
          assert(sorted_run_largest_key ==
                 meta.levels[level].files.back().largestkey);
          if (level != static_cast<int>(max_level)) {
            // compaction at max_level would change sequence number
            assert(sorted_run_smallest_seqno == level_smallest_seqno);
            assert(sorted_run_largest_seqno == level_largest_seqno);
          }
        } else if (compaction_style == kCompactionStyleUniversal) {
          // level <= 0 means sorted runs on level 0
          auto level0_file =
              meta.levels[0].files[sorted_runs[k].size() - 1 - i];
          assert(sorted_run_smallest_key == level0_file.smallestkey);
          assert(sorted_run_largest_key == level0_file.largestkey);
          if (level != static_cast<int>(max_level)) {
            assert(sorted_run_smallest_seqno == level0_file.smallest_seqno);
            assert(sorted_run_largest_seqno == level0_file.largest_seqno);
          }
        }
      }
    }
#endif
    // print the size of each sorted_run
    for (size_t k = 0; k < num_db; k++) {
      auto db = db_list[k];
      fprintf(stdout,
              "---------------------- DB %lu LSM ---------------------\n", k);
      db->GetColumnFamilyMetaData(&meta);
      for (auto& levelMeta : meta.levels) {
        if (levelMeta.files.empty()) {
          continue;
        }
        if (levelMeta.level == 0) {
          for (auto& fileMeta : levelMeta.files) {
3596 3597
            fprintf(stdout, "Level[%d]: %s(size: %" PRIu64 " bytes)\n",
                    levelMeta.level, fileMeta.name.c_str(), fileMeta.size);
3598 3599
          }
        } else {
3600
          fprintf(stdout, "Level[%d]: %s - %s(total size: %" PRIi64 " bytes)\n",
3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622
                  levelMeta.level, levelMeta.files.front().name.c_str(),
                  levelMeta.files.back().name.c_str(), levelMeta.size);
        }
      }
    }
    for (size_t i = 0; i < num_db; i++) {
      db_list[i]->SetOptions(
          {{"disable_auto_compactions",
            std::to_string(options_list[i].disable_auto_compactions)},
           {"level0_slowdown_writes_trigger",
            std::to_string(options_list[i].level0_slowdown_writes_trigger)},
           {"level0_stop_writes_trigger",
            std::to_string(options_list[i].level0_stop_writes_trigger)}});
    }
    return Status::OK();
#else
    fprintf(stderr, "Rocksdb Lite doesn't support filldeterministic\n");
    return Status::NotSupported(
        "Rocksdb Lite doesn't support filldeterministic");
#endif  // ROCKSDB_LITE
  }

3623
  void ReadSequential(ThreadState* thread) {
3624 3625
    if (db_.db != nullptr) {
      ReadSequential(thread, db_.db);
3626
    } else {
3627 3628
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadSequential(thread, db_with_cfh.db);
3629 3630 3631 3632 3633
      }
    }
  }

  void ReadSequential(ThreadState* thread, DB* db) {
3634 3635 3636 3637
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;

    Iterator* iter = db->NewIterator(options);
3638
    int64_t i = 0;
3639
    int64_t bytes = 0;
3640
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
3641
      bytes += iter->key().size() + iter->value().size();
3642
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
3643
      ++i;
3644 3645 3646 3647 3648

      if (thread->shared->read_rate_limiter.get() != nullptr &&
          i % 1024 == 1023) {
        thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH);
      }
3649
    }
3650

3651
    delete iter;
3652
    thread->stats.AddBytes(bytes);
3653 3654 3655
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
      thread->stats.AddMessage(perf_context.ToString());
    }
3656 3657
  }

3658
  void ReadReverse(ThreadState* thread) {
3659 3660
    if (db_.db != nullptr) {
      ReadReverse(thread, db_.db);
3661
    } else {
3662 3663
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadReverse(thread, db_with_cfh.db);
3664 3665 3666 3667 3668 3669
      }
    }
  }

  void ReadReverse(ThreadState* thread, DB* db) {
    Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
3670
    int64_t i = 0;
3671
    int64_t bytes = 0;
3672
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
3673
      bytes += iter->key().size() + iter->value().size();
3674
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
3675
      ++i;
3676 3677 3678 3679
      if (thread->shared->read_rate_limiter.get() != nullptr &&
          i % 1024 == 1023) {
        thread->shared->read_rate_limiter->Request(1024, Env::IO_HIGH);
      }
3680 3681
    }
    delete iter;
3682
    thread->stats.AddBytes(bytes);
3683 3684
  }

L
Lei Jin 已提交
3685 3686 3687
  void ReadRandomFast(ThreadState* thread) {
    int64_t read = 0;
    int64_t found = 0;
3688
    int64_t nonexist = 0;
L
Lei Jin 已提交
3689
    ReadOptions options(FLAGS_verify_checksum, true);
3690 3691
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705
    std::string value;
    DB* db = SelectDBWithCfh(thread)->db;

    int64_t pot = 1;
    while (pot < FLAGS_num) {
      pot <<= 1;
    }

    Duration duration(FLAGS_duration, reads_);
    do {
      for (int i = 0; i < 100; ++i) {
        int64_t key_rand = thread->rand.Next() & (pot - 1);
        GenerateKeyFromInt(key_rand, FLAGS_num, &key);
        ++read;
3706 3707
        auto status = db->Get(options, key, &value);
        if (status.ok()) {
L
Lei Jin 已提交
3708
          ++found;
3709
        } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3710 3711
          fprintf(stderr, "Get returned an error: %s\n",
                  status.ToString().c_str());
3712
          abort();
L
Lei Jin 已提交
3713
        }
3714 3715 3716
        if (key_rand >= FLAGS_num) {
          ++nonexist;
        }
L
Lei Jin 已提交
3717
      }
3718 3719 3720 3721
      if (thread->shared->read_rate_limiter.get() != nullptr) {
        thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH);
      }

3722
      thread->stats.FinishedOps(nullptr, db, 100, kRead);
L
Lei Jin 已提交
3723 3724 3725
    } while (!duration.Done(100));

    char msg[100];
3726 3727 3728
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
             "issued %" PRIu64 " non-exist keys)\n",
             found, read, nonexist);
L
Lei Jin 已提交
3729 3730 3731

    thread->stats.AddMessage(msg);

3732
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
L
Lei Jin 已提交
3733 3734 3735 3736
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747
  int64_t GetRandomKey(Random64* rand) {
    uint64_t rand_int = rand->Next();
    int64_t key_rand;
    if (read_random_exp_range_ == 0) {
      key_rand = rand_int % FLAGS_num;
    } else {
      const uint64_t kBigInt = static_cast<uint64_t>(1U) << 62;
      long double order = -static_cast<long double>(rand_int % kBigInt) /
                          static_cast<long double>(kBigInt) *
                          read_random_exp_range_;
      long double exp_ran = std::exp(order);
3748
      uint64_t rand_num =
3749
          static_cast<int64_t>(exp_ran * static_cast<long double>(FLAGS_num));
3750 3751 3752 3753
      // Map to a different number to avoid locality.
      const uint64_t kBigPrime = 0x5bd1e995;
      // Overflow is like %(2^64). Will have little impact of results.
      key_rand = static_cast<int64_t>((rand_num * kBigPrime) % FLAGS_num);
3754 3755 3756 3757
    }
    return key_rand;
  }

3758
  void ReadRandom(ThreadState* thread) {
L
Lei Jin 已提交
3759
    int64_t read = 0;
L
Lei Jin 已提交
3760
    int64_t found = 0;
3761
    int64_t bytes = 0;
L
Lei Jin 已提交
3762
    ReadOptions options(FLAGS_verify_checksum, true);
3763 3764
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3765
    std::string value;
3766

L
Lei Jin 已提交
3767 3768
    Duration duration(FLAGS_duration, reads_);
    while (!duration.Done(1)) {
3769 3770 3771 3772
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
      // We use same key_rand as seed for key and column family so that we can
      // deterministically find the cfh corresponding to a particular key, as it
      // is done in DoWrite method.
3773
      int64_t key_rand = GetRandomKey(&thread->rand);
3774
      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
L
Lei Jin 已提交
3775
      read++;
3776 3777
      Status s;
      if (FLAGS_num_column_families > 1) {
3778 3779
        s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
                                 &value);
3780 3781 3782 3783
      } else {
        s = db_with_cfh->db->Get(options, key, &value);
      }
      if (s.ok()) {
L
Lei Jin 已提交
3784
        found++;
3785
        bytes += key.size() + value.size();
3786
      } else if (!s.IsNotFound()) {
I
Igor Canadi 已提交
3787
        fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
3788
        abort();
M
Mark Callaghan 已提交
3789
      }
3790 3791 3792 3793 3794 3795

      if (thread->shared->read_rate_limiter.get() != nullptr &&
          read % 256 == 255) {
        thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH);
      }

3796
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
S
Sanjay Ghemawat 已提交
3797
    }
3798

S
Sanjay Ghemawat 已提交
3799
    char msg[100];
L
Lei Jin 已提交
3800
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
3801
             found, read);
3802

3803
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
3804
    thread->stats.AddMessage(msg);
3805

3806
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
3807 3808
      thread->stats.AddMessage(perf_context.ToString());
    }
S
Sanjay Ghemawat 已提交
3809 3810
  }

L
Lei Jin 已提交
3811 3812 3813 3814
  // Calls MultiGet over a list of keys from a random distribution.
  // Returns the total number of keys found.
  void MultiReadRandom(ThreadState* thread) {
    int64_t read = 0;
3815
    int64_t num_multireads = 0;
3816
    int64_t found = 0;
3817
    ReadOptions options(FLAGS_verify_checksum, true);
S
sdong 已提交
3818
    std::vector<Slice> keys;
3819
    std::vector<std::unique_ptr<const char[]> > key_guards;
L
Lei Jin 已提交
3820
    std::vector<std::string> values(entries_per_batch_);
3821
    while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
3822
      key_guards.push_back(std::unique_ptr<const char[]>());
3823
      keys.push_back(AllocateKey(&key_guards.back()));
J
jorlow@chromium.org 已提交
3824 3825
    }

M
Mark Callaghan 已提交
3826
    Duration duration(FLAGS_duration, reads_);
L
Lei Jin 已提交
3827
    while (!duration.Done(1)) {
3828
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3829
      for (int64_t i = 0; i < entries_per_batch_; ++i) {
3830
        GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
3831
      }
3832
      std::vector<Status> statuses = db->MultiGet(options, keys, &values);
3833
      assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
L
Lei Jin 已提交
3834 3835

      read += entries_per_batch_;
3836
      num_multireads++;
L
Lei Jin 已提交
3837 3838
      for (int64_t i = 0; i < entries_per_batch_; ++i) {
        if (statuses[i].ok()) {
3839
          ++found;
3840 3841 3842 3843
        } else if (!statuses[i].IsNotFound()) {
          fprintf(stderr, "MultiGet returned an error: %s\n",
                  statuses[i].ToString().c_str());
          abort();
3844 3845
        }
      }
3846 3847 3848 3849 3850
      if (thread->shared->read_rate_limiter.get() != nullptr &&
          num_multireads % 256 == 255) {
        thread->shared->read_rate_limiter->Request(256 * entries_per_batch_,
                                                   Env::IO_HIGH);
      }
3851
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
3852
    }
3853 3854

    char msg[100];
3855
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
L
Lei Jin 已提交
3856
             found, read);
3857
    thread->stats.AddMessage(msg);
3858 3859
  }

3860 3861 3862 3863
  void IteratorCreation(ThreadState* thread) {
    Duration duration(FLAGS_duration, reads_);
    ReadOptions options(FLAGS_verify_checksum, true);
    while (!duration.Done(1)) {
3864 3865
      DB* db = SelectDB(thread);
      Iterator* iter = db->NewIterator(options);
3866
      delete iter;
3867
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
3868 3869 3870
    }
  }

3871 3872 3873 3874
  void IteratorCreationWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      IteratorCreation(thread);
    } else {
3875
      BGWriter(thread, kWrite);
3876 3877 3878
    }
  }

S
Sanjay Ghemawat 已提交
3879
  void SeekRandom(ThreadState* thread) {
L
Lei Jin 已提交
3880
    int64_t read = 0;
3881
    int64_t found = 0;
3882
    int64_t bytes = 0;
L
Lei Jin 已提交
3883 3884
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;
3885 3886 3887

    Iterator* single_iter = nullptr;
    std::vector<Iterator*> multi_iters;
3888 3889
    if (db_.db != nullptr) {
      single_iter = db_.db->NewIterator(options);
3890
    } else {
3891 3892
      for (const auto& db_with_cfh : multi_dbs_) {
        multi_iters.push_back(db_with_cfh.db->NewIterator(options));
3893 3894 3895
      }
    }

3896 3897
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3898 3899

    Duration duration(FLAGS_duration, reads_);
3900
    char value_buffer[256];
M
Mark Callaghan 已提交
3901
    while (!duration.Done(1)) {
M
Mark Callaghan 已提交
3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912
      if (!FLAGS_use_tailing_iterator) {
        if (db_.db != nullptr) {
          delete single_iter;
          single_iter = db_.db->NewIterator(options);
        } else {
          for (auto iter : multi_iters) {
            delete iter;
          }
          multi_iters.clear();
          for (const auto& db_with_cfh : multi_dbs_) {
            multi_iters.push_back(db_with_cfh.db->NewIterator(options));
3913 3914 3915
          }
        }
      }
3916 3917 3918 3919 3920 3921
      // Pick a Iterator to use
      Iterator* iter_to_use = single_iter;
      if (single_iter == nullptr) {
        iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
      }

L
Lei Jin 已提交
3922
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
3923
      iter_to_use->Seek(key);
L
Lei Jin 已提交
3924
      read++;
3925
      if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
L
Lei Jin 已提交
3926 3927
        found++;
      }
3928 3929 3930 3931 3932 3933

      for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
        // Copy out iterator's value to make sure we read them.
        Slice value = iter_to_use->value();
        memcpy(value_buffer, value.data(),
               std::min(value.size(), sizeof(value_buffer)));
3934
        bytes += iter_to_use->key().size() + iter_to_use->value().size();
M
Mark Callaghan 已提交
3935 3936 3937 3938 3939 3940

        if (!FLAGS_reverse_iterator) {
          iter_to_use->Next();
        } else {
          iter_to_use->Prev();
        }
3941 3942 3943
        assert(iter_to_use->status().ok());
      }

3944 3945 3946 3947 3948
      if (thread->shared->read_rate_limiter.get() != nullptr &&
          read % 256 == 255) {
        thread->shared->read_rate_limiter->Request(256, Env::IO_HIGH);
      }

3949
      thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
S
Sanjay Ghemawat 已提交
3950
    }
3951 3952 3953 3954
    delete single_iter;
    for (auto iter : multi_iters) {
      delete iter;
    }
L
Lei Jin 已提交
3955

S
Sanjay Ghemawat 已提交
3956
    char msg[100];
L
Lei Jin 已提交
3957
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
3958
             found, read);
3959
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
3960
    thread->stats.AddMessage(msg);
3961
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
L
Lei Jin 已提交
3962 3963
      thread->stats.AddMessage(perf_context.ToString());
    }
S
Sanjay Ghemawat 已提交
3964
  }
L
Lei Jin 已提交
3965 3966 3967 3968 3969

  void SeekRandomWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
3970
      BGWriter(thread, kWrite);
L
Lei Jin 已提交
3971 3972
    }
  }
S
Sanjay Ghemawat 已提交
3973

3974 3975 3976 3977 3978 3979 3980 3981
  void SeekRandomWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

S
Sanjay Ghemawat 已提交
3982 3983
  void DoDelete(ThreadState* thread, bool seq) {
    WriteBatch batch;
Y
Yueh-Hsuan Chiang 已提交
3984
    Duration duration(seq ? 0 : FLAGS_duration, deletes_);
L
Lei Jin 已提交
3985
    int64_t i = 0;
3986 3987
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3988

M
Mark Callaghan 已提交
3989
    while (!duration.Done(entries_per_batch_)) {
3990
      DB* db = SelectDB(thread);
S
Sanjay Ghemawat 已提交
3991
      batch.Clear();
L
Lei Jin 已提交
3992 3993 3994
      for (int64_t j = 0; j < entries_per_batch_; ++j) {
        const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
        GenerateKeyFromInt(k, FLAGS_num, &key);
3995
        batch.Delete(key);
S
Sanjay Ghemawat 已提交
3996
      }
3997
      auto s = db->Write(write_options_, &batch);
3998
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
S
Sanjay Ghemawat 已提交
3999 4000 4001 4002
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
L
Lei Jin 已提交
4003
      i += entries_per_batch_;
S
Sanjay Ghemawat 已提交
4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

4015 4016 4017 4018
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
4019
      BGWriter(thread, kWrite);
4020 4021
    }
  }
4022

M
Mark Callaghan 已提交
4023 4024 4025 4026 4027 4028 4029 4030
  void ReadWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

4031
  void BGWriter(ThreadState* thread, enum OperationType write_merge) {
4032 4033
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
4034
    int64_t bytes = 0;
4035

4036 4037 4038 4039 4040
    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
4041 4042 4043 4044

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

4045 4046
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
4047 4048

    while (true) {
4049
      DB* db = SelectDB(thread);
4050 4051 4052 4053 4054
      {
        MutexLock l(&thread->shared->mu);
        if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
          // Other threads have finished
          break;
4055
        }
4056 4057 4058
      }

      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
4059 4060
      Status s;

4061
      if (write_merge == kWrite) {
4062
        s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
4063
      } else {
4064
        s = db->Merge(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
4065 4066
      }

4067
      if (!s.ok()) {
M
Mark Callaghan 已提交
4068
        fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
4069 4070
        exit(1);
      }
4071
      bytes += key.size() + value_size_;
4072
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
4073

4074 4075 4076 4077
      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_),
            Env::IO_HIGH);
4078 4079
      }
    }
4080
    thread->stats.AddBytes(bytes);
4081 4082
  }

4083
  // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
4084
  // in DB atomically i.e in a single batch. Also refer GetMany.
4085 4086
  Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
                 const Slice& value) {
4087 4088 4089 4090 4091 4092 4093 4094 4095 4096
    std::string suffixes[3] = {"2", "1", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Put(keys[i], value);
    }

4097
    s = db->Write(writeoptions, &batch);
4098 4099 4100 4101 4102
    return s;
  }


  // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
4103
  // in DB atomically i.e in a single batch. Also refer GetMany.
4104 4105
  Status DeleteMany(DB* db, const WriteOptions& writeoptions,
                    const Slice& key) {
4106 4107 4108 4109 4110 4111 4112 4113 4114 4115
    std::string suffixes[3] = {"1", "2", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Delete(keys[i]);
    }

4116
    s = db->Write(writeoptions, &batch);
4117 4118 4119 4120 4121
    return s;
  }

  // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
  // in the same snapshot, and verifies that all the values are identical.
4122
  // ASSUMES that PutMany was used to put (K, V) into the DB.
4123 4124
  Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
                 std::string* value) {
4125 4126 4127 4128 4129
    std::string suffixes[3] = {"0", "1", "2"};
    std::string keys[3];
    Slice key_slices[3];
    std::string values[3];
    ReadOptions readoptionscopy = readoptions;
4130
    readoptionscopy.snapshot = db->GetSnapshot();
4131 4132 4133 4134
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      key_slices[i] = keys[i];
4135
      s = db->Get(readoptionscopy, key_slices[i], value);
4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
      } else {
        values[i] = *value;
      }
    }
4147
    db->ReleaseSnapshot(readoptionscopy.snapshot);
4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160

    if ((values[0] != values[1]) || (values[1] != values[2])) {
      fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
              key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
              values[2].c_str());
      // we continue after error rather than exiting so that we can
      // find more errors if any
    }

    return s;
  }

  // Differs from readrandomwriterandom in the following ways:
4161
  // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
4162 4163 4164 4165
  // (b) Does deletes as well (per FLAGS_deletepercent)
  // (c) In order to achieve high % of 'found' during lookups, and to do
  //     multiple writes (including puts and deletes) it uses upto
  //     FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
4166
  // (d) Does not have a MultiGet option.
4167 4168 4169 4170
  void RandomWithVerify(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
4171
    int64_t found = 0;
4172 4173 4174
    int get_weight = 0;
    int put_weight = 0;
    int delete_weight = 0;
4175 4176 4177
    int64_t gets_done = 0;
    int64_t puts_done = 0;
    int64_t deletes_done = 0;
4178

4179 4180
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
4181

4182
    // the number of iterations is the larger of read_ or write_
4183
    for (int64_t i = 0; i < readwrites_; i++) {
4184
      DB* db = SelectDB(thread);
4185
      if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
4186
        // one batch completed, reinitialize for next batch
4187 4188 4189 4190
        get_weight = FLAGS_readwritepercent;
        delete_weight = FLAGS_deletepercent;
        put_weight = 100 - get_weight - delete_weight;
      }
L
Lei Jin 已提交
4191 4192
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
          FLAGS_numdistinct, &key);
4193 4194
      if (get_weight > 0) {
        // do all the gets first
4195
        Status s = GetMany(db, options, key, &value);
4196
        if (!s.ok() && !s.IsNotFound()) {
4197
          fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
4198 4199 4200 4201 4202 4203 4204
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
        get_weight--;
        gets_done++;
4205
        thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
4206 4207 4208
      } else if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
4209
        Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
4210
        if (!s.ok()) {
4211
          fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
4212 4213 4214 4215
          exit(1);
        }
        put_weight--;
        puts_done++;
4216
        thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
4217
      } else if (delete_weight > 0) {
4218
        Status s = DeleteMany(db, write_options_, key);
4219
        if (!s.ok()) {
4220
          fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
4221 4222 4223 4224
          exit(1);
        }
        delete_weight--;
        deletes_done++;
4225
        thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
4226 4227 4228
      }
    }
    char msg[100];
4229
    snprintf(msg, sizeof(msg),
4230 4231
             "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
             PRIu64 " found:%" PRIu64 ")",
4232 4233 4234 4235
             gets_done, puts_done, deletes_done, readwrites_, found);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
4236
  // This is different from ReadWhileWriting because it does not use
4237
  // an extra thread.
4238 4239 4240 4241
  void ReadRandomWriteRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
4242
    int64_t found = 0;
4243 4244
    int get_weight = 0;
    int put_weight = 0;
4245 4246
    int64_t reads_done = 0;
    int64_t writes_done = 0;
M
Mark Callaghan 已提交
4247 4248
    Duration duration(FLAGS_duration, readwrites_);

4249 4250
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
4251

4252
    // the number of iterations is the larger of read_ or write_
M
Mark Callaghan 已提交
4253
    while (!duration.Done(1)) {
4254
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
4255
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
4256
      if (get_weight == 0 && put_weight == 0) {
X
Xing Jin 已提交
4257
        // one batch completed, reinitialize for next batch
4258 4259 4260 4261 4262
        get_weight = FLAGS_readwritepercent;
        put_weight = 100 - get_weight;
      }
      if (get_weight > 0) {
        // do all the gets first
4263
        Status s = db->Get(options, key, &value);
4264 4265 4266 4267 4268 4269 4270
        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
4271 4272
        get_weight--;
        reads_done++;
4273
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
4274 4275 4276
      } else  if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
4277
        Status s = db->Put(write_options_, key, gen.Generate(value_size_));
4278 4279 4280 4281 4282 4283
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        put_weight--;
        writes_done++;
4284
        thread->stats.FinishedOps(nullptr, db, 1, kWrite);
4285 4286 4287
      }
    }
    char msg[100];
4288 4289
    snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
             " total:%" PRIu64 " found:%" PRIu64 ")",
4290
             reads_done, writes_done, readwrites_, found);
4291 4292 4293
    thread->stats.AddMessage(msg);
  }

M
Mark Callaghan 已提交
4294 4295 4296 4297 4298 4299
  //
  // Read-modify-write for random keys
  void UpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
4300
    int64_t found = 0;
4301
    int64_t bytes = 0;
M
Mark Callaghan 已提交
4302 4303
    Duration duration(FLAGS_duration, readwrites_);

4304 4305
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
M
Mark Callaghan 已提交
4306 4307
    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
4308
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
4309
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
4310

4311 4312 4313
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
4314
        bytes += key.size() + value.size();
4315
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
4316 4317
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
4318
        abort();
M
Mark Callaghan 已提交
4319 4320
      }

4321
      Status s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
4322 4323 4324 4325
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
4326
      bytes += key.size() + value_size_;
4327
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
M
Mark Callaghan 已提交
4328 4329
    }
    char msg[100];
4330
    snprintf(msg, sizeof(msg),
4331
             "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
4332
    thread->stats.AddBytes(bytes);
M
Mark Callaghan 已提交
4333 4334 4335
    thread->stats.AddMessage(msg);
  }

D
Deon Nicholas 已提交
4336 4337 4338 4339 4340 4341 4342
  // Read-modify-write for random keys.
  // Each operation causes the key grow by value_size (simulating an append).
  // Generally used for benchmarking against merges of similar type
  void AppendRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
4343
    int64_t found = 0;
4344
    int64_t bytes = 0;
D
Deon Nicholas 已提交
4345

4346 4347
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
4348 4349 4350
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
4351
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
4352
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
D
Deon Nicholas 已提交
4353

4354 4355 4356
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
4357
        bytes += key.size() + value.size();
4358
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
4359 4360
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
4361
        abort();
D
Deon Nicholas 已提交
4362 4363 4364 4365 4366 4367 4368 4369
      } else {
        // If not existing, then just assume an empty string of data
        value.clear();
      }

      // Update the value (by appending data)
      Slice operand = gen.Generate(value_size_);
      if (value.size() > 0) {
4370
        // Use a delimiter to match the semantics for StringAppendOperator
D
Deon Nicholas 已提交
4371 4372 4373 4374 4375
        value.append(1,',');
      }
      value.append(operand.data(), operand.size());

      // Write back to the database
4376
      Status s = db->Put(write_options_, key, value);
D
Deon Nicholas 已提交
4377 4378 4379 4380
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
4381
      bytes += key.size() + value.size();
4382
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
D
Deon Nicholas 已提交
4383
    }
L
Lei Jin 已提交
4384

D
Deon Nicholas 已提交
4385
    char msg[100];
4386 4387
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
            readwrites_, found);
4388
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
4389 4390 4391 4392 4393 4394 4395 4396 4397 4398
    thread->stats.AddMessage(msg);
  }

  // Read-modify-write for random keys (using MergeOperator)
  // The merge operator to use should be defined by FLAGS_merge_operator
  // Adjust FLAGS_value_size so that the keys are reasonable for this operator
  // Assumes that the merge operator is non-null (i.e.: is well-defined)
  //
  // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
  // to simulate random additions over 64-bit integers using merge.
4399 4400 4401
  //
  // The number of merges on the same key can be controlled by adjusting
  // FLAGS_merge_keys.
D
Deon Nicholas 已提交
4402 4403
  void MergeRandom(ThreadState* thread) {
    RandomGenerator gen;
4404
    int64_t bytes = 0;
4405 4406
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
4407 4408 4409
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
4410
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
4411
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
D
Deon Nicholas 已提交
4412

4413
      Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
D
Deon Nicholas 已提交
4414 4415 4416 4417 4418

      if (!s.ok()) {
        fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
        exit(1);
      }
4419
      bytes += key.size() + value_size_;
4420
      thread->stats.FinishedOps(nullptr, db, 1, kMerge);
D
Deon Nicholas 已提交
4421 4422 4423 4424
    }

    // Print some statistics
    char msg[100];
4425
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
4426
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
4427 4428 4429
    thread->stats.AddMessage(msg);
  }

4430 4431 4432 4433 4434 4435 4436 4437 4438 4439 4440
  // Read and merge random keys. The amount of reads and merges are controlled
  // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
  // keys (and thus also the number of reads and merges on the same key) can be
  // adjusted with FLAGS_merge_keys.
  //
  // As with MergeRandom, the merge operator to use should be defined by
  // FLAGS_merge_operator.
  void ReadRandomMergeRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
4441 4442 4443
    int64_t num_hits = 0;
    int64_t num_gets = 0;
    int64_t num_merges = 0;
4444 4445
    size_t max_length = 0;

4446 4447
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
4448 4449 4450
    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
4451
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
4452
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
4453 4454 4455 4456

      bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;

      if (do_merge) {
4457
        Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
4458 4459 4460 4461 4462
        if (!s.ok()) {
          fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
          exit(1);
        }
        num_merges++;
4463
        thread->stats.FinishedOps(nullptr, db, 1, kMerge);
4464
      } else {
4465
        Status s = db->Get(options, key, &value);
4466 4467 4468 4469 4470 4471 4472 4473 4474 4475 4476
        if (value.length() > max_length)
          max_length = value.length();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          num_hits++;
        }
        num_gets++;
4477
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
4478 4479
      }
    }
L
Lei Jin 已提交
4480

4481 4482
    char msg[100];
    snprintf(msg, sizeof(msg),
S
sdong 已提交
4483 4484
             "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64
             " hits:%" PRIu64 " maxlength:%" ROCKSDB_PRIszt ")",
4485 4486 4487 4488
             num_gets, num_merges, readwrites_, num_hits, max_length);
    thread->stats.AddMessage(msg);
  }

T
Tomislav Novak 已提交
4489 4490 4491 4492 4493 4494 4495 4496 4497 4498
  void WriteSeqSeekSeq(ThreadState* thread) {
    writes_ = FLAGS_num;
    DoWrite(thread, SEQUENTIAL);
    // exclude writes from the ops/sec calculation
    thread->stats.Start(thread->tid);

    DB* db = SelectDB(thread);
    std::unique_ptr<Iterator> iter(
      db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));

4499 4500
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
T
Tomislav Novak 已提交
4501 4502 4503 4504
    for (int64_t i = 0; i < FLAGS_num; ++i) {
      GenerateKeyFromInt(i, FLAGS_num, &key);
      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
4505
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
4506

4507
      for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
M
Mark Callaghan 已提交
4508 4509 4510 4511 4512
        if (!FLAGS_reverse_iterator) {
          iter->Next();
        } else {
          iter->Prev();
        }
T
Tomislav Novak 已提交
4513 4514
        GenerateKeyFromInt(++i, FLAGS_num, &key);
        assert(iter->Valid() && iter->key() == key);
4515
        thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
4516 4517 4518 4519
      }

      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
4520
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
4521 4522 4523
    }
  }

4524
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
4525 4526 4527 4528 4529 4530 4531 4532 4533 4534 4535 4536 4537 4538 4539 4540
  // This benchmark stress tests Transactions.  For a given --duration (or
  // total number of --writes, a Transaction will perform a read-modify-write
  // to increment the value of a key in each of N(--transaction-sets) sets of
  // keys (where each set has --num keys).  If --threads is set, this will be
  // done in parallel.
  //
  // To test transactions, use --transaction_db=true.  Not setting this
  // parameter
  // will run the same benchmark without transactions.
  //
  // RandomTransactionVerify() will then validate the correctness of the results
  // by checking if the sum of all keys in each set is the same.
  void RandomTransaction(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    Duration duration(FLAGS_duration, readwrites_);
    ReadOptions read_options(FLAGS_verify_checksum, true);
S
SherlockNoMad 已提交
4541
    uint16_t num_prefix_ranges = static_cast<uint16_t>(FLAGS_transaction_sets);
A
agiardullo 已提交
4542
    uint64_t transactions_done = 0;
A
agiardullo 已提交
4543 4544 4545 4546 4547 4548

    if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
      fprintf(stderr, "invalid value for transaction_sets\n");
      abort();
    }

A
agiardullo 已提交
4549 4550 4551 4552 4553 4554 4555 4556
    TransactionOptions txn_options;
    txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
    txn_options.set_snapshot = FLAGS_transaction_set_snapshot;

    RandomTransactionInserter inserter(&thread->rand, write_options_,
                                       read_options, FLAGS_num,
                                       num_prefix_ranges);

A
agiardullo 已提交
4557 4558 4559 4560 4561 4562 4563 4564
    if (FLAGS_num_multi_db > 1) {
      fprintf(stderr,
              "Cannot run RandomTransaction benchmark with "
              "FLAGS_multi_db > 1.");
      abort();
    }

    while (!duration.Done(1)) {
A
agiardullo 已提交
4565
      bool success;
A
agiardullo 已提交
4566

A
agiardullo 已提交
4567 4568
      // RandomTransactionInserter will attempt to insert a key for each
      // # of FLAGS_transaction_sets
A
agiardullo 已提交
4569
      if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
4570
        success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
A
agiardullo 已提交
4571 4572
      } else if (FLAGS_transaction_db) {
        TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);
A
agiardullo 已提交
4573
        success = inserter.TransactionDBInsert(txn_db, txn_options);
A
agiardullo 已提交
4574
      } else {
A
agiardullo 已提交
4575
        success = inserter.DBInsert(db_.db);
A
agiardullo 已提交
4576 4577
      }

A
agiardullo 已提交
4578 4579 4580 4581
      if (!success) {
        fprintf(stderr, "Unexpected error: %s\n",
                inserter.GetLastStatus().ToString().c_str());
        abort();
4582 4583
      }

A
agiardullo 已提交
4584
      thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
A
agiardullo 已提交
4585 4586 4587 4588
      transactions_done++;
    }

    char msg[100];
A
agiardullo 已提交
4589
    if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
A
agiardullo 已提交
4590 4591
      snprintf(msg, sizeof(msg),
               "( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
A
agiardullo 已提交
4592
               transactions_done, inserter.GetFailureCount());
A
agiardullo 已提交
4593 4594 4595 4596 4597
    } else {
      snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
    }
    thread->stats.AddMessage(msg);

4598
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
A
agiardullo 已提交
4599 4600 4601 4602 4603 4604 4605 4606
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

  // Verifies consistency of data after RandomTransaction() has been run.
  // Since each iteration of RandomTransaction() incremented a key in each set
  // by the same value, the sum of the keys in each set should be the same.
  void RandomTransactionVerify() {
A
agiardullo 已提交
4607
    if (!FLAGS_transaction_db && !FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
4608 4609 4610 4611
      // transactions not used, nothing to verify.
      return;
    }

A
agiardullo 已提交
4612
    Status s =
S
SherlockNoMad 已提交
4613 4614
        RandomTransactionInserter::Verify(db_.db,
                            static_cast<uint16_t>(FLAGS_transaction_sets));
A
agiardullo 已提交
4615

A
agiardullo 已提交
4616 4617 4618 4619
    if (s.ok()) {
      fprintf(stdout, "RandomTransactionVerify Success.\n");
    } else {
      fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
A
agiardullo 已提交
4620 4621
    }
  }
4622
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
4623

A
Andres Noetzli 已提交
4624 4625 4626 4627 4628 4629 4630 4631 4632 4633 4634 4635 4636 4637 4638 4639 4640 4641 4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652 4653 4654 4655 4656 4657 4658 4659 4660 4661 4662 4663 4664 4665 4666 4667 4668 4669 4670 4671 4672
  // Writes and deletes random keys without overwriting keys.
  //
  // This benchmark is intended to partially replicate the behavior of MyRocks
  // secondary indices: All data is stored in keys and updates happen by
  // deleting the old version of the key and inserting the new version.
  void RandomReplaceKeys(ThreadState* thread) {
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
    size_t max_counter = 50;
    RandomGenerator gen;

    Status s;
    DB* db = SelectDB(thread);
    for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
      GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
      s = db->Put(write_options_, key, gen.Generate(value_size_));
      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }
    }

    db->GetSnapshot();

    std::default_random_engine generator;
    std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
                                                  FLAGS_stddev);
    Duration duration(FLAGS_duration, FLAGS_num);
    while (!duration.Done(1)) {
      int64_t rnd_id = static_cast<int64_t>(distribution(generator));
      int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
                                static_cast<int64_t>(0));
      GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                         &key);
      s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
                                   : db->Delete(write_options_, key);
      if (s.ok()) {
        counters[key_id] = (counters[key_id] + 1) % max_counter;
        GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                           &key);
        s = db->Put(write_options_, key, Slice());
      }

      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }

4673
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
A
Andres Noetzli 已提交
4674 4675 4676 4677 4678 4679 4680 4681 4682 4683
    }

    char msg[200];
    snprintf(msg, sizeof(msg),
             "use single deletes: %d, "
             "standard deviation: %lf\n",
             FLAGS_use_single_deletes, FLAGS_stddev);
    thread->stats.AddMessage(msg);
  }

4684 4685 4686 4687 4688 4689 4690 4691 4692 4693 4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718 4719 4720 4721 4722 4723 4724 4725 4726 4727 4728 4729 4730 4731 4732 4733 4734 4735 4736 4737 4738 4739 4740 4741 4742 4743 4744 4745
  void TimeSeriesReadOrDelete(ThreadState* thread, bool do_deletion) {
    ReadOptions options(FLAGS_verify_checksum, true);
    int64_t read = 0;
    int64_t found = 0;
    int64_t bytes = 0;

    Iterator* iter = nullptr;
    // Only work on single database
    assert(db_.db != nullptr);
    iter = db_.db->NewIterator(options);

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);

    char value_buffer[256];
    while (true) {
      {
        MutexLock l(&thread->shared->mu);
        if (thread->shared->num_done >= 1) {
          // Write thread have finished
          break;
        }
      }
      if (!FLAGS_use_tailing_iterator) {
        delete iter;
        iter = db_.db->NewIterator(options);
      }
      // Pick a Iterator to use

      int64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
      GenerateKeyFromInt(key_id, FLAGS_num, &key);
      // Reset last 8 bytes to 0
      char* start = const_cast<char*>(key.data());
      start += key.size() - 8;
      memset(start, 0, 8);
      ++read;

      bool key_found = false;
      // Seek the prefix
      for (iter->Seek(key); iter->Valid() && iter->key().starts_with(key);
           iter->Next()) {
        key_found = true;
        // Copy out iterator's value to make sure we read them.
        if (do_deletion) {
          bytes += iter->key().size();
          if (KeyExpired(timestamp_emulator_.get(), iter->key())) {
            thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
            db_.db->Delete(write_options_, iter->key());
          } else {
            break;
          }
        } else {
          bytes += iter->key().size() + iter->value().size();
          thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
          Slice value = iter->value();
          memcpy(value_buffer, value.data(),
                 std::min(value.size(), sizeof(value_buffer)));

          assert(iter->status().ok());
        }
      }
      found += key_found;
4746 4747 4748 4749

      if (thread->shared->read_rate_limiter.get() != nullptr) {
        thread->shared->read_rate_limiter->Request(1, Env::IO_HIGH);
      }
4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792 4793 4794 4795 4796 4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833 4834
    }
    delete iter;

    char msg[100];
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found,
             read);
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(msg);
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

  void TimeSeriesWrite(ThreadState* thread) {
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
    int64_t bytes = 0;

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);

    Duration duration(FLAGS_duration, writes_);
    while (!duration.Done(1)) {
      DB* db = SelectDB(thread);

      uint64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
      // Write key id
      GenerateKeyFromInt(key_id, FLAGS_num, &key);
      // Write timestamp

      char* start = const_cast<char*>(key.data());
      char* pos = start + 8;
      int bytes_to_fill =
          std::min(key_size_ - static_cast<int>(pos - start), 8);
      uint64_t timestamp_value = timestamp_emulator_->Get();
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&timestamp_value), bytes_to_fill);
      }

      timestamp_emulator_->Inc();

      Status s;

      s = db->Put(write_options_, key, gen.Generate(value_size_));

      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      bytes = key.size() + value_size_;
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
      thread->stats.AddBytes(bytes);

      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH);
      }
    }
  }

  void TimeSeries(ThreadState* thread) {
    if (thread->tid > 0) {
      bool do_deletion = FLAGS_expire_style == "delete" &&
                         thread->tid <= FLAGS_num_deletion_threads;
      TimeSeriesReadOrDelete(thread, do_deletion);
    } else {
      TimeSeriesWrite(thread);
      thread->stats.Stop();
      thread->stats.Report("timeseries write");
    }
  }

4835
  void Compact(ThreadState* thread) {
4836
    DB* db = SelectDB(thread);
4837
    db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
J
jorlow@chromium.org 已提交
4838 4839
  }

S
Sanjay Ghemawat 已提交
4840
  void PrintStats(const char* key) {
4841 4842
    if (db_.db != nullptr) {
      PrintStats(db_.db, key, false);
4843
    }
4844 4845
    for (const auto& db_with_cfh : multi_dbs_) {
      PrintStats(db_with_cfh.db, key, true);
4846 4847 4848 4849 4850 4851 4852
    }
  }

  void PrintStats(DB* db, const char* key, bool print_header = false) {
    if (print_header) {
      fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
    }
4853
    std::string stats;
4854
    if (!db->GetProperty(key, &stats)) {
4855
      stats = "(failed)";
4856
    }
4857
    fprintf(stdout, "\n%s\n", stats.c_str());
4858
  }
J
jorlow@chromium.org 已提交
4859 4860
};

4861
int db_bench_tool(int argc, char** argv) {
I
Igor Canadi 已提交
4862
  rocksdb::port::InstallStackTraceHandler();
4863 4864 4865 4866 4867 4868
  static bool initialized = false;
  if (!initialized) {
    SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                    " [OPTIONS]...");
    initialized = true;
  }
4869
  ParseCommandLineFlags(&argc, &argv, true);
4870

4871 4872 4873
  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
J
jorlow@chromium.org 已提交
4874
  }
4875
  FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
J
jorlow@chromium.org 已提交
4876

I
Igor Canadi 已提交
4877 4878
  std::vector<std::string> fanout = rocksdb::StringSplit(
      FLAGS_max_bytes_for_level_multiplier_additional, ',');
4879
  for (size_t j = 0; j < fanout.size(); j++) {
4880
    FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
S
sdong 已提交
4881 4882 4883 4884 4885
#ifndef CYGWIN
        std::stoi(fanout[j]));
#else
        stoi(fanout[j]));
#endif
4886 4887 4888 4889 4890
  }

  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());

4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901 4902 4903
#ifndef ROCKSDB_LITE
  std::unique_ptr<Env> custom_env_guard;
  if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
    fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
    exit(1);
  } else if (!FLAGS_env_uri.empty()) {
    FLAGS_env = NewEnvFromUri(FLAGS_env_uri, &custom_env_guard);
    if (FLAGS_env == nullptr) {
      fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
      exit(1);
    }
  }
#endif  // ROCKSDB_LITE
4904 4905 4906 4907 4908 4909 4910 4911 4912 4913 4914 4915 4916 4917 4918 4919 4920 4921 4922
  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
  }

  if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
  else {
    fprintf(stdout, "Unknown compaction fadvice:%s\n",
            FLAGS_compaction_fadvice.c_str());
  }

  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());

4923 4924 4925
  // The number of background threads should be at least as much the
  // max number of concurrent compactions.
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
4926 4927 4928
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
                                  rocksdb::Env::Priority::HIGH);

H
heyongqiang 已提交
4929
  // Choose a location for the test database if none given with --db=<path>
4930 4931 4932 4933 4934
  if (FLAGS_db.empty()) {
    std::string default_db_path;
    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
    default_db_path += "/dbbench";
    FLAGS_db = default_db_path;
H
heyongqiang 已提交
4935 4936
  }

4937 4938 4939 4940 4941 4942
  if (FLAGS_stats_interval_seconds > 0) {
    // When both are set then FLAGS_stats_interval determines the frequency
    // at which the timer is checked for FLAGS_stats_interval_seconds
    FLAGS_stats_interval = 1000;
  }

4943
  rocksdb::Benchmark benchmark;
J
jorlow@chromium.org 已提交
4944 4945 4946
  benchmark.Run();
  return 0;
}
4947
}  // namespace rocksdb
J
Jonathan Wiepert 已提交
4948
#endif