db_bench_tool.cc 144.6 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

L
liuhuahang 已提交
10
#ifndef __STDC_FORMAT_MACROS
11
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
12
#endif
13

J
Jonathan Wiepert 已提交
14
#ifdef GFLAGS
15 16 17 18 19
#ifdef NUMA
#include <numa.h>
#include <numaif.h>
#endif

D
Dmitri Smirnov 已提交
20
#ifndef OS_WIN
21
#include <unistd.h>
D
Dmitri Smirnov 已提交
22
#endif
23
#include <fcntl.h>
24
#include <inttypes.h>
25
#include <cstddef>
J
jorlow@chromium.org 已提交
26 27 28
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
29
#include <gflags/gflags.h>
30 31 32 33 34

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
35
#include <unordered_map>
36

J
jorlow@chromium.org 已提交
37 38
#include "db/db_impl.h"
#include "db/version_set.h"
A
agiardullo 已提交
39 40 41
#include "hdfs/env_hdfs.h"
#include "port/port.h"
#include "port/stack_trace.h"
42 43 44
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
45
#include "rocksdb/filter_policy.h"
A
agiardullo 已提交
46 47 48
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
S
sdong 已提交
49
#include "rocksdb/rate_limiter.h"
A
agiardullo 已提交
50
#include "rocksdb/slice.h"
51
#include "rocksdb/slice_transform.h"
52
#include "rocksdb/utilities/env_registry.h"
53
#include "rocksdb/utilities/flashcache.h"
A
agiardullo 已提交
54
#include "rocksdb/utilities/optimistic_transaction_db.h"
55
#include "rocksdb/utilities/options_util.h"
56
#include "rocksdb/utilities/sim_cache.h"
A
agiardullo 已提交
57 58
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
A
agiardullo 已提交
59
#include "rocksdb/write_batch.h"
I
Igor Canadi 已提交
60
#include "util/compression.h"
A
agiardullo 已提交
61
#include "util/crc32c.h"
J
jorlow@chromium.org 已提交
62
#include "util/histogram.h"
63
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
64
#include "util/random.h"
I
Igor Canadi 已提交
65
#include "util/statistics.h"
A
agiardullo 已提交
66
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
67
#include "util/testutil.h"
A
agiardullo 已提交
68
#include "util/transaction_test_util.h"
I
xxHash  
Igor Canadi 已提交
69
#include "util/xxhash.h"
D
Deon Nicholas 已提交
70
#include "utilities/merge_operators.h"
J
jorlow@chromium.org 已提交
71

D
Dmitri Smirnov 已提交
72
#ifdef OS_WIN
S
sdong 已提交
73
#include <io.h>  // open/close
D
Dmitri Smirnov 已提交
74 75
#endif

76
namespace {
77 78 79
using GFLAGS::ParseCommandLineFlags;
using GFLAGS::RegisterFlagValidator;
using GFLAGS::SetUsageMessage;
T
Tyler Harter 已提交
80

81 82 83 84 85 86
DEFINE_string(benchmarks,
              "fillseq,"
              "fillsync,"
              "fillrandom,"
              "overwrite,"
              "readrandom,"
87 88
              "newiterator,"
              "newiteratorwhilewriting,"
L
Lei Jin 已提交
89 90
              "seekrandom,"
              "seekrandomwhilewriting,"
91
              "seekrandomwhilemerging,"
92 93 94 95
              "readseq,"
              "readreverse,"
              "compact,"
              "readrandom,"
L
Lei Jin 已提交
96
              "multireadrandom,"
97
              "readseq,"
M
Mark Callaghan 已提交
98
              "readtocache,"
99 100
              "readreverse,"
              "readwhilewriting,"
M
Mark Callaghan 已提交
101
              "readwhilemerging,"
102 103 104 105 106
              "readrandomwriterandom,"
              "updaterandom,"
              "randomwithverify,"
              "fill100K,"
              "crc32c,"
I
xxHash  
Igor Canadi 已提交
107
              "xxhash,"
A
Albert Strasheim 已提交
108 109
              "compress,"
              "uncompress,"
T
Tomislav Novak 已提交
110
              "acquireload,"
A
agiardullo 已提交
111
              "fillseekseq,"
A
Andres Noetzli 已提交
112 113
              "randomtransaction,"
              "randomreplacekeys",
114

115 116
              "Comma-separated list of operations to run in the specified"
              " order. Available benchmarks:\n"
117 118 119 120 121 122 123 124 125 126 127 128 129
              "\tfillseq       -- write N values in sequential key"
              " order in async mode\n"
              "\tfillrandom    -- write N values in random key order in async"
              " mode\n"
              "\toverwrite     -- overwrite N values in random key order in"
              " async mode\n"
              "\tfillsync      -- write N/100 values in random key order in "
              "sync mode\n"
              "\tfill100K      -- write N/1000 100K values in random order in"
              " async mode\n"
              "\tdeleteseq     -- delete N keys in sequential order\n"
              "\tdeleterandom  -- delete N keys in random order\n"
              "\treadseq       -- read N times sequentially\n"
M
Mark Callaghan 已提交
130
              "\treadtocache   -- 1 thread reading database sequentially\n"
131 132 133 134 135
              "\treadreverse   -- read N times in reverse order\n"
              "\treadrandom    -- read N times in random order\n"
              "\treadmissing   -- read N missing keys in random order\n"
              "\treadwhilewriting      -- 1 writer, N threads doing random "
              "reads\n"
M
Mark Callaghan 已提交
136 137
              "\treadwhilemerging      -- 1 merger, N threads doing random "
              "reads\n"
138 139 140 141 142 143 144 145 146 147
              "\treadrandomwriterandom -- N threads doing random-read, "
              "random-write\n"
              "\tprefixscanrandom      -- prefix scan N times in random order\n"
              "\tupdaterandom  -- N threads doing read-modify-write for random "
              "keys\n"
              "\tappendrandom  -- N threads doing read-modify-write with "
              "growing values\n"
              "\tmergerandom   -- same as updaterandom/appendrandom using merge"
              " operator. "
              "Must be used with merge_operator\n"
148 149
              "\treadrandommergerandom -- perform N random read-or-merge "
              "operations. Must be used with merge_operator\n"
150
              "\tnewiterator   -- repeated iterator creation\n"
151 152 153 154 155 156
              "\tseekrandom    -- N random seeks, call Next seek_nexts times "
              "per seek\n"
              "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
              "overwrite\n"
              "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
              "merge\n"
157
              "\tcrc32c        -- repeated crc32c of 4K of data\n"
I
xxHash  
Igor Canadi 已提交
158
              "\txxhash        -- repeated xxHash of 4K of data\n"
159
              "\tacquireload   -- load N*1000 times\n"
T
Tomislav Novak 已提交
160 161
              "\tfillseekseq   -- write N values in sequential key, then read "
              "them by seeking to each key\n"
A
agiardullo 已提交
162 163
              "\trandomtransaction     -- execute N random transactions and "
              "verify correctness\n"
A
Andres Noetzli 已提交
164 165
              "\trandomreplacekeys     -- randomly replaces N keys by deleting "
              "the old version and putting the new version\n\n"
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
              "Meta operations:\n"
              "\tcompact     -- Compact the entire DB\n"
              "\tstats       -- Print DB stats\n"
              "\tlevelstats  -- Print the number of files and bytes per level\n"
              "\tsstables    -- Print sstable info\n"
              "\theapprofile -- Dump a heap profile (if supported by this"
              " port)\n");

DEFINE_int64(num, 1000000, "Number of key/values to place in database");

DEFINE_int64(numdistinct, 1000,
             "Number of distinct keys to use. Used in RandomWithVerify to "
             "read/write on fewer keys so that gets are more likely to find the"
             " key and puts are more likely to update the same key");

181 182 183 184
DEFINE_int64(merge_keys, -1,
             "Number of distinct keys to use for MergeRandom and "
             "ReadRandomMergeRandom. "
             "If negative, there will be FLAGS_num keys.");
185
DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
186

187
DEFINE_int32(
188
    num_hot_column_families, 0,
189 190 191 192 193
    "Number of Hot Column Families. If more than 0, only write to this "
    "number of column families. After finishing all the writes to them, "
    "create new set of column families and insert to them. Only used "
    "when num_column_families > 1.");

194 195 196
DEFINE_int64(reads, -1, "Number of read operations to do.  "
             "If negative, do FLAGS_num reads.");

Y
Yueh-Hsuan Chiang 已提交
197 198 199
DEFINE_int64(deletes, -1, "Number of delete operations to do.  "
             "If negative, do FLAGS_num deletions.");

L
Lei Jin 已提交
200 201
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");

202 203 204 205 206 207 208 209 210
DEFINE_int64(seed, 0, "Seed base for random number generators. "
             "When 0 it is deterministic.");

DEFINE_int32(threads, 1, "Number of concurrent threads to run.");

DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
             " When 0 then num & reads determine the test duration");

DEFINE_int32(value_size, 100, "Size of each value");
T
Tyler Harter 已提交
211

212 213
DEFINE_int32(seek_nexts, 0,
             "How many times to call Next() after Seek() in "
214 215
             "fillseekseq, seekrandom, seekrandomwhilewriting and "
             "seekrandomwhilemerging");
T
Tomislav Novak 已提交
216

M
Mark Callaghan 已提交
217 218 219 220
DEFINE_bool(reverse_iterator, false,
            "When true use Prev rather than Next for iterators that do "
            "Seek and then Next");

221
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
222

223 224
DEFINE_int64(batch_size, 1, "Batch size");

225 226 227
static bool ValidateKeySize(const char* flagname, int32_t value) {
  return true;
}
228

229 230
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
  if (value > std::numeric_limits<uint32_t>::max()) {
231
    fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
232 233 234 235 236 237
            (unsigned long)value);
    return false;
  }
  return true;
}

238
DEFINE_int32(key_size, 16, "size of each key");
239

240 241 242
DEFINE_int32(num_multi_db, 0,
             "Number of DBs used in the benchmark. 0 means single DB.");

243 244
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
              " to this fraction of their original size after compression");
J
jorlow@chromium.org 已提交
245

246 247
DEFINE_double(read_random_exp_range, 0.0,
              "Read random's key will be generated using distribution of "
248
              "num * exp(-r) where r is uniform number from 0 to this value. "
249 250 251
              "The larger the number is, the more skewed the reads are. "
              "Only used in readrandom and multireadrandom benchmarks.");

252
DEFINE_bool(histogram, false, "Print histogram of operation timings");
J
jorlow@chromium.org 已提交
253

254 255 256 257 258 259 260 261
DEFINE_bool(enable_numa, false,
            "Make operations aware of NUMA architecture and bind memory "
            "and cpus corresponding to nodes together. In NUMA, memory "
            "in same node as CPUs are closer when compared to memory in "
            "other nodes. Reads can be faster when the process is bound to "
            "CPU and memory of same node. Use \"$numactl --hardware\" command "
            "to see NUMA memory architecture.");

262 263 264
DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
             "Number of bytes to buffer in all memtables before compacting");

265
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
266
             "Number of bytes to buffer in memtable before compacting");
267

268 269 270 271
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. Each memtable is of size"
             "write_buffer_size.");
272

273 274 275 276 277 278 279 280 281 282
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together"
             "before writing to storage. This is cheap because it is an"
             "in-memory merge. If this feature is not enabled, then all these"
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check"
             " in all of these files. Also, an in-memory merge may result in"
             " writing less data to storage if there are duplicate records "
             " in each of these individual write buffers.");
283

284 285 286 287 288 289 290 291 292 293 294 295 296 297
DEFINE_int32(max_write_buffer_number_to_maintain,
             rocksdb::Options().max_write_buffer_number_to_maintain,
             "The total maximum number of write buffers to maintain in memory "
             "including copies of buffers that have already been flushed. "
             "Unlike max_write_buffer_number, this parameter does not affect "
             "flushing. This controls the minimum amount of write history "
             "that will be available in memory for conflict checking when "
             "Transactions are used. If this value is too low, some "
             "transactions may fail at commit time due to not being able to "
             "determine whether there were any write conflicts. Setting this "
             "value to 0 will cause write buffers to be freed immediately "
             "after they are flushed.  If this value is set to -1, "
             "'max_write_buffer_number' will be used.");

298 299 300 301
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");
302

303
DEFINE_uint64(subcompactions, 1,
304 305 306 307 308
              "Maximum number of subcompactions to divide L0-L1 compactions "
              "into.");
static const bool FLAGS_subcompactions_dummy
    __attribute__((unused)) = RegisterFlagValidator(&FLAGS_subcompactions,
                                                    &ValidateUint32Range);
309

310 311 312 313 314
DEFINE_int32(max_background_flushes,
             rocksdb::Options().max_background_flushes,
             "The maximum number of concurrent background flushes"
             " that can occur in parallel.");

315 316 317
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
             "style of compaction: level-based vs universal");
318

319
static rocksdb::CompactionPri FLAGS_compaction_pri_e;
320
DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
321 322
             "priority of files to compaction: by size or by data age");

323 324 325
DEFINE_int32(universal_size_ratio, 0,
             "Percentage flexibility while comparing file size"
             " (for universal compaction only).");
326

327 328
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
             " single compaction run (for universal compaction only).");
329

330 331
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
332

333 334
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
335

336 337 338 339
DEFINE_int32(universal_compression_size_percent, -1,
             "The percentage of the database to compress for universal "
             "compaction. -1 means compress everything.");

340
DEFINE_bool(universal_allow_trivial_move, false,
341
            "Allow trivial move in universal compaction.");
342

343 344 345 346 347 348 349
DEFINE_int64(cache_size, -1,
             "Number of bytes to use as a cache of uncompressed"
             " data. Negative means use default settings.");

DEFINE_int64(simcache_size, -1,
             "Number of bytes to use as a simcache of "
             "uncompressed data. Negative means use default settings.");
J
jorlow@chromium.org 已提交
350

351 352 353
DEFINE_bool(cache_index_and_filter_blocks, false,
            "Cache index/filter blocks in block cache.");

354 355 356
DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
            "Pin index/filter blocks of L0 files in block cache.");

357 358
DEFINE_int32(block_size,
             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
359
             "Number of bytes in a block.");
360

361 362
DEFINE_int32(block_restart_interval,
             rocksdb::BlockBasedTableOptions().block_restart_interval,
363
             "Number of keys between restart points "
364 365 366 367 368 369
             "for delta encoding of keys in data block.");

DEFINE_int32(index_block_restart_interval,
             rocksdb::BlockBasedTableOptions().index_block_restart_interval,
             "Number of keys between restart points "
             "for delta encoding of keys in index block.");
370

371 372 373
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data.");

374 375 376 377
DEFINE_int64(row_cache_size, 0,
             "Number of bytes to use as a cache of individual rows"
             " (0 = disabled).");

378 379 380
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time"
             " (use default if == 0)");
381

382 383 384 385
DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
             "If open_files is set to -1, this option set the number of "
             "threads that will be used to open files during DB::Open()");

386 387 388 389 390
DEFINE_int32(new_table_reader_for_compaction_inputs, true,
             "If true, uses a separate file handle for compaction inputs");

DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");

D
Dmitri Smirnov 已提交
391 392
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
             "Maximum windows randomaccess buffer size");
393

I
Islam AbdelRahman 已提交
394 395
DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
             "Maximum write buffer for Writable File");
396 397 398 399

DEFINE_int32(skip_table_builder_flush, false, "Skip flushing block in "
             "table builder ");

400 401
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
             " use default settings.");
402 403 404
DEFINE_double(memtable_bloom_size_ratio, 0,
              "Ratio of memtable size used for bloom filter. 0 means no bloom "
              "filter.");
S
Sanjay Ghemawat 已提交
405

406 407 408
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
            " database.  If you set this flag and also specify a benchmark that"
            " wants a fresh database, that benchmark will fail.");
409

410 411 412 413 414
DEFINE_bool(show_table_properties, false,
            "If true, then per-level table"
            " properties will be printed on every stats-interval when"
            " stats_interval is set and stats_per_interval is on.");

415
DEFINE_string(db, "", "Use the db with the following name.");
416

417 418 419 420 421 422 423 424 425 426 427
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
  if (value >= 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(cache_numshardbits, -1, "Number of shards for the block cache"
             " is 2 ** cache_numshardbits. Negative means use default settings."
             " This is applied only if FLAGS_cache_size is non-negative.");
428

429 430
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
            " from storage");
431

432
DEFINE_bool(statistics, false, "Database statistics");
433
static class std::shared_ptr<rocksdb::Statistics> dbstats;
434

435 436
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
             " --num reads.");
H
heyongqiang 已提交
437

438
DEFINE_bool(sync, false, "Sync all writes to disk");
H
heyongqiang 已提交
439

440 441
DEFINE_bool(disable_data_sync, false, "If true, do not wait until data is"
            " synced to disk.");
M
Mark Callaghan 已提交
442

443
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
M
Mark Callaghan 已提交
444

445
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
446

L
Lei Jin 已提交
447 448
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");

449
DEFINE_int32(num_levels, 7, "The total number of levels");
H
heyongqiang 已提交
450

451
DEFINE_int64(target_file_size_base, 2 * 1048576, "Target file size at level-1");
H
heyongqiang 已提交
452

453 454
DEFINE_int32(target_file_size_multiplier, 1,
             "A multiplier to compute target level-N file size (N >= 2)");
455

456
DEFINE_uint64(max_bytes_for_level_base,  10 * 1048576, "Max bytes for level-1");
H
heyongqiang 已提交
457

