db_bench_tool.cc 152.1 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

L
liuhuahang 已提交
10
#ifndef __STDC_FORMAT_MACROS
11
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
12
#endif
13

J
Jonathan Wiepert 已提交
14
#ifdef GFLAGS
15 16 17 18 19
#ifdef NUMA
#include <numa.h>
#include <numaif.h>
#endif

D
Dmitri Smirnov 已提交
20
#ifndef OS_WIN
21
#include <unistd.h>
D
Dmitri Smirnov 已提交
22
#endif
23
#include <fcntl.h>
24
#include <inttypes.h>
25
#include <cstddef>
J
jorlow@chromium.org 已提交
26 27 28
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
29
#include <gflags/gflags.h>
30 31 32 33 34

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
35
#include <unordered_map>
36

J
jorlow@chromium.org 已提交
37 38
#include "db/db_impl.h"
#include "db/version_set.h"
A
agiardullo 已提交
39 40 41
#include "hdfs/env_hdfs.h"
#include "port/port.h"
#include "port/stack_trace.h"
42 43 44
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
45
#include "rocksdb/filter_policy.h"
A
agiardullo 已提交
46 47 48
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
S
sdong 已提交
49
#include "rocksdb/rate_limiter.h"
A
agiardullo 已提交
50
#include "rocksdb/slice.h"
51
#include "rocksdb/slice_transform.h"
52
#include "rocksdb/utilities/env_registry.h"
53
#include "rocksdb/utilities/flashcache.h"
A
agiardullo 已提交
54
#include "rocksdb/utilities/optimistic_transaction_db.h"
55
#include "rocksdb/utilities/options_util.h"
56
#include "rocksdb/utilities/sim_cache.h"
A
agiardullo 已提交
57 58
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
A
agiardullo 已提交
59
#include "rocksdb/write_batch.h"
I
Igor Canadi 已提交
60
#include "util/compression.h"
A
agiardullo 已提交
61
#include "util/crc32c.h"
J
jorlow@chromium.org 已提交
62
#include "util/histogram.h"
63
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
64
#include "util/random.h"
I
Igor Canadi 已提交
65
#include "util/statistics.h"
A
agiardullo 已提交
66
#include "util/string_util.h"
J
jorlow@chromium.org 已提交
67
#include "util/testutil.h"
A
agiardullo 已提交
68
#include "util/transaction_test_util.h"
I
xxHash  
Igor Canadi 已提交
69
#include "util/xxhash.h"
D
Deon Nicholas 已提交
70
#include "utilities/merge_operators.h"
J
jorlow@chromium.org 已提交
71

D
Dmitri Smirnov 已提交
72
#ifdef OS_WIN
S
sdong 已提交
73
#include <io.h>  // open/close
D
Dmitri Smirnov 已提交
74 75
#endif

76
namespace {
77 78 79
using GFLAGS::ParseCommandLineFlags;
using GFLAGS::RegisterFlagValidator;
using GFLAGS::SetUsageMessage;
T
Tyler Harter 已提交
80

81 82 83 84 85 86
DEFINE_string(benchmarks,
              "fillseq,"
              "fillsync,"
              "fillrandom,"
              "overwrite,"
              "readrandom,"
87 88
              "newiterator,"
              "newiteratorwhilewriting,"
L
Lei Jin 已提交
89 90
              "seekrandom,"
              "seekrandomwhilewriting,"
91
              "seekrandomwhilemerging,"
92 93 94 95
              "readseq,"
              "readreverse,"
              "compact,"
              "readrandom,"
L
Lei Jin 已提交
96
              "multireadrandom,"
97
              "readseq,"
M
Mark Callaghan 已提交
98
              "readtocache,"
99 100
              "readreverse,"
              "readwhilewriting,"
M
Mark Callaghan 已提交
101
              "readwhilemerging,"
102 103 104 105 106
              "readrandomwriterandom,"
              "updaterandom,"
              "randomwithverify,"
              "fill100K,"
              "crc32c,"
I
xxHash  
Igor Canadi 已提交
107
              "xxhash,"
A
Albert Strasheim 已提交
108 109
              "compress,"
              "uncompress,"
T
Tomislav Novak 已提交
110
              "acquireload,"
A
agiardullo 已提交
111
              "fillseekseq,"
A
Andres Noetzli 已提交
112
              "randomtransaction,"
113 114
              "randomreplacekeys,"
              "timeseries",
115

116 117
              "Comma-separated list of operations to run in the specified"
              " order. Available benchmarks:\n"
118 119 120 121 122 123 124 125 126 127 128 129 130
              "\tfillseq       -- write N values in sequential key"
              " order in async mode\n"
              "\tfillrandom    -- write N values in random key order in async"
              " mode\n"
              "\toverwrite     -- overwrite N values in random key order in"
              " async mode\n"
              "\tfillsync      -- write N/100 values in random key order in "
              "sync mode\n"
              "\tfill100K      -- write N/1000 100K values in random order in"
              " async mode\n"
              "\tdeleteseq     -- delete N keys in sequential order\n"
              "\tdeleterandom  -- delete N keys in random order\n"
              "\treadseq       -- read N times sequentially\n"
M
Mark Callaghan 已提交
131
              "\treadtocache   -- 1 thread reading database sequentially\n"
132 133 134 135 136
              "\treadreverse   -- read N times in reverse order\n"
              "\treadrandom    -- read N times in random order\n"
              "\treadmissing   -- read N missing keys in random order\n"
              "\treadwhilewriting      -- 1 writer, N threads doing random "
              "reads\n"
M
Mark Callaghan 已提交
137 138
              "\treadwhilemerging      -- 1 merger, N threads doing random "
              "reads\n"
139 140 141 142 143 144 145 146 147 148
              "\treadrandomwriterandom -- N threads doing random-read, "
              "random-write\n"
              "\tprefixscanrandom      -- prefix scan N times in random order\n"
              "\tupdaterandom  -- N threads doing read-modify-write for random "
              "keys\n"
              "\tappendrandom  -- N threads doing read-modify-write with "
              "growing values\n"
              "\tmergerandom   -- same as updaterandom/appendrandom using merge"
              " operator. "
              "Must be used with merge_operator\n"
149 150
              "\treadrandommergerandom -- perform N random read-or-merge "
              "operations. Must be used with merge_operator\n"
151
              "\tnewiterator   -- repeated iterator creation\n"
152 153 154 155 156 157
              "\tseekrandom    -- N random seeks, call Next seek_nexts times "
              "per seek\n"
              "\tseekrandomwhilewriting -- seekrandom and 1 thread doing "
              "overwrite\n"
              "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
              "merge\n"
158
              "\tcrc32c        -- repeated crc32c of 4K of data\n"
I
xxHash  
Igor Canadi 已提交
159
              "\txxhash        -- repeated xxHash of 4K of data\n"
160
              "\tacquireload   -- load N*1000 times\n"
T
Tomislav Novak 已提交
161 162
              "\tfillseekseq   -- write N values in sequential key, then read "
              "them by seeking to each key\n"
A
agiardullo 已提交
163 164
              "\trandomtransaction     -- execute N random transactions and "
              "verify correctness\n"
A
Andres Noetzli 已提交
165 166
              "\trandomreplacekeys     -- randomly replaces N keys by deleting "
              "the old version and putting the new version\n\n"
167 168
              "\ttimeseries            -- 1 writer generates time series data "
              "and multiple readers doing random reads on id\n\n"
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
              "Meta operations:\n"
              "\tcompact     -- Compact the entire DB\n"
              "\tstats       -- Print DB stats\n"
              "\tlevelstats  -- Print the number of files and bytes per level\n"
              "\tsstables    -- Print sstable info\n"
              "\theapprofile -- Dump a heap profile (if supported by this"
              " port)\n");

DEFINE_int64(num, 1000000, "Number of key/values to place in database");

DEFINE_int64(numdistinct, 1000,
             "Number of distinct keys to use. Used in RandomWithVerify to "
             "read/write on fewer keys so that gets are more likely to find the"
             " key and puts are more likely to update the same key");

184 185 186 187
DEFINE_int64(merge_keys, -1,
             "Number of distinct keys to use for MergeRandom and "
             "ReadRandomMergeRandom. "
             "If negative, there will be FLAGS_num keys.");
188
DEFINE_int32(num_column_families, 1, "Number of Column Families to use.");
189

190
DEFINE_int32(
191
    num_hot_column_families, 0,
192 193 194 195 196
    "Number of Hot Column Families. If more than 0, only write to this "
    "number of column families. After finishing all the writes to them, "
    "create new set of column families and insert to them. Only used "
    "when num_column_families > 1.");

197 198 199
DEFINE_int64(reads, -1, "Number of read operations to do.  "
             "If negative, do FLAGS_num reads.");

Y
Yueh-Hsuan Chiang 已提交
200 201 202
DEFINE_int64(deletes, -1, "Number of delete operations to do.  "
             "If negative, do FLAGS_num deletions.");

L
Lei Jin 已提交
203 204
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");

205 206 207 208 209 210 211 212 213
DEFINE_int64(seed, 0, "Seed base for random number generators. "
             "When 0 it is deterministic.");

DEFINE_int32(threads, 1, "Number of concurrent threads to run.");

DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
             " When 0 then num & reads determine the test duration");

DEFINE_int32(value_size, 100, "Size of each value");
T
Tyler Harter 已提交
214

215 216
DEFINE_int32(seek_nexts, 0,
             "How many times to call Next() after Seek() in "
217 218
             "fillseekseq, seekrandom, seekrandomwhilewriting and "
             "seekrandomwhilemerging");
T
Tomislav Novak 已提交
219

M
Mark Callaghan 已提交
220 221 222 223
DEFINE_bool(reverse_iterator, false,
            "When true use Prev rather than Next for iterators that do "
            "Seek and then Next");

224
DEFINE_bool(use_uint64_comparator, false, "use Uint64 user comparator");
225

226 227
DEFINE_int64(batch_size, 1, "Batch size");

228 229 230
static bool ValidateKeySize(const char* flagname, int32_t value) {
  return true;
}
231

232 233
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
  if (value > std::numeric_limits<uint32_t>::max()) {
234
    fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
235 236 237 238 239 240
            (unsigned long)value);
    return false;
  }
  return true;
}

241
DEFINE_int32(key_size, 16, "size of each key");
242

243 244 245
DEFINE_int32(num_multi_db, 0,
             "Number of DBs used in the benchmark. 0 means single DB.");

246 247
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
              " to this fraction of their original size after compression");
J
jorlow@chromium.org 已提交
248

249 250
DEFINE_double(read_random_exp_range, 0.0,
              "Read random's key will be generated using distribution of "
251
              "num * exp(-r) where r is uniform number from 0 to this value. "
252 253 254
              "The larger the number is, the more skewed the reads are. "
              "Only used in readrandom and multireadrandom benchmarks.");

255
DEFINE_bool(histogram, false, "Print histogram of operation timings");
J
jorlow@chromium.org 已提交
256

257 258 259 260 261 262 263 264
DEFINE_bool(enable_numa, false,
            "Make operations aware of NUMA architecture and bind memory "
            "and cpus corresponding to nodes together. In NUMA, memory "
            "in same node as CPUs are closer when compared to memory in "
            "other nodes. Reads can be faster when the process is bound to "
            "CPU and memory of same node. Use \"$numactl --hardware\" command "
            "to see NUMA memory architecture.");

265 266 267
DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
             "Number of bytes to buffer in all memtables before compacting");

268
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
269
             "Number of bytes to buffer in memtable before compacting");
270

271 272 273 274
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. Each memtable is of size"
             "write_buffer_size.");
275

276 277 278 279 280 281 282 283 284 285
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together"
             "before writing to storage. This is cheap because it is an"
             "in-memory merge. If this feature is not enabled, then all these"
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check"
             " in all of these files. Also, an in-memory merge may result in"
             " writing less data to storage if there are duplicate records "
             " in each of these individual write buffers.");
286

287 288 289 290 291 292 293 294 295 296 297 298 299 300
DEFINE_int32(max_write_buffer_number_to_maintain,
             rocksdb::Options().max_write_buffer_number_to_maintain,
             "The total maximum number of write buffers to maintain in memory "
             "including copies of buffers that have already been flushed. "
             "Unlike max_write_buffer_number, this parameter does not affect "
             "flushing. This controls the minimum amount of write history "
             "that will be available in memory for conflict checking when "
             "Transactions are used. If this value is too low, some "
             "transactions may fail at commit time due to not being able to "
             "determine whether there were any write conflicts. Setting this "
             "value to 0 will cause write buffers to be freed immediately "
             "after they are flushed.  If this value is set to -1, "
             "'max_write_buffer_number' will be used.");

301 302 303 304
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");
305

306
DEFINE_uint64(subcompactions, 1,
307 308 309 310 311
              "Maximum number of subcompactions to divide L0-L1 compactions "
              "into.");
static const bool FLAGS_subcompactions_dummy
    __attribute__((unused)) = RegisterFlagValidator(&FLAGS_subcompactions,
                                                    &ValidateUint32Range);
312

313 314 315 316 317
DEFINE_int32(max_background_flushes,
             rocksdb::Options().max_background_flushes,
             "The maximum number of concurrent background flushes"
             " that can occur in parallel.");

318 319 320
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
             "style of compaction: level-based vs universal");
321

322
static rocksdb::CompactionPri FLAGS_compaction_pri_e;
323
DEFINE_int32(compaction_pri, (int32_t)rocksdb::Options().compaction_pri,
324 325
             "priority of files to compaction: by size or by data age");

326 327 328
DEFINE_int32(universal_size_ratio, 0,
             "Percentage flexibility while comparing file size"
             " (for universal compaction only).");
329

330 331
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
             " single compaction run (for universal compaction only).");
332

333 334
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
335

336 337
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
338

339 340 341 342
DEFINE_int32(universal_compression_size_percent, -1,
             "The percentage of the database to compress for universal "
             "compaction. -1 means compress everything.");

343
DEFINE_bool(universal_allow_trivial_move, false,
344
            "Allow trivial move in universal compaction.");
345

346 347 348 349 350 351 352
DEFINE_int64(cache_size, -1,
             "Number of bytes to use as a cache of uncompressed"
             " data. Negative means use default settings.");

DEFINE_int64(simcache_size, -1,
             "Number of bytes to use as a simcache of "
             "uncompressed data. Negative means use default settings.");
J
jorlow@chromium.org 已提交
353

354 355 356
DEFINE_bool(cache_index_and_filter_blocks, false,
            "Cache index/filter blocks in block cache.");

357 358 359
DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false,
            "Pin index/filter blocks of L0 files in block cache.");

360 361
DEFINE_int32(block_size,
             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
362
             "Number of bytes in a block.");
363

364 365
DEFINE_int32(block_restart_interval,
             rocksdb::BlockBasedTableOptions().block_restart_interval,
366
             "Number of keys between restart points "
367 368 369 370 371 372
             "for delta encoding of keys in data block.");

DEFINE_int32(index_block_restart_interval,
             rocksdb::BlockBasedTableOptions().index_block_restart_interval,
             "Number of keys between restart points "
             "for delta encoding of keys in index block.");
373

374 375 376
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data.");

377 378 379 380
DEFINE_int64(row_cache_size, 0,
             "Number of bytes to use as a cache of individual rows"
             " (0 = disabled).");

381 382 383
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time"
             " (use default if == 0)");
384

385 386 387 388
DEFINE_int32(file_opening_threads, rocksdb::Options().max_file_opening_threads,
             "If open_files is set to -1, this option set the number of "
             "threads that will be used to open files during DB::Open()");

389 390 391 392 393
DEFINE_int32(new_table_reader_for_compaction_inputs, true,
             "If true, uses a separate file handle for compaction inputs");

DEFINE_int32(compaction_readahead_size, 0, "Compaction readahead size");

D
Dmitri Smirnov 已提交
394 395
DEFINE_int32(random_access_max_buffer_size, 1024 * 1024,
             "Maximum windows randomaccess buffer size");
396

I
Islam AbdelRahman 已提交
397 398
DEFINE_int32(writable_file_max_buffer_size, 1024 * 1024,
             "Maximum write buffer for Writable File");
399 400 401 402

DEFINE_int32(skip_table_builder_flush, false, "Skip flushing block in "
             "table builder ");

403 404
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
             " use default settings.");
405 406 407
DEFINE_double(memtable_bloom_size_ratio, 0,
              "Ratio of memtable size used for bloom filter. 0 means no bloom "
              "filter.");
408 409
DEFINE_bool(memtable_use_huge_page, false,
            "Try to use huge page in memtables.");
S
Sanjay Ghemawat 已提交
410

411 412 413
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
            " database.  If you set this flag and also specify a benchmark that"
            " wants a fresh database, that benchmark will fail.");
414

415 416 417 418 419
DEFINE_bool(show_table_properties, false,
            "If true, then per-level table"
            " properties will be printed on every stats-interval when"
            " stats_interval is set and stats_per_interval is on.");

420
DEFINE_string(db, "", "Use the db with the following name.");
421

422 423 424 425 426 427 428 429 430 431 432
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
  if (value >= 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(cache_numshardbits, -1, "Number of shards for the block cache"
             " is 2 ** cache_numshardbits. Negative means use default settings."
             " This is applied only if FLAGS_cache_size is non-negative.");
433

434 435
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
            " from storage");
436

437
DEFINE_bool(statistics, false, "Database statistics");
438
static class std::shared_ptr<rocksdb::Statistics> dbstats;
439

440 441
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
             " --num reads.");
H
heyongqiang 已提交
442

443
DEFINE_bool(sync, false, "Sync all writes to disk");
H
heyongqiang 已提交
444

445 446
DEFINE_bool(disable_data_sync, false, "If true, do not wait until data is"
            " synced to disk.");
M
Mark Callaghan 已提交
447

448
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
M
Mark Callaghan 已提交
449

450
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
451

L
Lei Jin 已提交
452 453
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");

454
DEFINE_int32(num_levels, 7, "The total number of levels");
H
heyongqiang 已提交
455

456
DEFINE_int64(target_file_size_base, 2 * 1048576, "Target file size at level-1");
H
heyongqiang 已提交
457

458 459
DEFINE_int32(target_file_size_multiplier, 1,
             "A multiplier to compute target level-N file size (N >= 2)");
460

461
DEFINE_uint64(max_bytes_for_level_base,  10 * 1048576, "Max bytes for level-1");
H
heyongqiang 已提交
462

463 464 465
DEFINE_bool(level_compaction_dynamic_level_bytes, false,
            "Whether level size base is dynamic");

466 467
DEFINE_int32(max_bytes_for_level_multiplier, 10,
             "A multiplier to compute max bytes for level-N (N >= 2)");
H
heyongqiang 已提交
468

469 470 471
static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
DEFINE_string(max_bytes_for_level_multiplier_additional, "",
              "A vector that specifies additional fanout per level");
472

473 474 475
DEFINE_int32(level0_stop_writes_trigger,
             rocksdb::Options().level0_stop_writes_trigger,
             "Number of files in level-0"
476
             " that will trigger put stop.");
477

478 479 480
DEFINE_int32(level0_slowdown_writes_trigger,
             rocksdb::Options().level0_slowdown_writes_trigger,
             "Number of files in level-0"
481
             " that will slow down writes.");