458 459 460
DEFINE_bool(level_compaction_dynamic_level_bytes, false,
            "Whether level size base is dynamic");

461 462
DEFINE_int32(max_bytes_for_level_multiplier, 10,
             "A multiplier to compute max bytes for level-N (N >= 2)");
H
heyongqiang 已提交
463

464 465 466
static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
DEFINE_string(max_bytes_for_level_multiplier_additional, "",
              "A vector that specifies additional fanout per level");
467

468 469 470
DEFINE_int32(level0_stop_writes_trigger,
             rocksdb::Options().level0_stop_writes_trigger,
             "Number of files in level-0"
471
             " that will trigger put stop.");
472

473 474 475
DEFINE_int32(level0_slowdown_writes_trigger,
             rocksdb::Options().level0_slowdown_writes_trigger,
             "Number of files in level-0"
476
             " that will slow down writes.");
477

478 479 480
DEFINE_int32(level0_file_num_compaction_trigger,
             rocksdb::Options().level0_file_num_compaction_trigger,
             "Number of files in level-0"
481
             " when compactions start");
482

483 484 485 486 487 488 489 490 491 492 493 494 495
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value <= 0 || value>=100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
             " as percentage) for the ReadRandomWriteRandom workload. The "
             "default value 90 means 90% operations out of all reads and writes"
             " operations are reads. In other words, 9 gets for every 1 put.");

496 497 498 499 500
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
             " as percentage) for the ReadRandomMergeRandom workload. The"
             " default value 70 means 70% out of all read and merge operations"
             " are merges. In other words, 7 merges for every 3 gets.");

501 502 503 504 505 506
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
             "deletes (used in RandomWithVerify only). RandomWithVerify "
             "calculates writepercent as (100 - FLAGS_readwritepercent - "
             "deletepercent), so deletepercent must be smaller than (100 - "
             "FLAGS_readwritepercent)");

507 508 509 510 511
DEFINE_bool(optimize_filters_for_hits, false,
            "Optimizes bloom filters for workloads for most lookups return "
            "a value. For now this doesn't create bloom filters for the max "
            "level of the LSM to reduce metadata that should fit in RAM. ");

I
Igor Canadi 已提交
512 513
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
              "Ignored. Left here for backward compatibility");
514

515
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
516
DEFINE_bool(optimistic_transaction_db, false,
A
agiardullo 已提交
517 518 519
            "Open a OptimisticTransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
520 521 522 523
DEFINE_bool(transaction_db, false,
            "Open a TransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
524 525 526 527
DEFINE_uint64(transaction_sets, 2,
              "Number of keys each transaction will "
              "modify (use in RandomTransaction only).  Max: 9999");

528 529 530 531
DEFINE_bool(transaction_set_snapshot, false,
            "Setting to true will have each transaction call SetSnapshot()"
            " upon creation.");

A
agiardullo 已提交
532 533 534 535
DEFINE_int32(transaction_sleep, 0,
             "Max microseconds to sleep in between "
             "reading and writing a value (used in RandomTransaction only). ");

536 537 538
DEFINE_uint64(transaction_lock_timeout, 100,
              "If using a transaction_db, specifies the lock wait timeout in"
              " milliseconds before failing a transaction waiting on a lock");
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
DEFINE_string(
    options_file, "",
    "The path to a RocksDB options file.  If specified, then db_bench will "
    "run with the RocksDB options in the default column family of the "
    "specified options file. "
    "Note that with this setting, db_bench will ONLY accept the following "
    "RocksDB options related command-line arguments, all other arguments "
    "that are related to RocksDB options will be ignored:\n"
    "\t--use_existing_db\n"
    "\t--statistics\n"
    "\t--row_cache_size\n"
    "\t--row_cache_numshardbits\n"
    "\t--enable_io_prio\n"
    "\t--disable_flashcache_for_background_threads\n"
    "\t--flashcache_dev\n"
    "\t--dump_malloc_stats\n"
    "\t--num_multi_db\n");
556
#endif  // ROCKSDB_LITE
557

558
DEFINE_bool(report_bg_io_stats, false,
559 560
            "Measure times spents on I/Os while in compactions. ");

561 562 563 564 565 566 567 568 569 570 571
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;
A
Albert Strasheim 已提交
572 573 574 575
  else if (!strcasecmp(ctype, "lz4"))
    return rocksdb::kLZ4Compression;
  else if (!strcasecmp(ctype, "lz4hc"))
    return rocksdb::kLZ4HCCompression;
576 577
  else if (!strcasecmp(ctype, "xpress"))
    return rocksdb::kXpressCompression;
578 579
  else if (!strcasecmp(ctype, "zstd"))
    return rocksdb::kZSTDNotFinalCompression;
580 581

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
582
  return rocksdb::kSnappyCompression;  // default value
583
}
584

S
sdong 已提交
585
std::string ColumnFamilyName(size_t i) {
586 587 588 589
  if (i == 0) {
    return rocksdb::kDefaultColumnFamilyName;
  } else {
    char name[100];
S
sdong 已提交
590
    snprintf(name, sizeof(name), "column_family_name_%06zu", i);
591 592 593
    return std::string(name);
  }
}
I
Igor Canadi 已提交
594

595 596 597
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
598
    rocksdb::kSnappyCompression;
599

600 601 602 603
DEFINE_int32(compression_level, -1,
             "Compression level. For zlib this should be -1 for the "
             "default level, or between 0 and 9.");

604 605 606 607
DEFINE_int32(compression_max_dict_bytes, 0,
             "Maximum size of dictionary used to prime the compression "
             "library.");

608 609 610 611 612 613 614 615 616
static bool ValidateCompressionLevel(const char* flagname, int32_t value) {
  if (value < -1 || value > 9) {
    fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n",
            flagname, value);
    return false;
  }
  return true;
}

I
Igor Canadi 已提交
617
static const bool FLAGS_compression_level_dummy __attribute__((unused)) =
618
    RegisterFlagValidator(&FLAGS_compression_level, &ValidateCompressionLevel);
619

620 621 622 623 624 625 626 627 628 629 630 631 632 633 634
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
             " from this level. Levels with number < min_level_to_compress are"
             " not compressed. Otherwise, apply compression_type to "
             "all levels.");

static bool ValidateTableCacheNumshardbits(const char* flagname,
                                           int32_t value) {
  if (0 >= value || value > 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be  0 < val <= 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(table_cache_numshardbits, 4, "");
635

636 637 638 639 640 641
#ifndef ROCKSDB_LITE
DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
              " with --hdfs.");
#endif  // ROCKSDB_LITE
DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
              " --env_uri.");
642
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
643

644 645
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
             "this is greater than zero. When 0 the interval grows over time.");
646

647 648 649
DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
             "overrides stats_interval when both are > 0.");

650 651
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
             " this is greater than 0.");
652

653 654 655 656 657 658 659 660
DEFINE_int64(report_interval_seconds, 0,
             "If greater than zero, it will write simple stats in CVS format "
             "to --report_file every N seconds");

DEFINE_string(report_file, "report.csv",
              "Filename where some simple stats are reported to (if "
              "--report_interval_seconds is bigger than 0)");

661 662 663 664
DEFINE_int32(thread_status_per_interval, 0,
             "Takes and report a snapshot of the current status of each thread"
             " when this is greater than 0.");

665 666
DEFINE_int32(perf_level, 0, "Level of perf collection");

667
static bool ValidateRateLimit(const char* flagname, double value) {
D
Dmitri Smirnov 已提交
668
  const double EPSILON = 1e-10;
669 670 671 672 673 674 675
  if ( value < -EPSILON ) {
    fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
            flagname, value);
    return false;
  }
  return true;
}
676
DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED");
J
Jim Paton 已提交
677

678 679
DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");

680 681 682
DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024,
              "Slowdown writes if pending compaction bytes exceed this number");

683
DEFINE_uint64(hard_pending_compaction_bytes_limit, 128ull * 1024 * 1024 * 1024,
684
              "Stop writes if pending compaction bytes exceed this number");
685

S
sdong 已提交
686
DEFINE_uint64(delayed_write_rate, 8388608u,
S
sdong 已提交
687 688 689
              "Limited bytes allowed to DB when soft_rate_limit or "
              "level0_slowdown_writes_trigger triggers");

690 691 692 693 694 695 696 697 698 699 700 701 702 703
DEFINE_bool(allow_concurrent_memtable_write, false,
            "Allow multi-writers to update mem tables in parallel.");

DEFINE_bool(enable_write_thread_adaptive_yield, false,
            "Use a yielding spin loop for brief writer thread waits.");

DEFINE_uint64(
    write_thread_max_yield_usec, 100,
    "Maximum microseconds for enable_write_thread_adaptive_yield operation.");

DEFINE_uint64(write_thread_slow_yield_usec, 3,
              "The threshold at which a slow yield is considered a signal that "
              "other processes or threads want the core.");

704 705 706
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
             "When hard_rate_limit is set then this is the max time a put will"
             " be stalled.");
707

S
sdong 已提交
708 709
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");

710 711
DEFINE_uint64(
    benchmark_write_rate_limit, 0,
712 713
    "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
    "is the global rate in bytes/second.");
714

715 716 717
DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of "
             "overlaps in grandparent (i.e., level+2) before we stop building a"
             " single file in a level->level+1 compaction.");
718

719
#ifndef ROCKSDB_LITE
720
DEFINE_bool(readonly, false, "Run read only benchmarks.");
721
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
722

723
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
724

725 726 727
DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
             " a compaction run that compacts Level-K with Level-(K+1) (for"
             " K >= 1)");
728

729 730 731
DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
              " in MB.");
732
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
733

734 735
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
            "Allow buffered io using OS buffers");
736

737 738
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
            "Allow reads to occur via mmap-ing files");
739

740 741
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
            "Allow writes to occur via mmap-ing files");
742

743 744
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
            "Advise random access on table file open");
745

746 747 748
DEFINE_string(compaction_fadvice, "NORMAL",
              "Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
749
  rocksdb::Options().access_hint_on_compaction_start;
750

751 752 753 754 755
DEFINE_bool(disable_flashcache_for_background_threads, false,
            "Disable flashcache for background threads");

DEFINE_string(flashcache_dev, "", "Path to flashcache device");

I
Igor Canadi 已提交
756 757 758
DEFINE_bool(use_tailing_iterator, false,
            "Use tailing iterator to access a series of keys instead of get");

759 760 761 762
DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
            "Use adaptive mutex");

DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
763
              "Allows OS to incrementally sync SST files to disk while they are"
764 765
              " being written, in the background. Issue one request for every"
              " bytes_per_sync written. 0 turns it off.");
766 767 768 769 770 771

DEFINE_uint64(wal_bytes_per_sync,  rocksdb::Options().wal_bytes_per_sync,
              "Allows OS to incrementally sync WAL files to disk while they are"
              " being written, in the background. Issue one request for every"
              " wal_bytes_per_sync written. 0 turns it off.");

A
Andres Noetzli 已提交
772 773 774 775 776 777 778
DEFINE_bool(use_single_deletes, true,
            "Use single deletes (used in RandomReplaceKeys only).");

DEFINE_double(stddev, 2000.0,
              "Standard deviation of normal distribution used for picking keys"
              " (used in RandomReplaceKeys only).");

779 780 781
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
             " operations on a key in the memtable");

782 783 784 785 786 787 788 789
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
  if (value < 0 || value>=2000000000) {
    fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
            flagname, value);
    return false;
  }
  return true;
}
L
Lei Jin 已提交
790 791
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
             "plain table");
792 793 794
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
             "per prefix, 0 means no special handling of the prefix, "
             "i.e. use the prefix comes with the generated random number.");
795 796
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
            "threads' IO priority");
797 798 799
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
            "table becomes an identity function. This is only valid when key "
            "is 8 bytes");
800
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
J
Jim Paton 已提交
801 802 803 804

enum RepFactory {
  kSkipList,
  kPrefixHash,
805
  kVectorRep,
806 807
  kHashLinkedList,
  kCuckoo
J
Jim Paton 已提交
808
};
I
Igor Canadi 已提交
809

810 811 812 813 814 815 816 817 818
enum RepFactory StringToRepFactory(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
    return kPrefixHash;
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;
819 820
  else if (!strcasecmp(ctype, "hash_linkedlist"))
    return kHashLinkedList;
821 822
  else if (!strcasecmp(ctype, "cuckoo"))
    return kCuckoo;
823 824 825 826

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
I
Igor Canadi 已提交
827

J
Jim Paton 已提交
828
static enum RepFactory FLAGS_rep_factory;
829
DEFINE_string(memtablerep, "skip_list", "");
830
DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
L
Lei Jin 已提交
831 832
DEFINE_bool(use_plain_table, false, "if use plain table "
            "instead of block-based table format");
833 834
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
DEFINE_double(cuckoo_hash_ratio, 0.9, "Hash ratio for Cuckoo SST table.");
835 836 837
DEFINE_bool(use_hash_search, false, "if use kHashSearch "
            "instead of kBinarySearch. "
            "This is valid if only we use BlockTable");
838 839 840
DEFINE_bool(use_block_based_filter, false, "if use kBlockBasedFilter "
            "instead of kFullFilter for filter block. "
            "This is valid if only we use BlockTable");
841 842 843 844
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
              "If a new merge operator is specified, be sure to use fresh"
              " database The possible merge operators are defined in"
              " utilities/merge_operators.h");
T
Tomislav Novak 已提交
845 846 847
DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
             "linear search first for this many steps from the previous "
             "position");
848 849
DEFINE_bool(report_file_operations, false, "if report number of file "
            "operations");
D
Deon Nicholas 已提交
850

K
kailiu 已提交
851
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
852
    RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
853 854

static const bool FLAGS_hard_rate_limit_dummy __attribute__((unused)) =
855
    RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
856 857

static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
858
    RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
K
kailiu 已提交
859 860

static const bool FLAGS_key_size_dummy __attribute__((unused)) =
861
    RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);
K
kailiu 已提交
862 863

static const bool FLAGS_cache_numshardbits_dummy __attribute__((unused)) =
864 865
    RegisterFlagValidator(&FLAGS_cache_numshardbits,
                          &ValidateCacheNumshardbits);
K
kailiu 已提交
866 867

static const bool FLAGS_readwritepercent_dummy __attribute__((unused)) =
868
    RegisterFlagValidator(&FLAGS_readwritepercent, &ValidateInt32Percent);
K
kailiu 已提交
869

I
Igor Canadi 已提交
870 871 872
DEFINE_int32(disable_seek_compaction, false,
             "Not used, left here for backwards compatibility");

K
kailiu 已提交
873
static const bool FLAGS_deletepercent_dummy __attribute__((unused)) =
874 875 876 877
    RegisterFlagValidator(&FLAGS_deletepercent, &ValidateInt32Percent);
static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) =
    RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                          &ValidateTableCacheNumshardbits);
878
}  // namespace
K
kailiu 已提交
879

880
namespace rocksdb {
J
jorlow@chromium.org 已提交
881

882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904
namespace {
struct ReportFileOpCounters {
  std::atomic<int> open_counter_;
  std::atomic<int> read_counter_;
  std::atomic<int> append_counter_;
  std::atomic<uint64_t> bytes_read_;
  std::atomic<uint64_t> bytes_written_;
};

// A special Env to records and report file operations in db_bench
class ReportFileOpEnv : public EnvWrapper {
 public:
  explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }

  void reset() {
    counters_.open_counter_ = 0;
    counters_.read_counter_ = 0;
    counters_.append_counter_ = 0;
    counters_.bytes_read_ = 0;
    counters_.bytes_written_ = 0;
  }

  Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
I
Igor Sugak 已提交
905
                           const EnvOptions& soptions) override {
906 907 908 909 910 911 912 913 914 915
    class CountingFile : public SequentialFile {
     private:
      unique_ptr<SequentialFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<SequentialFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
916
      virtual Status Read(size_t n, Slice* result, char* scratch) override {
917 918 919 920 921 922 923
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }

I
Igor Sugak 已提交
924
      virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
925 926 927 928 929 930 931 932 933 934 935 936
    };

    Status s = target()->NewSequentialFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewRandomAccessFile(const std::string& f,
                             unique_ptr<RandomAccessFile>* r,
I
Igor Sugak 已提交
937
                             const EnvOptions& soptions) override {
938 939 940 941 942 943 944 945 946 947
    class CountingFile : public RandomAccessFile {
     private:
      unique_ptr<RandomAccessFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<RandomAccessFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}
      virtual Status Read(uint64_t offset, size_t n, Slice* result,
I
Igor Sugak 已提交
948
                          char* scratch) const override {
949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(offset, n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }
    };

    Status s = target()->NewRandomAccessFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
I
Igor Sugak 已提交
966
                         const EnvOptions& soptions) override {
967 968 969 970 971 972 973 974 975 976
    class CountingFile : public WritableFile {
     private:
      unique_ptr<WritableFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<WritableFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
977
      Status Append(const Slice& data) override {
978 979 980 981 982 983 984
        counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Append(data);
        counters_->bytes_written_.fetch_add(data.size(),
                                            std::memory_order_relaxed);
        return rv;
      }

985
      Status Truncate(uint64_t size) override { return target_->Truncate(size); }
I
Igor Sugak 已提交
986 987 988
      Status Close() override { return target_->Close(); }
      Status Flush() override { return target_->Flush(); }
      Status Sync() override { return target_->Sync(); }
989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
    };

    Status s = target()->NewWritableFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  // getter
  ReportFileOpCounters* counters() { return &counters_; }

 private:
  ReportFileOpCounters counters_;
};

}  // namespace

1008
// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
1009 1010 1011
class RandomGenerator {
 private:
  std::string data_;
1012
  unsigned int pos_;
J
jorlow@chromium.org 已提交
1013 1014 1015 1016 1017 1018 1019 1020

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
1021
    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
J
jorlow@chromium.org 已提交
1022 1023 1024 1025 1026 1027 1028 1029
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

1030
  Slice Generate(unsigned int len) {
1031
    assert(len <= data_.size());
J
jorlow@chromium.org 已提交
1032 1033 1034 1035 1036 1037
    if (pos_ + len > data_.size()) {
      pos_ = 0;
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
1038
};
X
Xing Jin 已提交
1039

1040 1041 1042 1043 1044 1045 1046 1047
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

1048 1049 1050
struct DBWithColumnFamilies {
  std::vector<ColumnFamilyHandle*> cfh;
  DB* db;
1051
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1052
  OptimisticTransactionDB* opt_txn_db;
1053
#endif  // ROCKSDB_LITE
1054 1055 1056 1057 1058 1059 1060
  std::atomic<size_t> num_created;  // Need to be updated after all the
                                    // new entries in cfh are set.
  size_t num_hot;  // Number of column families to be queried at each moment.
                   // After each CreateNewCf(), another num_hot number of new
                   // Column families will be created and used to be queried.
  port::Mutex create_cf_mutex;  // Only one thread can execute CreateNewCf()

1061 1062 1063 1064 1065 1066
  DBWithColumnFamilies()
      : db(nullptr)
#ifndef ROCKSDB_LITE
        , opt_txn_db(nullptr)
#endif  // ROCKSDB_LITE
  {
1067
    cfh.clear();
1068 1069
    num_created = 0;
    num_hot = 0;
1070
  }
1071 1072 1073 1074

  DBWithColumnFamilies(const DBWithColumnFamilies& other)
      : cfh(other.cfh),
        db(other.db),
1075
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1076
        opt_txn_db(other.opt_txn_db),
1077
#endif  // ROCKSDB_LITE
1078 1079 1080
        num_created(other.num_created.load()),
        num_hot(other.num_hot) {}

A
agiardullo 已提交
1081 1082 1083 1084
  void DeleteDBs() {
    std::for_each(cfh.begin(), cfh.end(),
                  [](ColumnFamilyHandle* cfhi) { delete cfhi; });
    cfh.clear();
1085
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1086 1087 1088
    if (opt_txn_db) {
      delete opt_txn_db;
      opt_txn_db = nullptr;
A
agiardullo 已提交
1089 1090
    } else {
      delete db;
1091
      db = nullptr;
A
agiardullo 已提交
1092
    }
1093 1094
#else
    delete db;
A
agiardullo 已提交
1095
    db = nullptr;
1096
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
1097 1098
  }

1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
  ColumnFamilyHandle* GetCfh(int64_t rand_num) {
    assert(num_hot > 0);
    return cfh[num_created.load(std::memory_order_acquire) - num_hot +
               rand_num % num_hot];
  }

  // stage: assume CF from 0 to stage * num_hot has be created. Need to create
  //        stage * num_hot + 1 to stage * (num_hot + 1).
  void CreateNewCf(ColumnFamilyOptions options, int64_t stage) {
    MutexLock l(&create_cf_mutex);
    if ((stage + 1) * num_hot <= num_created) {
      // Already created.
      return;
    }
    auto new_num_created = num_created + num_hot;
    assert(new_num_created <= cfh.size());
    for (size_t i = num_created; i < new_num_created; i++) {
      Status s =
          db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i]));
      if (!s.ok()) {
        fprintf(stderr, "create column family error: %s\n",
                s.ToString().c_str());
        abort();
      }
    }
    num_created.store(new_num_created, std::memory_order_release);
  }
1126 1127
};

1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
// a class that reports stats to CSV file
class ReporterAgent {
 public:
  ReporterAgent(Env* env, const std::string& fname,
                uint64_t report_interval_secs)
      : env_(env),
        total_ops_done_(0),
        last_report_(0),
        report_interval_secs_(report_interval_secs),
        stop_(false) {
    auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
    if (s.ok()) {
      s = report_file_->Append(Header() + "\n");
    }
    if (s.ok()) {
      s = report_file_->Flush();
    }
    if (!s.ok()) {
      fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
              s.ToString().c_str());
      abort();
    }

    reporting_thread_ = std::thread([&]() { SleepAndReport(); });
  }

  ~ReporterAgent() {
    {
      std::unique_lock<std::mutex> lk(mutex_);
      stop_ = true;
      stop_cv_.notify_all();
    }
    reporting_thread_.join();
  }

  // thread safe
  void ReportFinishedOps(int64_t num_ops) {
    total_ops_done_.fetch_add(num_ops);
  }

 private:
  std::string Header() const { return "secs_elapsed,interval_qps"; }
  void SleepAndReport() {
    uint64_t kMicrosInSecond = 1000 * 1000;
    auto time_started = env_->NowMicros();
    while (true) {
      {
        std::unique_lock<std::mutex> lk(mutex_);
        if (stop_ ||
            stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
                              [&]() { return stop_; })) {
          // stopping
          break;
        }
        // else -> timeout, which means time for a report!
      }
      auto total_ops_done_snapshot = total_ops_done_.load();
      // round the seconds elapsed
      auto secs_elapsed =
          (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
          kMicrosInSecond;
      std::string report = ToString(secs_elapsed) + "," +
                           ToString(total_ops_done_snapshot - last_report_) +
                           "\n";
      auto s = report_file_->Append(report);
      if (s.ok()) {
        s = report_file_->Flush();
      }
      if (!s.ok()) {
        fprintf(stderr,
                "Can't write to report file (%s), stopping the reporting\n",
                s.ToString().c_str());
        break;
      }
      last_report_ = total_ops_done_snapshot;
    }
  }

  Env* env_;
  std::unique_ptr<WritableFile> report_file_;
  std::atomic<int64_t> total_ops_done_;
  int64_t last_report_;
  const uint64_t report_interval_secs_;
  std::thread reporting_thread_;
  std::mutex mutex_;
  // will notify on stop
  std::condition_variable stop_cv_;
  bool stop_;
};

1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
enum OperationType : unsigned char {
  kRead = 0,
  kWrite,
  kDelete,
  kSeek,
  kMerge,
  kUpdate,
  kCompress,
  kUncompress,
  kCrc,
  kHash,
  kOthers
};

static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
                          OperationTypeString = {
  {kRead, "read"},
  {kWrite, "write"},
  {kDelete, "delete"},
  {kSeek, "seek"},
  {kMerge, "merge"},
  {kUpdate, "update"},
  {kCompress, "compress"},
  {kCompress, "uncompress"},
  {kCrc, "crc"},
  {kHash, "hash"},
  {kOthers, "op"}
};

1247 1248
class Stats {
 private:
1249
  int id_;
D
Dmitri Smirnov 已提交
1250 1251
  uint64_t start_;
  uint64_t finish_;
1252
  double seconds_;
D
Dmitri Smirnov 已提交
1253 1254 1255 1256 1257 1258
  uint64_t done_;
  uint64_t last_report_done_;
  uint64_t next_report_;
  uint64_t bytes_;
  uint64_t last_op_finish_;
  uint64_t last_report_finish_;
1259
  std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
1260
                     std::hash<unsigned char>> hist_;
1261
  std::string message_;
1262
  bool exclude_from_merge_;
1263
  ReporterAgent* reporter_agent_;  // does not own
1264 1265

 public:
1266
  Stats() { Start(-1); }
1267

1268 1269 1270 1271
  void SetReporterAgent(ReporterAgent* reporter_agent) {
    reporter_agent_ = reporter_agent;
  }

1272 1273 1274
  void Start(int id) {
    id_ = id;
    next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
1275
    last_op_finish_ = start_;
1276
    hist_.clear();
1277
    done_ = 0;
1278
    last_report_done_ = 0;
1279 1280
    bytes_ = 0;
    seconds_ = 0;
1281
    start_ = FLAGS_env->NowMicros();
1282
    finish_ = start_;
1283
    last_report_finish_ = start_;
1284
    message_.clear();
1285 1286
    // When set, stats from this thread won't be merged with others.
    exclude_from_merge_ = false;
1287 1288 1289
  }

  void Merge(const Stats& other) {
1290 1291 1292
    if (other.exclude_from_merge_)
      return;

1293 1294 1295
    for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
      auto this_it = hist_.find(it->first);
      if (this_it != hist_.end()) {
1296
        this_it->second->Merge(*(other.hist_.at(it->first)));
1297 1298 1299 1300 1301
      } else {
        hist_.insert({ it->first, it->second });
      }
    }

1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
1313
    finish_ = FLAGS_env->NowMicros();
1314 1315 1316 1317 1318 1319 1320
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

1321
  void SetId(int id) { id_ = id; }
1322
  void SetExcludeFromMerge() { exclude_from_merge_ = true; }
1323

1324 1325 1326 1327
  void PrintThreadStatus() {
    std::vector<ThreadStatus> thread_list;
    FLAGS_env->GetThreadList(&thread_list);

1328
    fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1329
        "ThreadID", "ThreadType", "cfName", "Operation",
1330
        "ElapsedTime", "Stage", "State", "OperationProperties");
1331

1332 1333
    int64_t current_time = 0;
    Env::Default()->GetCurrentTime(&current_time);
1334
    for (auto ts : thread_list) {
1335
      fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
1336 1337 1338 1339
          ts.thread_id,
          ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
          ts.cf_name.c_str(),
          ThreadStatus::GetOperationName(ts.operation_type).c_str(),
1340
          ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(),
1341
          ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
1342
          ThreadStatus::GetStateName(ts.state_type).c_str());
1343 1344 1345 1346 1347 1348 1349 1350

      auto op_properties = ThreadStatus::InterpretOperationProperties(
          ts.operation_type, ts.op_properties);
      for (const auto& op_prop : op_properties) {
        fprintf(stderr, " %s %" PRIu64" |",
            op_prop.first.c_str(), op_prop.second);
      }
      fprintf(stderr, "\n");
1351 1352 1353
    }
  }

1354 1355 1356 1357 1358
  void ResetLastOpTime() {
    // Set to now to avoid latency from calls to SleepForMicroseconds
    last_op_finish_ = FLAGS_env->NowMicros();
  }

1359 1360
  void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
                   enum OperationType op_type = kOthers) {
1361 1362 1363
    if (reporter_agent_) {
      reporter_agent_->ReportFinishedOps(num_ops);
    }
1364
    if (FLAGS_histogram) {
D
Dmitri Smirnov 已提交
1365 1366
      uint64_t now = FLAGS_env->NowMicros();
      uint64_t micros = now - last_op_finish_;
1367 1368 1369

      if (hist_.find(op_type) == hist_.end())
      {
1370 1371
        auto hist_temp = std::make_shared<HistogramImpl>();
        hist_.insert({op_type, std::move(hist_temp)});
1372
      }
1373
      hist_[op_type]->Add(micros);
1374

1375
      if (micros > 20000 && !FLAGS_stats_interval) {
D
Dmitri Smirnov 已提交
1376
        fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
1377 1378 1379 1380 1381
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

1382
    done_ += num_ops;
1383
    if (done_ >= next_report_) {
1384 1385 1386 1387 1388 1389 1390 1391
      if (!FLAGS_stats_interval) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
1392
        fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
1393
      } else {
D
Dmitri Smirnov 已提交
1394
        uint64_t now = FLAGS_env->NowMicros();
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405
        int64_t usecs_since_last = now - last_report_finish_;

        // Determine whether to print status where interval is either
        // each N operations or each N seconds.

        if (FLAGS_stats_interval_seconds &&
            usecs_since_last < (FLAGS_stats_interval_seconds * 1000000)) {
          // Don't check again for this many operations
          next_report_ += FLAGS_stats_interval;

        } else {
1406

1407 1408 1409
          fprintf(stderr,
                  "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
                  "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
D
Dmitri Smirnov 已提交
1410
                  FLAGS_env->TimeToString(now/1000000).c_str(),
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
                  id_,
                  done_ - last_report_done_, done_,
                  (done_ - last_report_done_) /
                  (usecs_since_last / 1000000.0),
                  done_ / ((now - start_) / 1000000.0),
                  (now - last_report_finish_) / 1000000.0,
                  (now - start_) / 1000000.0);

          if (FLAGS_stats_per_interval) {
            std::string stats;

            if (db_with_cfh && db_with_cfh->num_created.load()) {
              for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) {
                if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
                                    &stats))
                  fprintf(stderr, "%s\n", stats.c_str());
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456
                if (FLAGS_show_table_properties) {
                  for (int level = 0; level < FLAGS_num_levels; ++level) {
                    if (db->GetProperty(
                            db_with_cfh->cfh[i],
                            "rocksdb.aggregated-table-properties-at-level" +
                                ToString(level),
                            &stats)) {
                      if (stats.find("# entries=0") == std::string::npos) {
                        fprintf(stderr, "Level[%d]: %s\n", level,
                                stats.c_str());
                      }
                    }
                  }
                }
              }
            } else if (db) {
              if (db->GetProperty("rocksdb.stats", &stats)) {
                fprintf(stderr, "%s\n", stats.c_str());
              }
              if (FLAGS_show_table_properties) {
                for (int level = 0; level < FLAGS_num_levels; ++level) {
                  if (db->GetProperty(
                          "rocksdb.aggregated-table-properties-at-level" +
                              ToString(level),
                          &stats)) {
                    if (stats.find("# entries=0") == std::string::npos) {
                      fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str());
                    }
                  }
                }
1457 1458
              }
            }
1459
          }
M
Mark Callaghan 已提交
1460

1461 1462 1463 1464
          next_report_ += FLAGS_stats_interval;
          last_report_finish_ = now;
          last_report_done_ = done_;
        }
1465
      }
1466 1467 1468 1469
      if (id_ == 0 && FLAGS_thread_status_per_interval) {
        PrintThreadStatus();
      }
      fflush(stderr);
1470 1471 1472 1473 1474 1475 1476 1477 1478
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
1479
    // that does not call FinishedOps().
1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
1493 1494
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
1495

D
Dhruba Borthakur 已提交
1496
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
1497
            name.ToString().c_str(),
1498
            elapsed * 1e6 / done_,
D
Dhruba Borthakur 已提交
1499
            (long)throughput,
1500 1501 1502
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
1503 1504 1505
      for (auto it = hist_.begin(); it != hist_.end(); ++it) {
        fprintf(stdout, "Microseconds per %s:\n%s\n",
                OperationTypeString[it->first].c_str(),
1506
                it->second->ToString().c_str());
1507
      }
1508
    }
1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523
    if (FLAGS_report_file_operations) {
      ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
      ReportFileOpCounters* counters = env->counters();
      fprintf(stdout, "Num files opened: %d\n",
              counters->open_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Read(): %d\n",
              counters->read_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Append(): %d\n",
              counters->append_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
              counters->bytes_read_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
              counters->bytes_written_.load(std::memory_order_relaxed));
      env->reset();
    }
1524 1525 1526 1527 1528 1529 1530 1531 1532
    fflush(stdout);
  }
};

// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;
1533
  int perf_level;
1534
  std::shared_ptr<RateLimiter> write_rate_limiter;
1535 1536 1537 1538 1539 1540 1541

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

1542 1543
  long num_initialized;
  long num_done;
1544 1545
  bool start;

1546
  SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
1547 1548 1549 1550 1551
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
1552
  Random64 rand;         // Has different seeds for different threads
1553
  Stats stats;
1554
  SharedState* shared;
1555

A
Abhishek Kona 已提交
1556
  /* implicit */ ThreadState(int index)
1557
      : tid(index),
1558
        rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
1559 1560 1561
  }
};

M
Mark Callaghan 已提交
1562 1563
class Duration {
 public:
D
Dmitri Smirnov 已提交
1564
  Duration(uint64_t max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) {
M
Mark Callaghan 已提交
1565 1566
    max_seconds_ = max_seconds;
    max_ops_= max_ops;
1567
    ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops;
M
Mark Callaghan 已提交
1568 1569 1570 1571
    ops_ = 0;
    start_at_ = FLAGS_env->NowMicros();
  }

1572 1573
  int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; }

L
Lei Jin 已提交
1574
  bool Done(int64_t increment) {
1575
    if (increment <= 0) increment = 1;    // avoid Done(0) and infinite loops
M
Mark Callaghan 已提交
1576 1577 1578
    ops_ += increment;

    if (max_seconds_) {
1579 1580
      // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
      if ((ops_/1000) != ((ops_-increment)/1000)) {
D
Dmitri Smirnov 已提交
1581 1582
        uint64_t now = FLAGS_env->NowMicros();
        return ((now - start_at_) / 1000000) >= max_seconds_;
M
Mark Callaghan 已提交
1583 1584 1585 1586 1587 1588 1589 1590 1591
      } else {
        return false;
      }
    } else {
      return ops_ > max_ops_;
    }
  }

 private:
D
Dmitri Smirnov 已提交
1592
  uint64_t max_seconds_;
1593
  int64_t max_ops_;
1594
  int64_t ops_per_stage_;
1595
  int64_t ops_;
D
Dmitri Smirnov 已提交
1596
  uint64_t start_at_;
M
Mark Callaghan 已提交
1597 1598
};