482

483 484 485
DEFINE_int32(level0_file_num_compaction_trigger,
             rocksdb::Options().level0_file_num_compaction_trigger,
             "Number of files in level-0"
486
             " when compactions start");
487

488 489 490 491 492 493 494 495 496 497 498 499 500
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value <= 0 || value>=100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
             " as percentage) for the ReadRandomWriteRandom workload. The "
             "default value 90 means 90% operations out of all reads and writes"
             " operations are reads. In other words, 9 gets for every 1 put.");

501 502 503 504 505
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
             " as percentage) for the ReadRandomMergeRandom workload. The"
             " default value 70 means 70% out of all read and merge operations"
             " are merges. In other words, 7 merges for every 3 gets.");

506 507 508 509 510 511
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
             "deletes (used in RandomWithVerify only). RandomWithVerify "
             "calculates writepercent as (100 - FLAGS_readwritepercent - "
             "deletepercent), so deletepercent must be smaller than (100 - "
             "FLAGS_readwritepercent)");

512 513 514 515 516
DEFINE_bool(optimize_filters_for_hits, false,
            "Optimizes bloom filters for workloads for most lookups return "
            "a value. For now this doesn't create bloom filters for the max "
            "level of the LSM to reduce metadata that should fit in RAM. ");

I
Igor Canadi 已提交
517 518
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
              "Ignored. Left here for backward compatibility");
519

520
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
521
DEFINE_bool(optimistic_transaction_db, false,
A
agiardullo 已提交
522 523 524
            "Open a OptimisticTransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
525 526 527 528
DEFINE_bool(transaction_db, false,
            "Open a TransactionDB instance. "
            "Required for randomtransaction benchmark.");

A
agiardullo 已提交
529 530 531 532
DEFINE_uint64(transaction_sets, 2,
              "Number of keys each transaction will "
              "modify (use in RandomTransaction only).  Max: 9999");

533 534 535 536
DEFINE_bool(transaction_set_snapshot, false,
            "Setting to true will have each transaction call SetSnapshot()"
            " upon creation.");

A
agiardullo 已提交
537 538 539 540
DEFINE_int32(transaction_sleep, 0,
             "Max microseconds to sleep in between "
             "reading and writing a value (used in RandomTransaction only). ");

541 542 543
DEFINE_uint64(transaction_lock_timeout, 100,
              "If using a transaction_db, specifies the lock wait timeout in"
              " milliseconds before failing a transaction waiting on a lock");
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
DEFINE_string(
    options_file, "",
    "The path to a RocksDB options file.  If specified, then db_bench will "
    "run with the RocksDB options in the default column family of the "
    "specified options file. "
    "Note that with this setting, db_bench will ONLY accept the following "
    "RocksDB options related command-line arguments, all other arguments "
    "that are related to RocksDB options will be ignored:\n"
    "\t--use_existing_db\n"
    "\t--statistics\n"
    "\t--row_cache_size\n"
    "\t--row_cache_numshardbits\n"
    "\t--enable_io_prio\n"
    "\t--disable_flashcache_for_background_threads\n"
    "\t--flashcache_dev\n"
    "\t--dump_malloc_stats\n"
    "\t--num_multi_db\n");
561
#endif  // ROCKSDB_LITE
562

563
DEFINE_bool(report_bg_io_stats, false,
564 565
            "Measure times spents on I/Os while in compactions. ");

566 567 568 569 570 571 572 573 574 575 576
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;
A
Albert Strasheim 已提交
577 578 579 580
  else if (!strcasecmp(ctype, "lz4"))
    return rocksdb::kLZ4Compression;
  else if (!strcasecmp(ctype, "lz4hc"))
    return rocksdb::kLZ4HCCompression;
581 582
  else if (!strcasecmp(ctype, "xpress"))
    return rocksdb::kXpressCompression;
583 584
  else if (!strcasecmp(ctype, "zstd"))
    return rocksdb::kZSTDNotFinalCompression;
585 586

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
587
  return rocksdb::kSnappyCompression;  // default value
588
}
589

S
sdong 已提交
590
std::string ColumnFamilyName(size_t i) {
591 592 593 594
  if (i == 0) {
    return rocksdb::kDefaultColumnFamilyName;
  } else {
    char name[100];
S
sdong 已提交
595
    snprintf(name, sizeof(name), "column_family_name_%06zu", i);
596 597 598
    return std::string(name);
  }
}
I
Igor Canadi 已提交
599

600 601 602
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
603
    rocksdb::kSnappyCompression;
604

605 606 607 608
DEFINE_int32(compression_level, -1,
             "Compression level. For zlib this should be -1 for the "
             "default level, or between 0 and 9.");

609 610 611 612
DEFINE_int32(compression_max_dict_bytes, 0,
             "Maximum size of dictionary used to prime the compression "
             "library.");

613 614 615 616 617 618 619 620 621
static bool ValidateCompressionLevel(const char* flagname, int32_t value) {
  if (value < -1 || value > 9) {
    fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n",
            flagname, value);
    return false;
  }
  return true;
}

I
Igor Canadi 已提交
622
static const bool FLAGS_compression_level_dummy __attribute__((unused)) =
623
    RegisterFlagValidator(&FLAGS_compression_level, &ValidateCompressionLevel);
624

625 626 627 628 629 630 631 632 633 634 635 636 637 638 639
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
             " from this level. Levels with number < min_level_to_compress are"
             " not compressed. Otherwise, apply compression_type to "
             "all levels.");

static bool ValidateTableCacheNumshardbits(const char* flagname,
                                           int32_t value) {
  if (0 >= value || value > 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be  0 < val <= 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(table_cache_numshardbits, 4, "");
640

641 642 643 644 645 646
#ifndef ROCKSDB_LITE
DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive"
              " with --hdfs.");
#endif  // ROCKSDB_LITE
DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with"
              " --env_uri.");
647
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
648

649 650
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
             "this is greater than zero. When 0 the interval grows over time.");
651

652 653 654
DEFINE_int64(stats_interval_seconds, 0, "Report stats every N seconds. This "
             "overrides stats_interval when both are > 0.");

655 656
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
             " this is greater than 0.");
657

658 659 660 661 662 663 664 665
DEFINE_int64(report_interval_seconds, 0,
             "If greater than zero, it will write simple stats in CVS format "
             "to --report_file every N seconds");

DEFINE_string(report_file, "report.csv",
              "Filename where some simple stats are reported to (if "
              "--report_interval_seconds is bigger than 0)");

666 667 668 669
DEFINE_int32(thread_status_per_interval, 0,
             "Takes and report a snapshot of the current status of each thread"
             " when this is greater than 0.");

670
DEFINE_int32(perf_level, rocksdb::PerfLevel::kDisable, "Level of perf collection");
671

672
static bool ValidateRateLimit(const char* flagname, double value) {
D
Dmitri Smirnov 已提交
673
  const double EPSILON = 1e-10;
674 675 676 677 678 679 680
  if ( value < -EPSILON ) {
    fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
            flagname, value);
    return false;
  }
  return true;
}
681
DEFINE_double(soft_rate_limit, 0.0, "DEPRECATED");
J
Jim Paton 已提交
682

683 684
DEFINE_double(hard_rate_limit, 0.0, "DEPRECATED");

685 686 687
DEFINE_uint64(soft_pending_compaction_bytes_limit, 64ull * 1024 * 1024 * 1024,
              "Slowdown writes if pending compaction bytes exceed this number");

688
DEFINE_uint64(hard_pending_compaction_bytes_limit, 128ull * 1024 * 1024 * 1024,
689
              "Stop writes if pending compaction bytes exceed this number");
690

S
sdong 已提交
691
DEFINE_uint64(delayed_write_rate, 8388608u,
S
sdong 已提交
692 693 694
              "Limited bytes allowed to DB when soft_rate_limit or "
              "level0_slowdown_writes_trigger triggers");

695 696 697 698 699 700 701 702 703 704 705 706 707 708
DEFINE_bool(allow_concurrent_memtable_write, false,
            "Allow multi-writers to update mem tables in parallel.");

DEFINE_bool(enable_write_thread_adaptive_yield, false,
            "Use a yielding spin loop for brief writer thread waits.");

DEFINE_uint64(
    write_thread_max_yield_usec, 100,
    "Maximum microseconds for enable_write_thread_adaptive_yield operation.");

DEFINE_uint64(write_thread_slow_yield_usec, 3,
              "The threshold at which a slow yield is considered a signal that "
              "other processes or threads want the core.");

709 710 711
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
             "When hard_rate_limit is set then this is the max time a put will"
             " be stalled.");
712

S
sdong 已提交
713 714
DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");

715 716
DEFINE_uint64(
    benchmark_write_rate_limit, 0,
717 718
    "If non-zero, db_bench will rate-limit the writes going into RocksDB. This "
    "is the global rate in bytes/second.");
719

720 721 722
DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of "
             "overlaps in grandparent (i.e., level+2) before we stop building a"
             " single file in a level->level+1 compaction.");
723

724
#ifndef ROCKSDB_LITE
725
DEFINE_bool(readonly, false, "Run read only benchmarks.");
726
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
727

728
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
729

730 731 732
DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
             " a compaction run that compacts Level-K with Level-(K+1) (for"
             " K >= 1)");
733

734 735 736
DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
              " in MB.");
737
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
738

739 740
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
            "Allow buffered io using OS buffers");
741

742 743
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
            "Allow reads to occur via mmap-ing files");
744

745 746
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
            "Allow writes to occur via mmap-ing files");
747

748 749
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
            "Advise random access on table file open");
750

751 752 753
DEFINE_string(compaction_fadvice, "NORMAL",
              "Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
754
  rocksdb::Options().access_hint_on_compaction_start;
755

756 757 758 759 760
DEFINE_bool(disable_flashcache_for_background_threads, false,
            "Disable flashcache for background threads");

DEFINE_string(flashcache_dev, "", "Path to flashcache device");

I
Igor Canadi 已提交
761 762 763
DEFINE_bool(use_tailing_iterator, false,
            "Use tailing iterator to access a series of keys instead of get");

764 765 766 767
DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
            "Use adaptive mutex");

DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
768
              "Allows OS to incrementally sync SST files to disk while they are"
769 770
              " being written, in the background. Issue one request for every"
              " bytes_per_sync written. 0 turns it off.");
771 772 773 774 775 776

DEFINE_uint64(wal_bytes_per_sync,  rocksdb::Options().wal_bytes_per_sync,
              "Allows OS to incrementally sync WAL files to disk while they are"
              " being written, in the background. Issue one request for every"
              " wal_bytes_per_sync written. 0 turns it off.");

A
Andres Noetzli 已提交
777 778 779 780 781 782 783
DEFINE_bool(use_single_deletes, true,
            "Use single deletes (used in RandomReplaceKeys only).");

DEFINE_double(stddev, 2000.0,
              "Standard deviation of normal distribution used for picking keys"
              " (used in RandomReplaceKeys only).");

784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801
DEFINE_int32(key_id_range, 100000,
             "Range of possible value of key id (used in TimeSeries only).");

DEFINE_string(expire_style, "none",
              "Style to remove expired time entries. Can be one of the options "
              "below: none (do not expired data), compaction_filter (use a "
              "compaction filter to remove expired data), delete (seek IDs and "
              "remove expired data) (used in TimeSeries only).");

DEFINE_uint64(
    time_range, 100000,
    "Range of timestamp that store in the database (used in TimeSeries"
    " only).");

DEFINE_int32(num_deletion_threads, 1,
             "Number of threads to do deletion (used in TimeSeries and delete "
             "expire_style only).");

802 803 804
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
             " operations on a key in the memtable");

805 806 807 808 809 810 811 812
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
  if (value < 0 || value>=2000000000) {
    fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
            flagname, value);
    return false;
  }
  return true;
}
L
Lei Jin 已提交
813 814
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
             "plain table");
815 816 817
DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated "
             "per prefix, 0 means no special handling of the prefix, "
             "i.e. use the prefix comes with the generated random number.");
818 819
DEFINE_bool(enable_io_prio, false, "Lower the background flush/compaction "
            "threads' IO priority");
820 821 822
DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
            "table becomes an identity function. This is only valid when key "
            "is 8 bytes");
823
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
J
Jim Paton 已提交
824 825 826 827

enum RepFactory {
  kSkipList,
  kPrefixHash,
828
  kVectorRep,
829 830
  kHashLinkedList,
  kCuckoo
J
Jim Paton 已提交
831
};
I
Igor Canadi 已提交
832

833 834 835 836 837 838 839 840 841
enum RepFactory StringToRepFactory(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
    return kPrefixHash;
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;
842 843
  else if (!strcasecmp(ctype, "hash_linkedlist"))
    return kHashLinkedList;
844 845
  else if (!strcasecmp(ctype, "cuckoo"))
    return kCuckoo;
846 847 848 849

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
I
Igor Canadi 已提交
850

J
Jim Paton 已提交
851
static enum RepFactory FLAGS_rep_factory;
852
DEFINE_string(memtablerep, "skip_list", "");
853
DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count");
L
Lei Jin 已提交
854 855
DEFINE_bool(use_plain_table, false, "if use plain table "
            "instead of block-based table format");
856 857
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
DEFINE_double(cuckoo_hash_ratio, 0.9, "Hash ratio for Cuckoo SST table.");
858 859 860
DEFINE_bool(use_hash_search, false, "if use kHashSearch "
            "instead of kBinarySearch. "
            "This is valid if only we use BlockTable");
861 862 863
DEFINE_bool(use_block_based_filter, false, "if use kBlockBasedFilter "
            "instead of kFullFilter for filter block. "
            "This is valid if only we use BlockTable");
864 865 866 867
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
              "If a new merge operator is specified, be sure to use fresh"
              " database The possible merge operators are defined in"
              " utilities/merge_operators.h");
T
Tomislav Novak 已提交
868 869 870
DEFINE_int32(skip_list_lookahead, 0, "Used with skip_list memtablerep; try "
             "linear search first for this many steps from the previous "
             "position");
871 872
DEFINE_bool(report_file_operations, false, "if report number of file "
            "operations");
D
Deon Nicholas 已提交
873

K
kailiu 已提交
874
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
875
    RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
876 877

static const bool FLAGS_hard_rate_limit_dummy __attribute__((unused)) =
878
    RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);
K
kailiu 已提交
879 880

static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
881
    RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
K
kailiu 已提交
882 883

static const bool FLAGS_key_size_dummy __attribute__((unused)) =
884
    RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);
K
kailiu 已提交
885 886

static const bool FLAGS_cache_numshardbits_dummy __attribute__((unused)) =
887 888
    RegisterFlagValidator(&FLAGS_cache_numshardbits,
                          &ValidateCacheNumshardbits);
K
kailiu 已提交
889 890

static const bool FLAGS_readwritepercent_dummy __attribute__((unused)) =
891
    RegisterFlagValidator(&FLAGS_readwritepercent, &ValidateInt32Percent);
K
kailiu 已提交
892

I
Igor Canadi 已提交
893 894 895
DEFINE_int32(disable_seek_compaction, false,
             "Not used, left here for backwards compatibility");

K
kailiu 已提交
896
static const bool FLAGS_deletepercent_dummy __attribute__((unused)) =
897 898 899 900
    RegisterFlagValidator(&FLAGS_deletepercent, &ValidateInt32Percent);
static const bool FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) =
    RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                          &ValidateTableCacheNumshardbits);
901
}  // namespace
K
kailiu 已提交
902

903
namespace rocksdb {
J
jorlow@chromium.org 已提交
904

905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
namespace {
struct ReportFileOpCounters {
  std::atomic<int> open_counter_;
  std::atomic<int> read_counter_;
  std::atomic<int> append_counter_;
  std::atomic<uint64_t> bytes_read_;
  std::atomic<uint64_t> bytes_written_;
};

// A special Env to records and report file operations in db_bench
class ReportFileOpEnv : public EnvWrapper {
 public:
  explicit ReportFileOpEnv(Env* base) : EnvWrapper(base) { reset(); }

  void reset() {
    counters_.open_counter_ = 0;
    counters_.read_counter_ = 0;
    counters_.append_counter_ = 0;
    counters_.bytes_read_ = 0;
    counters_.bytes_written_ = 0;
  }

  Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
I
Igor Sugak 已提交
928
                           const EnvOptions& soptions) override {
929 930 931 932 933 934 935 936 937 938
    class CountingFile : public SequentialFile {
     private:
      unique_ptr<SequentialFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<SequentialFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
939
      virtual Status Read(size_t n, Slice* result, char* scratch) override {
940 941 942 943 944 945 946
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }

I
Igor Sugak 已提交
947
      virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
948 949 950 951 952 953 954 955 956 957 958 959
    };

    Status s = target()->NewSequentialFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewRandomAccessFile(const std::string& f,
                             unique_ptr<RandomAccessFile>* r,
I
Igor Sugak 已提交
960
                             const EnvOptions& soptions) override {
961 962 963 964 965 966 967 968 969 970
    class CountingFile : public RandomAccessFile {
     private:
      unique_ptr<RandomAccessFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<RandomAccessFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}
      virtual Status Read(uint64_t offset, size_t n, Slice* result,
I
Igor Sugak 已提交
971
                          char* scratch) const override {
972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
        counters_->read_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Read(offset, n, result, scratch);
        counters_->bytes_read_.fetch_add(result->size(),
                                         std::memory_order_relaxed);
        return rv;
      }
    };

    Status s = target()->NewRandomAccessFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
I
Igor Sugak 已提交
989
                         const EnvOptions& soptions) override {
990 991 992 993 994 995 996 997 998 999
    class CountingFile : public WritableFile {
     private:
      unique_ptr<WritableFile> target_;
      ReportFileOpCounters* counters_;

     public:
      CountingFile(unique_ptr<WritableFile>&& target,
                   ReportFileOpCounters* counters)
          : target_(std::move(target)), counters_(counters) {}

I
Igor Sugak 已提交
1000
      Status Append(const Slice& data) override {
1001 1002 1003 1004 1005 1006 1007
        counters_->append_counter_.fetch_add(1, std::memory_order_relaxed);
        Status rv = target_->Append(data);
        counters_->bytes_written_.fetch_add(data.size(),
                                            std::memory_order_relaxed);
        return rv;
      }

1008
      Status Truncate(uint64_t size) override { return target_->Truncate(size); }
I
Igor Sugak 已提交
1009 1010 1011
      Status Close() override { return target_->Close(); }
      Status Flush() override { return target_->Flush(); }
      Status Sync() override { return target_->Sync(); }
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
    };

    Status s = target()->NewWritableFile(f, r, soptions);
    if (s.ok()) {
      counters()->open_counter_.fetch_add(1, std::memory_order_relaxed);
      r->reset(new CountingFile(std::move(*r), counters()));
    }
    return s;
  }

  // getter
  ReportFileOpCounters* counters() { return &counters_; }

 private:
  ReportFileOpCounters counters_;
};

}  // namespace

1031
// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
1032 1033 1034
class RandomGenerator {
 private:
  std::string data_;
1035
  unsigned int pos_;
J
jorlow@chromium.org 已提交
1036 1037 1038 1039 1040 1041 1042 1043

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
1044
    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
J
jorlow@chromium.org 已提交
1045 1046 1047 1048 1049 1050 1051 1052
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

1053
  Slice Generate(unsigned int len) {
1054
    assert(len <= data_.size());
J
jorlow@chromium.org 已提交
1055 1056 1057 1058 1059 1060
    if (pos_ + len > data_.size()) {
      pos_ = 0;
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
1061
};
X
Xing Jin 已提交
1062

1063 1064 1065 1066 1067 1068 1069 1070
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

1071 1072 1073
struct DBWithColumnFamilies {
  std::vector<ColumnFamilyHandle*> cfh;
  DB* db;
1074
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1075
  OptimisticTransactionDB* opt_txn_db;
1076
#endif  // ROCKSDB_LITE
1077 1078 1079 1080 1081 1082 1083
  std::atomic<size_t> num_created;  // Need to be updated after all the
                                    // new entries in cfh are set.
  size_t num_hot;  // Number of column families to be queried at each moment.
                   // After each CreateNewCf(), another num_hot number of new
                   // Column families will be created and used to be queried.
  port::Mutex create_cf_mutex;  // Only one thread can execute CreateNewCf()

1084 1085 1086 1087 1088 1089
  DBWithColumnFamilies()
      : db(nullptr)
#ifndef ROCKSDB_LITE
        , opt_txn_db(nullptr)
#endif  // ROCKSDB_LITE
  {
1090
    cfh.clear();
1091 1092
    num_created = 0;
    num_hot = 0;
1093
  }
1094 1095 1096 1097

  DBWithColumnFamilies(const DBWithColumnFamilies& other)
      : cfh(other.cfh),
        db(other.db),
1098
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1099
        opt_txn_db(other.opt_txn_db),
1100
#endif  // ROCKSDB_LITE
1101 1102 1103
        num_created(other.num_created.load()),
        num_hot(other.num_hot) {}

A
agiardullo 已提交
1104 1105 1106 1107
  void DeleteDBs() {
    std::for_each(cfh.begin(), cfh.end(),
                  [](ColumnFamilyHandle* cfhi) { delete cfhi; });
    cfh.clear();
1108
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
1109 1110 1111
    if (opt_txn_db) {
      delete opt_txn_db;
      opt_txn_db = nullptr;
A
agiardullo 已提交
1112 1113
    } else {
      delete db;
1114
      db = nullptr;
A
agiardullo 已提交
1115
    }
1116 1117
#else
    delete db;
A
agiardullo 已提交
1118
    db = nullptr;
1119
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
1120 1121
  }

1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
  ColumnFamilyHandle* GetCfh(int64_t rand_num) {
    assert(num_hot > 0);
    return cfh[num_created.load(std::memory_order_acquire) - num_hot +
               rand_num % num_hot];
  }

  // stage: assume CF from 0 to stage * num_hot has be created. Need to create
  //        stage * num_hot + 1 to stage * (num_hot + 1).
  void CreateNewCf(ColumnFamilyOptions options, int64_t stage) {
    MutexLock l(&create_cf_mutex);
    if ((stage + 1) * num_hot <= num_created) {
      // Already created.
      return;
    }
    auto new_num_created = num_created + num_hot;
    assert(new_num_created <= cfh.size());
    for (size_t i = num_created; i < new_num_created; i++) {
      Status s =
          db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i]));
      if (!s.ok()) {
        fprintf(stderr, "create column family error: %s\n",
                s.ToString().c_str());
        abort();
      }
    }
    num_created.store(new_num_created, std::memory_order_release);
  }
1149 1150
};

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
// a class that reports stats to CSV file
class ReporterAgent {
 public:
  ReporterAgent(Env* env, const std::string& fname,
                uint64_t report_interval_secs)
      : env_(env),
        total_ops_done_(0),
        last_report_(0),
        report_interval_secs_(report_interval_secs),
        stop_(false) {
    auto s = env_->NewWritableFile(fname, &report_file_, EnvOptions());
    if (s.ok()) {
      s = report_file_->Append(Header() + "\n");
    }
    if (s.ok()) {
      s = report_file_->Flush();
    }
    if (!s.ok()) {
      fprintf(stderr, "Can't open %s: %s\n", fname.c_str(),
              s.ToString().c_str());
      abort();
    }

    reporting_thread_ = std::thread([&]() { SleepAndReport(); });
  }

  ~ReporterAgent() {
    {
      std::unique_lock<std::mutex> lk(mutex_);
      stop_ = true;
      stop_cv_.notify_all();
    }
    reporting_thread_.join();
  }

  // thread safe
  void ReportFinishedOps(int64_t num_ops) {
    total_ops_done_.fetch_add(num_ops);
  }

 private:
  std::string Header() const { return "secs_elapsed,interval_qps"; }
  void SleepAndReport() {
    uint64_t kMicrosInSecond = 1000 * 1000;
    auto time_started = env_->NowMicros();
    while (true) {
      {
        std::unique_lock<std::mutex> lk(mutex_);
        if (stop_ ||
            stop_cv_.wait_for(lk, std::chrono::seconds(report_interval_secs_),
                              [&]() { return stop_; })) {
          // stopping
          break;
        }
        // else -> timeout, which means time for a report!
      }
      auto total_ops_done_snapshot = total_ops_done_.load();
      // round the seconds elapsed
      auto secs_elapsed =
          (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
          kMicrosInSecond;
      std::string report = ToString(secs_elapsed) + "," +
                           ToString(total_ops_done_snapshot - last_report_) +
                           "\n";
      auto s = report_file_->Append(report);
      if (s.ok()) {
        s = report_file_->Flush();
      }
      if (!s.ok()) {
        fprintf(stderr,
                "Can't write to report file (%s), stopping the reporting\n",
                s.ToString().c_str());
        break;
      }
      last_report_ = total_ops_done_snapshot;
    }
  }

  Env* env_;
  std::unique_ptr<WritableFile> report_file_;
  std::atomic<int64_t> total_ops_done_;
  int64_t last_report_;
  const uint64_t report_interval_secs_;
  std::thread reporting_thread_;
  std::mutex mutex_;
  // will notify on stop
  std::condition_variable stop_cv_;
  bool stop_;
};

1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
enum OperationType : unsigned char {
  kRead = 0,
  kWrite,
  kDelete,
  kSeek,
  kMerge,
  kUpdate,
  kCompress,
  kUncompress,
  kCrc,
  kHash,
  kOthers
};

static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
                          OperationTypeString = {
  {kRead, "read"},
  {kWrite, "write"},
  {kDelete, "delete"},
  {kSeek, "seek"},
  {kMerge, "merge"},
  {kUpdate, "update"},
  {kCompress, "compress"},
  {kCompress, "uncompress"},
  {kCrc, "crc"},
  {kHash, "hash"},
  {kOthers, "op"}
};

1270 1271
class Stats {
 private:
1272
  int id_;
D
Dmitri Smirnov 已提交
1273 1274
  uint64_t start_;
  uint64_t finish_;
1275
  double seconds_;
D
Dmitri Smirnov 已提交
1276 1277 1278 1279 1280 1281
  uint64_t done_;
  uint64_t last_report_done_;
  uint64_t next_report_;
  uint64_t bytes_;
  uint64_t last_op_finish_;
  uint64_t last_report_finish_;
1282
  std::unordered_map<OperationType, std::shared_ptr<HistogramImpl>,
1283
                     std::hash<unsigned char>> hist_;
1284
  std::string message_;
1285
  bool exclude_from_merge_;
1286
  ReporterAgent* reporter_agent_;  // does not own
1287 1288

 public:
1289
  Stats() { Start(-1); }
1290

1291 1292 1293 1294
  void SetReporterAgent(ReporterAgent* reporter_agent) {
    reporter_agent_ = reporter_agent;
  }

1295 1296 1297
  void Start(int id) {
    id_ = id;
    next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
1298
    last_op_finish_ = start_;
1299
    hist_.clear();
1300
    done_ = 0;
1301
    last_report_done_ = 0;
1302 1303
    bytes_ = 0;
    seconds_ = 0;
1304
    start_ = FLAGS_env->NowMicros();
1305
    finish_ = start_;
1306
    last_report_finish_ = start_;
1307
    message_.clear();
1308 1309
    // When set, stats from this thread won't be merged with others.
    exclude_from_merge_ = false;
1310 1311 1312
  }

  void Merge(const Stats& other) {
1313 1314 1315
    if (other.exclude_from_merge_)
      return;

1316 1317 1318
    for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
      auto this_it = hist_.find(it->first);
      if (this_it != hist_.end()) {
1319
        this_it->second->Merge(*(other.hist_.at(it->first)));
1320 1321 1322 1323 1324
      } else {
        hist_.insert({ it->first, it->second });
      }
    }

1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
1336
    finish_ = FLAGS_env->NowMicros();
1337 1338 1339 1340 1341 1342 1343
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

1344
  void SetId(int id) { id_ = id; }
1345
  void SetExcludeFromMerge() { exclude_from_merge_ = true; }
1346

1347 1348 1349 1350
  void PrintThreadStatus() {
    std::vector<ThreadStatus> thread_list;
    FLAGS_env->GetThreadList(&thread_list);

1351
    fprintf(stderr, "\n%18s %10s %12s %20s %13s %45s %12s %s\n",
1352
        "ThreadID", "ThreadType", "cfName", "Operation",
1353
        "ElapsedTime", "Stage", "State", "OperationProperties");
1354

1355 1356
    int64_t current_time = 0;
    Env::Default()->GetCurrentTime(&current_time);
1357
    for (auto ts : thread_list) {
1358
      fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
1359 1360 1361 1362
          ts.thread_id,
          ThreadStatus::GetThreadTypeName(ts.thread_type).c_str(),
          ts.cf_name.c_str(),
          ThreadStatus::GetOperationName(ts.operation_type).c_str(),
1363
          ThreadStatus::MicrosToString(ts.op_elapsed_micros).c_str(),
1364
          ThreadStatus::GetOperationStageName(ts.operation_stage).c_str(),
1365
          ThreadStatus::GetStateName(ts.state_type).c_str());
1366 1367 1368 1369 1370 1371 1372 1373

      auto op_properties = ThreadStatus::InterpretOperationProperties(
          ts.operation_type, ts.op_properties);
      for (const auto& op_prop : op_properties) {
        fprintf(stderr, " %s %" PRIu64" |",
            op_prop.first.c_str(), op_prop.second);
      }
      fprintf(stderr, "\n");
1374 1375 1376
    }
  }

1377 1378 1379 1380 1381
  void ResetLastOpTime() {
    // Set to now to avoid latency from calls to SleepForMicroseconds
    last_op_finish_ = FLAGS_env->NowMicros();
  }

1382 1383
  void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
                   enum OperationType op_type = kOthers) {
1384 1385 1386
    if (reporter_agent_) {
      reporter_agent_->ReportFinishedOps(num_ops);
    }
1387
    if (FLAGS_histogram) {
D
Dmitri Smirnov 已提交
1388 1389
      uint64_t now = FLAGS_env->NowMicros();
      uint64_t micros = now - last_op_finish_;
1390 1391 1392

      if (hist_.find(op_type) == hist_.end())
      {
1393 1394
        auto hist_temp = std::make_shared<HistogramImpl>();
        hist_.insert({op_type, std::move(hist_temp)});
1395
      }
1396
      hist_[op_type]->Add(micros);
1397

1398
      if (micros > 20000 && !FLAGS_stats_interval) {
D
Dmitri Smirnov 已提交
1399
        fprintf(stderr, "long op: %" PRIu64 " micros%30s\r", micros, "");
1400 1401 1402 1403 1404
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

1405
    done_ += num_ops;
1406
    if (done_ >= next_report_) {
1407 1408 1409 1410 1411 1412 1413 1414
      if (!FLAGS_stats_interval) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
1415
        fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
1416
      } else {
D
Dmitri Smirnov 已提交
1417
        uint64_t now = FLAGS_env->NowMicros();
1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
        int64_t usecs_since_last = now - last_report_finish_;

        // Determine whether to print status where interval is either
        // each N operations or each N seconds.

        if (FLAGS_stats_interval_seconds &&
            usecs_since_last < (FLAGS_stats_interval_seconds * 1000000)) {
          // Don't check again for this many operations
          next_report_ += FLAGS_stats_interval;

        } else {
1429

1430 1431 1432
          fprintf(stderr,
                  "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
                  "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
D
Dmitri Smirnov 已提交
1433
                  FLAGS_env->TimeToString(now/1000000).c_str(),
1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449
                  id_,
                  done_ - last_report_done_, done_,
                  (done_ - last_report_done_) /
                  (usecs_since_last / 1000000.0),
                  done_ / ((now - start_) / 1000000.0),
                  (now - last_report_finish_) / 1000000.0,
                  (now - start_) / 1000000.0);

          if (FLAGS_stats_per_interval) {
            std::string stats;

            if (db_with_cfh && db_with_cfh->num_created.load()) {
              for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) {
                if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats",
                                    &stats))
                  fprintf(stderr, "%s\n", stats.c_str());
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
                if (FLAGS_show_table_properties) {
                  for (int level = 0; level < FLAGS_num_levels; ++level) {
                    if (db->GetProperty(
                            db_with_cfh->cfh[i],
                            "rocksdb.aggregated-table-properties-at-level" +
                                ToString(level),
                            &stats)) {
                      if (stats.find("# entries=0") == std::string::npos) {
                        fprintf(stderr, "Level[%d]: %s\n", level,
                                stats.c_str());
                      }
                    }
                  }
                }
              }
            } else if (db) {
              if (db->GetProperty("rocksdb.stats", &stats)) {
                fprintf(stderr, "%s\n", stats.c_str());
              }
              if (FLAGS_show_table_properties) {
                for (int level = 0; level < FLAGS_num_levels; ++level) {
                  if (db->GetProperty(
                          "rocksdb.aggregated-table-properties-at-level" +
                              ToString(level),
                          &stats)) {
                    if (stats.find("# entries=0") == std::string::npos) {
                      fprintf(stderr, "Level[%d]: %s\n", level, stats.c_str());
                    }
                  }
                }
1480 1481
              }
            }
1482
          }
M
Mark Callaghan 已提交
1483

1484 1485 1486 1487
          next_report_ += FLAGS_stats_interval;
          last_report_finish_ = now;
          last_report_done_ = done_;
        }
1488
      }
1489 1490 1491 1492
      if (id_ == 0 && FLAGS_thread_status_per_interval) {
        PrintThreadStatus();
      }
      fflush(stderr);
1493 1494 1495 1496 1497 1498 1499 1500 1501
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
1502
    // that does not call FinishedOps().
1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
1516 1517
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
1518

D
Dhruba Borthakur 已提交
1519
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
1520
            name.ToString().c_str(),
1521
            elapsed * 1e6 / done_,
D
Dhruba Borthakur 已提交
1522
            (long)throughput,
1523 1524 1525
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
1526 1527 1528
      for (auto it = hist_.begin(); it != hist_.end(); ++it) {
        fprintf(stdout, "Microseconds per %s:\n%s\n",
                OperationTypeString[it->first].c_str(),
1529
                it->second->ToString().c_str());
1530
      }
1531
    }
1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546
    if (FLAGS_report_file_operations) {
      ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
      ReportFileOpCounters* counters = env->counters();
      fprintf(stdout, "Num files opened: %d\n",
              counters->open_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Read(): %d\n",
              counters->read_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num Append(): %d\n",
              counters->append_counter_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes read: %" PRIu64 "\n",
              counters->bytes_read_.load(std::memory_order_relaxed));
      fprintf(stdout, "Num bytes written: %" PRIu64 "\n",
              counters->bytes_written_.load(std::memory_order_relaxed));
      env->reset();
    }
1547 1548 1549 1550
    fflush(stdout);
  }
};

1551 1552 1553 1554 1555 1556 1557 1558 1559 1560
class TimestampEmulator {
 private:
  std::atomic<uint64_t> timestamp_;

 public:
  TimestampEmulator() : timestamp_(0) {}
  uint64_t Get() const { return timestamp_.load(); }
  void Inc() { timestamp_++; }
};

1561 1562 1563 1564 1565
// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;
1566
  int perf_level;
1567
  std::shared_ptr<RateLimiter> write_rate_limiter;
1568 1569 1570 1571 1572 1573 1574

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

1575 1576
  long num_initialized;
  long num_done;
1577 1578
  bool start;

1579
  SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
1580 1581 1582 1583 1584
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
1585
  Random64 rand;         // Has different seeds for different threads
1586
  Stats stats;
1587
  SharedState* shared;
1588

A
Abhishek Kona 已提交
1589
  /* implicit */ ThreadState(int index)
1590
      : tid(index),
1591
        rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
1592 1593 1594
  }
};

M
Mark Callaghan 已提交
1595 1596
class Duration {
 public:
D
Dmitri Smirnov 已提交
1597
  Duration(uint64_t max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) {
M
Mark Callaghan 已提交
1598 1599
    max_seconds_ = max_seconds;
    max_ops_= max_ops;
1600
    ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops;
M
Mark Callaghan 已提交
1601 1602 1603 1604
    ops_ = 0;
    start_at_ = FLAGS_env->NowMicros();
  }

1605 1606
  int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; }

L
Lei Jin 已提交
1607
  bool Done(int64_t increment) {
1608
    if (increment <= 0) increment = 1;    // avoid Done(0) and infinite loops
M
Mark Callaghan 已提交
1609 1610 1611
    ops_ += increment;

    if (max_seconds_) {
1612 1613
      // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
      if ((ops_/1000) != ((ops_-increment)/1000)) {
D
Dmitri Smirnov 已提交
1614 1615
        uint64_t now = FLAGS_env->NowMicros();
        return ((now - start_at_) / 1000000) >= max_seconds_;
M
Mark Callaghan 已提交
1616 1617 1618 1619 1620 1621 1622 1623 1624
      } else {
        return false;
      }
    } else {
      return ops_ > max_ops_;
    }
  }

 private:
D
Dmitri Smirnov 已提交
1625
  uint64_t max_seconds_;
1626
  int64_t max_ops_;
1627
  int64_t ops_per_stage_;
1628
  int64_t ops_;
D
Dmitri Smirnov 已提交
1629
  uint64_t start_at_;
M
Mark Callaghan 已提交
1630 1631
};

J
jorlow@chromium.org 已提交
1632 1633
class Benchmark {
 private:
1634 1635 1636
  std::shared_ptr<Cache> cache_;
  std::shared_ptr<Cache> compressed_cache_;
  std::shared_ptr<const FilterPolicy> filter_policy_;
T
Tyler Harter 已提交
1637
  const SliceTransform* prefix_extractor_;
1638 1639
  DBWithColumnFamilies db_;
  std::vector<DBWithColumnFamilies> multi_dbs_;
1640
  int64_t num_;
1641
  int value_size_;
1642
  int key_size_;
1643 1644
  int prefix_size_;
  int64_t keys_per_prefix_;
L
Lei Jin 已提交
1645
  int64_t entries_per_batch_;
1646
  WriteOptions write_options_;
1647
  Options open_options_;  // keep options around to properly destroy db later
1648
  int64_t reads_;
Y
Yueh-Hsuan Chiang 已提交
1649
  int64_t deletes_;
1650
  double read_random_exp_range_;
1651 1652 1653
  int64_t writes_;
  int64_t readwrites_;
  int64_t merge_keys_;
1654
  bool report_file_operations_;
1655
  int cachedev_fd_;
1656 1657 1658 1659 1660 1661 1662 1663 1664