J
jorlow@chromium.org 已提交
1599 1600
class Benchmark {
 private:
1601 1602 1603
  std::shared_ptr<Cache> cache_;
  std::shared_ptr<Cache> compressed_cache_;
  std::shared_ptr<const FilterPolicy> filter_policy_;
T
Tyler Harter 已提交
1604
  const SliceTransform* prefix_extractor_;
1605 1606
  DBWithColumnFamilies db_;
  std::vector<DBWithColumnFamilies> multi_dbs_;
1607
  int64_t num_;
1608
  int value_size_;
1609
  int key_size_;
1610 1611
  int prefix_size_;
  int64_t keys_per_prefix_;
L
Lei Jin 已提交
1612
  int64_t entries_per_batch_;
1613
  WriteOptions write_options_;
1614
  Options open_options_;  // keep options around to properly destroy db later
1615
  int64_t reads_;
Y
Yueh-Hsuan Chiang 已提交
1616
  int64_t deletes_;
1617
  double read_random_exp_range_;
1618 1619 1620
  int64_t writes_;
  int64_t readwrites_;
  int64_t merge_keys_;
1621
  bool report_file_operations_;
1622
  int cachedev_fd_;
1623 1624 1625 1626 1627 1628 1629 1630 1631

  bool SanityCheck() {
    if (FLAGS_compression_ratio > 1) {
      fprintf(stderr, "compression_ratio should be between 0 and 1\n");
      return false;
    }
    return true;
  }

1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
  inline bool CompressSlice(const Slice& input, std::string* compressed) {
    bool ok = true;
    switch (FLAGS_compression_type_e) {
      case rocksdb::kSnappyCompression:
        ok = Snappy_Compress(Options().compression_opts, input.data(),
                             input.size(), compressed);
        break;
      case rocksdb::kZlibCompression:
        ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
                           input.size(), compressed);
        break;
      case rocksdb::kBZip2Compression:
        ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
                            input.size(), compressed);
        break;
      case rocksdb::kLZ4Compression:
        ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
                          input.size(), compressed);
        break;
      case rocksdb::kLZ4HCCompression:
        ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
                            input.size(), compressed);
        break;
1655 1656 1657 1658
      case rocksdb::kXpressCompression:
        ok = XPRESS_Compress(input.data(),
          input.size(), compressed);
        break;
1659 1660 1661 1662
      case rocksdb::kZSTDNotFinalCompression:
        ok = ZSTD_Compress(Options().compression_opts, input.data(),
                           input.size(), compressed);
        break;
1663 1664 1665 1666 1667 1668
      default:
        ok = false;
    }
    return ok;
  }

1669 1670
  void PrintHeader() {
    PrintEnvironment();
1671
    fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
1672 1673 1674
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
1675 1676 1677
    fprintf(stdout, "Entries:    %" PRIu64 "\n", num_);
    fprintf(stdout, "Prefix:    %d bytes\n", FLAGS_prefix_size);
    fprintf(stdout, "Keys per prefix:    %" PRIu64 "\n", keys_per_prefix_);
1678
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
1679
            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
1680
             / 1048576.0));
1681
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
1682 1683
            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
              * num_)
1684
             / 1048576.0));
1685 1686
    fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
            FLAGS_benchmark_write_rate_limit);
1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
    if (FLAGS_enable_numa) {
      fprintf(stderr, "Running in NUMA enabled mode.\n");
#ifndef NUMA
      fprintf(stderr, "NUMA is not defined in the system.\n");
      exit(1);
#else
      if (numa_available() == -1) {
        fprintf(stderr, "NUMA is not supported by the system.\n");
        exit(1);
      }
#endif
    }
1699

N
Nathan Bronson 已提交
1700 1701
    auto compression = CompressionTypeToString(FLAGS_compression_type_e);
    fprintf(stdout, "Compression: %s\n", compression.c_str());
1702

J
Jim Paton 已提交
1703 1704 1705 1706 1707 1708 1709 1710 1711 1712
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
        fprintf(stdout, "Memtablerep: prefix_hash\n");
        break;
      case kSkipList:
        fprintf(stdout, "Memtablerep: skip_list\n");
        break;
      case kVectorRep:
        fprintf(stdout, "Memtablerep: vector\n");
        break;
1713 1714 1715
      case kHashLinkedList:
        fprintf(stdout, "Memtablerep: hash_linkedlist\n");
        break;
1716 1717 1718
      case kCuckoo:
        fprintf(stdout, "Memtablerep: cuckoo\n");
        break;
J
Jim Paton 已提交
1719
    }
1720
    fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
J
Jim Paton 已提交
1721

N
Nathan Bronson 已提交
1722
    PrintWarnings(compression.c_str());
1723 1724 1725
    fprintf(stdout, "------------------------------------------------\n");
  }

1726
  void PrintWarnings(const char* compression) {
1727 1728 1729 1730 1731 1732 1733 1734 1735
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
1736
    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
1737 1738
      // The test string should not be too small.
      const int len = FLAGS_block_size;
1739
      std::string input_str(len, 'y');
1740
      std::string compressed;
1741
      bool result = CompressSlice(Slice(input_str), &compressed);
1742 1743

      if (!result) {
1744 1745 1746 1747 1748
        fprintf(stdout, "WARNING: %s compression is not enabled\n",
                compression);
      } else if (compressed.size() >= input_str.size()) {
        fprintf(stdout, "WARNING: %s compression is not effective\n",
                compression);
1749
      }
1750
    }
1751 1752
  }

K
kailiu 已提交
1753 1754 1755 1756 1757 1758 1759
// Current the following isn't equivalent to OS_LINUX.
#if defined(__linux)
  static Slice TrimSpace(Slice s) {
    unsigned int start = 0;
    while (start < s.size() && isspace(s[start])) {
      start++;
    }
S
sdong 已提交
1760
    unsigned int limit = static_cast<unsigned int>(s.size());
K
kailiu 已提交
1761 1762 1763 1764 1765 1766 1767
    while (limit > start && isspace(s[limit-1])) {
      limit--;
    }
    return Slice(s.data() + start, limit - start);
  }
#endif

1768
  void PrintEnvironment() {
H
Hyunyoung Lee 已提交
1769
    fprintf(stderr, "RocksDB:    version %d.%d\n",
1770 1771 1772
            kMajorVersion, kMinorVersion);

#if defined(__linux)
1773
    time_t now = time(nullptr);
1774 1775 1776 1777 1778
    char buf[52];
    // Lint complains about ctime() usage, so replace it with ctime_r(). The
    // requirement is to provide a buffer which is at least 26 bytes.
    fprintf(stderr, "Date:       %s",
            ctime_r(&now, buf));  // ctime_r() adds newline
1779 1780

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
1781
    if (cpuinfo != nullptr) {
1782 1783 1784 1785
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
1786
      while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
1787
        const char* sep = strchr(line, ':');
1788
        if (sep == nullptr) {
1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

J
jorlow@chromium.org 已提交
1807
 public:
1808
  Benchmark()
1809 1810 1811
      : cache_(
            FLAGS_cache_size >= 0
                ? (FLAGS_cache_numshardbits >= 1
1812
                       ? NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits)
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
                       : NewLRUCache(FLAGS_cache_size))
                : nullptr),
        compressed_cache_(FLAGS_compressed_cache_size >= 0
                              ? (FLAGS_cache_numshardbits >= 1
                                     ? NewLRUCache(FLAGS_compressed_cache_size,
                                                   FLAGS_cache_numshardbits)
                                     : NewLRUCache(FLAGS_compressed_cache_size))
                              : nullptr),
        filter_policy_(FLAGS_bloom_bits >= 0
                           ? NewBloomFilterPolicy(FLAGS_bloom_bits,
                                                  FLAGS_use_block_based_filter)
                           : nullptr),
        prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
        num_(FLAGS_num),
        value_size_(FLAGS_value_size),
        key_size_(FLAGS_key_size),
        prefix_size_(FLAGS_prefix_size),
        keys_per_prefix_(FLAGS_keys_per_prefix),
        entries_per_batch_(1),
        reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
        read_random_exp_range_(0.0),
        writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
        readwrites_(
            (FLAGS_writes < 0 && FLAGS_reads < 0)
                ? FLAGS_num
                : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)),
        merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
M
Mark Callaghan 已提交
1840 1841
        report_file_operations_(FLAGS_report_file_operations),
        cachedev_fd_(-1) {
1842 1843 1844 1845 1846 1847 1848 1849 1850 1851
    // use simcache instead of cache
    if (FLAGS_simcache_size >= 0) {
      if (FLAGS_cache_numshardbits >= 1) {
        cache_ =
            NewSimCache(cache_, FLAGS_simcache_size, FLAGS_cache_numshardbits);
      } else {
        cache_ = NewSimCache(cache_, FLAGS_simcache_size, 0);
      }
    }

1852 1853 1854 1855 1856 1857 1858 1859 1860 1861
    if (report_file_operations_) {
      if (!FLAGS_hdfs.empty()) {
        fprintf(stderr,
                "--hdfs and --report_file_operations cannot be enabled "
                "at the same time");
        exit(1);
      }
      FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
    }

1862 1863 1864 1865 1866
    if (FLAGS_prefix_size > FLAGS_key_size) {
      fprintf(stderr, "prefix size is larger than key size");
      exit(1);
    }

J
jorlow@chromium.org 已提交
1867
    std::vector<std::string> files;
1868
    FLAGS_env->GetChildren(FLAGS_db, &files);
1869
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
1870
      if (Slice(files[i]).starts_with("heap-")) {
1871
        FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
J
jorlow@chromium.org 已提交
1872 1873
      }
    }
1874
    if (!FLAGS_use_existing_db) {
1875 1876 1877 1878 1879
      Options options;
      if (!FLAGS_wal_dir.empty()) {
        options.wal_dir = FLAGS_wal_dir;
      }
      DestroyDB(FLAGS_db, options);
1880
    }
J
jorlow@chromium.org 已提交
1881 1882 1883
  }

  ~Benchmark() {
A
agiardullo 已提交
1884
    db_.DeleteDBs();
T
Tyler Harter 已提交
1885
    delete prefix_extractor_;
I
Igor Canadi 已提交
1886 1887 1888 1889
    if (cache_.get() != nullptr) {
      // this will leak, but we're shutting down so nobody cares
      cache_->DisownData();
    }
1890
    if (FLAGS_disable_flashcache_for_background_threads && cachedev_fd_ != -1) {
M
Mark Callaghan 已提交
1891 1892
      // Dtor for this env should run before cachedev_fd_ is closed
      flashcache_aware_env_ = nullptr;
1893 1894
      close(cachedev_fd_);
    }
J
jorlow@chromium.org 已提交
1895 1896
  }

1897
  Slice AllocateKey(std::unique_ptr<const char[]>* key_guard) {
1898 1899 1900
    char* data = new char[key_size_];
    const char* const_data = data;
    key_guard->reset(const_data);
1901
    return Slice(key_guard->get(), key_size_);
L
Lei Jin 已提交
1902 1903
  }

1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915
  // Generate key according to the given specification and random number.
  // The resulting key will have the following format (if keys_per_prefix_
  // is positive), extra trailing bytes are either cut off or paddd with '0'.
  // The prefix value is derived from key value.
  //   ----------------------------
  //   | prefix 00000 | key 00000 |
  //   ----------------------------
  // If keys_per_prefix_ is 0, the key is simply a binary representation of
  // random number followed by trailing '0's
  //   ----------------------------
  //   |        key 00000         |
  //   ----------------------------
L
Lei Jin 已提交
1916 1917
  void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
    char* start = const_cast<char*>(key->data());
1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948
    char* pos = start;
    if (keys_per_prefix_ > 0) {
      int64_t num_prefix = num_keys / keys_per_prefix_;
      int64_t prefix = v % num_prefix;
      int bytes_to_fill = std::min(prefix_size_, 8);
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
      }
      if (prefix_size_ > 8) {
        // fill the rest with 0s
        memset(pos + 8, '0', prefix_size_ - 8);
      }
      pos += prefix_size_;
    }

    int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
    if (port::kLittleEndian) {
      for (int i = 0; i < bytes_to_fill; ++i) {
        pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
      }
    } else {
      memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
    }
    pos += bytes_to_fill;
    if (key_size_ > pos - start) {
      memset(pos, '0', key_size_ - (pos - start));
    }
X
Xing Jin 已提交
1949 1950
  }

1951
  std::string GetDbNameForMultiple(std::string base_name, size_t id) {
1952
    return base_name + ToString(id);
1953 1954
  }

J
jorlow@chromium.org 已提交
1955
  void Run() {
1956 1957 1958
    if (!SanityCheck()) {
      exit(1);
    }
1959
    Open(&open_options_);
1960
    PrintHeader();
1961 1962 1963
    std::stringstream benchmark_stream(FLAGS_benchmarks);
    std::string name;
    while (std::getline(benchmark_stream, name, ',')) {
X
Xing Jin 已提交
1964
      // Sanitize parameters
1965
      num_ = FLAGS_num;
1966
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
1967
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
Y
Yueh-Hsuan Chiang 已提交
1968
      deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
1969
      value_size_ = FLAGS_value_size;
1970
      key_size_ = FLAGS_key_size;
1971
      entries_per_batch_ = FLAGS_batch_size;
1972
      write_options_ = WriteOptions();
1973
      read_random_exp_range_ = FLAGS_read_random_exp_range;
1974 1975 1976
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
H
heyongqiang 已提交
1977 1978
      write_options_.disableWAL = FLAGS_disable_wal;

1979
      void (Benchmark::*method)(ThreadState*) = nullptr;
A
agiardullo 已提交
1980 1981
      void (Benchmark::*post_process_method)() = nullptr;

1982
      bool fresh_db = false;
1983
      int num_threads = FLAGS_threads;
1984

1985
      if (name == "fillseq") {
1986 1987
        fresh_db = true;
        method = &Benchmark::WriteSeq;
1988
      } else if (name == "fillbatch") {
1989 1990 1991
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
1992
      } else if (name == "fillrandom") {
1993 1994
        fresh_db = true;
        method = &Benchmark::WriteRandom;
1995
      } else if (name == "filluniquerandom") {
1996 1997
        fresh_db = true;
        if (num_threads > 1) {
1998 1999 2000
          fprintf(stderr,
                  "filluniquerandom multithreaded not supported"
                  ", use 1 thread");
2001
          num_threads = 1;
2002 2003
        }
        method = &Benchmark::WriteUniqueRandom;
2004
      } else if (name == "overwrite") {
2005
        method = &Benchmark::WriteRandom;
2006
      } else if (name == "fillsync") {
2007 2008 2009 2010
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
2011
      } else if (name == "fill100K") {
2012 2013 2014 2015
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
2016
      } else if (name == "readseq") {
2017
        method = &Benchmark::ReadSequential;
2018
      } else if (name == "readtocache") {
M
Mark Callaghan 已提交
2019 2020 2021
        method = &Benchmark::ReadSequential;
        num_threads = 1;
        reads_ = num_;
2022
      } else if (name == "readreverse") {
2023
        method = &Benchmark::ReadReverse;
2024
      } else if (name == "readrandom") {
2025
        method = &Benchmark::ReadRandom;
2026
      } else if (name == "readrandomfast") {
L
Lei Jin 已提交
2027
        method = &Benchmark::ReadRandomFast;
2028
      } else if (name == "multireadrandom") {
M
mike@arpaia.co 已提交
2029 2030
        fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
                entries_per_batch_);
L
Lei Jin 已提交
2031
        method = &Benchmark::MultiReadRandom;
2032
      } else if (name == "readmissing") {
L
Lei Jin 已提交
2033 2034
        ++key_size_;
        method = &Benchmark::ReadRandom;
2035
      } else if (name == "newiterator") {
2036
        method = &Benchmark::IteratorCreation;
2037
      } else if (name == "newiteratorwhilewriting") {
2038 2039
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::IteratorCreationWhileWriting;
2040
      } else if (name == "seekrandom") {
S
Sanjay Ghemawat 已提交
2041
        method = &Benchmark::SeekRandom;
2042
      } else if (name == "seekrandomwhilewriting") {
L
Lei Jin 已提交
2043 2044
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::SeekRandomWhileWriting;
2045
      } else if (name == "seekrandomwhilemerging") {
2046 2047
        num_threads++;  // Add extra thread for merging
        method = &Benchmark::SeekRandomWhileMerging;
2048
      } else if (name == "readrandomsmall") {
2049
        reads_ /= 1000;
2050
        method = &Benchmark::ReadRandom;
2051
      } else if (name == "deleteseq") {
S
Sanjay Ghemawat 已提交
2052
        method = &Benchmark::DeleteSeq;
2053
      } else if (name == "deleterandom") {
S
Sanjay Ghemawat 已提交
2054
        method = &Benchmark::DeleteRandom;
2055
      } else if (name == "readwhilewriting") {
2056 2057
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
2058
      } else if (name == "readwhilemerging") {
M
Mark Callaghan 已提交
2059 2060
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileMerging;
2061
      } else if (name == "readrandomwriterandom") {
2062
        method = &Benchmark::ReadRandomWriteRandom;
2063
      } else if (name == "readrandommergerandom") {
2064 2065
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2066
                  name.c_str());
L
Lei Jin 已提交
2067
          exit(1);
2068
        }
L
Lei Jin 已提交
2069
        method = &Benchmark::ReadRandomMergeRandom;
2070
      } else if (name == "updaterandom") {
M
Mark Callaghan 已提交
2071
        method = &Benchmark::UpdateRandom;
2072
      } else if (name == "appendrandom") {
D
Deon Nicholas 已提交
2073
        method = &Benchmark::AppendRandom;
2074
      } else if (name == "mergerandom") {
D
Deon Nicholas 已提交
2075 2076
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2077
                  name.c_str());