  bool SanityCheck() {
    if (FLAGS_compression_ratio > 1) {
      fprintf(stderr, "compression_ratio should be between 0 and 1\n");
      return false;
    }
    return true;
  }

1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687
  inline bool CompressSlice(const Slice& input, std::string* compressed) {
    bool ok = true;
    switch (FLAGS_compression_type_e) {
      case rocksdb::kSnappyCompression:
        ok = Snappy_Compress(Options().compression_opts, input.data(),
                             input.size(), compressed);
        break;
      case rocksdb::kZlibCompression:
        ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
                           input.size(), compressed);
        break;
      case rocksdb::kBZip2Compression:
        ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
                            input.size(), compressed);
        break;
      case rocksdb::kLZ4Compression:
        ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
                          input.size(), compressed);
        break;
      case rocksdb::kLZ4HCCompression:
        ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
                            input.size(), compressed);
        break;
1688 1689 1690 1691
      case rocksdb::kXpressCompression:
        ok = XPRESS_Compress(input.data(),
          input.size(), compressed);
        break;
1692 1693 1694 1695
      case rocksdb::kZSTDNotFinalCompression:
        ok = ZSTD_Compress(Options().compression_opts, input.data(),
                           input.size(), compressed);
        break;
1696 1697 1698 1699 1700 1701
      default:
        ok = false;
    }
    return ok;
  }

1702 1703
  void PrintHeader() {
    PrintEnvironment();
1704
    fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
1705 1706 1707
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
1708 1709 1710
    fprintf(stdout, "Entries:    %" PRIu64 "\n", num_);
    fprintf(stdout, "Prefix:    %d bytes\n", FLAGS_prefix_size);
    fprintf(stdout, "Keys per prefix:    %" PRIu64 "\n", keys_per_prefix_);
1711
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
1712
            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
1713
             / 1048576.0));
1714
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
1715 1716
            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
              * num_)
1717
             / 1048576.0));
1718 1719
    fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
            FLAGS_benchmark_write_rate_limit);
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731
    if (FLAGS_enable_numa) {
      fprintf(stderr, "Running in NUMA enabled mode.\n");
#ifndef NUMA
      fprintf(stderr, "NUMA is not defined in the system.\n");
      exit(1);
#else
      if (numa_available() == -1) {
        fprintf(stderr, "NUMA is not supported by the system.\n");
        exit(1);
      }
#endif
    }
1732

N
Nathan Bronson 已提交
1733 1734
    auto compression = CompressionTypeToString(FLAGS_compression_type_e);
    fprintf(stdout, "Compression: %s\n", compression.c_str());
1735

J
Jim Paton 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
        fprintf(stdout, "Memtablerep: prefix_hash\n");
        break;
      case kSkipList:
        fprintf(stdout, "Memtablerep: skip_list\n");
        break;
      case kVectorRep:
        fprintf(stdout, "Memtablerep: vector\n");
        break;
1746 1747 1748
      case kHashLinkedList:
        fprintf(stdout, "Memtablerep: hash_linkedlist\n");
        break;
1749 1750 1751
      case kCuckoo:
        fprintf(stdout, "Memtablerep: cuckoo\n");
        break;
J
Jim Paton 已提交
1752
    }
1753
    fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
J
Jim Paton 已提交
1754

N
Nathan Bronson 已提交
1755
    PrintWarnings(compression.c_str());
1756 1757 1758
    fprintf(stdout, "------------------------------------------------\n");
  }

1759
  void PrintWarnings(const char* compression) {
1760 1761 1762 1763 1764 1765 1766 1767 1768
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
1769
    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
1770 1771
      // The test string should not be too small.
      const int len = FLAGS_block_size;
1772
      std::string input_str(len, 'y');
1773
      std::string compressed;
1774
      bool result = CompressSlice(Slice(input_str), &compressed);
1775 1776

      if (!result) {
1777 1778 1779 1780 1781
        fprintf(stdout, "WARNING: %s compression is not enabled\n",
                compression);
      } else if (compressed.size() >= input_str.size()) {
        fprintf(stdout, "WARNING: %s compression is not effective\n",
                compression);
1782
      }
1783
    }
1784 1785
  }

K
kailiu 已提交
1786 1787 1788 1789 1790 1791 1792
// Current the following isn't equivalent to OS_LINUX.
#if defined(__linux)
  static Slice TrimSpace(Slice s) {
    unsigned int start = 0;
    while (start < s.size() && isspace(s[start])) {
      start++;
    }
S
sdong 已提交
1793
    unsigned int limit = static_cast<unsigned int>(s.size());
K
kailiu 已提交
1794 1795 1796 1797 1798 1799 1800
    while (limit > start && isspace(s[limit-1])) {
      limit--;
    }
    return Slice(s.data() + start, limit - start);
  }
#endif

1801
  void PrintEnvironment() {
H
Hyunyoung Lee 已提交
1802
    fprintf(stderr, "RocksDB:    version %d.%d\n",
1803 1804 1805
            kMajorVersion, kMinorVersion);

#if defined(__linux)
1806
    time_t now = time(nullptr);
1807 1808 1809 1810 1811
    char buf[52];
    // Lint complains about ctime() usage, so replace it with ctime_r(). The
    // requirement is to provide a buffer which is at least 26 bytes.
    fprintf(stderr, "Date:       %s",
            ctime_r(&now, buf));  // ctime_r() adds newline
1812 1813

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
1814
    if (cpuinfo != nullptr) {
1815 1816 1817 1818
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
1819
      while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
1820
        const char* sep = strchr(line, ':');
1821
        if (sep == nullptr) {
1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
  static bool KeyExpired(const TimestampEmulator* timestamp_emulator,
                         const Slice& key) {
    const char* pos = key.data();
    pos += 8;
    uint64_t timestamp = 0;
    if (port::kLittleEndian) {
      int bytes_to_fill = 8;
      for (int i = 0; i < bytes_to_fill; ++i) {
        timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
                      << ((bytes_to_fill - i - 1) << 3));
      }
    } else {
      memcpy(&timestamp, pos, sizeof(timestamp));
    }
    return timestamp_emulator->Get() - timestamp > FLAGS_time_range;
  }

  class ExpiredTimeFilter : public CompactionFilter {
   public:
    explicit ExpiredTimeFilter(
        const std::shared_ptr<TimestampEmulator>& timestamp_emulator)
        : timestamp_emulator_(timestamp_emulator) {}
    bool Filter(int level, const Slice& key, const Slice& existing_value,
                std::string* new_value, bool* value_changed) const override {
      return KeyExpired(timestamp_emulator_.get(), key);
    }
    const char* Name() const override { return "ExpiredTimeFilter"; }

   private:
    std::shared_ptr<TimestampEmulator> timestamp_emulator_;
  };

J
jorlow@chromium.org 已提交
1872
 public:
1873
  Benchmark()
1874 1875 1876
      : cache_(
            FLAGS_cache_size >= 0
                ? (FLAGS_cache_numshardbits >= 1
1877
                       ? NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits)
1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904
                       : NewLRUCache(FLAGS_cache_size))
                : nullptr),
        compressed_cache_(FLAGS_compressed_cache_size >= 0
                              ? (FLAGS_cache_numshardbits >= 1
                                     ? NewLRUCache(FLAGS_compressed_cache_size,
                                                   FLAGS_cache_numshardbits)
                                     : NewLRUCache(FLAGS_compressed_cache_size))
                              : nullptr),
        filter_policy_(FLAGS_bloom_bits >= 0
                           ? NewBloomFilterPolicy(FLAGS_bloom_bits,
                                                  FLAGS_use_block_based_filter)
                           : nullptr),
        prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
        num_(FLAGS_num),
        value_size_(FLAGS_value_size),
        key_size_(FLAGS_key_size),
        prefix_size_(FLAGS_prefix_size),
        keys_per_prefix_(FLAGS_keys_per_prefix),
        entries_per_batch_(1),
        reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
        read_random_exp_range_(0.0),
        writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
        readwrites_(
            (FLAGS_writes < 0 && FLAGS_reads < 0)
                ? FLAGS_num
                : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)),
        merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
M
Mark Callaghan 已提交
1905 1906
        report_file_operations_(FLAGS_report_file_operations),
        cachedev_fd_(-1) {
1907 1908 1909 1910 1911 1912 1913 1914 1915 1916
    // use simcache instead of cache
    if (FLAGS_simcache_size >= 0) {
      if (FLAGS_cache_numshardbits >= 1) {
        cache_ =
            NewSimCache(cache_, FLAGS_simcache_size, FLAGS_cache_numshardbits);
      } else {
        cache_ = NewSimCache(cache_, FLAGS_simcache_size, 0);
      }
    }

1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
    if (report_file_operations_) {
      if (!FLAGS_hdfs.empty()) {
        fprintf(stderr,
                "--hdfs and --report_file_operations cannot be enabled "
                "at the same time");
        exit(1);
      }
      FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default());
    }

1927 1928 1929 1930 1931
    if (FLAGS_prefix_size > FLAGS_key_size) {
      fprintf(stderr, "prefix size is larger than key size");
      exit(1);
    }

J
jorlow@chromium.org 已提交
1932
    std::vector<std::string> files;
1933
    FLAGS_env->GetChildren(FLAGS_db, &files);
1934
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
1935
      if (Slice(files[i]).starts_with("heap-")) {
1936
        FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
J
jorlow@chromium.org 已提交
1937 1938
      }
    }
1939
    if (!FLAGS_use_existing_db) {
1940 1941 1942 1943 1944
      Options options;
      if (!FLAGS_wal_dir.empty()) {
        options.wal_dir = FLAGS_wal_dir;
      }
      DestroyDB(FLAGS_db, options);
1945
    }
J
jorlow@chromium.org 已提交
1946 1947 1948
  }

  ~Benchmark() {
A
agiardullo 已提交
1949
    db_.DeleteDBs();
T
Tyler Harter 已提交
1950
    delete prefix_extractor_;
I
Igor Canadi 已提交
1951 1952 1953 1954
    if (cache_.get() != nullptr) {
      // this will leak, but we're shutting down so nobody cares
      cache_->DisownData();
    }
1955
    if (FLAGS_disable_flashcache_for_background_threads && cachedev_fd_ != -1) {
M
Mark Callaghan 已提交
1956 1957
      // Dtor for this env should run before cachedev_fd_ is closed
      flashcache_aware_env_ = nullptr;
1958 1959
      close(cachedev_fd_);
    }
J
jorlow@chromium.org 已提交
1960 1961
  }

1962
  Slice AllocateKey(std::unique_ptr<const char[]>* key_guard) {
1963 1964 1965
    char* data = new char[key_size_];
    const char* const_data = data;
    key_guard->reset(const_data);
1966
    return Slice(key_guard->get(), key_size_);
L
Lei Jin 已提交
1967 1968
  }

1969 1970
  // Generate key according to the given specification and random number.
  // The resulting key will have the following format (if keys_per_prefix_
1971
  // is positive), extra trailing bytes are either cut off or padded with '0'.
1972 1973 1974 1975 1976 1977 1978 1979 1980
  // The prefix value is derived from key value.
  //   ----------------------------
  //   | prefix 00000 | key 00000 |
  //   ----------------------------
  // If keys_per_prefix_ is 0, the key is simply a binary representation of
  // random number followed by trailing '0's
  //   ----------------------------
  //   |        key 00000         |
  //   ----------------------------
L
Lei Jin 已提交
1981 1982
  void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
    char* start = const_cast<char*>(key->data());
1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013
    char* pos = start;
    if (keys_per_prefix_ > 0) {
      int64_t num_prefix = num_keys / keys_per_prefix_;
      int64_t prefix = v % num_prefix;
      int bytes_to_fill = std::min(prefix_size_, 8);
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&prefix), bytes_to_fill);
      }
      if (prefix_size_ > 8) {
        // fill the rest with 0s
        memset(pos + 8, '0', prefix_size_ - 8);
      }
      pos += prefix_size_;
    }

    int bytes_to_fill = std::min(key_size_ - static_cast<int>(pos - start), 8);
    if (port::kLittleEndian) {
      for (int i = 0; i < bytes_to_fill; ++i) {
        pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
      }
    } else {
      memcpy(pos, static_cast<void*>(&v), bytes_to_fill);
    }
    pos += bytes_to_fill;
    if (key_size_ > pos - start) {
      memset(pos, '0', key_size_ - (pos - start));
    }
X
Xing Jin 已提交
2014 2015
  }

2016
  std::string GetDbNameForMultiple(std::string base_name, size_t id) {
2017
    return base_name + ToString(id);
2018 2019
  }

J
jorlow@chromium.org 已提交
2020
  void Run() {
2021 2022 2023
    if (!SanityCheck()) {
      exit(1);
    }
2024
    Open(&open_options_);
2025
    PrintHeader();
2026 2027
    std::stringstream benchmark_stream(FLAGS_benchmarks);
    std::string name;
2028
    std::unique_ptr<ExpiredTimeFilter> filter;
2029
    while (std::getline(benchmark_stream, name, ',')) {
X
Xing Jin 已提交
2030
      // Sanitize parameters
2031
      num_ = FLAGS_num;
2032
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
2033
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
Y
Yueh-Hsuan Chiang 已提交
2034
      deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
2035
      value_size_ = FLAGS_value_size;
2036
      key_size_ = FLAGS_key_size;
2037
      entries_per_batch_ = FLAGS_batch_size;
2038
      write_options_ = WriteOptions();
2039
      read_random_exp_range_ = FLAGS_read_random_exp_range;
2040 2041 2042
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
H
heyongqiang 已提交
2043 2044
      write_options_.disableWAL = FLAGS_disable_wal;

2045
      void (Benchmark::*method)(ThreadState*) = nullptr;
A
agiardullo 已提交
2046 2047
      void (Benchmark::*post_process_method)() = nullptr;

2048
      bool fresh_db = false;
2049
      int num_threads = FLAGS_threads;
2050

2051
      if (name == "fillseq") {
2052 2053
        fresh_db = true;
        method = &Benchmark::WriteSeq;
2054
      } else if (name == "fillbatch") {
2055 2056 2057
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
2058
      } else if (name == "fillrandom") {
2059 2060
        fresh_db = true;
        method = &Benchmark::WriteRandom;
2061
      } else if (name == "filluniquerandom") {
2062 2063
        fresh_db = true;
        if (num_threads > 1) {
2064 2065 2066
          fprintf(stderr,
                  "filluniquerandom multithreaded not supported"
                  ", use 1 thread");
2067
          num_threads = 1;
2068 2069
        }
        method = &Benchmark::WriteUniqueRandom;
2070
      } else if (name == "overwrite") {
2071
        method = &Benchmark::WriteRandom;
2072
      } else if (name == "fillsync") {
2073 2074 2075 2076
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
2077
      } else if (name == "fill100K") {
2078 2079 2080 2081
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
2082
      } else if (name == "readseq") {
2083
        method = &Benchmark::ReadSequential;
2084
      } else if (name == "readtocache") {
M
Mark Callaghan 已提交
2085 2086 2087
        method = &Benchmark::ReadSequential;
        num_threads = 1;
        reads_ = num_;
2088
      } else if (name == "readreverse") {
2089
        method = &Benchmark::ReadReverse;
2090
      } else if (name == "readrandom") {
2091
        method = &Benchmark::ReadRandom;
2092
      } else if (name == "readrandomfast") {
L
Lei Jin 已提交
2093
        method = &Benchmark::ReadRandomFast;
2094
      } else if (name == "multireadrandom") {
M
mike@arpaia.co 已提交
2095 2096
        fprintf(stderr, "entries_per_batch = %" PRIi64 "\n",
                entries_per_batch_);
L
Lei Jin 已提交
2097
        method = &Benchmark::MultiReadRandom;
2098
      } else if (name == "readmissing") {
L
Lei Jin 已提交
2099 2100
        ++key_size_;
        method = &Benchmark::ReadRandom;
2101
      } else if (name == "newiterator") {
2102
        method = &Benchmark::IteratorCreation;
2103
      } else if (name == "newiteratorwhilewriting") {
2104 2105
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::IteratorCreationWhileWriting;
2106
      } else if (name == "seekrandom") {
S
Sanjay Ghemawat 已提交
2107
        method = &Benchmark::SeekRandom;
2108
      } else if (name == "seekrandomwhilewriting") {
L
Lei Jin 已提交
2109 2110
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::SeekRandomWhileWriting;
2111
      } else if (name == "seekrandomwhilemerging") {
2112 2113
        num_threads++;  // Add extra thread for merging
        method = &Benchmark::SeekRandomWhileMerging;
2114
      } else if (name == "readrandomsmall") {
2115
        reads_ /= 1000;
2116
        method = &Benchmark::ReadRandom;
2117
      } else if (name == "deleteseq") {
S
Sanjay Ghemawat 已提交
2118
        method = &Benchmark::DeleteSeq;
2119
      } else if (name == "deleterandom") {
S
Sanjay Ghemawat 已提交
2120
        method = &Benchmark::DeleteRandom;
2121
      } else if (name == "readwhilewriting") {
2122 2123
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
2124
      } else if (name == "readwhilemerging") {
M
Mark Callaghan 已提交
2125 2126
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileMerging;
2127
      } else if (name == "readrandomwriterandom") {
2128
        method = &Benchmark::ReadRandomWriteRandom;
2129
      } else if (name == "readrandommergerandom") {
2130 2131
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2132
                  name.c_str());
L
Lei Jin 已提交
2133
          exit(1);
2134
        }
L
Lei Jin 已提交
2135
        method = &Benchmark::ReadRandomMergeRandom;
2136
      } else if (name == "updaterandom") {
M
Mark Callaghan 已提交
2137
        method = &Benchmark::UpdateRandom;
2138
      } else if (name == "appendrandom") {
D
Deon Nicholas 已提交
2139
        method = &Benchmark::AppendRandom;
2140
      } else if (name == "mergerandom") {
D
Deon Nicholas 已提交
2141 2142
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
2143
                  name.c_str());
L
Lei Jin 已提交
2144
          exit(1);
D
Deon Nicholas 已提交
2145
        }
L
Lei Jin 已提交
2146
        method = &Benchmark::MergeRandom;