L
Lei Jin 已提交
2078
          exit(1);
D
Deon Nicholas 已提交
2079
        }
L
Lei Jin 已提交
2080
        method = &Benchmark::MergeRandom;
2081
      } else if (name == "randomwithverify") {
2082
        method = &Benchmark::RandomWithVerify;
2083
      } else if (name == "fillseekseq") {
T
Tomislav Novak 已提交
2084
        method = &Benchmark::WriteSeqSeekSeq;
2085
      } else if (name == "compact") {
2086
        method = &Benchmark::Compact;
2087
      } else if (name == "crc32c") {
2088
        method = &Benchmark::Crc32c;
2089
      } else if (name == "xxhash") {
I
xxHash  
Igor Canadi 已提交
2090
        method = &Benchmark::xxHash;
2091
      } else if (name == "acquireload") {
2092
        method = &Benchmark::AcquireLoad;
2093
      } else if (name == "compress") {
A
Albert Strasheim 已提交
2094
        method = &Benchmark::Compress;
2095
      } else if (name == "uncompress") {
A
Albert Strasheim 已提交
2096
        method = &Benchmark::Uncompress;
2097
#ifndef ROCKSDB_LITE
2098
      } else if (name == "randomtransaction") {
A
agiardullo 已提交
2099 2100
        method = &Benchmark::RandomTransaction;
        post_process_method = &Benchmark::RandomTransactionVerify;
2101
#endif  // ROCKSDB_LITE
A
Andres Noetzli 已提交
2102 2103 2104
      } else if (name == "randomreplacekeys") {
        fresh_db = true;
        method = &Benchmark::RandomReplaceKeys;
2105
      } else if (name == "stats") {
2106
        PrintStats("rocksdb.stats");
2107
      } else if (name == "levelstats") {
2108
        PrintStats("rocksdb.levelstats");
2109
      } else if (name == "sstables") {
2110
        PrintStats("rocksdb.sstables");
2111 2112 2113
      } else if (!name.empty()) {  // No error message for empty name
        fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
        exit(1);
2114
      }
2115 2116 2117 2118

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
2119
                  name.c_str());
2120
          method = nullptr;
2121
        } else {
2122
          if (db_.db != nullptr) {
A
agiardullo 已提交
2123
            db_.DeleteDBs();
2124
            DestroyDB(FLAGS_db, open_options_);
2125 2126
          }
          for (size_t i = 0; i < multi_dbs_.size(); i++) {
2127
            delete multi_dbs_[i].db;
2128
            DestroyDB(GetDbNameForMultiple(FLAGS_db, i), open_options_);
2129 2130
          }
          multi_dbs_.clear();
2131
        }
2132
        Open(&open_options_);  // use open_options for the last accessed
2133 2134
      }

2135
      if (method != nullptr) {
2136
        fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2137
        RunBenchmark(num_threads, name, method);
J
jorlow@chromium.org 已提交
2138
      }
A
agiardullo 已提交
2139 2140 2141
      if (post_process_method != nullptr) {
        (this->*post_process_method)();
      }
J
jorlow@chromium.org 已提交
2142
    }
2143
    if (FLAGS_statistics) {
K
krad 已提交
2144
      fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2145
    }
I
Islam AbdelRahman 已提交
2146
    if (FLAGS_simcache_size >= 0) {
2147 2148 2149
      fprintf(stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
              std::dynamic_pointer_cast<SimCache>(cache_)->ToString().c_str());
    }
J
jorlow@chromium.org 已提交
2150 2151
  }

2152
 private:
2153 2154
  std::unique_ptr<Env> flashcache_aware_env_;

2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

2177
    SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
2178
    thread->stats.Start(thread->tid);
2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

2191 2192
  void RunBenchmark(int n, Slice name,
                    void (Benchmark::*method)(ThreadState*)) {
2193 2194 2195 2196 2197
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;
2198 2199 2200 2201
    if (FLAGS_benchmark_write_rate_limit > 0) {
      shared.write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
2202

2203 2204 2205 2206 2207 2208
    std::unique_ptr<ReporterAgent> reporter_agent;
    if (FLAGS_report_interval_seconds > 0) {
      reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
                                             FLAGS_report_interval_seconds));
    }

2209
    ThreadArg* arg = new ThreadArg[n];
2210

2211
    for (int i = 0; i < n; i++) {
2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228
#ifdef NUMA
      if (FLAGS_enable_numa) {
        // Performs a local allocation of memory to threads in numa node.
        int n_nodes = numa_num_task_nodes();  // Number of nodes in NUMA.
        numa_exit_on_error = 1;
        int numa_node = i % n_nodes;
        bitmask* nodes = numa_allocate_nodemask();
        numa_bitmask_clearall(nodes);
        numa_bitmask_setbit(nodes, numa_node);
        // numa_bind() call binds the process to the node and these
        // properties are passed on to the thread that is created in
        // StartThread method called later in the loop.
        numa_bind(nodes);
        numa_set_strict(1);
        numa_free_nodemask(nodes);
      }
#endif
2229 2230 2231 2232
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
2233
      arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
2234
      arg[i].thread->shared = &shared;
2235
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

2250 2251 2252 2253
    // Stats for some threads can be excluded.
    Stats merge_stats;
    for (int i = 0; i < n; i++) {
      merge_stats.Merge(arg[i].thread->stats);
2254
    }
2255
    merge_stats.Report(name);
2256 2257 2258 2259 2260 2261 2262 2263

    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
2264
    // Checksum about 500MB of data total
2265 2266
    const int size = 4096;
    const char* label = "(4K per op)";
J
jorlow@chromium.org 已提交
2267
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
2268 2269 2270 2271
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
2272
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
J
jorlow@chromium.org 已提交
2273 2274 2275 2276 2277
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

2278 2279
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
2280 2281
  }

I
xxHash  
Igor Canadi 已提交
2282 2283 2284 2285 2286 2287 2288 2289 2290
  void xxHash(ThreadState* thread) {
    // Checksum about 500MB of data total
    const int size = 4096;
    const char* label = "(4K per op)";
    std::string data(size, 'x');
    int64_t bytes = 0;
    unsigned int xxh32 = 0;
    while (bytes < 500 * 1048576) {
      xxh32 = XXH32(data.data(), size, 0);
2291
      thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
I
xxHash  
Igor Canadi 已提交
2292 2293 2294 2295 2296 2297 2298 2299 2300
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));

    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
  }

2301
  void AcquireLoad(ThreadState* thread) {
2302
    int dummy;
I
Igor Canadi 已提交
2303
    std::atomic<void*> ap(&dummy);
2304
    int count = 0;
2305
    void *ptr = nullptr;
2306
    thread->stats.AddMessage("(each op is 1000 loads)");
2307 2308
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
I
Igor Canadi 已提交
2309
        ptr = ap.load(std::memory_order_acquire);
2310 2311
      }
      count++;
2312
      thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
2313
    }
2314
    if (ptr == nullptr) exit(1);  // Disable unused variable warning.
2315 2316
  }

A
Albert Strasheim 已提交
2317
  void Compress(ThreadState *thread) {
2318
    RandomGenerator gen;
2319
    Slice input = gen.Generate(FLAGS_block_size);
2320 2321 2322 2323
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
A
Albert Strasheim 已提交
2324 2325 2326

    // Compress 1G
    while (ok && bytes < int64_t(1) << 30) {
2327
      compressed.clear();
2328
      ok = CompressSlice(input, &compressed);
2329 2330
      produced += compressed.size();
      bytes += input.size();
2331
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
2332 2333 2334
    }

    if (!ok) {
A
Albert Strasheim 已提交
2335
      thread->stats.AddMessage("(compression failure)");
2336 2337 2338 2339
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
2340 2341
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
2342 2343 2344
    }
  }

A
Albert Strasheim 已提交
2345
  void Uncompress(ThreadState *thread) {
2346
    RandomGenerator gen;
2347
    Slice input = gen.Generate(FLAGS_block_size);
2348
    std::string compressed;
A
Albert Strasheim 已提交
2349

2350
    bool ok = CompressSlice(input, &compressed);
2351
    int64_t bytes = 0;
A
Albert Strasheim 已提交
2352 2353 2354 2355
    int decompress_size;
    while (ok && bytes < 1024 * 1048576) {
      char *uncompressed = nullptr;
      switch (FLAGS_compression_type_e) {
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368
        case rocksdb::kSnappyCompression: {
          // get size and allocate here to make comparison fair
          size_t ulength = 0;
          if (!Snappy_GetUncompressedLength(compressed.data(),
                                            compressed.size(), &ulength)) {
            ok = false;
            break;
          }
          uncompressed = new char[ulength];
          ok = Snappy_Uncompress(compressed.data(), compressed.size(),
                                 uncompressed);
          break;
        }
A
Albert Strasheim 已提交
2369
      case rocksdb::kZlibCompression:
I
Igor Canadi 已提交
2370
        uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(),
2371
                                       &decompress_size, 2);
A
Albert Strasheim 已提交
2372 2373 2374
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kBZip2Compression:
I
Igor Canadi 已提交
2375
        uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
2376
                                        &decompress_size, 2);
A
Albert Strasheim 已提交
2377 2378 2379
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4Compression:
I
Igor Canadi 已提交
2380
        uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
2381
                                      &decompress_size, 2);
A
Albert Strasheim 已提交
2382 2383 2384
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4HCCompression:
I
Igor Canadi 已提交
2385
        uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
2386
                                      &decompress_size, 2);
A
Albert Strasheim 已提交
2387 2388
        ok = uncompressed != nullptr;
        break;
2389 2390 2391 2392 2393
      case rocksdb::kXpressCompression:
        uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
          &decompress_size);
        ok = uncompressed != nullptr;
        break;
2394 2395 2396 2397 2398
      case rocksdb::kZSTDNotFinalCompression:
        uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(),
                                       &decompress_size);
        ok = uncompressed != nullptr;
        break;
A
Albert Strasheim 已提交
2399 2400 2401 2402
      default:
        ok = false;
      }
      delete[] uncompressed;
2403
      bytes += input.size();
2404
      thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
2405 2406 2407
    }

    if (!ok) {
A
Albert Strasheim 已提交
2408
      thread->stats.AddMessage("(compression failure)");
2409
    } else {
2410
      thread->stats.AddBytes(bytes);
2411 2412 2413
    }
  }

2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437
  // Returns true if the options is initialized from the specified
  // options file.
  bool InitializeOptionsFromFile(Options* opts) {
#ifndef ROCKSDB_LITE
    printf("Initializing RocksDB Options from the specified file\n");
    DBOptions db_opts;
    std::vector<ColumnFamilyDescriptor> cf_descs;
    if (FLAGS_options_file != "") {
      auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
                                   &cf_descs);
      if (s.ok()) {
        *opts = Options(db_opts, cf_descs[0].options);
        return true;
      }
      fprintf(stderr, "Unable to load options file %s --- %s\n",
              FLAGS_options_file.c_str(), s.ToString().c_str());
      exit(1);
    }
#endif
    return false;
  }

  void InitializeOptionsFromFlags(Options* opts) {
    printf("Initializing RocksDB Options from command-line flags\n");
2438 2439
    Options& options = *opts;

2440
    assert(db_.db == nullptr);
2441

2442
    options.create_missing_column_families = FLAGS_num_column_families > 1;
2443
    options.max_open_files = FLAGS_open_files;
2444
    options.db_write_buffer_size = FLAGS_db_write_buffer_size;
2445
    options.write_buffer_size = FLAGS_write_buffer_size;
2446
    options.max_write_buffer_number = FLAGS_max_write_buffer_number;
2447 2448
    options.min_write_buffer_number_to_merge =
      FLAGS_min_write_buffer_number_to_merge;
2449 2450
    options.max_write_buffer_number_to_maintain =
        FLAGS_max_write_buffer_number_to_maintain;
2451
    options.max_background_compactions = FLAGS_max_background_compactions;
2452
    options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2453
    options.max_background_flushes = FLAGS_max_background_flushes;
2454
    options.compaction_style = FLAGS_compaction_style_e;
2455
    options.compaction_pri = FLAGS_compaction_pri_e;
2456
    if (FLAGS_prefix_size != 0) {
2457 2458 2459
      options.prefix_extractor.reset(
          NewFixedPrefixTransform(FLAGS_prefix_size));
    }
2460 2461 2462 2463 2464 2465 2466
    if (FLAGS_use_uint64_comparator) {
      options.comparator = test::Uint64Comparator();
      if (FLAGS_key_size != 8) {
        fprintf(stderr, "Using Uint64 comparator but key size is not 8.\n");
        exit(1);
      }
    }
2467
    options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
L
Lei Jin 已提交
2468
    options.bloom_locality = FLAGS_bloom_locality;
2469
    options.max_file_opening_threads = FLAGS_file_opening_threads;
2470 2471 2472
    options.new_table_reader_for_compaction_inputs =
        FLAGS_new_table_reader_for_compaction_inputs;
    options.compaction_readahead_size = FLAGS_compaction_readahead_size;
2473
    options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
2474
    options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
H
heyongqiang 已提交
2475
    options.disableDataSync = FLAGS_disable_data_sync;
2476
    options.use_fsync = FLAGS_use_fsync;
2477
    options.num_levels = FLAGS_num_levels;
H
heyongqiang 已提交
2478 2479 2480
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2481 2482
    options.level_compaction_dynamic_level_bytes =
        FLAGS_level_compaction_dynamic_level_bytes;
H
heyongqiang 已提交
2483 2484
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
2485 2486 2487 2488
    if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
                                     FLAGS_rep_factory == kHashLinkedList)) {
      fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
                      "HashLinkedList memtablerep is used\n");
J
Jim Paton 已提交
2489 2490 2491 2492
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kSkipList:
T
Tomislav Novak 已提交
2493 2494
        options.memtable_factory.reset(new SkipListFactory(
            FLAGS_skip_list_lookahead));
J
Jim Paton 已提交
2495
        break;
S
sdong 已提交
2496 2497 2498 2499 2500
#ifndef ROCKSDB_LITE
      case kPrefixHash:
        options.memtable_factory.reset(
            NewHashSkipListRepFactory(FLAGS_hash_bucket_count));
        break;
2501 2502 2503 2504
      case kHashLinkedList:
        options.memtable_factory.reset(NewHashLinkListRepFactory(
            FLAGS_hash_bucket_count));
        break;
J
Jim Paton 已提交
2505 2506 2507 2508 2509
      case kVectorRep:
        options.memtable_factory.reset(
          new VectorRepFactory
        );
        break;
2510 2511 2512 2513
      case kCuckoo:
        options.memtable_factory.reset(NewHashCuckooRepFactory(
            options.write_buffer_size, FLAGS_key_size + FLAGS_value_size));
        break;
S
sdong 已提交
2514 2515 2516 2517 2518
#else
      default:
        fprintf(stderr, "Only skip list is supported in lite mode\n");
        exit(1);
#endif  // ROCKSDB_LITE
J
Jim Paton 已提交
2519
    }
L
Lei Jin 已提交
2520
    if (FLAGS_use_plain_table) {
S
sdong 已提交
2521
#ifndef ROCKSDB_LITE
2522 2523
      if (FLAGS_rep_factory != kPrefixHash &&
          FLAGS_rep_factory != kHashLinkedList) {
L
Lei Jin 已提交
2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534
        fprintf(stderr, "Waring: plain table is used with skipList\n");
      }
      if (!FLAGS_mmap_read && !FLAGS_mmap_write) {
        fprintf(stderr, "plain table format requires mmap to operate\n");
        exit(1);
      }

      int bloom_bits_per_key = FLAGS_bloom_bits;
      if (bloom_bits_per_key < 0) {
        bloom_bits_per_key = 0;
      }
S
Stanislau Hlebik 已提交
2535 2536 2537 2538 2539 2540 2541

      PlainTableOptions plain_table_options;
      plain_table_options.user_key_len = FLAGS_key_size;
      plain_table_options.bloom_bits_per_key = bloom_bits_per_key;
      plain_table_options.hash_table_ratio = 0.75;
      options.table_factory = std::shared_ptr<TableFactory>(
          NewPlainTableFactory(plain_table_options));
S
sdong 已提交
2542 2543 2544 2545
#else
      fprintf(stderr, "Plain table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
2546
    } else if (FLAGS_use_cuckoo_table) {
S
sdong 已提交
2547
#ifndef ROCKSDB_LITE
2548 2549 2550 2551
      if (FLAGS_cuckoo_hash_ratio > 1 || FLAGS_cuckoo_hash_ratio < 0) {
        fprintf(stderr, "Invalid cuckoo_hash_ratio\n");
        exit(1);
      }
2552 2553 2554
      rocksdb::CuckooTableOptions table_options;
      table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
      table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
2555
      options.table_factory = std::shared_ptr<TableFactory>(
2556
          NewCuckooTableFactory(table_options));
S
sdong 已提交
2557 2558 2559 2560
#else
      fprintf(stderr, "Cuckoo table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
2561 2562 2563
    } else {
      BlockBasedTableOptions block_based_options;
      if (FLAGS_use_hash_search) {
2564 2565 2566 2567 2568
        if (FLAGS_prefix_size == 0) {
          fprintf(stderr,
              "prefix_size not assigned when enable use_hash_search \n");
          exit(1);
        }
2569 2570 2571 2572
        block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
      } else {
        block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
      }
2573 2574 2575
      if (cache_ == nullptr) {
        block_based_options.no_block_cache = true;
      }
2576 2577
      block_based_options.cache_index_and_filter_blocks =
          FLAGS_cache_index_and_filter_blocks;
2578 2579
      block_based_options.pin_l0_filter_and_index_blocks_in_cache =
          FLAGS_pin_l0_filter_and_index_blocks_in_cache;
2580 2581 2582 2583
      block_based_options.block_cache = cache_;
      block_based_options.block_cache_compressed = compressed_cache_;
      block_based_options.block_size = FLAGS_block_size;
      block_based_options.block_restart_interval = FLAGS_block_restart_interval;
2584 2585
      block_based_options.index_block_restart_interval =
          FLAGS_index_block_restart_interval;
2586
      block_based_options.filter_policy = filter_policy_;
I
Islam AbdelRahman 已提交
2587
      block_based_options.skip_table_builder_flush =
2588
          FLAGS_skip_table_builder_flush;
2589
      block_based_options.format_version = 2;
2590 2591
      options.table_factory.reset(
          NewBlockBasedTableFactory(block_based_options));
L
Lei Jin 已提交
2592
    }
2593 2594
    if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
      if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
2595 2596
          (unsigned int)FLAGS_num_levels) {
        fprintf(stderr, "Insufficient number of fanouts specified %d\n",
2597
                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
2598 2599 2600
        exit(1);
      }
      options.max_bytes_for_level_multiplier_additional =
2601
        FLAGS_max_bytes_for_level_multiplier_additional_v;
2602
    }
H
heyongqiang 已提交
2603
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
M
Mark Callaghan 已提交
2604
    options.level0_file_num_compaction_trigger =
2605
        FLAGS_level0_file_num_compaction_trigger;
H
heyongqiang 已提交
2606 2607
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
2608
    options.compression = FLAGS_compression_type_e;
2609
    options.compression_opts.level = FLAGS_compression_level;
2610
    options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2611 2612
    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
2613 2614
    options.max_total_wal_size = FLAGS_max_total_wal_size;

2615 2616
    if (FLAGS_min_level_to_compress >= 0) {
      assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
2617
      options.compression_per_level.resize(FLAGS_num_levels);
2618
      for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
2619 2620
        options.compression_per_level[i] = kNoCompression;
      }
2621
      for (int i = FLAGS_min_level_to_compress;
2622
           i < FLAGS_num_levels; i++) {
2623
        options.compression_per_level[i] = FLAGS_compression_type_e;
2624 2625
      }
    }
J
Jim Paton 已提交
2626 2627
    options.soft_rate_limit = FLAGS_soft_rate_limit;
    options.hard_rate_limit = FLAGS_hard_rate_limit;
2628 2629
    options.soft_pending_compaction_bytes_limit =
        FLAGS_soft_pending_compaction_bytes_limit;
2630 2631
    options.hard_pending_compaction_bytes_limit =
        FLAGS_hard_pending_compaction_bytes_limit;
S
sdong 已提交
2632
    options.delayed_write_rate = FLAGS_delayed_write_rate;
2633 2634 2635 2636 2637 2638
    options.allow_concurrent_memtable_write =
        FLAGS_allow_concurrent_memtable_write;
    options.enable_write_thread_adaptive_yield =
        FLAGS_enable_write_thread_adaptive_yield;
    options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
    options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
J
Jim Paton 已提交
2639 2640
    options.rate_limit_delay_max_milliseconds =
      FLAGS_rate_limit_delay_max_milliseconds;
2641
    options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
A
Abhishek Kona 已提交
2642
    options.max_grandparent_overlap_factor =
2643
      FLAGS_max_grandparent_overlap_factor;
2644
    options.disable_auto_compactions = FLAGS_disable_auto_compactions;
2645
    options.source_compaction_factor = FLAGS_source_compaction_factor;
2646
    options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
2647 2648

    // fill storage options
2649 2650 2651
    options.allow_os_buffer = FLAGS_bufferedio;
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
2652
    options.advise_random_on_open = FLAGS_advise_random_on_open;
2653
    options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
H
Haobo Xu 已提交
2654
    options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
H
Haobo Xu 已提交
2655
    options.bytes_per_sync = FLAGS_bytes_per_sync;
2656
    options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
H
Haobo Xu 已提交
2657

D
Deon Nicholas 已提交
2658
    // merge operator options
2659 2660 2661
    options.merge_operator = MergeOperators::CreateFromStringId(
        FLAGS_merge_operator);
    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
D
Deon Nicholas 已提交
2662 2663 2664 2665
      fprintf(stderr, "invalid merge operator: %s\n",
              FLAGS_merge_operator.c_str());
      exit(1);
    }
2666
    options.max_successive_merges = FLAGS_max_successive_merges;
2667
    options.report_bg_io_stats = FLAGS_report_bg_io_stats;
D
Deon Nicholas 已提交
2668

2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
      options.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    }
    if (FLAGS_universal_min_merge_width != 0) {
      options.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    }
    if (FLAGS_universal_max_merge_width != 0) {
      options.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
      options.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    }
2686 2687 2688 2689
    if (FLAGS_universal_compression_size_percent != -1) {
      options.compaction_options_universal.compression_size_percent =
        FLAGS_universal_compression_size_percent;
    }
2690 2691
    options.compaction_options_universal.allow_trivial_move =
        FLAGS_universal_allow_trivial_move;
2692 2693 2694
    if (FLAGS_thread_status_per_interval > 0) {
      options.enable_thread_tracking = true;
    }
S
sdong 已提交
2695 2696 2697 2698
    if (FLAGS_rate_limiter_bytes_per_sec > 0) {
      options.rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec));
    }
2699

2700
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
2701 2702 2703 2704
    if (FLAGS_readonly && FLAGS_transaction_db) {
      fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
      exit(1);
    }
2705
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
2706

2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748
    if (FLAGS_min_level_to_compress >= 0) {
      options.compression_per_level.clear();
    }
  }

  void InitializeOptionsGeneral(Options* opts) {
    Options& options = *opts;

    options.statistics = dbstats;
    options.wal_dir = FLAGS_wal_dir;
    options.create_if_missing = !FLAGS_use_existing_db;

    if (FLAGS_row_cache_size) {
      if (FLAGS_cache_numshardbits >= 1) {
        options.row_cache =
            NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
      } else {
        options.row_cache = NewLRUCache(FLAGS_row_cache_size);
      }
    }
    if (FLAGS_enable_io_prio) {
      FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
      FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
    }
    if (FLAGS_disable_flashcache_for_background_threads && cachedev_fd_ == -1) {
      // Avoid creating the env twice when an use_existing_db is true
      cachedev_fd_ = open(FLAGS_flashcache_dev.c_str(), O_RDONLY);
      if (cachedev_fd_ < 0) {
        fprintf(stderr, "Open flash device failed\n");
        exit(1);
      }
      flashcache_aware_env_ = NewFlashcacheAwareEnv(FLAGS_env, cachedev_fd_);
      if (flashcache_aware_env_.get() == nullptr) {
        fprintf(stderr, "Failed to open flashcache device at %s\n",
                FLAGS_flashcache_dev.c_str());
        std::abort();
      }
      options.env = flashcache_aware_env_.get();
    } else {
      options.env = FLAGS_env;
    }

2749 2750 2751 2752
    if (FLAGS_num_multi_db <= 1) {
      OpenDb(options, FLAGS_db, &db_);
    } else {
      multi_dbs_.clear();
2753
      multi_dbs_.resize(FLAGS_num_multi_db);
2754
      for (int i = 0; i < FLAGS_num_multi_db; i++) {
2755
        OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &multi_dbs_[i]);
2756 2757
      }
    }
2758 2759 2760 2761 2762 2763
    options.dump_malloc_stats = FLAGS_dump_malloc_stats;
  }

  void Open(Options* opts) {
    if (!InitializeOptionsFromFile(opts)) {
      InitializeOptionsFromFlags(opts);
2764
    }
2765

2766
    InitializeOptionsGeneral(opts);
2767 2768
  }

2769 2770
  void OpenDb(const Options& options, const std::string& db_name,
      DBWithColumnFamilies* db) {
H
heyongqiang 已提交
2771
    Status s;
2772 2773
    // Open with column families if necessary.
    if (FLAGS_num_column_families > 1) {
2774 2775 2776 2777 2778 2779 2780
      size_t num_hot = FLAGS_num_column_families;
      if (FLAGS_num_hot_column_families > 0 &&
          FLAGS_num_hot_column_families < FLAGS_num_column_families) {
        num_hot = FLAGS_num_hot_column_families;
      } else {
        FLAGS_num_hot_column_families = FLAGS_num_column_families;
      }
2781
      std::vector<ColumnFamilyDescriptor> column_families;
2782
      for (size_t i = 0; i < num_hot; i++) {
2783 2784 2785
        column_families.push_back(ColumnFamilyDescriptor(
              ColumnFamilyName(i), ColumnFamilyOptions(options)));
      }
2786
#ifndef ROCKSDB_LITE
2787 2788 2789
      if (FLAGS_readonly) {
        s = DB::OpenForReadOnly(options, db_name, column_families,
            &db->cfh, &db->db);
A
agiardullo 已提交
2790
      } else if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
2791
        s = OptimisticTransactionDB::Open(options, db_name, column_families,
A
agiardullo 已提交
2792 2793 2794 2795 2796 2797 2798 2799 2800
                                          &db->cfh, &db->opt_txn_db);
        if (s.ok()) {
          db->db = db->opt_txn_db->GetBaseDB();
        }
      } else if (FLAGS_transaction_db) {
        TransactionDB* ptr;
        TransactionDBOptions txn_db_options;
        s = TransactionDB::Open(options, txn_db_options, db_name,
                                column_families, &db->cfh, &ptr);
A
agiardullo 已提交
2801
        if (s.ok()) {
A
agiardullo 已提交
2802
          db->db = ptr;
A
agiardullo 已提交
2803
        }
2804 2805 2806
      } else {
        s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
      }
2807 2808 2809
#else
      s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
#endif  // ROCKSDB_LITE
2810 2811 2812
      db->cfh.resize(FLAGS_num_column_families);
      db->num_created = num_hot;
      db->num_hot = num_hot;
2813
#ifndef ROCKSDB_LITE
2814 2815
    } else if (FLAGS_readonly) {
      s = DB::OpenForReadOnly(options, db_name, &db->db);
A
agiardullo 已提交
2816 2817 2818 2819 2820
    } else if (FLAGS_optimistic_transaction_db) {
      s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db);
      if (s.ok()) {
        db->db = db->opt_txn_db->GetBaseDB();
      }
A
agiardullo 已提交
2821
    } else if (FLAGS_transaction_db) {
A
agiardullo 已提交
2822 2823 2824
      TransactionDB* ptr;
      TransactionDBOptions txn_db_options;
      s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
A
agiardullo 已提交
2825
      if (s.ok()) {
A
agiardullo 已提交
2826
        db->db = ptr;
A
agiardullo 已提交
2827
      }
2828
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
2829
    } else {
2830
      s = DB::Open(options, db_name, &db->db);
H
heyongqiang 已提交
2831
    }
2832 2833 2834 2835 2836 2837
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

2838 2839 2840 2841
  enum WriteMode {
    RANDOM, SEQUENTIAL, UNIQUE_RANDOM
  };

2842
  void WriteSeq(ThreadState* thread) {
2843
    DoWrite(thread, SEQUENTIAL);
2844
  }
2845

2846
  void WriteRandom(ThreadState* thread) {
2847
    DoWrite(thread, RANDOM);
2848 2849
  }

2850 2851 2852 2853
  void WriteUniqueRandom(ThreadState* thread) {
    DoWrite(thread, UNIQUE_RANDOM);
  }

2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870
  class KeyGenerator {
   public:
    KeyGenerator(Random64* rand, WriteMode mode,
        uint64_t num, uint64_t num_per_set = 64 * 1024)
      : rand_(rand),
        mode_(mode),
        num_(num),
        next_(0) {
      if (mode_ == UNIQUE_RANDOM) {
        // NOTE: if memory consumption of this approach becomes a concern,
        // we can either break it into pieces and only random shuffle a section
        // each time. Alternatively, use a bit map implementation
        // (https://reviews.facebook.net/differential/diff/54627/)
        values_.resize(num_);
        for (uint64_t i = 0; i < num_; ++i) {
          values_[i] = i;
        }
2871 2872 2873
        std::shuffle(
            values_.begin(), values_.end(),
            std::default_random_engine(static_cast<unsigned int>(FLAGS_seed)));
2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897
      }
    }

    uint64_t Next() {
      switch (mode_) {
        case SEQUENTIAL:
          return next_++;
        case RANDOM:
          return rand_->Next() % num_;
        case UNIQUE_RANDOM:
          return values_[next_++];
      }
      assert(false);
      return std::numeric_limits<uint64_t>::max();
    }

   private:
    Random64* rand_;
    WriteMode mode_;
    const uint64_t num_;
    uint64_t next_;
    std::vector<uint64_t> values_;
  };

2898
  DB* SelectDB(ThreadState* thread) {
2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910
    return SelectDBWithCfh(thread)->db;
  }

  DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
    return SelectDBWithCfh(thread->rand.Next());
  }

  DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
    if (db_.db != nullptr) {
      return &db_;
    } else  {
      return &multi_dbs_[rand_int % multi_dbs_.size()];
2911 2912
    }
  }
2913

2914 2915
  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
2916
    const int64_t num_ops = writes_ == 0 ? num_ : writes_;
2917

2918
    size_t num_key_gens = 1;
2919
    if (db_.db == nullptr) {
2920 2921 2922
      num_key_gens = multi_dbs_.size();
    }
    std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
2923 2924 2925 2926 2927 2928 2929 2930 2931
    int64_t max_ops = num_ops * num_key_gens;
    int64_t ops_per_stage = max_ops;
    if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
      ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
                                       FLAGS_num_hot_column_families) +
                      1;
    }

    Duration duration(test_duration, max_ops, ops_per_stage);
2932
    for (size_t i = 0; i < num_key_gens; i++) {
2933
      key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_,
2934
                                         ops_per_stage));
2935
    }
M
Mark Callaghan 已提交
2936

2937
    if (num_ != FLAGS_num) {
2938
      char msg[100];
2939
      snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
2940
      thread->stats.AddMessage(msg);
2941 2942
    }

2943
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
2944 2945
    WriteBatch batch;
    Status s;
2946
    int64_t bytes = 0;
L
Lei Jin 已提交
2947

2948 2949
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
2950
    int64_t stage = 0;
M
Mark Callaghan 已提交
2951
    while (!duration.Done(entries_per_batch_)) {
2952 2953 2954 2955 2956 2957 2958 2959 2960 2961
      if (duration.GetStage() != stage) {
        stage = duration.GetStage();
        if (db_.db != nullptr) {
          db_.CreateNewCf(open_options_, stage);
        } else {
          for (auto& db : multi_dbs_) {
            db.CreateNewCf(open_options_, stage);
          }
        }
      }
2962

2963 2964
      size_t id = thread->rand.Next() % num_key_gens;
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
J
jorlow@chromium.org 已提交
2965
      batch.Clear();
2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976

      if (thread->shared->write_rate_limiter.get() != nullptr) {
        thread->shared->write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_),
            Env::IO_HIGH);
        // Set time at which last op finished to Now() to hide latency and
        // sleep from rate limiter. Also, do the check once per batch, not
        // once per write.
        thread->stats.ResetLastOpTime();
      }

L
Lei Jin 已提交
2977
      for (int64_t j = 0; j < entries_per_batch_; j++) {
2978 2979 2980 2981 2982 2983 2984 2985
        int64_t rand_num = key_gens[id]->Next();
        GenerateKeyFromInt(rand_num, FLAGS_num, &key);
        if (FLAGS_num_column_families <= 1) {
          batch.Put(key, gen.Generate(value_size_));
        } else {
          // We use same rand_num as seed for key and column family so that we
          // can deterministically find the cfh corresponding to a particular
          // key while reading the key.
2986 2987
          batch.Put(db_with_cfh->GetCfh(rand_num), key,
                    gen.Generate(value_size_));
2988
        }
L
Lei Jin 已提交
2989
        bytes += value_size_ + key_size_;
2990
      }
2991
      s = db_with_cfh->db->Write(write_options_, &batch);
2992
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
2993
                                entries_per_batch_, kWrite);
J
jorlow@chromium.org 已提交
2994 2995 2996 2997 2998
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
    }
2999
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
3000 3001
  }

3002
  void ReadSequential(ThreadState* thread) {
3003 3004
    if (db_.db != nullptr) {
      ReadSequential(thread, db_.db);
3005
    } else {
3006 3007
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadSequential(thread, db_with_cfh.db);
3008 3009 3010 3011 3012
      }
    }
  }

  void ReadSequential(ThreadState* thread, DB* db) {
3013 3014 3015 3016
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;

    Iterator* iter = db->NewIterator(options);
3017
    int64_t i = 0;
3018
    int64_t bytes = 0;
3019
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
3020
      bytes += iter->key().size() + iter->value().size();
3021
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
3022 3023 3024
      ++i;
    }
    delete iter;
3025
    thread->stats.AddBytes(bytes);
3026 3027
  }

3028
  void ReadReverse(ThreadState* thread) {
3029 3030
    if (db_.db != nullptr) {
      ReadReverse(thread, db_.db);
3031
    } else {
3032 3033
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadReverse(thread, db_with_cfh.db);
3034 3035 3036 3037 3038 3039
      }
    }
  }

  void ReadReverse(ThreadState* thread, DB* db) {
    Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
3040
    int64_t i = 0;
3041
    int64_t bytes = 0;
3042
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
3043
      bytes += iter->key().size() + iter->value().size();
3044
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
3045 3046 3047
      ++i;
    }
    delete iter;