2147
      } else if (name == "randomwithverify") {
2148
        method = &Benchmark::RandomWithVerify;
2149
      } else if (name == "fillseekseq") {
T
Tomislav Novak 已提交
2150
        method = &Benchmark::WriteSeqSeekSeq;
2151
      } else if (name == "compact") {
2152
        method = &Benchmark::Compact;
2153
      } else if (name == "crc32c") {
2154
        method = &Benchmark::Crc32c;
2155
      } else if (name == "xxhash") {
I
xxHash  
Igor Canadi 已提交
2156
        method = &Benchmark::xxHash;
2157
      } else if (name == "acquireload") {
2158
        method = &Benchmark::AcquireLoad;
2159
      } else if (name == "compress") {
A
Albert Strasheim 已提交
2160
        method = &Benchmark::Compress;
2161
      } else if (name == "uncompress") {
A
Albert Strasheim 已提交
2162
        method = &Benchmark::Uncompress;
2163
#ifndef ROCKSDB_LITE
2164
      } else if (name == "randomtransaction") {
A
agiardullo 已提交
2165 2166
        method = &Benchmark::RandomTransaction;
        post_process_method = &Benchmark::RandomTransactionVerify;
2167
#endif  // ROCKSDB_LITE
A
Andres Noetzli 已提交
2168 2169 2170
      } else if (name == "randomreplacekeys") {
        fresh_db = true;
        method = &Benchmark::RandomReplaceKeys;
2171 2172 2173 2174 2175 2176 2177 2178 2179
      } else if (name == "timeseries") {
        timestamp_emulator_.reset(new TimestampEmulator());
        if (FLAGS_expire_style == "compaction_filter") {
          filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
          fprintf(stdout, "Compaction filter is used to remove expired data");
          open_options_.compaction_filter = filter.get();
        }
        fresh_db = true;
        method = &Benchmark::TimeSeries;
2180
      } else if (name == "stats") {
2181
        PrintStats("rocksdb.stats");
2182
      } else if (name == "levelstats") {
2183
        PrintStats("rocksdb.levelstats");
2184
      } else if (name == "sstables") {
2185
        PrintStats("rocksdb.sstables");
2186 2187 2188
      } else if (!name.empty()) {  // No error message for empty name
        fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
        exit(1);
2189
      }
2190 2191 2192 2193

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
2194
                  name.c_str());
2195
          method = nullptr;
2196
        } else {
2197
          if (db_.db != nullptr) {
A
agiardullo 已提交
2198
            db_.DeleteDBs();
2199
            DestroyDB(FLAGS_db, open_options_);
2200 2201
          }
          for (size_t i = 0; i < multi_dbs_.size(); i++) {
2202
            delete multi_dbs_[i].db;
2203
            DestroyDB(GetDbNameForMultiple(FLAGS_db, i), open_options_);
2204 2205
          }
          multi_dbs_.clear();
2206
        }
2207
        Open(&open_options_);  // use open_options for the last accessed
2208 2209
      }

2210
      if (method != nullptr) {
2211
        fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
2212
        RunBenchmark(num_threads, name, method);
J
jorlow@chromium.org 已提交
2213
      }
A
agiardullo 已提交
2214 2215 2216
      if (post_process_method != nullptr) {
        (this->*post_process_method)();
      }
J
jorlow@chromium.org 已提交
2217
    }
2218
    if (FLAGS_statistics) {
K
krad 已提交
2219
      fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2220
    }
I
Islam AbdelRahman 已提交
2221
    if (FLAGS_simcache_size >= 0) {
2222 2223 2224
      fprintf(stdout, "SIMULATOR CACHE STATISTICS:\n%s\n",
              std::dynamic_pointer_cast<SimCache>(cache_)->ToString().c_str());
    }
J
jorlow@chromium.org 已提交
2225 2226
  }

2227
 private:
2228
  std::unique_ptr<Env> flashcache_aware_env_;
2229
  std::shared_ptr<TimestampEmulator> timestamp_emulator_;
2230

2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

2253
    SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
2254
    thread->stats.Start(thread->tid);
2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

2267 2268
  void RunBenchmark(int n, Slice name,
                    void (Benchmark::*method)(ThreadState*)) {
2269 2270 2271 2272 2273
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;
2274 2275 2276 2277
    if (FLAGS_benchmark_write_rate_limit > 0) {
      shared.write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
2278

2279 2280 2281 2282 2283 2284
    std::unique_ptr<ReporterAgent> reporter_agent;
    if (FLAGS_report_interval_seconds > 0) {
      reporter_agent.reset(new ReporterAgent(FLAGS_env, FLAGS_report_file,
                                             FLAGS_report_interval_seconds));
    }

2285
    ThreadArg* arg = new ThreadArg[n];
2286

2287
    for (int i = 0; i < n; i++) {
2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304
#ifdef NUMA
      if (FLAGS_enable_numa) {
        // Performs a local allocation of memory to threads in numa node.
        int n_nodes = numa_num_task_nodes();  // Number of nodes in NUMA.
        numa_exit_on_error = 1;
        int numa_node = i % n_nodes;
        bitmask* nodes = numa_allocate_nodemask();
        numa_bitmask_clearall(nodes);
        numa_bitmask_setbit(nodes, numa_node);
        // numa_bind() call binds the process to the node and these
        // properties are passed on to the thread that is created in
        // StartThread method called later in the loop.
        numa_bind(nodes);
        numa_set_strict(1);
        numa_free_nodemask(nodes);
      }
#endif
2305 2306 2307 2308
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
2309
      arg[i].thread->stats.SetReporterAgent(reporter_agent.get());
2310
      arg[i].thread->shared = &shared;
2311
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

2326 2327 2328 2329
    // Stats for some threads can be excluded.
    Stats merge_stats;
    for (int i = 0; i < n; i++) {
      merge_stats.Merge(arg[i].thread->stats);
2330
    }
2331
    merge_stats.Report(name);
2332 2333 2334 2335 2336 2337 2338 2339

    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
2340
    // Checksum about 500MB of data total
2341 2342
    const int size = 4096;
    const char* label = "(4K per op)";
J
jorlow@chromium.org 已提交
2343
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
2344 2345 2346 2347
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
2348
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
J
jorlow@chromium.org 已提交
2349 2350 2351 2352 2353
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

2354 2355
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
2356 2357
  }

I
xxHash  
Igor Canadi 已提交
2358 2359 2360 2361 2362 2363 2364 2365 2366
  void xxHash(ThreadState* thread) {
    // Checksum about 500MB of data total
    const int size = 4096;
    const char* label = "(4K per op)";
    std::string data(size, 'x');
    int64_t bytes = 0;
    unsigned int xxh32 = 0;
    while (bytes < 500 * 1048576) {
      xxh32 = XXH32(data.data(), size, 0);
2367
      thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
I
xxHash  
Igor Canadi 已提交
2368 2369 2370 2371 2372 2373 2374 2375 2376
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));

    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
  }

2377
  void AcquireLoad(ThreadState* thread) {
2378
    int dummy;
I
Igor Canadi 已提交
2379
    std::atomic<void*> ap(&dummy);
2380
    int count = 0;
2381
    void *ptr = nullptr;
2382
    thread->stats.AddMessage("(each op is 1000 loads)");
2383 2384
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
I
Igor Canadi 已提交
2385
        ptr = ap.load(std::memory_order_acquire);
2386 2387
      }
      count++;
2388
      thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
2389
    }
2390
    if (ptr == nullptr) exit(1);  // Disable unused variable warning.
2391 2392
  }

A
Albert Strasheim 已提交
2393
  void Compress(ThreadState *thread) {
2394
    RandomGenerator gen;
2395
    Slice input = gen.Generate(FLAGS_block_size);
2396 2397 2398 2399
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
A
Albert Strasheim 已提交
2400 2401 2402

    // Compress 1G
    while (ok && bytes < int64_t(1) << 30) {
2403
      compressed.clear();
2404
      ok = CompressSlice(input, &compressed);
2405 2406
      produced += compressed.size();
      bytes += input.size();
2407
      thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
2408 2409 2410
    }

    if (!ok) {
A
Albert Strasheim 已提交
2411
      thread->stats.AddMessage("(compression failure)");
2412 2413 2414 2415
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
2416 2417
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
2418 2419 2420
    }
  }

A
Albert Strasheim 已提交
2421
  void Uncompress(ThreadState *thread) {
2422
    RandomGenerator gen;
2423
    Slice input = gen.Generate(FLAGS_block_size);
2424
    std::string compressed;
A
Albert Strasheim 已提交
2425

2426
    bool ok = CompressSlice(input, &compressed);
2427
    int64_t bytes = 0;
A
Albert Strasheim 已提交
2428 2429 2430 2431
    int decompress_size;
    while (ok && bytes < 1024 * 1048576) {
      char *uncompressed = nullptr;
      switch (FLAGS_compression_type_e) {
2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444
        case rocksdb::kSnappyCompression: {
          // get size and allocate here to make comparison fair
          size_t ulength = 0;
          if (!Snappy_GetUncompressedLength(compressed.data(),
                                            compressed.size(), &ulength)) {
            ok = false;
            break;
          }
          uncompressed = new char[ulength];
          ok = Snappy_Uncompress(compressed.data(), compressed.size(),
                                 uncompressed);
          break;
        }
A
Albert Strasheim 已提交
2445
      case rocksdb::kZlibCompression:
I
Igor Canadi 已提交
2446
        uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(),
2447
                                       &decompress_size, 2);
A
Albert Strasheim 已提交
2448 2449 2450
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kBZip2Compression:
I
Igor Canadi 已提交
2451
        uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
2452
                                        &decompress_size, 2);
A
Albert Strasheim 已提交
2453 2454 2455
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4Compression:
I
Igor Canadi 已提交
2456
        uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
2457
                                      &decompress_size, 2);
A
Albert Strasheim 已提交
2458 2459 2460
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4HCCompression:
I
Igor Canadi 已提交
2461
        uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
2462
                                      &decompress_size, 2);
A
Albert Strasheim 已提交
2463 2464
        ok = uncompressed != nullptr;
        break;
2465 2466 2467 2468 2469
      case rocksdb::kXpressCompression:
        uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
          &decompress_size);
        ok = uncompressed != nullptr;
        break;
2470 2471 2472 2473 2474
      case rocksdb::kZSTDNotFinalCompression:
        uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(),
                                       &decompress_size);
        ok = uncompressed != nullptr;
        break;
A
Albert Strasheim 已提交
2475 2476 2477 2478
      default:
        ok = false;
      }
      delete[] uncompressed;
2479
      bytes += input.size();
2480
      thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
2481 2482 2483
    }

    if (!ok) {
A
Albert Strasheim 已提交
2484
      thread->stats.AddMessage("(compression failure)");
2485
    } else {
2486
      thread->stats.AddBytes(bytes);
2487 2488 2489
    }
  }

2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513
  // Returns true if the options is initialized from the specified
  // options file.
  bool InitializeOptionsFromFile(Options* opts) {
#ifndef ROCKSDB_LITE
    printf("Initializing RocksDB Options from the specified file\n");
    DBOptions db_opts;
    std::vector<ColumnFamilyDescriptor> cf_descs;
    if (FLAGS_options_file != "") {
      auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts,
                                   &cf_descs);
      if (s.ok()) {
        *opts = Options(db_opts, cf_descs[0].options);
        return true;
      }
      fprintf(stderr, "Unable to load options file %s --- %s\n",
              FLAGS_options_file.c_str(), s.ToString().c_str());
      exit(1);
    }
#endif
    return false;
  }

  void InitializeOptionsFromFlags(Options* opts) {
    printf("Initializing RocksDB Options from command-line flags\n");
2514 2515
    Options& options = *opts;

2516
    assert(db_.db == nullptr);
2517

2518
    options.create_missing_column_families = FLAGS_num_column_families > 1;
2519
    options.max_open_files = FLAGS_open_files;
2520
    options.db_write_buffer_size = FLAGS_db_write_buffer_size;
2521
    options.write_buffer_size = FLAGS_write_buffer_size;
2522
    options.max_write_buffer_number = FLAGS_max_write_buffer_number;
2523 2524
    options.min_write_buffer_number_to_merge =
      FLAGS_min_write_buffer_number_to_merge;
2525 2526
    options.max_write_buffer_number_to_maintain =
        FLAGS_max_write_buffer_number_to_maintain;
2527
    options.max_background_compactions = FLAGS_max_background_compactions;
2528
    options.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
2529
    options.max_background_flushes = FLAGS_max_background_flushes;
2530
    options.compaction_style = FLAGS_compaction_style_e;
2531
    options.compaction_pri = FLAGS_compaction_pri_e;
2532
    if (FLAGS_prefix_size != 0) {
2533 2534 2535
      options.prefix_extractor.reset(
          NewFixedPrefixTransform(FLAGS_prefix_size));
    }
2536 2537 2538 2539 2540 2541 2542
    if (FLAGS_use_uint64_comparator) {
      options.comparator = test::Uint64Comparator();
      if (FLAGS_key_size != 8) {
        fprintf(stderr, "Using Uint64 comparator but key size is not 8.\n");
        exit(1);
      }
    }
2543
    options.memtable_huge_page_size = FLAGS_memtable_use_huge_page ? 2048 : 0;
2544
    options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_bloom_size_ratio;
L
Lei Jin 已提交
2545
    options.bloom_locality = FLAGS_bloom_locality;
2546
    options.max_file_opening_threads = FLAGS_file_opening_threads;
2547 2548 2549
    options.new_table_reader_for_compaction_inputs =
        FLAGS_new_table_reader_for_compaction_inputs;
    options.compaction_readahead_size = FLAGS_compaction_readahead_size;
2550
    options.random_access_max_buffer_size = FLAGS_random_access_max_buffer_size;
2551
    options.writable_file_max_buffer_size = FLAGS_writable_file_max_buffer_size;
H
heyongqiang 已提交
2552
    options.disableDataSync = FLAGS_disable_data_sync;
2553
    options.use_fsync = FLAGS_use_fsync;
2554
    options.num_levels = FLAGS_num_levels;
H
heyongqiang 已提交
2555 2556 2557
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
2558 2559
    options.level_compaction_dynamic_level_bytes =
        FLAGS_level_compaction_dynamic_level_bytes;
H
heyongqiang 已提交
2560 2561
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
2562 2563 2564 2565
    if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash ||
                                     FLAGS_rep_factory == kHashLinkedList)) {
      fprintf(stderr, "prefix_size should be non-zero if PrefixHash or "
                      "HashLinkedList memtablerep is used\n");
J
Jim Paton 已提交
2566 2567 2568 2569
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kSkipList:
T
Tomislav Novak 已提交
2570 2571
        options.memtable_factory.reset(new SkipListFactory(
            FLAGS_skip_list_lookahead));
J
Jim Paton 已提交
2572
        break;
S
sdong 已提交
2573 2574 2575 2576 2577
#ifndef ROCKSDB_LITE
      case kPrefixHash:
        options.memtable_factory.reset(
            NewHashSkipListRepFactory(FLAGS_hash_bucket_count));
        break;
2578 2579 2580 2581
      case kHashLinkedList:
        options.memtable_factory.reset(NewHashLinkListRepFactory(
            FLAGS_hash_bucket_count));
        break;
J
Jim Paton 已提交
2582 2583 2584 2585 2586
      case kVectorRep:
        options.memtable_factory.reset(
          new VectorRepFactory
        );
        break;
2587 2588 2589 2590
      case kCuckoo:
        options.memtable_factory.reset(NewHashCuckooRepFactory(
            options.write_buffer_size, FLAGS_key_size + FLAGS_value_size));
        break;
S
sdong 已提交
2591 2592 2593 2594 2595
#else
      default:
        fprintf(stderr, "Only skip list is supported in lite mode\n");
        exit(1);
#endif  // ROCKSDB_LITE
J
Jim Paton 已提交
2596
    }
L
Lei Jin 已提交
2597
    if (FLAGS_use_plain_table) {
S
sdong 已提交
2598
#ifndef ROCKSDB_LITE
2599 2600
      if (FLAGS_rep_factory != kPrefixHash &&
          FLAGS_rep_factory != kHashLinkedList) {
L
Lei Jin 已提交
2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611
        fprintf(stderr, "Waring: plain table is used with skipList\n");
      }
      if (!FLAGS_mmap_read && !FLAGS_mmap_write) {
        fprintf(stderr, "plain table format requires mmap to operate\n");
        exit(1);
      }

      int bloom_bits_per_key = FLAGS_bloom_bits;
      if (bloom_bits_per_key < 0) {
        bloom_bits_per_key = 0;
      }
S
Stanislau Hlebik 已提交
2612 2613 2614 2615 2616 2617 2618

      PlainTableOptions plain_table_options;
      plain_table_options.user_key_len = FLAGS_key_size;
      plain_table_options.bloom_bits_per_key = bloom_bits_per_key;
      plain_table_options.hash_table_ratio = 0.75;
      options.table_factory = std::shared_ptr<TableFactory>(
          NewPlainTableFactory(plain_table_options));
S
sdong 已提交
2619 2620 2621 2622
#else
      fprintf(stderr, "Plain table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
2623
    } else if (FLAGS_use_cuckoo_table) {
S
sdong 已提交
2624
#ifndef ROCKSDB_LITE
2625 2626 2627 2628
      if (FLAGS_cuckoo_hash_ratio > 1 || FLAGS_cuckoo_hash_ratio < 0) {
        fprintf(stderr, "Invalid cuckoo_hash_ratio\n");
        exit(1);
      }
2629 2630 2631
      rocksdb::CuckooTableOptions table_options;
      table_options.hash_table_ratio = FLAGS_cuckoo_hash_ratio;
      table_options.identity_as_first_hash = FLAGS_identity_as_first_hash;
2632
      options.table_factory = std::shared_ptr<TableFactory>(
2633
          NewCuckooTableFactory(table_options));
S
sdong 已提交
2634 2635 2636 2637
#else
      fprintf(stderr, "Cuckoo table is not supported in lite mode\n");
      exit(1);
#endif  // ROCKSDB_LITE
2638 2639 2640
    } else {
      BlockBasedTableOptions block_based_options;
      if (FLAGS_use_hash_search) {
2641 2642 2643 2644 2645
        if (FLAGS_prefix_size == 0) {
          fprintf(stderr,
              "prefix_size not assigned when enable use_hash_search \n");
          exit(1);
        }
2646 2647 2648 2649
        block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
      } else {
        block_based_options.index_type = BlockBasedTableOptions::kBinarySearch;
      }
2650 2651 2652
      if (cache_ == nullptr) {
        block_based_options.no_block_cache = true;
      }
2653 2654
      block_based_options.cache_index_and_filter_blocks =
          FLAGS_cache_index_and_filter_blocks;
2655 2656
      block_based_options.pin_l0_filter_and_index_blocks_in_cache =
          FLAGS_pin_l0_filter_and_index_blocks_in_cache;
2657 2658 2659 2660
      block_based_options.block_cache = cache_;
      block_based_options.block_cache_compressed = compressed_cache_;
      block_based_options.block_size = FLAGS_block_size;
      block_based_options.block_restart_interval = FLAGS_block_restart_interval;
2661 2662
      block_based_options.index_block_restart_interval =
          FLAGS_index_block_restart_interval;
2663
      block_based_options.filter_policy = filter_policy_;
I
Islam AbdelRahman 已提交
2664
      block_based_options.skip_table_builder_flush =
2665
          FLAGS_skip_table_builder_flush;
2666
      block_based_options.format_version = 2;
2667 2668
      options.table_factory.reset(
          NewBlockBasedTableFactory(block_based_options));
L
Lei Jin 已提交
2669
    }
2670 2671
    if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
      if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
2672 2673
          (unsigned int)FLAGS_num_levels) {
        fprintf(stderr, "Insufficient number of fanouts specified %d\n",
2674
                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
2675 2676 2677
        exit(1);
      }
      options.max_bytes_for_level_multiplier_additional =
2678
        FLAGS_max_bytes_for_level_multiplier_additional_v;
2679
    }
H
heyongqiang 已提交
2680
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
M
Mark Callaghan 已提交
2681
    options.level0_file_num_compaction_trigger =
2682
        FLAGS_level0_file_num_compaction_trigger;
H
heyongqiang 已提交
2683 2684
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
2685
    options.compression = FLAGS_compression_type_e;
2686
    options.compression_opts.level = FLAGS_compression_level;
2687
    options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
2688 2689
    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
2690 2691
    options.max_total_wal_size = FLAGS_max_total_wal_size;

2692 2693
    if (FLAGS_min_level_to_compress >= 0) {
      assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
2694
      options.compression_per_level.resize(FLAGS_num_levels);
2695
      for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
2696 2697
        options.compression_per_level[i] = kNoCompression;
      }
2698
      for (int i = FLAGS_min_level_to_compress;
2699
           i < FLAGS_num_levels; i++) {
2700
        options.compression_per_level[i] = FLAGS_compression_type_e;
2701 2702
      }
    }
J
Jim Paton 已提交
2703 2704
    options.soft_rate_limit = FLAGS_soft_rate_limit;
    options.hard_rate_limit = FLAGS_hard_rate_limit;
2705 2706
    options.soft_pending_compaction_bytes_limit =
        FLAGS_soft_pending_compaction_bytes_limit;
2707 2708
    options.hard_pending_compaction_bytes_limit =
        FLAGS_hard_pending_compaction_bytes_limit;
S
sdong 已提交
2709
    options.delayed_write_rate = FLAGS_delayed_write_rate;
2710 2711 2712 2713 2714 2715
    options.allow_concurrent_memtable_write =
        FLAGS_allow_concurrent_memtable_write;
    options.enable_write_thread_adaptive_yield =
        FLAGS_enable_write_thread_adaptive_yield;
    options.write_thread_max_yield_usec = FLAGS_write_thread_max_yield_usec;
    options.write_thread_slow_yield_usec = FLAGS_write_thread_slow_yield_usec;
J
Jim Paton 已提交
2716 2717
    options.rate_limit_delay_max_milliseconds =
      FLAGS_rate_limit_delay_max_milliseconds;
2718
    options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
A
Abhishek Kona 已提交
2719
    options.max_grandparent_overlap_factor =
2720
      FLAGS_max_grandparent_overlap_factor;
2721
    options.disable_auto_compactions = FLAGS_disable_auto_compactions;
2722
    options.source_compaction_factor = FLAGS_source_compaction_factor;
2723
    options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
2724 2725

    // fill storage options
2726 2727 2728
    options.allow_os_buffer = FLAGS_bufferedio;
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
2729
    options.advise_random_on_open = FLAGS_advise_random_on_open;
2730
    options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
H
Haobo Xu 已提交
2731
    options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
H
Haobo Xu 已提交
2732
    options.bytes_per_sync = FLAGS_bytes_per_sync;
2733
    options.wal_bytes_per_sync = FLAGS_wal_bytes_per_sync;
H
Haobo Xu 已提交
2734

D
Deon Nicholas 已提交
2735
    // merge operator options
2736 2737 2738
    options.merge_operator = MergeOperators::CreateFromStringId(
        FLAGS_merge_operator);
    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
D
Deon Nicholas 已提交
2739 2740 2741 2742
      fprintf(stderr, "invalid merge operator: %s\n",
              FLAGS_merge_operator.c_str());
      exit(1);
    }
2743
    options.max_successive_merges = FLAGS_max_successive_merges;
2744
    options.report_bg_io_stats = FLAGS_report_bg_io_stats;
D
Deon Nicholas 已提交
2745

2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
      options.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    }
    if (FLAGS_universal_min_merge_width != 0) {
      options.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    }
    if (FLAGS_universal_max_merge_width != 0) {
      options.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
      options.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    }
2763 2764 2765 2766
    if (FLAGS_universal_compression_size_percent != -1) {
      options.compaction_options_universal.compression_size_percent =
        FLAGS_universal_compression_size_percent;
    }
2767 2768
    options.compaction_options_universal.allow_trivial_move =
        FLAGS_universal_allow_trivial_move;
2769 2770 2771
    if (FLAGS_thread_status_per_interval > 0) {
      options.enable_thread_tracking = true;
    }
S
sdong 已提交
2772 2773 2774 2775
    if (FLAGS_rate_limiter_bytes_per_sec > 0) {
      options.rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_rate_limiter_bytes_per_sec));
    }
2776

2777
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
2778 2779 2780 2781
    if (FLAGS_readonly && FLAGS_transaction_db) {
      fprintf(stderr, "Cannot use readonly flag with transaction_db\n");
      exit(1);
    }
2782
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
2783

2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822
  }

  void InitializeOptionsGeneral(Options* opts) {
    Options& options = *opts;

    options.statistics = dbstats;
    options.wal_dir = FLAGS_wal_dir;
    options.create_if_missing = !FLAGS_use_existing_db;

    if (FLAGS_row_cache_size) {
      if (FLAGS_cache_numshardbits >= 1) {
        options.row_cache =
            NewLRUCache(FLAGS_row_cache_size, FLAGS_cache_numshardbits);
      } else {
        options.row_cache = NewLRUCache(FLAGS_row_cache_size);
      }
    }
    if (FLAGS_enable_io_prio) {
      FLAGS_env->LowerThreadPoolIOPriority(Env::LOW);
      FLAGS_env->LowerThreadPoolIOPriority(Env::HIGH);
    }
    if (FLAGS_disable_flashcache_for_background_threads && cachedev_fd_ == -1) {
      // Avoid creating the env twice when an use_existing_db is true
      cachedev_fd_ = open(FLAGS_flashcache_dev.c_str(), O_RDONLY);
      if (cachedev_fd_ < 0) {
        fprintf(stderr, "Open flash device failed\n");
        exit(1);
      }
      flashcache_aware_env_ = NewFlashcacheAwareEnv(FLAGS_env, cachedev_fd_);
      if (flashcache_aware_env_.get() == nullptr) {
        fprintf(stderr, "Failed to open flashcache device at %s\n",
                FLAGS_flashcache_dev.c_str());
        std::abort();
      }
      options.env = flashcache_aware_env_.get();
    } else {
      options.env = FLAGS_env;
    }

2823 2824 2825 2826
    if (FLAGS_num_multi_db <= 1) {
      OpenDb(options, FLAGS_db, &db_);
    } else {
      multi_dbs_.clear();
2827
      multi_dbs_.resize(FLAGS_num_multi_db);
2828
      for (int i = 0; i < FLAGS_num_multi_db; i++) {
2829
        OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &multi_dbs_[i]);
2830 2831
      }
    }
2832 2833 2834 2835 2836 2837
    options.dump_malloc_stats = FLAGS_dump_malloc_stats;
  }

  void Open(Options* opts) {
    if (!InitializeOptionsFromFile(opts)) {
      InitializeOptionsFromFlags(opts);
2838
    }
2839

2840
    InitializeOptionsGeneral(opts);
2841 2842
  }

2843 2844
  void OpenDb(const Options& options, const std::string& db_name,
      DBWithColumnFamilies* db) {
H
heyongqiang 已提交
2845
    Status s;
2846 2847
    // Open with column families if necessary.
    if (FLAGS_num_column_families > 1) {
2848 2849 2850 2851 2852 2853 2854
      size_t num_hot = FLAGS_num_column_families;
      if (FLAGS_num_hot_column_families > 0 &&
          FLAGS_num_hot_column_families < FLAGS_num_column_families) {
        num_hot = FLAGS_num_hot_column_families;
      } else {
        FLAGS_num_hot_column_families = FLAGS_num_column_families;
      }
2855
      std::vector<ColumnFamilyDescriptor> column_families;
2856
      for (size_t i = 0; i < num_hot; i++) {
2857 2858 2859
        column_families.push_back(ColumnFamilyDescriptor(
              ColumnFamilyName(i), ColumnFamilyOptions(options)));
      }
2860
#ifndef ROCKSDB_LITE
2861 2862 2863
      if (FLAGS_readonly) {
        s = DB::OpenForReadOnly(options, db_name, column_families,
            &db->cfh, &db->db);
A
agiardullo 已提交
2864
      } else if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
2865
        s = OptimisticTransactionDB::Open(options, db_name, column_families,
A
agiardullo 已提交
2866 2867 2868 2869 2870 2871 2872 2873 2874
                                          &db->cfh, &db->opt_txn_db);
        if (s.ok()) {
          db->db = db->opt_txn_db->GetBaseDB();
        }
      } else if (FLAGS_transaction_db) {
        TransactionDB* ptr;
        TransactionDBOptions txn_db_options;
        s = TransactionDB::Open(options, txn_db_options, db_name,
                                column_families, &db->cfh, &ptr);
A
agiardullo 已提交
2875
        if (s.ok()) {
A
agiardullo 已提交
2876
          db->db = ptr;
A
agiardullo 已提交
2877
        }
2878 2879 2880
      } else {
        s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
      }
2881 2882 2883
#else
      s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
#endif  // ROCKSDB_LITE
2884 2885 2886
      db->cfh.resize(FLAGS_num_column_families);
      db->num_created = num_hot;
      db->num_hot = num_hot;
2887
#ifndef ROCKSDB_LITE
2888 2889
    } else if (FLAGS_readonly) {
      s = DB::OpenForReadOnly(options, db_name, &db->db);
A
agiardullo 已提交
2890 2891 2892 2893 2894
    } else if (FLAGS_optimistic_transaction_db) {
      s = OptimisticTransactionDB::Open(options, db_name, &db->opt_txn_db);
      if (s.ok()) {
        db->db = db->opt_txn_db->GetBaseDB();
      }
A
agiardullo 已提交
2895
    } else if (FLAGS_transaction_db) {
A
agiardullo 已提交
2896 2897 2898
      TransactionDB* ptr;
      TransactionDBOptions txn_db_options;
      s = TransactionDB::Open(options, txn_db_options, db_name, &ptr);
A
agiardullo 已提交
2899
      if (s.ok()) {
A
agiardullo 已提交
2900
        db->db = ptr;
A
agiardullo 已提交
2901
      }
2902
#endif  // ROCKSDB_LITE
H
heyongqiang 已提交
2903
    } else {
2904
      s = DB::Open(options, db_name, &db->db);
H
heyongqiang 已提交
2905
    }
2906 2907 2908 2909 2910 2911
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

2912 2913 2914 2915
  enum WriteMode {
    RANDOM, SEQUENTIAL, UNIQUE_RANDOM
  };

2916
  void WriteSeq(ThreadState* thread) {
2917
    DoWrite(thread, SEQUENTIAL);
2918
  }
2919

2920
  void WriteRandom(ThreadState* thread) {
2921
    DoWrite(thread, RANDOM);
2922 2923
  }

2924 2925 2926 2927
  void WriteUniqueRandom(ThreadState* thread) {
    DoWrite(thread, UNIQUE_RANDOM);
  }

2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944
  class KeyGenerator {
   public:
    KeyGenerator(Random64* rand, WriteMode mode,
        uint64_t num, uint64_t num_per_set = 64 * 1024)
      : rand_(rand),
        mode_(mode),
        num_(num),
        next_(0) {
      if (mode_ == UNIQUE_RANDOM) {
        // NOTE: if memory consumption of this approach becomes a concern,
        // we can either break it into pieces and only random shuffle a section
        // each time. Alternatively, use a bit map implementation
        // (https://reviews.facebook.net/differential/diff/54627/)
        values_.resize(num_);
        for (uint64_t i = 0; i < num_; ++i) {
          values_[i] = i;
        }
2945 2946 2947
        std::shuffle(
            values_.begin(), values_.end(),
            std::default_random_engine(static_cast<unsigned int>(FLAGS_seed)));
2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971
      }
    }

    uint64_t Next() {
      switch (mode_) {
        case SEQUENTIAL:
          return next_++;
        case RANDOM:
          return rand_->Next() % num_;
        case UNIQUE_RANDOM:
          return values_[next_++];
      }
      assert(false);
      return std::numeric_limits<uint64_t>::max();
    }

   private:
    Random64* rand_;
    WriteMode mode_;
    const uint64_t num_;
    uint64_t next_;
    std::vector<uint64_t> values_;
  };

2972
  DB* SelectDB(ThreadState* thread) {
2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984
    return SelectDBWithCfh(thread)->db;
  }

  DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) {
    return SelectDBWithCfh(thread->rand.Next());
  }

  DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) {
    if (db_.db != nullptr) {
      return &db_;
    } else  {
      return &multi_dbs_[rand_int % multi_dbs_.size()];
2985 2986
    }
  }
2987

2988 2989
  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
2990
    const int64_t num_ops = writes_ == 0 ? num_ : writes_;
2991

2992
    size_t num_key_gens = 1;
2993
    if (db_.db == nullptr) {
2994 2995 2996
      num_key_gens = multi_dbs_.size();
    }
    std::vector<std::unique_ptr<KeyGenerator>> key_gens(num_key_gens);
2997 2998 2999 3000 3001 3002 3003 3004 3005
    int64_t max_ops = num_ops * num_key_gens;
    int64_t ops_per_stage = max_ops;
    if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) {
      ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families /
                                       FLAGS_num_hot_column_families) +
                      1;
    }

    Duration duration(test_duration, max_ops, ops_per_stage);
3006
    for (size_t i = 0; i < num_key_gens; i++) {
3007
      key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_,
3008
                                         ops_per_stage));
3009
    }
M
Mark Callaghan 已提交
3010

3011
    if (num_ != FLAGS_num) {
3012
      char msg[100];
3013
      snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_);
3014
      thread->stats.AddMessage(msg);
3015 3016
    }

3017
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
3018 3019
    WriteBatch batch;
    Status s;
3020
    int64_t bytes = 0;
L
Lei Jin 已提交
3021

3022 3023
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
3024
    int64_t stage = 0;
M
Mark Callaghan 已提交
3025
    while (!duration.Done(entries_per_batch_)) {
3026 3027 3028 3029 3030 3031 3032 3033 3034 3035
      if (duration.GetStage() != stage) {
        stage = duration.GetStage();
        if (db_.db != nullptr) {
          db_.CreateNewCf(open_options_, stage);
        } else {
          for (auto& db : multi_dbs_) {
            db.CreateNewCf(open_options_, stage);
          }
        }
      }
3036

3037 3038
      size_t id = thread->rand.Next() % num_key_gens;
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
J
jorlow@chromium.org 已提交
3039
      batch.Clear();
3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050

      if (thread->shared->write_rate_limiter.get() != nullptr) {
        thread->shared->write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_),
            Env::IO_HIGH);
        // Set time at which last op finished to Now() to hide latency and
        // sleep from rate limiter. Also, do the check once per batch, not
        // once per write.
        thread->stats.ResetLastOpTime();
      }

L
Lei Jin 已提交
3051
      for (int64_t j = 0; j < entries_per_batch_; j++) {
3052 3053 3054 3055 3056 3057 3058 3059
        int64_t rand_num = key_gens[id]->Next();
        GenerateKeyFromInt(rand_num, FLAGS_num, &key);
        if (FLAGS_num_column_families <= 1) {
          batch.Put(key, gen.Generate(value_size_));
        } else {
          // We use same rand_num as seed for key and column family so that we
          // can deterministically find the cfh corresponding to a particular
          // key while reading the key.
3060 3061
          batch.Put(db_with_cfh->GetCfh(rand_num), key,
                    gen.Generate(value_size_));
3062
        }
L
Lei Jin 已提交
3063
        bytes += value_size_ + key_size_;
3064
      }
3065
      s = db_with_cfh->db->Write(write_options_, &batch);
3066
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
3067
                                entries_per_batch_, kWrite);
J
jorlow@chromium.org 已提交
3068 3069 3070 3071 3072
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
    }
3073
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
3074 3075
  }

3076
  void ReadSequential(ThreadState* thread) {
3077 3078
    if (db_.db != nullptr) {
      ReadSequential(thread, db_.db);
3079
    } else {
3080 3081
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadSequential(thread, db_with_cfh.db);
3082 3083 3084 3085 3086
      }
    }
  }

  void ReadSequential(ThreadState* thread, DB* db) {
3087 3088 3089 3090
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;

    Iterator* iter = db->NewIterator(options);
3091
    int64_t i = 0;
3092
    int64_t bytes = 0;
3093
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
3094
      bytes += iter->key().size() + iter->value().size();
3095
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
3096 3097 3098
      ++i;
    }
    delete iter;
3099
    thread->stats.AddBytes(bytes);
3100 3101
  }

3102
  void ReadReverse(ThreadState* thread) {
3103 3104
    if (db_.db != nullptr) {
      ReadReverse(thread, db_.db);
3105
    } else {
3106 3107
      for (const auto& db_with_cfh : multi_dbs_) {
        ReadReverse(thread, db_with_cfh.db);
3108 3109 3110 3111 3112 3113
      }
    }
  }

  void ReadReverse(ThreadState* thread, DB* db) {
    Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
3114
    int64_t i = 0;
3115
    int64_t bytes = 0;
3116
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
3117
      bytes += iter->key().size() + iter->value().size();
3118
      thread->stats.FinishedOps(nullptr, db, 1, kRead);
3119 3120 3121
      ++i;
    }
    delete iter;
3122
    thread->stats.AddBytes(bytes);
3123 3124
  }

L
Lei Jin 已提交
3125 3126 3127
  void ReadRandomFast(ThreadState* thread) {
    int64_t read = 0;
    int64_t found = 0;
3128
    int64_t nonexist = 0;
L
Lei Jin 已提交
3129
    ReadOptions options(FLAGS_verify_checksum, true);
3130 3131
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145
    std::string value;
    DB* db = SelectDBWithCfh(thread)->db;

    int64_t pot = 1;
    while (pot < FLAGS_num) {
      pot <<= 1;
    }

    Duration duration(FLAGS_duration, reads_);
    do {
      for (int i = 0; i < 100; ++i) {
        int64_t key_rand = thread->rand.Next() & (pot - 1);
        GenerateKeyFromInt(key_rand, FLAGS_num, &key);
        ++read;
3146 3147
        auto status = db->Get(options, key, &value);
        if (status.ok()) {
L
Lei Jin 已提交
3148
          ++found;
3149
        } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3150 3151
          fprintf(stderr, "Get returned an error: %s\n",
                  status.ToString().c_str());
3152
          abort();
L
Lei Jin 已提交
3153
        }
3154 3155 3156
        if (key_rand >= FLAGS_num) {
          ++nonexist;
        }
L
Lei Jin 已提交
3157
      }
3158
      thread->stats.FinishedOps(nullptr, db, 100, kRead);
L
Lei Jin 已提交
3159 3160 3161
    } while (!duration.Done(100));

    char msg[100];
3162 3163 3164
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found, "
             "issued %" PRIu64 " non-exist keys)\n",
             found, read, nonexist);
L
Lei Jin 已提交
3165 3166 3167

    thread->stats.AddMessage(msg);