3048
    thread->stats.AddBytes(bytes);
3049 3050
  }

L
Lei Jin 已提交
3051 3052 3053
  void ReadRandomFast(ThreadState* thread) {
    int64_t read = 0;
    int64_t found = 0;
3054
    int64_t nonexist = 0;
L
Lei Jin 已提交
3055
    ReadOptions options(FLAGS_verify_checksum, true);
3056 3057
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071
    std::string value;
    DB* db = SelectDBWithCfh(thread)->db;

    int64_t pot = 1;
    while (pot < FLAGS_num) {
      pot <<= 1;
    }

    Duration duration(FLAGS_duration, reads_);
    do {
      for (int i = 0; i < 100; ++i) {
        int64_t key_rand = thread->rand.Next() & (pot - 1);
        GenerateKeyFromInt(key_rand, FLAGS_num, &key);
        ++read;
3072 3073
        auto status = db->Get(options, key, &value);
        if (status.ok()) {
L
Lei Jin 已提交
3074
          ++found;
3075
        } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3076 3077
          fprintf(stderr, "Get returned an error: %s\n",
                  status.ToString().c_str());
3078
          abort();
L
Lei Jin 已提交
3079
        }
3080 3081 3082
        if (key_rand >= FLAGS_num) {
          ++nonexist;
        }
L
Lei Jin 已提交
3083
      }
3084
      thread->stats.FinishedOps(nullptr, db, 100, kRead);
L
Lei Jin 已提交
3085 3086 3087
    } while (!duration.Done(100));

    char msg[100];
3088 3089 3090
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
             "issued %" PRIu64 " non-exist keys)\n",
             found, read, nonexist);
L
Lei Jin 已提交
3091 3092 3093 3094 3095 3096 3097 3098

    thread->stats.AddMessage(msg);

    if (FLAGS_perf_level > 0) {
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109
  int64_t GetRandomKey(Random64* rand) {
    uint64_t rand_int = rand->Next();
    int64_t key_rand;
    if (read_random_exp_range_ == 0) {
      key_rand = rand_int % FLAGS_num;
    } else {
      const uint64_t kBigInt = static_cast<uint64_t>(1U) << 62;
      long double order = -static_cast<long double>(rand_int % kBigInt) /
                          static_cast<long double>(kBigInt) *
                          read_random_exp_range_;
      long double exp_ran = std::exp(order);
3110
      uint64_t rand_num =
3111
          static_cast<int64_t>(exp_ran * static_cast<long double>(FLAGS_num));
3112 3113 3114 3115
      // Map to a different number to avoid locality.
      const uint64_t kBigPrime = 0x5bd1e995;
      // Overflow is like %(2^64). Will have little impact of results.
      key_rand = static_cast<int64_t>((rand_num * kBigPrime) % FLAGS_num);
3116 3117 3118 3119
    }
    return key_rand;
  }

3120
  void ReadRandom(ThreadState* thread) {
L
Lei Jin 已提交
3121
    int64_t read = 0;
L
Lei Jin 已提交
3122
    int64_t found = 0;
3123
    int64_t bytes = 0;
L
Lei Jin 已提交
3124
    ReadOptions options(FLAGS_verify_checksum, true);
3125 3126
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3127
    std::string value;
3128

L
Lei Jin 已提交
3129 3130
    Duration duration(FLAGS_duration, reads_);
    while (!duration.Done(1)) {
3131 3132 3133 3134
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
      // We use same key_rand as seed for key and column family so that we can
      // deterministically find the cfh corresponding to a particular key, as it
      // is done in DoWrite method.
3135
      int64_t key_rand = GetRandomKey(&thread->rand);
3136
      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
L
Lei Jin 已提交
3137
      read++;
3138 3139
      Status s;
      if (FLAGS_num_column_families > 1) {
3140 3141
        s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
                                 &value);
3142 3143 3144 3145
      } else {
        s = db_with_cfh->db->Get(options, key, &value);
      }
      if (s.ok()) {
L
Lei Jin 已提交
3146
        found++;
3147
        bytes += key.size() + value.size();
3148
      } else if (!s.IsNotFound()) {
I
Igor Canadi 已提交
3149
        fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
3150
        abort();
M
Mark Callaghan 已提交
3151
      }
3152
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
S
Sanjay Ghemawat 已提交
3153
    }
3154

S
Sanjay Ghemawat 已提交
3155
    char msg[100];
L
Lei Jin 已提交
3156
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
3157
             found, read);
3158

3159
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
3160
    thread->stats.AddMessage(msg);
3161 3162 3163 3164

    if (FLAGS_perf_level > 0) {
      thread->stats.AddMessage(perf_context.ToString());
    }
S
Sanjay Ghemawat 已提交
3165 3166
  }

L
Lei Jin 已提交
3167 3168 3169 3170
  // Calls MultiGet over a list of keys from a random distribution.
  // Returns the total number of keys found.
  void MultiReadRandom(ThreadState* thread) {
    int64_t read = 0;
3171
    int64_t found = 0;
3172
    ReadOptions options(FLAGS_verify_checksum, true);
S
sdong 已提交
3173
    std::vector<Slice> keys;
3174
    std::vector<std::unique_ptr<const char[]> > key_guards;
L
Lei Jin 已提交
3175
    std::vector<std::string> values(entries_per_batch_);
3176
    while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
3177
      key_guards.push_back(std::unique_ptr<const char[]>());
3178
      keys.push_back(AllocateKey(&key_guards.back()));
J
jorlow@chromium.org 已提交
3179 3180
    }

M
Mark Callaghan 已提交
3181
    Duration duration(FLAGS_duration, reads_);
L
Lei Jin 已提交
3182
    while (!duration.Done(1)) {
3183
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3184
      for (int64_t i = 0; i < entries_per_batch_; ++i) {
3185
        GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
3186
      }
3187
      std::vector<Status> statuses = db->MultiGet(options, keys, &values);
3188
      assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
L
Lei Jin 已提交
3189 3190 3191 3192

      read += entries_per_batch_;
      for (int64_t i = 0; i < entries_per_batch_; ++i) {
        if (statuses[i].ok()) {
3193
          ++found;
3194 3195 3196 3197
        } else if (!statuses[i].IsNotFound()) {
          fprintf(stderr, "MultiGet returned an error: %s\n",
                  statuses[i].ToString().c_str());
          abort();
3198 3199
        }
      }
3200
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
3201
    }
3202 3203

    char msg[100];
3204
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
L
Lei Jin 已提交
3205
             found, read);
3206
    thread->stats.AddMessage(msg);
3207 3208
  }

3209 3210 3211 3212
  void IteratorCreation(ThreadState* thread) {
    Duration duration(FLAGS_duration, reads_);
    ReadOptions options(FLAGS_verify_checksum, true);
    while (!duration.Done(1)) {
3213 3214
      DB* db = SelectDB(thread);
      Iterator* iter = db->NewIterator(options);
3215
      delete iter;
3216
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
3217 3218 3219
    }
  }

3220 3221 3222 3223
  void IteratorCreationWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      IteratorCreation(thread);
    } else {
3224
      BGWriter(thread, kWrite);
3225 3226 3227
    }
  }

S
Sanjay Ghemawat 已提交
3228
  void SeekRandom(ThreadState* thread) {
L
Lei Jin 已提交
3229
    int64_t read = 0;
3230
    int64_t found = 0;
3231
    int64_t bytes = 0;
L
Lei Jin 已提交
3232 3233
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;
3234 3235 3236

    Iterator* single_iter = nullptr;
    std::vector<Iterator*> multi_iters;
3237 3238
    if (db_.db != nullptr) {
      single_iter = db_.db->NewIterator(options);
3239
    } else {
3240 3241
      for (const auto& db_with_cfh : multi_dbs_) {
        multi_iters.push_back(db_with_cfh.db->NewIterator(options));
3242 3243 3244
      }
    }

3245 3246
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3247 3248

    Duration duration(FLAGS_duration, reads_);
3249
    char value_buffer[256];
M
Mark Callaghan 已提交
3250
    while (!duration.Done(1)) {
M
Mark Callaghan 已提交
3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261
      if (!FLAGS_use_tailing_iterator) {
        if (db_.db != nullptr) {
          delete single_iter;
          single_iter = db_.db->NewIterator(options);
        } else {
          for (auto iter : multi_iters) {
            delete iter;
          }
          multi_iters.clear();
          for (const auto& db_with_cfh : multi_dbs_) {
            multi_iters.push_back(db_with_cfh.db->NewIterator(options));
3262 3263 3264
          }
        }
      }
3265 3266 3267 3268 3269 3270
      // Pick a Iterator to use
      Iterator* iter_to_use = single_iter;
      if (single_iter == nullptr) {
        iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
      }

L
Lei Jin 已提交
3271
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
3272
      iter_to_use->Seek(key);
L
Lei Jin 已提交
3273
      read++;
3274
      if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
L
Lei Jin 已提交
3275 3276
        found++;
      }
3277 3278 3279 3280 3281 3282

      for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
        // Copy out iterator's value to make sure we read them.
        Slice value = iter_to_use->value();
        memcpy(value_buffer, value.data(),
               std::min(value.size(), sizeof(value_buffer)));
3283
        bytes += iter_to_use->key().size() + iter_to_use->value().size();
M
Mark Callaghan 已提交
3284 3285 3286 3287 3288 3289

        if (!FLAGS_reverse_iterator) {
          iter_to_use->Next();
        } else {
          iter_to_use->Prev();
        }
3290 3291 3292
        assert(iter_to_use->status().ok());
      }

3293
      thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
S
Sanjay Ghemawat 已提交
3294
    }
3295 3296 3297 3298
    delete single_iter;
    for (auto iter : multi_iters) {
      delete iter;
    }
L
Lei Jin 已提交
3299

S
Sanjay Ghemawat 已提交
3300
    char msg[100];
L
Lei Jin 已提交
3301
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
3302
             found, read);
3303
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
3304
    thread->stats.AddMessage(msg);
L
Lei Jin 已提交
3305 3306 3307
    if (FLAGS_perf_level > 0) {
      thread->stats.AddMessage(perf_context.ToString());
    }
S
Sanjay Ghemawat 已提交
3308
  }
L
Lei Jin 已提交
3309 3310 3311 3312 3313

  void SeekRandomWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
3314
      BGWriter(thread, kWrite);
L
Lei Jin 已提交
3315 3316
    }
  }
S
Sanjay Ghemawat 已提交
3317

3318 3319 3320 3321 3322 3323 3324 3325
  void SeekRandomWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

S
Sanjay Ghemawat 已提交
3326 3327
  void DoDelete(ThreadState* thread, bool seq) {
    WriteBatch batch;
Y
Yueh-Hsuan Chiang 已提交
3328
    Duration duration(seq ? 0 : FLAGS_duration, deletes_);
L
Lei Jin 已提交
3329
    int64_t i = 0;
3330 3331
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3332

M
Mark Callaghan 已提交
3333
    while (!duration.Done(entries_per_batch_)) {
3334
      DB* db = SelectDB(thread);
S
Sanjay Ghemawat 已提交
3335
      batch.Clear();
L
Lei Jin 已提交
3336 3337 3338
      for (int64_t j = 0; j < entries_per_batch_; ++j) {
        const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
        GenerateKeyFromInt(k, FLAGS_num, &key);
3339
        batch.Delete(key);
S
Sanjay Ghemawat 已提交
3340
      }
3341
      auto s = db->Write(write_options_, &batch);
3342
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
S
Sanjay Ghemawat 已提交
3343 3344 3345 3346
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
L
Lei Jin 已提交
3347
      i += entries_per_batch_;
S
Sanjay Ghemawat 已提交
3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

3359 3360 3361 3362
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
3363
      BGWriter(thread, kWrite);
3364 3365
    }
  }
3366

M
Mark Callaghan 已提交
3367 3368 3369 3370 3371 3372 3373 3374
  void ReadWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

3375
  void BGWriter(ThreadState* thread, enum OperationType write_merge) {
3376 3377
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
3378
    int64_t bytes = 0;
3379

3380 3381 3382 3383 3384
    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
3385 3386 3387 3388

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

3389 3390
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
3391 3392

    while (true) {
3393
      DB* db = SelectDB(thread);
3394 3395 3396 3397 3398
      {
        MutexLock l(&thread->shared->mu);
        if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
          // Other threads have finished
          break;
3399
        }
3400 3401 3402
      }

      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
3403 3404
      Status s;

3405
      if (write_merge == kWrite) {
M
Mark Callaghan 已提交
3406 3407 3408 3409 3410
          s = db->Put(write_options_, key, gen.Generate(value_size_));
      } else {
          s = db->Merge(write_options_, key, gen.Generate(value_size_));
      }

3411
      if (!s.ok()) {
M
Mark Callaghan 已提交
3412
        fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
3413 3414
        exit(1);
      }
3415
      bytes += key.size() + value_size_;
3416
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
3417

3418 3419 3420 3421
      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_),
            Env::IO_HIGH);
3422 3423
      }
    }
3424
    thread->stats.AddBytes(bytes);
3425 3426
  }

3427
  // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
3428
  // in DB atomically i.e in a single batch. Also refer GetMany.
3429 3430
  Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
                 const Slice& value) {
3431 3432 3433 3434 3435 3436 3437 3438 3439 3440
    std::string suffixes[3] = {"2", "1", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Put(keys[i], value);
    }

3441
    s = db->Write(writeoptions, &batch);
3442 3443 3444 3445 3446
    return s;
  }


  // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
3447
  // in DB atomically i.e in a single batch. Also refer GetMany.
3448 3449
  Status DeleteMany(DB* db, const WriteOptions& writeoptions,
                    const Slice& key) {
3450 3451 3452 3453 3454 3455 3456 3457 3458 3459
    std::string suffixes[3] = {"1", "2", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Delete(keys[i]);
    }

3460
    s = db->Write(writeoptions, &batch);
3461 3462 3463 3464 3465
    return s;
  }

  // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
  // in the same snapshot, and verifies that all the values are identical.
3466
  // ASSUMES that PutMany was used to put (K, V) into the DB.
3467 3468
  Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
                 std::string* value) {
3469 3470 3471 3472 3473
    std::string suffixes[3] = {"0", "1", "2"};
    std::string keys[3];
    Slice key_slices[3];
    std::string values[3];
    ReadOptions readoptionscopy = readoptions;
3474
    readoptionscopy.snapshot = db->GetSnapshot();
3475 3476 3477 3478
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      key_slices[i] = keys[i];
3479
      s = db->Get(readoptionscopy, key_slices[i], value);
3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
      } else {
        values[i] = *value;
      }
    }
3491
    db->ReleaseSnapshot(readoptionscopy.snapshot);
3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504

    if ((values[0] != values[1]) || (values[1] != values[2])) {
      fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
              key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
              values[2].c_str());
      // we continue after error rather than exiting so that we can
      // find more errors if any
    }

    return s;
  }

  // Differs from readrandomwriterandom in the following ways:
3505
  // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
3506 3507 3508 3509
  // (b) Does deletes as well (per FLAGS_deletepercent)
  // (c) In order to achieve high % of 'found' during lookups, and to do
  //     multiple writes (including puts and deletes) it uses upto
  //     FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
3510
  // (d) Does not have a MultiGet option.
3511 3512 3513 3514
  void RandomWithVerify(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3515
    int64_t found = 0;
3516 3517 3518
    int get_weight = 0;
    int put_weight = 0;
    int delete_weight = 0;
3519 3520 3521
    int64_t gets_done = 0;
    int64_t puts_done = 0;
    int64_t deletes_done = 0;
3522

3523 3524
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3525

3526
    // the number of iterations is the larger of read_ or write_
3527
    for (int64_t i = 0; i < readwrites_; i++) {
3528
      DB* db = SelectDB(thread);
3529
      if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
3530
        // one batch completed, reinitialize for next batch
3531 3532 3533 3534
        get_weight = FLAGS_readwritepercent;
        delete_weight = FLAGS_deletepercent;
        put_weight = 100 - get_weight - delete_weight;
      }
L
Lei Jin 已提交
3535 3536
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
          FLAGS_numdistinct, &key);
3537 3538
      if (get_weight > 0) {
        // do all the gets first
3539
        Status s = GetMany(db, options, key, &value);
3540
        if (!s.ok() && !s.IsNotFound()) {
3541
          fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
3542 3543 3544 3545 3546 3547 3548
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
        get_weight--;
        gets_done++;
3549
        thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
3550 3551 3552
      } else if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
3553
        Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
3554
        if (!s.ok()) {
3555
          fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
3556 3557 3558 3559
          exit(1);
        }
        put_weight--;
        puts_done++;
3560
        thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
3561
      } else if (delete_weight > 0) {
3562
        Status s = DeleteMany(db, write_options_, key);
3563
        if (!s.ok()) {
3564
          fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
3565 3566 3567 3568
          exit(1);
        }
        delete_weight--;
        deletes_done++;
3569
        thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
3570 3571 3572
      }
    }
    char msg[100];
3573
    snprintf(msg, sizeof(msg),
3574 3575
             "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
             PRIu64 " found:%" PRIu64 ")",
3576 3577 3578 3579
             gets_done, puts_done, deletes_done, readwrites_, found);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
3580
  // This is different from ReadWhileWriting because it does not use