3168
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
L
Lei Jin 已提交
3169 3170 3171 3172
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183
  int64_t GetRandomKey(Random64* rand) {
    uint64_t rand_int = rand->Next();
    int64_t key_rand;
    if (read_random_exp_range_ == 0) {
      key_rand = rand_int % FLAGS_num;
    } else {
      const uint64_t kBigInt = static_cast<uint64_t>(1U) << 62;
      long double order = -static_cast<long double>(rand_int % kBigInt) /
                          static_cast<long double>(kBigInt) *
                          read_random_exp_range_;
      long double exp_ran = std::exp(order);
3184
      uint64_t rand_num =
3185
          static_cast<int64_t>(exp_ran * static_cast<long double>(FLAGS_num));
3186 3187 3188 3189
      // Map to a different number to avoid locality.
      const uint64_t kBigPrime = 0x5bd1e995;
      // Overflow is like %(2^64). Will have little impact of results.
      key_rand = static_cast<int64_t>((rand_num * kBigPrime) % FLAGS_num);
3190 3191 3192 3193
    }
    return key_rand;
  }

3194
  void ReadRandom(ThreadState* thread) {
L
Lei Jin 已提交
3195
    int64_t read = 0;
L
Lei Jin 已提交
3196
    int64_t found = 0;
3197
    int64_t bytes = 0;
L
Lei Jin 已提交
3198
    ReadOptions options(FLAGS_verify_checksum, true);
3199 3200
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3201
    std::string value;
3202

L
Lei Jin 已提交
3203 3204
    Duration duration(FLAGS_duration, reads_);
    while (!duration.Done(1)) {
3205 3206 3207 3208
      DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread);
      // We use same key_rand as seed for key and column family so that we can
      // deterministically find the cfh corresponding to a particular key, as it
      // is done in DoWrite method.
3209
      int64_t key_rand = GetRandomKey(&thread->rand);
3210
      GenerateKeyFromInt(key_rand, FLAGS_num, &key);
L
Lei Jin 已提交
3211
      read++;
3212 3213
      Status s;
      if (FLAGS_num_column_families > 1) {
3214 3215
        s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key,
                                 &value);
3216 3217 3218 3219
      } else {
        s = db_with_cfh->db->Get(options, key, &value);
      }
      if (s.ok()) {
L
Lei Jin 已提交
3220
        found++;
3221
        bytes += key.size() + value.size();
3222
      } else if (!s.IsNotFound()) {
I
Igor Canadi 已提交
3223
        fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
3224
        abort();
M
Mark Callaghan 已提交
3225
      }
3226
      thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
S
Sanjay Ghemawat 已提交
3227
    }
3228

S
Sanjay Ghemawat 已提交
3229
    char msg[100];
L
Lei Jin 已提交
3230
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
3231
             found, read);
3232

3233
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
3234
    thread->stats.AddMessage(msg);
3235

3236
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
3237 3238
      thread->stats.AddMessage(perf_context.ToString());
    }
S
Sanjay Ghemawat 已提交
3239 3240
  }

L
Lei Jin 已提交
3241 3242 3243 3244
  // Calls MultiGet over a list of keys from a random distribution.
  // Returns the total number of keys found.
  void MultiReadRandom(ThreadState* thread) {
    int64_t read = 0;
3245
    int64_t found = 0;
3246
    ReadOptions options(FLAGS_verify_checksum, true);
S
sdong 已提交
3247
    std::vector<Slice> keys;
3248
    std::vector<std::unique_ptr<const char[]> > key_guards;
L
Lei Jin 已提交
3249
    std::vector<std::string> values(entries_per_batch_);
3250
    while (static_cast<int64_t>(keys.size()) < entries_per_batch_) {
3251
      key_guards.push_back(std::unique_ptr<const char[]>());
3252
      keys.push_back(AllocateKey(&key_guards.back()));
J
jorlow@chromium.org 已提交
3253 3254
    }

M
Mark Callaghan 已提交
3255
    Duration duration(FLAGS_duration, reads_);
L
Lei Jin 已提交
3256
    while (!duration.Done(1)) {
3257
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3258
      for (int64_t i = 0; i < entries_per_batch_; ++i) {
3259
        GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]);
3260
      }
3261
      std::vector<Status> statuses = db->MultiGet(options, keys, &values);
3262
      assert(static_cast<int64_t>(statuses.size()) == entries_per_batch_);
L
Lei Jin 已提交
3263 3264 3265 3266

      read += entries_per_batch_;
      for (int64_t i = 0; i < entries_per_batch_; ++i) {
        if (statuses[i].ok()) {
3267
          ++found;
3268 3269 3270 3271
        } else if (!statuses[i].IsNotFound()) {
          fprintf(stderr, "MultiGet returned an error: %s\n",
                  statuses[i].ToString().c_str());
          abort();
3272 3273
        }
      }
3274
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
3275
    }
3276 3277

    char msg[100];
3278
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
L
Lei Jin 已提交
3279
             found, read);
3280
    thread->stats.AddMessage(msg);
3281 3282
  }

3283 3284 3285 3286
  void IteratorCreation(ThreadState* thread) {
    Duration duration(FLAGS_duration, reads_);
    ReadOptions options(FLAGS_verify_checksum, true);
    while (!duration.Done(1)) {
3287 3288
      DB* db = SelectDB(thread);
      Iterator* iter = db->NewIterator(options);
3289
      delete iter;
3290
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
3291 3292 3293
    }
  }

3294 3295 3296 3297
  void IteratorCreationWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      IteratorCreation(thread);
    } else {
3298
      BGWriter(thread, kWrite);
3299 3300 3301
    }
  }

S
Sanjay Ghemawat 已提交
3302
  void SeekRandom(ThreadState* thread) {
L
Lei Jin 已提交
3303
    int64_t read = 0;
3304
    int64_t found = 0;
3305
    int64_t bytes = 0;
L
Lei Jin 已提交
3306 3307
    ReadOptions options(FLAGS_verify_checksum, true);
    options.tailing = FLAGS_use_tailing_iterator;
3308 3309 3310

    Iterator* single_iter = nullptr;
    std::vector<Iterator*> multi_iters;
3311 3312
    if (db_.db != nullptr) {
      single_iter = db_.db->NewIterator(options);
3313
    } else {
3314 3315
      for (const auto& db_with_cfh : multi_dbs_) {
        multi_iters.push_back(db_with_cfh.db->NewIterator(options));
3316 3317 3318
      }
    }

3319 3320
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3321 3322

    Duration duration(FLAGS_duration, reads_);
3323
    char value_buffer[256];
M
Mark Callaghan 已提交
3324
    while (!duration.Done(1)) {
M
Mark Callaghan 已提交
3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335
      if (!FLAGS_use_tailing_iterator) {
        if (db_.db != nullptr) {
          delete single_iter;
          single_iter = db_.db->NewIterator(options);
        } else {
          for (auto iter : multi_iters) {
            delete iter;
          }
          multi_iters.clear();
          for (const auto& db_with_cfh : multi_dbs_) {
            multi_iters.push_back(db_with_cfh.db->NewIterator(options));
3336 3337 3338
          }
        }
      }
3339 3340 3341 3342 3343 3344
      // Pick a Iterator to use
      Iterator* iter_to_use = single_iter;
      if (single_iter == nullptr) {
        iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()];
      }

L
Lei Jin 已提交
3345
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
3346
      iter_to_use->Seek(key);
L
Lei Jin 已提交
3347
      read++;
3348
      if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) {
L
Lei Jin 已提交
3349 3350
        found++;
      }
3351 3352 3353 3354 3355 3356

      for (int j = 0; j < FLAGS_seek_nexts && iter_to_use->Valid(); ++j) {
        // Copy out iterator's value to make sure we read them.
        Slice value = iter_to_use->value();
        memcpy(value_buffer, value.data(),
               std::min(value.size(), sizeof(value_buffer)));
3357
        bytes += iter_to_use->key().size() + iter_to_use->value().size();
M
Mark Callaghan 已提交
3358 3359 3360 3361 3362 3363

        if (!FLAGS_reverse_iterator) {
          iter_to_use->Next();
        } else {
          iter_to_use->Prev();
        }
3364 3365 3366
        assert(iter_to_use->status().ok());
      }

3367
      thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
S
Sanjay Ghemawat 已提交
3368
    }
3369 3370 3371 3372
    delete single_iter;
    for (auto iter : multi_iters) {
      delete iter;
    }
L
Lei Jin 已提交
3373

S
Sanjay Ghemawat 已提交
3374
    char msg[100];
L
Lei Jin 已提交
3375
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
L
Lei Jin 已提交
3376
             found, read);
3377
    thread->stats.AddBytes(bytes);
S
Sanjay Ghemawat 已提交
3378
    thread->stats.AddMessage(msg);
3379
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
L
Lei Jin 已提交
3380 3381
      thread->stats.AddMessage(perf_context.ToString());
    }
S
Sanjay Ghemawat 已提交
3382
  }
L
Lei Jin 已提交
3383 3384 3385 3386 3387

  void SeekRandomWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
3388
      BGWriter(thread, kWrite);
L
Lei Jin 已提交
3389 3390
    }
  }
S
Sanjay Ghemawat 已提交
3391

3392 3393 3394 3395 3396 3397 3398 3399
  void SeekRandomWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      SeekRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

S
Sanjay Ghemawat 已提交
3400 3401
  void DoDelete(ThreadState* thread, bool seq) {
    WriteBatch batch;
Y
Yueh-Hsuan Chiang 已提交
3402
    Duration duration(seq ? 0 : FLAGS_duration, deletes_);
L
Lei Jin 已提交
3403
    int64_t i = 0;
3404 3405
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3406

M
Mark Callaghan 已提交
3407
    while (!duration.Done(entries_per_batch_)) {
3408
      DB* db = SelectDB(thread);
S
Sanjay Ghemawat 已提交
3409
      batch.Clear();
L
Lei Jin 已提交
3410 3411 3412
      for (int64_t j = 0; j < entries_per_batch_; ++j) {
        const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
        GenerateKeyFromInt(k, FLAGS_num, &key);
3413
        batch.Delete(key);
S
Sanjay Ghemawat 已提交
3414
      }
3415
      auto s = db->Write(write_options_, &batch);
3416
      thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
S
Sanjay Ghemawat 已提交
3417 3418 3419 3420
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
L
Lei Jin 已提交
3421
      i += entries_per_batch_;
S
Sanjay Ghemawat 已提交
3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

3433 3434 3435 3436
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
3437
      BGWriter(thread, kWrite);
3438 3439
    }
  }
3440

M
Mark Callaghan 已提交
3441 3442 3443 3444 3445 3446 3447 3448
  void ReadWhileMerging(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      BGWriter(thread, kMerge);
    }
  }

3449
  void BGWriter(ThreadState* thread, enum OperationType write_merge) {
3450 3451
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
3452
    int64_t bytes = 0;
3453

3454 3455 3456 3457 3458
    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }
3459 3460 3461 3462

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

3463 3464
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
3465 3466

    while (true) {
3467
      DB* db = SelectDB(thread);
3468 3469 3470 3471 3472
      {
        MutexLock l(&thread->shared->mu);
        if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
          // Other threads have finished
          break;
3473
        }
3474 3475 3476
      }

      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
3477 3478
      Status s;

3479
      if (write_merge == kWrite) {
3480
        s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
3481
      } else {
3482
        s = db->Merge(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
3483 3484
      }

3485
      if (!s.ok()) {
M
Mark Callaghan 已提交
3486
        fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
3487 3488
        exit(1);
      }
3489
      bytes += key.size() + value_size_;
3490
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
3491

3492 3493 3494 3495
      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_),
            Env::IO_HIGH);
3496 3497
      }
    }
3498
    thread->stats.AddBytes(bytes);
3499 3500
  }

3501
  // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
3502
  // in DB atomically i.e in a single batch. Also refer GetMany.
3503 3504
  Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key,
                 const Slice& value) {
3505 3506 3507 3508 3509 3510 3511 3512 3513 3514
    std::string suffixes[3] = {"2", "1", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Put(keys[i], value);
    }

3515
    s = db->Write(writeoptions, &batch);
3516 3517 3518 3519 3520
    return s;
  }


  // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
3521
  // in DB atomically i.e in a single batch. Also refer GetMany.
3522 3523
  Status DeleteMany(DB* db, const WriteOptions& writeoptions,
                    const Slice& key) {
3524 3525 3526 3527 3528 3529 3530 3531 3532 3533
    std::string suffixes[3] = {"1", "2", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Delete(keys[i]);
    }

3534
    s = db->Write(writeoptions, &batch);
3535 3536 3537 3538 3539
    return s;
  }

  // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
  // in the same snapshot, and verifies that all the values are identical.
3540
  // ASSUMES that PutMany was used to put (K, V) into the DB.
3541 3542
  Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key,
                 std::string* value) {
3543 3544 3545 3546 3547
    std::string suffixes[3] = {"0", "1", "2"};
    std::string keys[3];
    Slice key_slices[3];
    std::string values[3];
    ReadOptions readoptionscopy = readoptions;
3548
    readoptionscopy.snapshot = db->GetSnapshot();
3549 3550 3551 3552
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      key_slices[i] = keys[i];
3553
      s = db->Get(readoptionscopy, key_slices[i], value);
3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
      } else {
        values[i] = *value;
      }
    }
3565
    db->ReleaseSnapshot(readoptionscopy.snapshot);
3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578

    if ((values[0] != values[1]) || (values[1] != values[2])) {
      fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
              key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
              values[2].c_str());
      // we continue after error rather than exiting so that we can
      // find more errors if any
    }

    return s;
  }

  // Differs from readrandomwriterandom in the following ways:
3579
  // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
3580 3581 3582 3583
  // (b) Does deletes as well (per FLAGS_deletepercent)
  // (c) In order to achieve high % of 'found' during lookups, and to do
  //     multiple writes (including puts and deletes) it uses upto
  //     FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
3584
  // (d) Does not have a MultiGet option.
3585 3586 3587 3588
  void RandomWithVerify(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3589
    int64_t found = 0;
3590 3591 3592
    int get_weight = 0;
    int put_weight = 0;
    int delete_weight = 0;
3593 3594 3595
    int64_t gets_done = 0;
    int64_t puts_done = 0;
    int64_t deletes_done = 0;
3596

3597 3598
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3599

3600
    // the number of iterations is the larger of read_ or write_
3601
    for (int64_t i = 0; i < readwrites_; i++) {
3602
      DB* db = SelectDB(thread);
3603
      if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
3604
        // one batch completed, reinitialize for next batch
3605 3606 3607 3608
        get_weight = FLAGS_readwritepercent;
        delete_weight = FLAGS_deletepercent;
        put_weight = 100 - get_weight - delete_weight;
      }
L
Lei Jin 已提交
3609 3610
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
          FLAGS_numdistinct, &key);
3611 3612
      if (get_weight > 0) {
        // do all the gets first
3613
        Status s = GetMany(db, options, key, &value);
3614
        if (!s.ok() && !s.IsNotFound()) {
3615
          fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
3616 3617 3618 3619 3620 3621 3622
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
        get_weight--;
        gets_done++;
3623
        thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
3624 3625 3626
      } else if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
3627
        Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
3628
        if (!s.ok()) {
3629
          fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
3630 3631 3632 3633
          exit(1);
        }
        put_weight--;
        puts_done++;
3634
        thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
3635
      } else if (delete_weight > 0) {
3636
        Status s = DeleteMany(db, write_options_, key);
3637
        if (!s.ok()) {
3638
          fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
3639 3640 3641 3642
          exit(1);
        }
        delete_weight--;
        deletes_done++;
3643
        thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
3644 3645 3646
      }
    }
    char msg[100];
3647
    snprintf(msg, sizeof(msg),
3648 3649
             "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \
             PRIu64 " found:%" PRIu64 ")",
3650 3651 3652 3653
             gets_done, puts_done, deletes_done, readwrites_, found);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
3654
  // This is different from ReadWhileWriting because it does not use
3655
  // an extra thread.
3656 3657 3658 3659
  void ReadRandomWriteRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3660
    int64_t found = 0;
3661 3662
    int get_weight = 0;
    int put_weight = 0;
3663 3664
    int64_t reads_done = 0;
    int64_t writes_done = 0;
M
Mark Callaghan 已提交
3665 3666
    Duration duration(FLAGS_duration, readwrites_);

3667 3668
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
L
Lei Jin 已提交
3669

3670
    // the number of iterations is the larger of read_ or write_
M
Mark Callaghan 已提交
3671
    while (!duration.Done(1)) {
3672
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3673
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
3674
      if (get_weight == 0 && put_weight == 0) {
X
Xing Jin 已提交
3675
        // one batch completed, reinitialize for next batch
3676 3677 3678 3679 3680
        get_weight = FLAGS_readwritepercent;
        put_weight = 100 - get_weight;
      }
      if (get_weight > 0) {
        // do all the gets first
3681
        Status s = db->Get(options, key, &value);
3682 3683 3684 3685 3686 3687 3688
        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
3689 3690
        get_weight--;
        reads_done++;
3691
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
3692 3693 3694
      } else  if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
3695
        Status s = db->Put(write_options_, key, gen.Generate(value_size_));
3696 3697 3698 3699 3700 3701
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        put_weight--;
        writes_done++;
3702
        thread->stats.FinishedOps(nullptr, db, 1, kWrite);
3703 3704 3705
      }
    }
    char msg[100];
3706 3707
    snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
             " total:%" PRIu64 " found:%" PRIu64 ")",
3708
             reads_done, writes_done, readwrites_, found);
3709 3710 3711
    thread->stats.AddMessage(msg);
  }

M
Mark Callaghan 已提交
3712 3713 3714 3715 3716 3717
  //
  // Read-modify-write for random keys
  void UpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3718
    int64_t found = 0;
3719
    int64_t bytes = 0;
M
Mark Callaghan 已提交
3720 3721
    Duration duration(FLAGS_duration, readwrites_);

3722 3723
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
M
Mark Callaghan 已提交
3724 3725
    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
3726
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3727
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
M
Mark Callaghan 已提交
3728

3729 3730 3731
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
3732
        bytes += key.size() + value.size();
3733
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3734 3735
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
3736
        abort();
M
Mark Callaghan 已提交
3737 3738
      }

3739
      Status s = db->Put(write_options_, key, gen.Generate(value_size_));
M
Mark Callaghan 已提交
3740 3741 3742 3743
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
3744
      bytes += key.size() + value_size_;
3745
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
M
Mark Callaghan 已提交
3746 3747
    }
    char msg[100];
3748
    snprintf(msg, sizeof(msg),
3749
             "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
3750
    thread->stats.AddBytes(bytes);
M
Mark Callaghan 已提交
3751 3752 3753
    thread->stats.AddMessage(msg);
  }

D
Deon Nicholas 已提交
3754 3755 3756 3757 3758 3759 3760
  // Read-modify-write for random keys.
  // Each operation causes the key grow by value_size (simulating an append).
  // Generally used for benchmarking against merges of similar type
  void AppendRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3761
    int64_t found = 0;
3762
    int64_t bytes = 0;
D
Deon Nicholas 已提交
3763

3764 3765
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
3766 3767 3768
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
3769
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3770
      GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
D
Deon Nicholas 已提交
3771

3772 3773 3774
      auto status = db->Get(options, key, &value);
      if (status.ok()) {
        ++found;
3775
        bytes += key.size() + value.size();
3776
      } else if (!status.IsNotFound()) {
I
Igor Canadi 已提交
3777 3778
        fprintf(stderr, "Get returned an error: %s\n",
                status.ToString().c_str());
3779
        abort();
D
Deon Nicholas 已提交
3780 3781 3782 3783 3784 3785 3786 3787
      } else {
        // If not existing, then just assume an empty string of data
        value.clear();
      }

      // Update the value (by appending data)
      Slice operand = gen.Generate(value_size_);
      if (value.size() > 0) {
3788
        // Use a delimiter to match the semantics for StringAppendOperator
D
Deon Nicholas 已提交
3789 3790 3791 3792 3793
        value.append(1,',');
      }
      value.append(operand.data(), operand.size());

      // Write back to the database