3581
  // an extra thread.
3582 3583 3584 3585
  void ReadRandomWriteRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3586
    int64_t found = 0;
3587 3588
    int get_weight = 0;
    int put_weight = 0;
3589 3590
    int64_t reads_done = 0;
    int64_t writes_done = 0;
M
Mark Callaghan 已提交
3591 3592
    Duration duration(FLAGS_duration, readwrites_);

3593 3594
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3595

3596
    // the number of iterations is the larger of read_ or write_
M
Mark Callaghan 已提交
3597
    while (!duration.Done(1)) {
3598
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3599
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
3600
      if (get_weight == 0 && put_weight == 0) {
X
Xing Jin 已提交
3601
        // one batch completed, reinitialize for next batch
3602 3603 3604 3605 3606
        get_weight = FLAGS_readwritepercent;
        put_weight = 100 - get_weight;
      }
      if (get_weight > 0) {
        // do all the gets first
3607
        Status s = db->Get(options, key, &value);
3608 3609 3610 3611 3612 3613 3614
        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
3615 3616
        get_weight--;
        reads_done++;
3617
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
3618 3619 3620
      } else  if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
3621
        Status s = db->Put(write_options_, key, gen.Generate(value_size_));
3622 3623 3624 3625 3626 3627
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        put_weight--;
        writes_done++;
3628
        thread->stats.FinishedOps(nullptr, db, 1, kWrite);
3629 3630 3631
      }
    }
    char msg[100];
3632 3633
    snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
             " total:%" PRIu64 " found:%" PRIu64 ")",
3634
             reads_done, writes_done, readwrites_, found);
3635 3636 3637
    thread->stats.AddMessage(msg);
  }

M
Mark Callaghan 已提交
3638 3639 3640 3641 3642 3643
  //
  // Read-modify-write for random keys
  void UpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3644
    int64_t found = 0;
3645
    int64_t bytes = 0;
M
Mark Callaghan 已提交
3646 3647
    Duration duration(FLAGS_duration, readwrites_);

3648 3649
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
M
Mark Callaghan 已提交
3650 3651
    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
3652
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3653
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
3654

3655 3656 3657
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
3658
        bytes += key.size() + value.size();
3659
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3660 3661
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
3662
        abort();
M
Mark Callaghan 已提交
3663 3664
      }

3665
      Status s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
3666 3667 3668 3669
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
3670
      bytes += key.size() + value_size_;
3671
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
M
Mark Callaghan 已提交
3672 3673
    }
    char msg[100];
3674
    snprintf(msg, sizeof(msg),
3675
             "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
3676
    thread->stats.AddBytes(bytes);
M
Mark Callaghan 已提交
3677 3678 3679
    thread->stats.AddMessage(msg);
  }

D
Deon Nicholas 已提交
3680 3681 3682 3683 3684 3685 3686
  // Read-modify-write for random keys.
  // Each operation causes the key grow by value_size (simulating an append).
  // Generally used for benchmarking against merges of similar type
  void AppendRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3687
    int64_t found = 0;
3688
    int64_t bytes = 0;
D
Deon Nicholas 已提交
3689

3690 3691
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
3692 3693 3694
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
3695
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3696
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
D
Deon Nicholas 已提交
3697

3698 3699 3700
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
3701
        bytes += key.size() + value.size();
3702
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3703 3704
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
3705
        abort();
D
Deon Nicholas 已提交
3706 3707 3708 3709 3710 3711 3712 3713
      } else {
        // If not existing, then just assume an empty string of data
        value.clear();
      }

      // Update the value (by appending data)
      Slice operand = gen.Generate(value_size_);
      if (value.size() > 0) {
3714
        // Use a delimiter to match the semantics for StringAppendOperator
D
Deon Nicholas 已提交
3715 3716 3717 3718 3719
        value.append(1,',');
      }
      value.append(operand.data(), operand.size());

      // Write back to the database
3720
      Status s = db->Put(write_options_, key, value);
D
Deon Nicholas 已提交
3721 3722 3723 3724
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
3725
      bytes += key.size() + value.size();
3726
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
D
Deon Nicholas 已提交
3727
    }
L
Lei Jin 已提交
3728

D
Deon Nicholas 已提交
3729
    char msg[100];
3730 3731
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
            readwrites_, found);
3732
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
3733 3734 3735 3736 3737 3738 3739 3740 3741 3742
    thread->stats.AddMessage(msg);
  }

  // Read-modify-write for random keys (using MergeOperator)
  // The merge operator to use should be defined by FLAGS_merge_operator
  // Adjust FLAGS_value_size so that the keys are reasonable for this operator
  // Assumes that the merge operator is non-null (i.e.: is well-defined)
  //
  // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
  // to simulate random additions over 64-bit integers using merge.
3743 3744 3745
  //
  // The number of merges on the same key can be controlled by adjusting
  // FLAGS_merge_keys.
D
Deon Nicholas 已提交
3746 3747
  void MergeRandom(ThreadState* thread) {
    RandomGenerator gen;
3748
    int64_t bytes = 0;
3749 3750
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
3751 3752 3753
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
3754
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3755
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
D
Deon Nicholas 已提交
3756

3757
      Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
D
Deon Nicholas 已提交
3758 3759 3760 3761 3762

      if (!s.ok()) {
        fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
        exit(1);
      }
3763
      bytes += key.size() + value_size_;
3764
      thread->stats.FinishedOps(nullptr, db, 1, kMerge);
D
Deon Nicholas 已提交
3765 3766 3767 3768
    }

    // Print some statistics
    char msg[100];
3769
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
3770
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
3771 3772 3773
    thread->stats.AddMessage(msg);
  }

3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784
  // Read and merge random keys. The amount of reads and merges are controlled
  // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
  // keys (and thus also the number of reads and merges on the same key) can be
  // adjusted with FLAGS_merge_keys.
  //
  // As with MergeRandom, the merge operator to use should be defined by
  // FLAGS_merge_operator.
  void ReadRandomMergeRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3785 3786 3787
    int64_t num_hits = 0;
    int64_t num_gets = 0;
    int64_t num_merges = 0;
3788 3789
    size_t max_length = 0;

3790 3791
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
3792 3793 3794
    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
3795
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3796
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
3797 3798 3799 3800

      bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;

      if (do_merge) {
3801
        Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
3802 3803 3804 3805 3806
        if (!s.ok()) {
          fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
          exit(1);
        }
        num_merges++;
3807
        thread->stats.FinishedOps(nullptr, db, 1, kMerge);
3808
      } else {
3809
        Status s = db->Get(options, key, &value);
3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820
        if (value.length() > max_length)
          max_length = value.length();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          num_hits++;
        }
        num_gets++;
3821
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
3822 3823
      }
    }
L
Lei Jin 已提交
3824

3825 3826
    char msg[100];
    snprintf(msg, sizeof(msg),
S
sdong 已提交
3827 3828
             "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64
             " hits:%" PRIu64 " maxlength:%" ROCKSDB_PRIszt ")",
3829 3830 3831 3832
             num_gets, num_merges, readwrites_, num_hits, max_length);
    thread->stats.AddMessage(msg);
  }

T
Tomislav Novak 已提交
3833 3834 3835 3836 3837 3838 3839 3840 3841 3842
  void WriteSeqSeekSeq(ThreadState* thread) {
    writes_ = FLAGS_num;
    DoWrite(thread, SEQUENTIAL);
    // exclude writes from the ops/sec calculation
    thread->stats.Start(thread->tid);

    DB* db = SelectDB(thread);
    std::unique_ptr<Iterator> iter(
      db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));

3843 3844
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
T
Tomislav Novak 已提交
3845 3846 3847 3848
    for (int64_t i = 0; i < FLAGS_num; ++i) {
      GenerateKeyFromInt(i, FLAGS_num, &key);
      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
3849
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
3850

3851
      for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
M
Mark Callaghan 已提交
3852 3853 3854 3855 3856
        if (!FLAGS_reverse_iterator) {
          iter->Next();
        } else {
          iter->Prev();
        }
T
Tomislav Novak 已提交
3857 3858
        GenerateKeyFromInt(++i, FLAGS_num, &key);
        assert(iter->Valid() && iter->key() == key);
3859
        thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
3860 3861 3862 3863
      }

      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
3864
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
3865 3866 3867
    }
  }

3868
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884
  // This benchmark stress tests Transactions.  For a given --duration (or
  // total number of --writes, a Transaction will perform a read-modify-write
  // to increment the value of a key in each of N(--transaction-sets) sets of
  // keys (where each set has --num keys).  If --threads is set, this will be
  // done in parallel.
  //
  // To test transactions, use --transaction_db=true.  Not setting this
  // parameter
  // will run the same benchmark without transactions.
  //
  // RandomTransactionVerify() will then validate the correctness of the results
  // by checking if the sum of all keys in each set is the same.
  void RandomTransaction(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    Duration duration(FLAGS_duration, readwrites_);
    ReadOptions read_options(FLAGS_verify_checksum, true);
S
SherlockNoMad 已提交
3885
    uint16_t num_prefix_ranges = static_cast<uint16_t>(FLAGS_transaction_sets);
A
agiardullo 已提交
3886
    uint64_t transactions_done = 0;
A
agiardullo 已提交
3887 3888 3889 3890 3891 3892

    if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
      fprintf(stderr, "invalid value for transaction_sets\n");
      abort();
    }

A
agiardullo 已提交
3893 3894 3895 3896 3897 3898 3899 3900
    TransactionOptions txn_options;
    txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
    txn_options.set_snapshot = FLAGS_transaction_set_snapshot;

    RandomTransactionInserter inserter(&thread->rand, write_options_,
                                       read_options, FLAGS_num,
                                       num_prefix_ranges);

A
agiardullo 已提交
3901 3902 3903 3904 3905 3906 3907 3908
    if (FLAGS_num_multi_db > 1) {
      fprintf(stderr,
              "Cannot run RandomTransaction benchmark with "
              "FLAGS_multi_db > 1.");
      abort();
    }

    while (!duration.Done(1)) {
A
agiardullo 已提交
3909
      bool success;
A
agiardullo 已提交
3910

A
agiardullo 已提交
3911 3912
      // RandomTransactionInserter will attempt to insert a key for each
      // # of FLAGS_transaction_sets
A
agiardullo 已提交
3913
      if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
3914
        success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
A
agiardullo 已提交
3915 3916
      } else if (FLAGS_transaction_db) {
        TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);
A
agiardullo 已提交
3917
        success = inserter.TransactionDBInsert(txn_db, txn_options);
A
agiardullo 已提交
3918
      } else {
A
agiardullo 已提交
3919
        success = inserter.DBInsert(db_.db);
A
agiardullo 已提交
3920 3921
      }

A
agiardullo 已提交
3922 3923 3924 3925
      if (!success) {
        fprintf(stderr, "Unexpected error: %s\n",
                inserter.GetLastStatus().ToString().c_str());
        abort();
3926 3927
      }

A
agiardullo 已提交
3928
      thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
A
agiardullo 已提交
3929 3930 3931 3932
      transactions_done++;
    }

    char msg[100];
A
agiardullo 已提交
3933
    if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
A
agiardullo 已提交
3934 3935
      snprintf(msg, sizeof(msg),
               "( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
A
agiardullo 已提交
3936
               transactions_done, inserter.GetFailureCount());
A
agiardullo 已提交
3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950
    } else {
      snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
    }
    thread->stats.AddMessage(msg);

    if (FLAGS_perf_level > 0) {
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

  // Verifies consistency of data after RandomTransaction() has been run.
  // Since each iteration of RandomTransaction() incremented a key in each set
  // by the same value, the sum of the keys in each set should be the same.
  void RandomTransactionVerify() {
A
agiardullo 已提交
3951
    if (!FLAGS_transaction_db && !FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
3952 3953 3954 3955
      // transactions not used, nothing to verify.
      return;
    }

A
agiardullo 已提交
3956
    Status s =
S
SherlockNoMad 已提交
3957 3958
        RandomTransactionInserter::Verify(db_.db,
                            static_cast<uint16_t>(FLAGS_transaction_sets));
A
agiardullo 已提交
3959

A
agiardullo 已提交
3960 3961 3962 3963
    if (s.ok()) {
      fprintf(stdout, "RandomTransactionVerify Success.\n");
    } else {
      fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
A
agiardullo 已提交
3964 3965
    }
  }
3966
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
3967

A
Andres Noetzli 已提交
3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016
  // Writes and deletes random keys without overwriting keys.
  //
  // This benchmark is intended to partially replicate the behavior of MyRocks
  // secondary indices: All data is stored in keys and updates happen by
  // deleting the old version of the key and inserting the new version.
  void RandomReplaceKeys(ThreadState* thread) {
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
    size_t max_counter = 50;
    RandomGenerator gen;

    Status s;
    DB* db = SelectDB(thread);
    for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
      GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
      s = db->Put(write_options_, key, gen.Generate(value_size_));
      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }
    }

    db->GetSnapshot();

    std::default_random_engine generator;
    std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
                                                  FLAGS_stddev);
    Duration duration(FLAGS_duration, FLAGS_num);
    while (!duration.Done(1)) {
      int64_t rnd_id = static_cast<int64_t>(distribution(generator));
      int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
                                static_cast<int64_t>(0));
      GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                         &key);
      s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
                                   : db->Delete(write_options_, key);
      if (s.ok()) {
        counters[key_id] = (counters[key_id] + 1) % max_counter;
        GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                           &key);
        s = db->Put(write_options_, key, Slice());
      }

      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }

4017
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
A
Andres Noetzli 已提交
4018 4019 4020 4021 4022 4023 4024 4025 4026 4027
    }

    char msg[200];
    snprintf(msg, sizeof(msg),
             "use single deletes: %d, "
             "standard deviation: %lf\n",
             FLAGS_use_single_deletes, FLAGS_stddev);
    thread->stats.AddMessage(msg);
  }

4028
  void Compact(ThreadState* thread) {
4029
    DB* db = SelectDB(thread);
4030
    db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
J
jorlow@chromium.org 已提交
4031 4032
  }

S
Sanjay Ghemawat 已提交
4033
  void PrintStats(const char* key) {
4034 4035
    if (db_.db != nullptr) {
      PrintStats(db_.db, key, false);
4036
    }
4037 4038
    for (const auto& db_with_cfh : multi_dbs_) {
      PrintStats(db_with_cfh.db, key, true);
4039 4040 4041 4042 4043 4044 4045
    }
  }

  void PrintStats(DB* db, const char* key, bool print_header = false) {
    if (print_header) {
      fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
    }
4046
    std::string stats;
4047
    if (!db->GetProperty(key, &stats)) {
4048
      stats = "(failed)";
4049
    }
4050
    fprintf(stdout, "\n%s\n", stats.c_str());
4051
  }
J
jorlow@chromium.org 已提交
4052 4053
};

4054
int db_bench_tool(int argc, char** argv) {
I
Igor Canadi 已提交
4055
  rocksdb::port::InstallStackTraceHandler();
4056 4057 4058 4059 4060 4061
  static bool initialized = false;
  if (!initialized) {
    SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                    " [OPTIONS]...");
    initialized = true;
  }
4062
  ParseCommandLineFlags(&argc, &argv, true);
4063

4064 4065 4066
  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
J
jorlow@chromium.org 已提交
4067
  }
4068
  FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
J
jorlow@chromium.org 已提交
4069

I
Igor Canadi 已提交
4070 4071
  std::vector<std::string> fanout = rocksdb::StringSplit(
      FLAGS_max_bytes_for_level_multiplier_additional, ',');
4072
  for (size_t j = 0; j < fanout.size(); j++) {
4073
    FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
S
sdong 已提交
4074 4075 4076 4077 4078
#ifndef CYGWIN
        std::stoi(fanout[j]));
#else
        stoi(fanout[j]));
#endif
4079 4080 4081 4082 4083
  }

  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());

4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096
#ifndef ROCKSDB_LITE
  std::unique_ptr<Env> custom_env_guard;
  if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
    fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
    exit(1);
  } else if (!FLAGS_env_uri.empty()) {
    FLAGS_env = NewEnvFromUri(FLAGS_env_uri, &custom_env_guard);
    if (FLAGS_env == nullptr) {
      fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
      exit(1);
    }
  }
#endif  // ROCKSDB_LITE
4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115
  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
  }

  if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
  else {
    fprintf(stdout, "Unknown compaction fadvice:%s\n",
            FLAGS_compaction_fadvice.c_str());
  }

  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());

4116 4117 4118
  // The number of background threads should be at least as much the
  // max number of concurrent compactions.
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
4119 4120 4121
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
                                  rocksdb::Env::Priority::HIGH);

H
heyongqiang 已提交
4122
  // Choose a location for the test database if none given with --db=<path>
4123 4124 4125 4126 4127
  if (FLAGS_db.empty()) {
    std::string default_db_path;
    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
    default_db_path += "/dbbench";
    FLAGS_db = default_db_path;
H
heyongqiang 已提交
4128 4129
  }

4130 4131 4132 4133 4134 4135
  if (FLAGS_stats_interval_seconds > 0) {
    // When both are set then FLAGS_stats_interval determines the frequency
    // at which the timer is checked for FLAGS_stats_interval_seconds
    FLAGS_stats_interval = 1000;
  }

4136
  rocksdb::Benchmark benchmark;
J
jorlow@chromium.org 已提交
4137 4138 4139
  benchmark.Run();
  return 0;
}
4140
}  // namespace rocksdb
J
Jonathan Wiepert 已提交
4141
#endif