3794
      Status s = db->Put(write_options_, key, value);
D
Deon Nicholas 已提交
3795 3796 3797 3798
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
3799
      bytes += key.size() + value.size();
3800
      thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
D
Deon Nicholas 已提交
3801
    }
L
Lei Jin 已提交
3802

D
Deon Nicholas 已提交
3803
    char msg[100];
3804 3805
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
            readwrites_, found);
3806
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
3807 3808 3809 3810 3811 3812 3813 3814 3815 3816
    thread->stats.AddMessage(msg);
  }

  // Read-modify-write for random keys (using MergeOperator)
  // The merge operator to use should be defined by FLAGS_merge_operator
  // Adjust FLAGS_value_size so that the keys are reasonable for this operator
  // Assumes that the merge operator is non-null (i.e.: is well-defined)
  //
  // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
  // to simulate random additions over 64-bit integers using merge.
3817 3818 3819
  //
  // The number of merges on the same key can be controlled by adjusting
  // FLAGS_merge_keys.
D
Deon Nicholas 已提交
3820 3821
  void MergeRandom(ThreadState* thread) {
    RandomGenerator gen;
3822
    int64_t bytes = 0;
3823 3824
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
D
Deon Nicholas 已提交
3825 3826 3827
    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
3828
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3829
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
D
Deon Nicholas 已提交
3830

3831
      Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
D
Deon Nicholas 已提交
3832 3833 3834 3835 3836

      if (!s.ok()) {
        fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
        exit(1);
      }
3837
      bytes += key.size() + value_size_;
3838
      thread->stats.FinishedOps(nullptr, db, 1, kMerge);
D
Deon Nicholas 已提交
3839 3840 3841 3842
    }

    // Print some statistics
    char msg[100];
3843
    snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_);
3844
    thread->stats.AddBytes(bytes);
D
Deon Nicholas 已提交
3845 3846 3847
    thread->stats.AddMessage(msg);
  }

3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858
  // Read and merge random keys. The amount of reads and merges are controlled
  // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
  // keys (and thus also the number of reads and merges on the same key) can be
  // adjusted with FLAGS_merge_keys.
  //
  // As with MergeRandom, the merge operator to use should be defined by
  // FLAGS_merge_operator.
  void ReadRandomMergeRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
3859 3860 3861
    int64_t num_hits = 0;
    int64_t num_gets = 0;
    int64_t num_merges = 0;
3862 3863
    size_t max_length = 0;

3864 3865
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
3866 3867 3868
    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
3869
      DB* db = SelectDB(thread);
L
Lei Jin 已提交
3870
      GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
3871 3872 3873 3874

      bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;

      if (do_merge) {
3875
        Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
3876 3877 3878 3879 3880
        if (!s.ok()) {
          fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
          exit(1);
        }
        num_merges++;
3881
        thread->stats.FinishedOps(nullptr, db, 1, kMerge);
3882
      } else {
3883
        Status s = db->Get(options, key, &value);
3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894
        if (value.length() > max_length)
          max_length = value.length();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          num_hits++;
        }
        num_gets++;
3895
        thread->stats.FinishedOps(nullptr, db, 1, kRead);
3896 3897
      }
    }
L
Lei Jin 已提交
3898

3899 3900
    char msg[100];
    snprintf(msg, sizeof(msg),
S
sdong 已提交
3901 3902
             "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64
             " hits:%" PRIu64 " maxlength:%" ROCKSDB_PRIszt ")",
3903 3904 3905 3906
             num_gets, num_merges, readwrites_, num_hits, max_length);
    thread->stats.AddMessage(msg);
  }

T
Tomislav Novak 已提交
3907 3908 3909 3910 3911 3912 3913 3914 3915 3916
  void WriteSeqSeekSeq(ThreadState* thread) {
    writes_ = FLAGS_num;
    DoWrite(thread, SEQUENTIAL);
    // exclude writes from the ops/sec calculation
    thread->stats.Start(thread->tid);

    DB* db = SelectDB(thread);
    std::unique_ptr<Iterator> iter(
      db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)));

3917 3918
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
T
Tomislav Novak 已提交
3919 3920 3921 3922
    for (int64_t i = 0; i < FLAGS_num; ++i) {
      GenerateKeyFromInt(i, FLAGS_num, &key);
      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
3923
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
3924

3925
      for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
M
Mark Callaghan 已提交
3926 3927 3928 3929 3930
        if (!FLAGS_reverse_iterator) {
          iter->Next();
        } else {
          iter->Prev();
        }
T
Tomislav Novak 已提交
3931 3932
        GenerateKeyFromInt(++i, FLAGS_num, &key);
        assert(iter->Valid() && iter->key() == key);
3933
        thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
3934 3935 3936 3937
      }

      iter->Seek(key);
      assert(iter->Valid() && iter->key() == key);
3938
      thread->stats.FinishedOps(nullptr, db, 1, kSeek);
T
Tomislav Novak 已提交
3939 3940 3941
    }
  }

3942
#ifndef ROCKSDB_LITE
A
agiardullo 已提交
3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958
  // This benchmark stress tests Transactions.  For a given --duration (or
  // total number of --writes, a Transaction will perform a read-modify-write
  // to increment the value of a key in each of N(--transaction-sets) sets of
  // keys (where each set has --num keys).  If --threads is set, this will be
  // done in parallel.
  //
  // To test transactions, use --transaction_db=true.  Not setting this
  // parameter
  // will run the same benchmark without transactions.
  //
  // RandomTransactionVerify() will then validate the correctness of the results
  // by checking if the sum of all keys in each set is the same.
  void RandomTransaction(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    Duration duration(FLAGS_duration, readwrites_);
    ReadOptions read_options(FLAGS_verify_checksum, true);
S
SherlockNoMad 已提交
3959
    uint16_t num_prefix_ranges = static_cast<uint16_t>(FLAGS_transaction_sets);
A
agiardullo 已提交
3960
    uint64_t transactions_done = 0;
A
agiardullo 已提交
3961 3962 3963 3964 3965 3966

    if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
      fprintf(stderr, "invalid value for transaction_sets\n");
      abort();
    }

A
agiardullo 已提交
3967 3968 3969 3970 3971 3972 3973 3974
    TransactionOptions txn_options;
    txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
    txn_options.set_snapshot = FLAGS_transaction_set_snapshot;

    RandomTransactionInserter inserter(&thread->rand, write_options_,
                                       read_options, FLAGS_num,
                                       num_prefix_ranges);

A
agiardullo 已提交
3975 3976 3977 3978 3979 3980 3981 3982
    if (FLAGS_num_multi_db > 1) {
      fprintf(stderr,
              "Cannot run RandomTransaction benchmark with "
              "FLAGS_multi_db > 1.");
      abort();
    }

    while (!duration.Done(1)) {
A
agiardullo 已提交
3983
      bool success;
A
agiardullo 已提交
3984

A
agiardullo 已提交
3985 3986
      // RandomTransactionInserter will attempt to insert a key for each
      // # of FLAGS_transaction_sets
A
agiardullo 已提交
3987
      if (FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
3988
        success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
A
agiardullo 已提交
3989 3990
      } else if (FLAGS_transaction_db) {
        TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);
A
agiardullo 已提交
3991
        success = inserter.TransactionDBInsert(txn_db, txn_options);
A
agiardullo 已提交
3992
      } else {
A
agiardullo 已提交
3993
        success = inserter.DBInsert(db_.db);
A
agiardullo 已提交
3994 3995
      }

A
agiardullo 已提交
3996 3997 3998 3999
      if (!success) {
        fprintf(stderr, "Unexpected error: %s\n",
                inserter.GetLastStatus().ToString().c_str());
        abort();
4000 4001
      }

A
agiardullo 已提交
4002
      thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
A
agiardullo 已提交
4003 4004 4005 4006
      transactions_done++;
    }

    char msg[100];
A
agiardullo 已提交
4007
    if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
A
agiardullo 已提交
4008 4009
      snprintf(msg, sizeof(msg),
               "( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
A
agiardullo 已提交
4010
               transactions_done, inserter.GetFailureCount());
A
agiardullo 已提交
4011 4012 4013 4014 4015
    } else {
      snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
    }
    thread->stats.AddMessage(msg);

4016
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
A
agiardullo 已提交
4017 4018 4019 4020 4021 4022 4023 4024
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

  // Verifies consistency of data after RandomTransaction() has been run.
  // Since each iteration of RandomTransaction() incremented a key in each set
  // by the same value, the sum of the keys in each set should be the same.
  void RandomTransactionVerify() {
A
agiardullo 已提交
4025
    if (!FLAGS_transaction_db && !FLAGS_optimistic_transaction_db) {
A
agiardullo 已提交
4026 4027 4028 4029
      // transactions not used, nothing to verify.
      return;
    }

A
agiardullo 已提交
4030
    Status s =
S
SherlockNoMad 已提交
4031 4032
        RandomTransactionInserter::Verify(db_.db,
                            static_cast<uint16_t>(FLAGS_transaction_sets));
A
agiardullo 已提交
4033

A
agiardullo 已提交
4034 4035 4036 4037
    if (s.ok()) {
      fprintf(stdout, "RandomTransactionVerify Success.\n");
    } else {
      fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
A
agiardullo 已提交
4038 4039
    }
  }
4040
#endif  // ROCKSDB_LITE
A
agiardullo 已提交
4041

A
Andres Noetzli 已提交
4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090
  // Writes and deletes random keys without overwriting keys.
  //
  // This benchmark is intended to partially replicate the behavior of MyRocks
  // secondary indices: All data is stored in keys and updates happen by
  // deleting the old version of the key and inserting the new version.
  void RandomReplaceKeys(ThreadState* thread) {
    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);
    std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
    size_t max_counter = 50;
    RandomGenerator gen;

    Status s;
    DB* db = SelectDB(thread);
    for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
      GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
      s = db->Put(write_options_, key, gen.Generate(value_size_));
      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }
    }

    db->GetSnapshot();

    std::default_random_engine generator;
    std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
                                                  FLAGS_stddev);
    Duration duration(FLAGS_duration, FLAGS_num);
    while (!duration.Done(1)) {
      int64_t rnd_id = static_cast<int64_t>(distribution(generator));
      int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
                                static_cast<int64_t>(0));
      GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                         &key);
      s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
                                   : db->Delete(write_options_, key);
      if (s.ok()) {
        counters[key_id] = (counters[key_id] + 1) % max_counter;
        GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
                           &key);
        s = db->Put(write_options_, key, Slice());
      }

      if (!s.ok()) {
        fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
        exit(1);
      }

4091
      thread->stats.FinishedOps(nullptr, db, 1, kOthers);
A
Andres Noetzli 已提交
4092 4093 4094 4095 4096 4097 4098 4099 4100 4101
    }

    char msg[200];
    snprintf(msg, sizeof(msg),
             "use single deletes: %d, "
             "standard deviation: %lf\n",
             FLAGS_use_single_deletes, FLAGS_stddev);
    thread->stats.AddMessage(msg);
  }

4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248
  void TimeSeriesReadOrDelete(ThreadState* thread, bool do_deletion) {
    ReadOptions options(FLAGS_verify_checksum, true);
    int64_t read = 0;
    int64_t found = 0;
    int64_t bytes = 0;

    Iterator* iter = nullptr;
    // Only work on single database
    assert(db_.db != nullptr);
    iter = db_.db->NewIterator(options);

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);

    char value_buffer[256];
    while (true) {
      {
        MutexLock l(&thread->shared->mu);
        if (thread->shared->num_done >= 1) {
          // Write thread have finished
          break;
        }
      }
      if (!FLAGS_use_tailing_iterator) {
        delete iter;
        iter = db_.db->NewIterator(options);
      }
      // Pick a Iterator to use

      int64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
      GenerateKeyFromInt(key_id, FLAGS_num, &key);
      // Reset last 8 bytes to 0
      char* start = const_cast<char*>(key.data());
      start += key.size() - 8;
      memset(start, 0, 8);
      ++read;

      bool key_found = false;
      // Seek the prefix
      for (iter->Seek(key); iter->Valid() && iter->key().starts_with(key);
           iter->Next()) {
        key_found = true;
        // Copy out iterator's value to make sure we read them.
        if (do_deletion) {
          bytes += iter->key().size();
          if (KeyExpired(timestamp_emulator_.get(), iter->key())) {
            thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
            db_.db->Delete(write_options_, iter->key());
          } else {
            break;
          }
        } else {
          bytes += iter->key().size() + iter->value().size();
          thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
          Slice value = iter->value();
          memcpy(value_buffer, value.data(),
                 std::min(value.size(), sizeof(value_buffer)));

          assert(iter->status().ok());
        }
      }
      found += key_found;
    }
    delete iter;

    char msg[100];
    snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found,
             read);
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(msg);
    if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
      thread->stats.AddMessage(perf_context.ToString());
    }
  }

  void TimeSeriesWrite(ThreadState* thread) {
    // Special thread that keeps writing until other threads are done.
    RandomGenerator gen;
    int64_t bytes = 0;

    // Don't merge stats from this thread with the readers.
    thread->stats.SetExcludeFromMerge();

    std::unique_ptr<RateLimiter> write_rate_limiter;
    if (FLAGS_benchmark_write_rate_limit > 0) {
      write_rate_limiter.reset(
          NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
    }

    std::unique_ptr<const char[]> key_guard;
    Slice key = AllocateKey(&key_guard);

    Duration duration(FLAGS_duration, writes_);
    while (!duration.Done(1)) {
      DB* db = SelectDB(thread);

      uint64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
      // Write key id
      GenerateKeyFromInt(key_id, FLAGS_num, &key);
      // Write timestamp

      char* start = const_cast<char*>(key.data());
      char* pos = start + 8;
      int bytes_to_fill =
          std::min(key_size_ - static_cast<int>(pos - start), 8);
      uint64_t timestamp_value = timestamp_emulator_->Get();
      if (port::kLittleEndian) {
        for (int i = 0; i < bytes_to_fill; ++i) {
          pos[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
        }
      } else {
        memcpy(pos, static_cast<void*>(&timestamp_value), bytes_to_fill);
      }

      timestamp_emulator_->Inc();

      Status s;

      s = db->Put(write_options_, key, gen.Generate(value_size_));

      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      bytes = key.size() + value_size_;
      thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
      thread->stats.AddBytes(bytes);

      if (FLAGS_benchmark_write_rate_limit > 0) {
        write_rate_limiter->Request(
            entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH);
      }
    }
  }

  void TimeSeries(ThreadState* thread) {
    if (thread->tid > 0) {
      bool do_deletion = FLAGS_expire_style == "delete" &&
                         thread->tid <= FLAGS_num_deletion_threads;
      TimeSeriesReadOrDelete(thread, do_deletion);
    } else {
      TimeSeriesWrite(thread);
      thread->stats.Stop();
      thread->stats.Report("timeseries write");
    }
  }

4249
  void Compact(ThreadState* thread) {
4250
    DB* db = SelectDB(thread);
4251
    db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
J
jorlow@chromium.org 已提交
4252 4253
  }

S
Sanjay Ghemawat 已提交
4254
  void PrintStats(const char* key) {
4255 4256
    if (db_.db != nullptr) {
      PrintStats(db_.db, key, false);
4257
    }
4258 4259
    for (const auto& db_with_cfh : multi_dbs_) {
      PrintStats(db_with_cfh.db, key, true);
4260 4261 4262 4263 4264 4265 4266
    }
  }

  void PrintStats(DB* db, const char* key, bool print_header = false) {
    if (print_header) {
      fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str());
    }
4267
    std::string stats;
4268
    if (!db->GetProperty(key, &stats)) {
4269
      stats = "(failed)";
4270
    }
4271
    fprintf(stdout, "\n%s\n", stats.c_str());
4272
  }
J
jorlow@chromium.org 已提交
4273 4274
};

4275
int db_bench_tool(int argc, char** argv) {
I
Igor Canadi 已提交
4276
  rocksdb::port::InstallStackTraceHandler();
4277 4278 4279 4280 4281 4282
  static bool initialized = false;
  if (!initialized) {
    SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                    " [OPTIONS]...");
    initialized = true;
  }
4283
  ParseCommandLineFlags(&argc, &argv, true);
4284

4285 4286 4287
  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
J
jorlow@chromium.org 已提交
4288
  }
4289
  FLAGS_compaction_pri_e = (rocksdb::CompactionPri)FLAGS_compaction_pri;
J
jorlow@chromium.org 已提交
4290

I
Igor Canadi 已提交
4291 4292
  std::vector<std::string> fanout = rocksdb::StringSplit(
      FLAGS_max_bytes_for_level_multiplier_additional, ',');
4293
  for (size_t j = 0; j < fanout.size(); j++) {
4294
    FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
S
sdong 已提交
4295 4296 4297 4298 4299
#ifndef CYGWIN
        std::stoi(fanout[j]));
#else
        stoi(fanout[j]));
#endif
4300 4301 4302 4303 4304
  }

  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());

4305 4306 4307 4308 4309 4310 4311 4312 4313 4314 4315 4316 4317
#ifndef ROCKSDB_LITE
  std::unique_ptr<Env> custom_env_guard;
  if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
    fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
    exit(1);
  } else if (!FLAGS_env_uri.empty()) {
    FLAGS_env = NewEnvFromUri(FLAGS_env_uri, &custom_env_guard);
    if (FLAGS_env == nullptr) {
      fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
      exit(1);
    }
  }
#endif  // ROCKSDB_LITE
4318 4319 4320 4321 4322 4323 4324 4325 4326 4327 4328 4329 4330 4331 4332 4333 4334 4335 4336
  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
  }

  if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
  else {
    fprintf(stdout, "Unknown compaction fadvice:%s\n",
            FLAGS_compaction_fadvice.c_str());
  }

  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());

4337 4338 4339
  // The number of background threads should be at least as much the
  // max number of concurrent compactions.
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
4340 4341 4342
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_flushes,
                                  rocksdb::Env::Priority::HIGH);

H
heyongqiang 已提交
4343
  // Choose a location for the test database if none given with --db=<path>
4344 4345 4346 4347 4348
  if (FLAGS_db.empty()) {
    std::string default_db_path;
    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
    default_db_path += "/dbbench";
    FLAGS_db = default_db_path;
H
heyongqiang 已提交
4349 4350
  }

4351 4352 4353 4354 4355 4356
  if (FLAGS_stats_interval_seconds > 0) {
    // When both are set then FLAGS_stats_interval determines the frequency
    // at which the timer is checked for FLAGS_stats_interval_seconds
    FLAGS_stats_interval = 1000;
  }

4357
  rocksdb::Benchmark benchmark;
J
jorlow@chromium.org 已提交
4358 4359 4360
  benchmark.Run();
  return 0;
}
4361
}  // namespace rocksdb
J
Jonathan Wiepert 已提交
4362
#endif