db_bench.cc 92.9 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

10
#include <cstddef>
J
jorlow@chromium.org 已提交
11 12 13
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
14
#include <gflags/gflags.h>
J
jorlow@chromium.org 已提交
15 16
#include "db/db_impl.h"
#include "db/version_set.h"
I
Igor Canadi 已提交
17
#include "rocksdb/statistics.h"
18 19 20 21
#include "rocksdb/options.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
22
#include "rocksdb/memtablerep.h"
23
#include "rocksdb/write_batch.h"
24 25
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
26
#include "rocksdb/statistics.h"
27
#include "rocksdb/perf_context.h"
J
jorlow@chromium.org 已提交
28
#include "port/port.h"
29
#include "util/bit_set.h"
J
jorlow@chromium.org 已提交
30
#include "util/crc32c.h"
J
jorlow@chromium.org 已提交
31
#include "util/histogram.h"
32
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
33
#include "util/random.h"
34
#include "util/stack_trace.h"
35
#include "util/string_util.h"
I
Igor Canadi 已提交
36
#include "util/statistics.h"
J
jorlow@chromium.org 已提交
37
#include "util/testutil.h"
38
#include "hdfs/env_hdfs.h"
D
Deon Nicholas 已提交
39
#include "utilities/merge_operators.h"
J
jorlow@chromium.org 已提交
40

T
Tyler Harter 已提交
41

42 43 44 45 46 47 48 49 50 51 52 53 54
DEFINE_string(benchmarks,

              "fillseq,"
              "fillsync,"
              "fillrandom,"
              "overwrite,"
              "readrandom,"
              "readrandom,"
              "readseq,"
              "readreverse,"
              "compact,"
              "readrandom,"
              "readseq,"
M
Mark Callaghan 已提交
55
              "readtocache,"
56 57 58 59 60 61 62
              "readreverse,"
              "readwhilewriting,"
              "readrandomwriterandom,"
              "updaterandom,"
              "randomwithverify,"
              "fill100K,"
              "crc32c,"
A
Albert Strasheim 已提交
63 64
              "compress,"
              "uncompress,"
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
              "acquireload,"
              "fillfromstdin,",

              "Comma-separated list of operations to run in the specified order"
              "Actual benchmarks:\n"
              "\tfillseq       -- write N values in sequential key"
              " order in async mode\n"
              "\tfillrandom    -- write N values in random key order in async"
              " mode\n"
              "\toverwrite     -- overwrite N values in random key order in"
              " async mode\n"
              "\tfillsync      -- write N/100 values in random key order in "
              "sync mode\n"
              "\tfill100K      -- write N/1000 100K values in random order in"
              " async mode\n"
              "\tdeleteseq     -- delete N keys in sequential order\n"
              "\tdeleterandom  -- delete N keys in random order\n"
              "\treadseq       -- read N times sequentially\n"
M
Mark Callaghan 已提交
83
              "\treadtocache   -- 1 thread reading database sequentially\n"
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
              "\treadreverse   -- read N times in reverse order\n"
              "\treadrandom    -- read N times in random order\n"
              "\treadmissing   -- read N missing keys in random order\n"
              "\treadhot       -- read N times in random order from 1% section "
              "of DB\n"
              "\treadwhilewriting      -- 1 writer, N threads doing random "
              "reads\n"
              "\treadrandomwriterandom -- N threads doing random-read, "
              "random-write\n"
              "\tprefixscanrandom      -- prefix scan N times in random order\n"
              "\tupdaterandom  -- N threads doing read-modify-write for random "
              "keys\n"
              "\tappendrandom  -- N threads doing read-modify-write with "
              "growing values\n"
              "\tmergerandom   -- same as updaterandom/appendrandom using merge"
              " operator. "
              "Must be used with merge_operator\n"
101 102
              "\treadrandommergerandom -- perform N random read-or-merge "
              "operations. Must be used with merge_operator\n"
103
              "\tnewiterator   -- repeated iterator creation\n"
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
              "\tseekrandom    -- N random seeks\n"
              "\tcrc32c        -- repeated crc32c of 4K of data\n"
              "\tacquireload   -- load N*1000 times\n"
              "Meta operations:\n"
              "\tcompact     -- Compact the entire DB\n"
              "\tstats       -- Print DB stats\n"
              "\tlevelstats  -- Print the number of files and bytes per level\n"
              "\tsstables    -- Print sstable info\n"
              "\theapprofile -- Dump a heap profile (if supported by this"
              " port)\n");

DEFINE_int64(num, 1000000, "Number of key/values to place in database");

DEFINE_int64(numdistinct, 1000,
             "Number of distinct keys to use. Used in RandomWithVerify to "
             "read/write on fewer keys so that gets are more likely to find the"
             " key and puts are more likely to update the same key");

122 123 124 125 126
DEFINE_int64(merge_keys, -1,
             "Number of distinct keys to use for MergeRandom and "
             "ReadRandomMergeRandom. "
             "If negative, there will be FLAGS_num keys.");

127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
DEFINE_int64(reads, -1, "Number of read operations to do.  "
             "If negative, do FLAGS_num reads.");

DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use"
             " an iterator");

DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms");

DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for"
            " prefixscanrandom. If true, use_prefix_blooms must also be true.");

DEFINE_int64(seed, 0, "Seed base for random number generators. "
             "When 0 it is deterministic.");

DEFINE_int32(threads, 1, "Number of concurrent threads to run.");

DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
             " When 0 then num & reads determine the test duration");

DEFINE_int32(value_size, 100, "Size of each value");
T
Tyler Harter 已提交
147

148

149 150 151 152 153 154 155 156 157 158 159
// the maximum size of key in bytes
static const int kMaxKeySize = 128;
static bool ValidateKeySize(const char* flagname, int32_t value) {
  if (value > kMaxKeySize) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < %d\n",
            flagname, value, kMaxKeySize);
    return false;
  }
  return true;
}
DEFINE_int32(key_size, 16, "size of each key");
160

161 162
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
              " to this fraction of their original size after compression");
J
jorlow@chromium.org 已提交
163

164
DEFINE_bool(histogram, false, "Print histogram of operation timings");
J
jorlow@chromium.org 已提交
165

166
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
167
             "Number of bytes to buffer in memtable before compacting");
168

169 170 171 172
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. Each memtable is of size"
             "write_buffer_size.");
173

174 175 176 177 178 179 180 181 182 183
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together"
             "before writing to storage. This is cheap because it is an"
             "in-memory merge. If this feature is not enabled, then all these"
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check"
             " in all of these files. Also, an in-memory merge may result in"
             " writing less data to storage if there are duplicate records "
             " in each of these individual write buffers.");
184

185 186 187 188
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");
189

190 191 192
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
             "style of compaction: level-based vs universal");
193

194 195 196
DEFINE_int32(universal_size_ratio, 0,
             "Percentage flexibility while comparing file size"
             " (for universal compaction only).");
197

198 199
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
             " single compaction run (for universal compaction only).");
200

201 202
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
203

204 205
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
206

207 208 209 210
DEFINE_int32(universal_compression_size_percent, -1,
             "The percentage of the database to compress for universal "
             "compaction. -1 means compress everything.");

211 212
DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
             "data. Negative means use default settings.");
J
jorlow@chromium.org 已提交
213

214 215
DEFINE_int32(block_size, rocksdb::Options().block_size,
             "Number of bytes in a block.");
216

217 218 219
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data.");

220 221 222
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time"
             " (use default if == 0)");
223

224 225
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
             " use default settings.");
S
Sanjay Ghemawat 已提交
226

227 228 229
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
            " database.  If you set this flag and also specify a benchmark that"
            " wants a fresh database, that benchmark will fail.");
230

231
DEFINE_string(db, "", "Use the db with the following name.");
232

233 234 235 236 237 238 239 240 241 242 243
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
  if (value >= 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(cache_numshardbits, -1, "Number of shards for the block cache"
             " is 2 ** cache_numshardbits. Negative means use default settings."
             " This is applied only if FLAGS_cache_size is non-negative.");
244

245
DEFINE_int32(cache_remove_scan_count_limit, 32, "");
246

247 248
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
            " from storage");
249

250
DEFINE_bool(statistics, false, "Database statistics");
251
static class std::shared_ptr<rocksdb::Statistics> dbstats;
252

253 254
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
             " --num reads.");
H
heyongqiang 已提交
255

256 257
DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes per second."
             " No limit when <= 0. Only for the readwhilewriting test.");
258

259
DEFINE_bool(sync, false, "Sync all writes to disk");
H
heyongqiang 已提交
260

261 262
DEFINE_bool(disable_data_sync, false, "If true, do not wait until data is"
            " synced to disk.");
M
Mark Callaghan 已提交
263

264
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
M
Mark Callaghan 已提交
265

266
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
267

268 269
DEFINE_bool(use_snapshot, false, "If true, create a snapshot per query when"
            " randomread benchmark is used");
H
heyongqiang 已提交
270

271 272
DEFINE_bool(get_approx, false, "If true, call GetApproximateSizes per query"
            " when read_range is > 1 and randomread benchmark is used");
H
heyongqiang 已提交
273

274
DEFINE_int32(num_levels, 7, "The total number of levels");
H
heyongqiang 已提交
275

276
DEFINE_int32(target_file_size_base, 2 * 1048576, "Target file size at level-1");
H
heyongqiang 已提交
277

278 279
DEFINE_int32(target_file_size_multiplier, 1,
             "A multiplier to compute target level-N file size (N >= 2)");
280

281
DEFINE_uint64(max_bytes_for_level_base,  10 * 1048576, "Max bytes for level-1");
H
heyongqiang 已提交
282

283 284
DEFINE_int32(max_bytes_for_level_multiplier, 10,
             "A multiplier to compute max bytes for level-N (N >= 2)");
H
heyongqiang 已提交
285

286 287 288
static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
DEFINE_string(max_bytes_for_level_multiplier_additional, "",
              "A vector that specifies additional fanout per level");
289

290 291
DEFINE_int32(level0_stop_writes_trigger, 12, "Number of files in level-0"
             " that will trigger put stop.");
292

293 294
DEFINE_int32(level0_slowdown_writes_trigger, 8, "Number of files in level-0"
             " that will slow down writes.");
295

296 297
DEFINE_int32(level0_file_num_compaction_trigger, 4, "Number of files in level-0"
             " when compactions start");
298

299 300 301 302 303 304 305 306 307 308 309 310 311
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value <= 0 || value>=100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
             " as percentage) for the ReadRandomWriteRandom workload. The "
             "default value 90 means 90% operations out of all reads and writes"
             " operations are reads. In other words, 9 gets for every 1 put.");

312 313 314 315 316
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
             " as percentage) for the ReadRandomMergeRandom workload. The"
             " default value 70 means 70% out of all read and merge operations"
             " are merges. In other words, 7 merges for every 3 gets.");

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
             "deletes (used in RandomWithVerify only). RandomWithVerify "
             "calculates writepercent as (100 - FLAGS_readwritepercent - "
             "deletepercent), so deletepercent must be smaller than (100 - "
             "FLAGS_readwritepercent)");

DEFINE_int32(disable_seek_compaction, false, "Option to disable compaction"
             " triggered by read.");

DEFINE_uint64(delete_obsolete_files_period_micros, 0, "Option to delete "
              "obsolete files periodically. 0 means that obsolete files are"
              " deleted after every compaction run.");

enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;
A
Albert Strasheim 已提交
341 342 343 344
  else if (!strcasecmp(ctype, "lz4"))
    return rocksdb::kLZ4Compression;
  else if (!strcasecmp(ctype, "lz4hc"))
    return rocksdb::kLZ4HCCompression;
345 346 347 348 349 350 351

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
  return rocksdb::kSnappyCompression; //default value
}
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
352
    rocksdb::kSnappyCompression;
353

354 355 356 357 358 359 360 361 362 363 364 365 366
DEFINE_int32(compression_level, -1,
             "Compression level. For zlib this should be -1 for the "
             "default level, or between 0 and 9.");

static bool ValidateCompressionLevel(const char* flagname, int32_t value) {
  if (value < -1 || value > 9) {
    fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n",
            flagname, value);
    return false;
  }
  return true;
}

I
Igor Canadi 已提交
367 368 369
static const bool FLAGS_compression_level_dummy __attribute__((unused)) =
    google::RegisterFlagValidator(&FLAGS_compression_level,
                                  &ValidateCompressionLevel);
370

371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
             " from this level. Levels with number < min_level_to_compress are"
             " not compressed. Otherwise, apply compression_type to "
             "all levels.");

static bool ValidateTableCacheNumshardbits(const char* flagname,
                                           int32_t value) {
  if (0 >= value || value > 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be  0 < val <= 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(table_cache_numshardbits, 4, "");
386

387
DEFINE_string(hdfs, "", "Name of hdfs environment");
388
// posix or hdfs environment
389
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
390

391 392
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
             "this is greater than zero. When 0 the interval grows over time.");
393

394 395
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
             " this is greater than 0.");
396

397 398
DEFINE_int32(perf_level, 0, "Level of perf collection");

399 400 401 402 403 404 405 406 407 408
static bool ValidateRateLimit(const char* flagname, double value) {
  static constexpr double EPSILON = 1e-10;
  if ( value < -EPSILON ) {
    fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_double(soft_rate_limit, 0.0, "");
J
Jim Paton 已提交
409

410 411 412
DEFINE_double(hard_rate_limit, 0.0, "When not equal to 0 this make threads "
              "sleep at each stats reporting interval until the compaction"
              " score for all levels is less than or equal to this value.");
413

414 415 416
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
             "When hard_rate_limit is set then this is the max time a put will"
             " be stalled.");
417

418 419 420
DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of "
             "overlaps in grandparent (i.e., level+2) before we stop building a"
             " single file in a level->level+1 compaction.");
421

422
DEFINE_bool(readonly, false, "Run read only benchmarks.");
H
heyongqiang 已提交
423

424
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
425

426 427 428
DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
             " a compaction run that compacts Level-K with Level-(K+1) (for"
             " K >= 1)");
429

430 431 432
DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
              " in MB.");
433

434 435
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
            "Allow buffered io using OS buffers");
436

437 438
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
            "Allow reads to occur via mmap-ing files");
439

440 441
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
            "Allow writes to occur via mmap-ing files");
442

443 444
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
            "Advise random access on table file open");
445

446 447 448
DEFINE_string(compaction_fadvice, "NORMAL",
              "Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
449
  rocksdb::Options().access_hint_on_compaction_start;
450

451 452
DEFINE_bool(use_multiget, false,
            "Use multiget to access a series of keys instead of get");
453

I
Igor Canadi 已提交
454 455 456
DEFINE_bool(use_tailing_iterator, false,
            "Use tailing iterator to access a series of keys instead of get");

457 458 459
DEFINE_int64(keys_per_multiget, 90, "If use_multiget is true, determines number"
             " of keys to group per call Arbitrary default is good because it"
             " agrees with readwritepercent");
460 461

// TODO: Apply this flag to generic Get calls too. Currently only with Multiget
462 463 464 465 466 467 468 469 470 471 472 473 474
DEFINE_bool(warn_missing_keys, true, "Print a message to user when a key is"
            " missing in a Get/MultiGet call");

DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
            "Use adaptive mutex");

DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
              "Allows OS to incrementally sync files to disk while they are"
              " being written, in the background. Issue one request for every"
              " bytes_per_sync written. 0 turns it off.");
DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop"
            " the delete if key not present");

475 476 477
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
             " operations on a key in the memtable");

478 479 480 481 482 483 484 485
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
  if (value < 0 || value>=2000000000) {
    fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
            flagname, value);
    return false;
  }
  return true;
}
L
Lei Jin 已提交
486 487
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
             "plain table");
J
Jim Paton 已提交
488 489 490 491 492 493

enum RepFactory {
  kSkipList,
  kPrefixHash,
  kVectorRep
};
494 495 496 497 498 499 500 501 502 503 504 505 506
enum RepFactory StringToRepFactory(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
    return kPrefixHash;
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
J
Jim Paton 已提交
507
static enum RepFactory FLAGS_rep_factory;
508
DEFINE_string(memtablerep, "skip_list", "");
L
Lei Jin 已提交
509 510
DEFINE_bool(use_plain_table, false, "if use plain table "
            "instead of block-based table format");
J
Jim Paton 已提交
511

512 513 514 515
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
              "If a new merge operator is specified, be sure to use fresh"
              " database The possible merge operators are defined in"
              " utilities/merge_operators.h");
D
Deon Nicholas 已提交
516

K
kailiu 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_soft_rate_limit,
                                &ValidateRateLimit);

static const bool FLAGS_hard_rate_limit_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);

static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);

static const bool FLAGS_key_size_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);

static const bool FLAGS_cache_numshardbits_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_cache_numshardbits,
                                &ValidateCacheNumshardbits);

static const bool FLAGS_readwritepercent_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_readwritepercent,
                                &ValidateInt32Percent);

static const bool FLAGS_deletepercent_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_deletepercent,
                                &ValidateInt32Percent);
static const bool
  FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                                &ValidateTableCacheNumshardbits);

546
namespace rocksdb {
J
jorlow@chromium.org 已提交
547

548
// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
549 550 551
class RandomGenerator {
 private:
  std::string data_;
552
  unsigned int pos_;
J
jorlow@chromium.org 已提交
553 554 555 556 557 558 559 560

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
561
    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
J
jorlow@chromium.org 已提交
562 563 564 565 566 567 568 569
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

570
  Slice Generate(unsigned int len) {
J
jorlow@chromium.org 已提交
571 572 573 574 575 576 577
    if (pos_ + len > data_.size()) {
      pos_ = 0;
      assert(len < data_.size());
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
578
};
X
Xing Jin 已提交
579

580 581 582 583 584 585 586 587 588 589
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

class Stats {
 private:
590
  int id_;
591 592 593
  double start_;
  double finish_;
  double seconds_;
594 595 596
  long long done_;
  long long last_report_done_;
  long long next_report_;
597 598
  int64_t bytes_;
  double last_op_finish_;
599
  double last_report_finish_;
600
  HistogramImpl hist_;
601
  std::string message_;
602
  bool exclude_from_merge_;
603 604

 public:
605
  Stats() { Start(-1); }
606

607 608 609
  void Start(int id) {
    id_ = id;
    next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
610 611 612
    last_op_finish_ = start_;
    hist_.Clear();
    done_ = 0;
613
    last_report_done_ = 0;
614 615
    bytes_ = 0;
    seconds_ = 0;
616
    start_ = FLAGS_env->NowMicros();
617
    finish_ = start_;
618
    last_report_finish_ = start_;
619
    message_.clear();
620 621
    // When set, stats from this thread won't be merged with others.
    exclude_from_merge_ = false;
622 623 624
  }

  void Merge(const Stats& other) {
625 626 627
    if (other.exclude_from_merge_)
      return;

628 629 630 631 632 633 634 635 636 637 638 639
    hist_.Merge(other.hist_);
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
640
    finish_ = FLAGS_env->NowMicros();
641 642 643 644 645 646 647
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

648
  void SetId(int id) { id_ = id; }
649
  void SetExcludeFromMerge() { exclude_from_merge_ = true; }
650

M
Mark Callaghan 已提交
651
  void FinishedSingleOp(DB* db) {
652
    if (FLAGS_histogram) {
653
      double now = FLAGS_env->NowMicros();
654 655
      double micros = now - last_op_finish_;
      hist_.Add(micros);
656
      if (micros > 20000 && !FLAGS_stats_interval) {
657 658 659 660 661 662 663 664
        fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

    done_++;
    if (done_ >= next_report_) {
665 666 667 668 669 670 671 672
      if (!FLAGS_stats_interval) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
673
        fprintf(stderr, "... finished %lld ops%30s\r", done_, "");
674 675 676 677
        fflush(stderr);
      } else {
        double now = FLAGS_env->NowMicros();
        fprintf(stderr,
678 679
                "%s ... thread %d: (%lld,%lld) ops and "
                "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
680
                FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(),
681
                id_,
M
Mark Callaghan 已提交
682
                done_ - last_report_done_, done_,
683
                (done_ - last_report_done_) /
M
Mark Callaghan 已提交
684 685 686 687
                ((now - last_report_finish_) / 1000000.0),
                done_ / ((now - start_) / 1000000.0),
                (now - last_report_finish_) / 1000000.0,
                (now - start_) / 1000000.0);
M
Mark Callaghan 已提交
688

689 690
        if (FLAGS_stats_per_interval) {
          std::string stats;
691
          if (db && db->GetProperty("rocksdb.stats", &stats))
692
            fprintf(stderr, "%s\n", stats.c_str());
693
        }
M
Mark Callaghan 已提交
694

695 696 697 698 699
        fflush(stderr);
        next_report_ += FLAGS_stats_interval;
        last_report_finish_ = now;
        last_report_done_ = done_;
      }
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
    // that does not call FinishedSingleOp().
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
723 724
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
725

D
Dhruba Borthakur 已提交
726
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
727
            name.ToString().c_str(),
728
            elapsed * 1e6 / done_,
D
Dhruba Borthakur 已提交
729
            (long)throughput,
730 731 732 733 734 735 736 737 738 739 740 741 742 743
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
    }
    fflush(stdout);
  }
};

// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;
744
  int perf_level;
745 746 747 748 749 750 751

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

752 753
  long num_initialized;
  long num_done;
754 755
  bool start;

756
  SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
757 758 759 760 761
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
762
  Random64 rand;         // Has different seeds for different threads
763
  Stats stats;
764
  SharedState* shared;
765

A
Abhishek Kona 已提交
766
  /* implicit */ ThreadState(int index)
767
      : tid(index),
768
        rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
769 770 771
  }
};

M
Mark Callaghan 已提交
772 773
class Duration {
 public:
774
  Duration(int max_seconds, long long max_ops) {
M
Mark Callaghan 已提交
775 776 777 778 779 780 781
    max_seconds_ = max_seconds;
    max_ops_= max_ops;
    ops_ = 0;
    start_at_ = FLAGS_env->NowMicros();
  }

  bool Done(int increment) {
782
    if (increment <= 0) increment = 1;    // avoid Done(0) and infinite loops
M
Mark Callaghan 已提交
783 784 785
    ops_ += increment;

    if (max_seconds_) {
786 787
      // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
      if ((ops_/1000) != ((ops_-increment)/1000)) {
M
Mark Callaghan 已提交
788 789 790 791 792 793 794 795 796 797 798 799
        double now = FLAGS_env->NowMicros();
        return ((now - start_at_) / 1000000.0) >= max_seconds_;
      } else {
        return false;
      }
    } else {
      return ops_ > max_ops_;
    }
  }

 private:
  int max_seconds_;
800 801
  long long max_ops_;
  long long ops_;
M
Mark Callaghan 已提交
802 803 804
  double start_at_;
};

J
jorlow@chromium.org 已提交
805 806
class Benchmark {
 private:
807
  shared_ptr<Cache> cache_;
808
  shared_ptr<Cache> compressed_cache_;
S
Sanjay Ghemawat 已提交
809
  const FilterPolicy* filter_policy_;
T
Tyler Harter 已提交
810
  const SliceTransform* prefix_extractor_;
J
jorlow@chromium.org 已提交
811
  DB* db_;
812
  long long num_;
813
  int value_size_;
814
  int key_size_;
815 816
  int entries_per_batch_;
  WriteOptions write_options_;
817 818 819
  long long reads_;
  long long writes_;
  long long readwrites_;
820
  long long merge_keys_;
J
jorlow@chromium.org 已提交
821
  int heap_counter_;
822
  char keyFormat_[100]; // will contain the format of key. e.g "%016d"
823 824
  void PrintHeader() {
    PrintEnvironment();
825
    fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
826 827 828
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
829
    fprintf(stdout, "Entries:    %lld\n", num_);
830
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
831
            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
832
             / 1048576.0));
833
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
834 835
            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
              * num_)
836
             / 1048576.0));
837
    fprintf(stdout, "Write rate limit: %d\n", FLAGS_writes_per_second);
838
    switch (FLAGS_compression_type_e) {
839
      case rocksdb::kNoCompression:
840 841
        fprintf(stdout, "Compression: none\n");
        break;
842
      case rocksdb::kSnappyCompression:
843 844
        fprintf(stdout, "Compression: snappy\n");
        break;
845
      case rocksdb::kZlibCompression:
846 847
        fprintf(stdout, "Compression: zlib\n");
        break;
848
      case rocksdb::kBZip2Compression:
849 850
        fprintf(stdout, "Compression: bzip2\n");
        break;
A
Albert Strasheim 已提交
851 852 853 854 855 856 857
      case rocksdb::kLZ4Compression:
        fprintf(stdout, "Compression: lz4\n");
        break;
      case rocksdb::kLZ4HCCompression:
        fprintf(stdout, "Compression: lz4hc\n");
        break;
      }
858

J
Jim Paton 已提交
859 860 861 862 863 864 865 866 867 868 869
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
        fprintf(stdout, "Memtablerep: prefix_hash\n");
        break;
      case kSkipList:
        fprintf(stdout, "Memtablerep: skip_list\n");
        break;
      case kVectorRep:
        fprintf(stdout, "Memtablerep: vector\n");
        break;
    }
870
    fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);
J
Jim Paton 已提交
871

872 873 874 875 876 877 878 879 880 881 882 883 884 885
    PrintWarnings();
    fprintf(stdout, "------------------------------------------------\n");
  }

  void PrintWarnings() {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
886
    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
887 888 889 890
      // The test string should not be too small.
      const int len = FLAGS_block_size;
      char* text = (char*) malloc(len+1);
      bool result = true;
891
      const char* name = nullptr;
892 893 894 895
      std::string compressed;

      memset(text, (int) 'y', len);
      text[len] = '\0';
896
      switch (FLAGS_compression_type_e) {
897
        case kSnappyCompression:
898 899
          result = port::Snappy_Compress(Options().compression_opts, text,
                                         strlen(text), &compressed);
900 901 902
          name = "Snappy";
          break;
        case kZlibCompression:
903 904
          result = port::Zlib_Compress(Options().compression_opts, text,
                                       strlen(text), &compressed);
905 906 907
          name = "Zlib";
          break;
        case kBZip2Compression:
908 909
          result = port::BZip2_Compress(Options().compression_opts, text,
                                        strlen(text), &compressed);
910 911
          name = "BZip2";
          break;
A
Albert Strasheim 已提交
912 913 914 915 916 917 918 919 920 921
        case kLZ4Compression:
          result = port::LZ4_Compress(Options().compression_opts, text,
                                      strlen(text), &compressed);
          name = "LZ4";
          break;
        case kLZ4HCCompression:
          result = port::LZ4HC_Compress(Options().compression_opts, text,
                                        strlen(text), &compressed);
          name = "LZ4HC";
          break;
922 923 924
        case kNoCompression:
          assert(false); // cannot happen
          break;
925 926 927 928 929 930 931 932 933
      }

      if (!result) {
        fprintf(stdout, "WARNING: %s compression is not enabled\n", name);
      } else if (name && compressed.size() >= strlen(text)) {
        fprintf(stdout, "WARNING: %s compression is not effective\n", name);
      }

      free(text);
934
    }
935 936
  }

K
kailiu 已提交
937 938 939 940 941 942 943 944 945 946 947 948 949 950 951
// Current the following isn't equivalent to OS_LINUX.
#if defined(__linux)
  static Slice TrimSpace(Slice s) {
    unsigned int start = 0;
    while (start < s.size() && isspace(s[start])) {
      start++;
    }
    unsigned int limit = s.size();
    while (limit > start && isspace(s[limit-1])) {
      limit--;
    }
    return Slice(s.data() + start, limit - start);
  }
#endif

952 953 954 955 956
  void PrintEnvironment() {
    fprintf(stderr, "LevelDB:    version %d.%d\n",
            kMajorVersion, kMinorVersion);

#if defined(__linux)
957
    time_t now = time(nullptr);
958 959 960
    fprintf(stderr, "Date:       %s", ctime(&now));  // ctime() adds newline

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
961
    if (cpuinfo != nullptr) {
962 963 964 965
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
966
      while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
967
        const char* sep = strchr(line, ':');
968
        if (sep == nullptr) {
969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

J
jorlow@chromium.org 已提交
987
 public:
988
  Benchmark()
989 990
  : cache_(FLAGS_cache_size >= 0 ?
           (FLAGS_cache_numshardbits >= 1 ?
991 992
            NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits,
                        FLAGS_cache_remove_scan_count_limit) :
993
            NewLRUCache(FLAGS_cache_size)) : nullptr),
994 995 996 997
    compressed_cache_(FLAGS_compressed_cache_size >= 0 ?
           (FLAGS_cache_numshardbits >= 1 ?
            NewLRUCache(FLAGS_compressed_cache_size, FLAGS_cache_numshardbits) :
            NewLRUCache(FLAGS_compressed_cache_size)) : nullptr),
S
Sanjay Ghemawat 已提交
998 999
    filter_policy_(FLAGS_bloom_bits >= 0
                   ? NewBloomFilterPolicy(FLAGS_bloom_bits)
1000
                   : nullptr),
L
Lei Jin 已提交
1001 1002
    prefix_extractor_(NewFixedPrefixTransform(FLAGS_use_plain_table ?
                      FLAGS_prefix_size : FLAGS_key_size-1)),
1003
    db_(nullptr),
1004
    num_(FLAGS_num),
1005
    value_size_(FLAGS_value_size),
1006
    key_size_(FLAGS_key_size),
1007
    entries_per_batch_(1),
1008
    reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
1009
    writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
1010
    readwrites_((FLAGS_writes < 0  && FLAGS_reads < 0)? FLAGS_num :
1011
                ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)
1012
               ),
1013
    merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
1014
    heap_counter_(0) {
J
jorlow@chromium.org 已提交
1015
    std::vector<std::string> files;
1016
    FLAGS_env->GetChildren(FLAGS_db, &files);
1017
    for (unsigned int i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
1018
      if (Slice(files[i]).starts_with("heap-")) {
1019
        FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
J
jorlow@chromium.org 已提交
1020 1021
      }
    }
1022
    if (!FLAGS_use_existing_db) {
1023
      DestroyDB(FLAGS_db, Options());
1024
    }
J
jorlow@chromium.org 已提交
1025 1026 1027 1028
  }

  ~Benchmark() {
    delete db_;
S
Sanjay Ghemawat 已提交
1029
    delete filter_policy_;
T
Tyler Harter 已提交
1030
    delete prefix_extractor_;
J
jorlow@chromium.org 已提交
1031 1032
  }

1033
  //this function will construct string format for key. e.g "%016lld"
X
Xing Jin 已提交
1034 1035 1036
  void ConstructStrFormatForKey(char* str, int keySize) {
    str[0] = '%';
    str[1] = '0';
1037
    sprintf(str+2, "%dlld%s", keySize, "%s");
X
Xing Jin 已提交
1038 1039
  }

1040
  unique_ptr<char []> GenerateKeyFromInt(long long v, const char* suffix = "") {
1041 1042
    unique_ptr<char []> keyInStr(new char[kMaxKeySize + 1]);
    snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix);
X
Xing Jin 已提交
1043 1044 1045
    return keyInStr;
  }

J
jorlow@chromium.org 已提交
1046
  void Run() {
1047 1048
    PrintHeader();
    Open();
1049
    const char* benchmarks = FLAGS_benchmarks.c_str();
1050
    while (benchmarks != nullptr) {
J
jorlow@chromium.org 已提交
1051 1052
      const char* sep = strchr(benchmarks, ',');
      Slice name;
1053
      if (sep == nullptr) {
J
jorlow@chromium.org 已提交
1054
        name = benchmarks;
1055
        benchmarks = nullptr;
J
jorlow@chromium.org 已提交
1056 1057 1058 1059 1060
      } else {
        name = Slice(benchmarks, sep - benchmarks);
        benchmarks = sep + 1;
      }

X
Xing Jin 已提交
1061
      // Sanitize parameters
1062
      num_ = FLAGS_num;
1063
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
1064
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
1065
      value_size_ = FLAGS_value_size;
1066 1067
      key_size_ = FLAGS_key_size;
      ConstructStrFormatForKey(keyFormat_, key_size_);
1068 1069
      entries_per_batch_ = 1;
      write_options_ = WriteOptions();
1070 1071 1072
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
H
heyongqiang 已提交
1073 1074
      write_options_.disableWAL = FLAGS_disable_wal;

1075
      void (Benchmark::*method)(ThreadState*) = nullptr;
1076
      bool fresh_db = false;
1077
      int num_threads = FLAGS_threads;
1078 1079

      if (name == Slice("fillseq")) {
1080 1081
        fresh_db = true;
        method = &Benchmark::WriteSeq;
1082
      } else if (name == Slice("fillbatch")) {
1083 1084 1085
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
1086
      } else if (name == Slice("fillrandom")) {
1087 1088
        fresh_db = true;
        method = &Benchmark::WriteRandom;
1089 1090 1091
      } else if (name == Slice("fillfromstdin")) {
        fresh_db = true;
        method = &Benchmark::WriteFromStdin;
1092 1093 1094 1095 1096 1097 1098 1099
      } else if (name == Slice("filluniquerandom")) {
        fresh_db = true;
        if (num_threads > 1) {
          fprintf(stderr, "filluniquerandom multithreaded not supported"
                           " set --threads=1");
          exit(1);
        }
        method = &Benchmark::WriteUniqueRandom;
1100
      } else if (name == Slice("overwrite")) {
1101 1102
        fresh_db = false;
        method = &Benchmark::WriteRandom;
1103
      } else if (name == Slice("fillsync")) {
1104 1105 1106 1107
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
1108
      } else if (name == Slice("fill100K")) {
1109 1110 1111 1112
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
J
jorlow@chromium.org 已提交
1113
      } else if (name == Slice("readseq")) {
1114
        method = &Benchmark::ReadSequential;
M
Mark Callaghan 已提交
1115 1116 1117 1118
      } else if (name == Slice("readtocache")) {
        method = &Benchmark::ReadSequential;
        num_threads = 1;
        reads_ = num_;
J
jorlow@chromium.org 已提交
1119
      } else if (name == Slice("readreverse")) {
1120
        method = &Benchmark::ReadReverse;
J
jorlow@chromium.org 已提交
1121
      } else if (name == Slice("readrandom")) {
1122
        method = &Benchmark::ReadRandom;
S
Sanjay Ghemawat 已提交
1123 1124
      } else if (name == Slice("readmissing")) {
        method = &Benchmark::ReadMissing;
1125 1126
      } else if (name == Slice("newiterator")) {
        method = &Benchmark::IteratorCreation;
S
Sanjay Ghemawat 已提交
1127 1128
      } else if (name == Slice("seekrandom")) {
        method = &Benchmark::SeekRandom;
1129
      } else if (name == Slice("readhot")) {
1130
        method = &Benchmark::ReadHot;
1131
      } else if (name == Slice("readrandomsmall")) {
1132
        reads_ /= 1000;
1133
        method = &Benchmark::ReadRandom;
T
Tyler Harter 已提交
1134 1135
      } else if (name == Slice("prefixscanrandom")) {
        method = &Benchmark::PrefixScanRandom;
S
Sanjay Ghemawat 已提交
1136 1137 1138 1139
      } else if (name == Slice("deleteseq")) {
        method = &Benchmark::DeleteSeq;
      } else if (name == Slice("deleterandom")) {
        method = &Benchmark::DeleteRandom;
1140 1141 1142
      } else if (name == Slice("readwhilewriting")) {
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
1143 1144
      } else if (name == Slice("readrandomwriterandom")) {
        method = &Benchmark::ReadRandomWriteRandom;
1145 1146 1147 1148 1149 1150 1151 1152
      } else if (name == Slice("readrandommergerandom")) {
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
                  name.ToString().c_str());
          method = nullptr;
        } else {
          method = &Benchmark::ReadRandomMergeRandom;
        }
M
Mark Callaghan 已提交
1153 1154
      } else if (name == Slice("updaterandom")) {
        method = &Benchmark::UpdateRandom;
D
Deon Nicholas 已提交
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164
      } else if (name == Slice("appendrandom")) {
        method = &Benchmark::AppendRandom;
      } else if (name == Slice("mergerandom")) {
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
                  name.ToString().c_str());
          method = nullptr;
        } else {
          method = &Benchmark::MergeRandom;
        }
1165 1166
      } else if (name == Slice("randomwithverify")) {
        method = &Benchmark::RandomWithVerify;
J
jorlow@chromium.org 已提交
1167
      } else if (name == Slice("compact")) {
1168
        method = &Benchmark::Compact;
J
jorlow@chromium.org 已提交
1169
      } else if (name == Slice("crc32c")) {
1170
        method = &Benchmark::Crc32c;
1171
      } else if (name == Slice("acquireload")) {
1172
        method = &Benchmark::AcquireLoad;
A
Albert Strasheim 已提交
1173 1174 1175 1176
      } else if (name == Slice("compress")) {
        method = &Benchmark::Compress;
      } else if (name == Slice("uncompress")) {
        method = &Benchmark::Uncompress;
J
jorlow@chromium.org 已提交
1177 1178
      } else if (name == Slice("heapprofile")) {
        HeapProfile();
1179
      } else if (name == Slice("stats")) {
1180
        PrintStats("rocksdb.stats");
1181
      } else if (name == Slice("levelstats")) {
1182
        PrintStats("rocksdb.levelstats");
S
Sanjay Ghemawat 已提交
1183
      } else if (name == Slice("sstables")) {
1184
        PrintStats("rocksdb.sstables");
J
jorlow@chromium.org 已提交
1185
      } else {
1186 1187 1188 1189
        if (name != Slice()) {  // No error message for empty name
          fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
        }
      }
1190 1191 1192 1193 1194

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
                  name.ToString().c_str());
1195
          method = nullptr;
1196 1197
        } else {
          delete db_;
1198
          db_ = nullptr;
1199 1200 1201 1202 1203
          DestroyDB(FLAGS_db, Options());
          Open();
        }
      }

1204
      if (method != nullptr) {
1205
        fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
1206
        RunBenchmark(num_threads, name, method);
J
jorlow@chromium.org 已提交
1207 1208
      }
    }
1209 1210 1211
    if (FLAGS_statistics) {
     fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
    }
J
jorlow@chromium.org 已提交
1212 1213
  }

1214
 private:
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

1237
    SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
1238
    thread->stats.Start(thread->tid);
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

1251 1252
  void RunBenchmark(int n, Slice name,
                    void (Benchmark::*method)(ThreadState*)) {
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;

    ThreadArg* arg = new ThreadArg[n];
    for (int i = 0; i < n; i++) {
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
1265
      arg[i].thread->shared = &shared;
1266
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

1281 1282 1283 1284
    // Stats for some threads can be excluded.
    Stats merge_stats;
    for (int i = 0; i < n; i++) {
      merge_stats.Merge(arg[i].thread->stats);
1285
    }
1286
    merge_stats.Report(name);
1287 1288 1289 1290 1291 1292 1293 1294

    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
1295
    // Checksum about 500MB of data total
1296 1297
    const int size = 4096;
    const char* label = "(4K per op)";
J
jorlow@chromium.org 已提交
1298
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
1299 1300 1301 1302
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
1303
      thread->stats.FinishedSingleOp(nullptr);
J
jorlow@chromium.org 已提交
1304 1305 1306 1307 1308
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

1309 1310
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
1311 1312
  }

1313
  void AcquireLoad(ThreadState* thread) {
1314 1315 1316
    int dummy;
    port::AtomicPointer ap(&dummy);
    int count = 0;
1317
    void *ptr = nullptr;
1318
    thread->stats.AddMessage("(each op is 1000 loads)");
1319 1320 1321 1322 1323
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
        ptr = ap.Acquire_Load();
      }
      count++;
1324
      thread->stats.FinishedSingleOp(nullptr);
1325
    }
1326
    if (ptr == nullptr) exit(1); // Disable unused variable warning.
1327 1328
  }

A
Albert Strasheim 已提交
1329
  void Compress(ThreadState *thread) {
1330 1331
    RandomGenerator gen;
    Slice input = gen.Generate(Options().block_size);
1332 1333 1334 1335
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
A
Albert Strasheim 已提交
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345

    // Compress 1G
    while (ok && bytes < int64_t(1) << 30) {
      switch (FLAGS_compression_type_e) {
      case rocksdb::kSnappyCompression:
        ok = port::Snappy_Compress(Options().compression_opts, input.data(),
                                   input.size(), &compressed);
        break;
      case rocksdb::kZlibCompression:
        ok = port::Zlib_Compress(Options().compression_opts, input.data(),
1346
                                 input.size(), &compressed);
A
Albert Strasheim 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362
        break;
      case rocksdb::kBZip2Compression:
        ok = port::BZip2_Compress(Options().compression_opts, input.data(),
                                  input.size(), &compressed);
        break;
      case rocksdb::kLZ4Compression:
        ok = port::LZ4_Compress(Options().compression_opts, input.data(),
                                input.size(), &compressed);
        break;
      case rocksdb::kLZ4HCCompression:
        ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
                                  input.size(), &compressed);
        break;
      default:
        ok = false;
      }
1363 1364
      produced += compressed.size();
      bytes += input.size();
1365
      thread->stats.FinishedSingleOp(nullptr);
1366 1367 1368
    }

    if (!ok) {
A
Albert Strasheim 已提交
1369
      thread->stats.AddMessage("(compression failure)");
1370 1371 1372 1373
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
1374 1375
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
1376 1377 1378
    }
  }

A
Albert Strasheim 已提交
1379
  void Uncompress(ThreadState *thread) {
1380 1381
    RandomGenerator gen;
    Slice input = gen.Generate(Options().block_size);
1382
    std::string compressed;
A
Albert Strasheim 已提交
1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409

    bool ok;
    switch (FLAGS_compression_type_e) {
    case rocksdb::kSnappyCompression:
      ok = port::Snappy_Compress(Options().compression_opts, input.data(),
                                 input.size(), &compressed);
      break;
    case rocksdb::kZlibCompression:
      ok = port::Zlib_Compress(Options().compression_opts, input.data(),
                               input.size(), &compressed);
      break;
    case rocksdb::kBZip2Compression:
      ok = port::BZip2_Compress(Options().compression_opts, input.data(),
                                input.size(), &compressed);
      break;
    case rocksdb::kLZ4Compression:
      ok = port::LZ4_Compress(Options().compression_opts, input.data(),
                              input.size(), &compressed);
      break;
    case rocksdb::kLZ4HCCompression:
      ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
                                input.size(), &compressed);
      break;
    default:
      ok = false;
    }

1410
    int64_t bytes = 0;
A
Albert Strasheim 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
    int decompress_size;
    while (ok && bytes < 1024 * 1048576) {
      char *uncompressed = nullptr;
      switch (FLAGS_compression_type_e) {
      case rocksdb::kSnappyCompression:
        // allocate here to make comparison fair
        uncompressed = new char[input.size()];
        ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
                                     uncompressed);
        break;
      case rocksdb::kZlibCompression:
        uncompressed = port::Zlib_Uncompress(
            compressed.data(), compressed.size(), &decompress_size);
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kBZip2Compression:
        uncompressed = port::BZip2_Uncompress(
            compressed.data(), compressed.size(), &decompress_size);
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4Compression:
        uncompressed = port::LZ4_Uncompress(
            compressed.data(), compressed.size(), &decompress_size);
        ok = uncompressed != nullptr;
        break;
      case rocksdb::kLZ4HCCompression:
        uncompressed = port::LZ4_Uncompress(
            compressed.data(), compressed.size(), &decompress_size);
        ok = uncompressed != nullptr;
        break;
      default:
        ok = false;
      }
      delete[] uncompressed;
1445
      bytes += input.size();
1446
      thread->stats.FinishedSingleOp(nullptr);
1447 1448 1449
    }

    if (!ok) {
A
Albert Strasheim 已提交
1450
      thread->stats.AddMessage("(compression failure)");
1451
    } else {
1452
      thread->stats.AddBytes(bytes);
1453 1454 1455
    }
  }

1456
  void Open() {
1457
    assert(db_ == nullptr);
1458
    Options options;
1459
    options.create_if_missing = !FLAGS_use_existing_db;
1460
    options.block_cache = cache_;
1461
    options.block_cache_compressed = compressed_cache_;
1462
    if (cache_ == nullptr) {
1463 1464
      options.no_block_cache = true;
    }
1465
    options.write_buffer_size = FLAGS_write_buffer_size;
1466
    options.max_write_buffer_number = FLAGS_max_write_buffer_number;
1467 1468
    options.min_write_buffer_number_to_merge =
      FLAGS_min_write_buffer_number_to_merge;
1469
    options.max_background_compactions = FLAGS_max_background_compactions;
1470
    options.compaction_style = FLAGS_compaction_style_e;
1471
    options.block_size = FLAGS_block_size;
S
Sanjay Ghemawat 已提交
1472
    options.filter_policy = filter_policy_;
L
Lei Jin 已提交
1473 1474 1475
    options.prefix_extractor =
      (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_
                                                         : nullptr;
1476 1477
    options.max_open_files = FLAGS_open_files;
    options.statistics = dbstats;
1478
    options.env = FLAGS_env;
H
heyongqiang 已提交
1479
    options.disableDataSync = FLAGS_disable_data_sync;
1480
    options.use_fsync = FLAGS_use_fsync;
1481
    options.num_levels = FLAGS_num_levels;
H
heyongqiang 已提交
1482 1483 1484 1485 1486
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
1487
    options.filter_deletes = FLAGS_filter_deletes;
J
Jim Paton 已提交
1488
    if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
L
Lei Jin 已提交
1489 1490
      fprintf(stderr, "prefix_size should be non-zero iff memtablerep "
                      "== prefix_hash\n");
J
Jim Paton 已提交
1491 1492 1493 1494
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
I
Igor Canadi 已提交
1495 1496
        options.memtable_factory.reset(NewHashSkipListRepFactory(
            NewFixedPrefixTransform(FLAGS_prefix_size)));
J
Jim Paton 已提交
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
        break;
      case kSkipList:
        // no need to do anything
        break;
      case kVectorRep:
        options.memtable_factory.reset(
          new VectorRepFactory
        );
        break;
    }
L
Lei Jin 已提交
1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
    if (FLAGS_use_plain_table) {
      if (FLAGS_rep_factory != kPrefixHash) {
        fprintf(stderr, "Waring: plain table is used with skipList\n");
      }
      if (!FLAGS_mmap_read && !FLAGS_mmap_write) {
        fprintf(stderr, "plain table format requires mmap to operate\n");
        exit(1);
      }

      int bloom_bits_per_key = FLAGS_bloom_bits;
      if (bloom_bits_per_key < 0) {
        bloom_bits_per_key = 0;
      }
      options.table_factory = std::shared_ptr<TableFactory>(
          NewPlainTableFactory(FLAGS_key_size, bloom_bits_per_key, 0.75));
    }
1523 1524
    if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
      if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
1525 1526
          (unsigned int)FLAGS_num_levels) {
        fprintf(stderr, "Insufficient number of fanouts specified %d\n",
1527
                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
1528 1529 1530
        exit(1);
      }
      options.max_bytes_for_level_multiplier_additional =
1531
        FLAGS_max_bytes_for_level_multiplier_additional_v;
1532
    }
H
heyongqiang 已提交
1533
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
M
Mark Callaghan 已提交
1534
    options.level0_file_num_compaction_trigger =
1535
        FLAGS_level0_file_num_compaction_trigger;
H
heyongqiang 已提交
1536 1537
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
1538
    options.compression = FLAGS_compression_type_e;
1539
    options.compression_opts.level = FLAGS_compression_level;
1540 1541
    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
1542 1543
    if (FLAGS_min_level_to_compress >= 0) {
      assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
1544
      options.compression_per_level.resize(FLAGS_num_levels);
1545
      for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
1546 1547
        options.compression_per_level[i] = kNoCompression;
      }
1548
      for (int i = FLAGS_min_level_to_compress;
1549
           i < FLAGS_num_levels; i++) {
1550
        options.compression_per_level[i] = FLAGS_compression_type_e;
1551 1552
      }
    }
1553
    options.disable_seek_compaction = FLAGS_disable_seek_compaction;
1554 1555
    options.delete_obsolete_files_period_micros =
      FLAGS_delete_obsolete_files_period_micros;
J
Jim Paton 已提交
1556 1557 1558 1559
    options.soft_rate_limit = FLAGS_soft_rate_limit;
    options.hard_rate_limit = FLAGS_hard_rate_limit;
    options.rate_limit_delay_max_milliseconds =
      FLAGS_rate_limit_delay_max_milliseconds;
1560
    options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
A
Abhishek Kona 已提交
1561
    options.max_grandparent_overlap_factor =
1562
      FLAGS_max_grandparent_overlap_factor;
1563
    options.disable_auto_compactions = FLAGS_disable_auto_compactions;
1564
    options.source_compaction_factor = FLAGS_source_compaction_factor;
1565 1566

    // fill storage options
1567 1568 1569
    options.allow_os_buffer = FLAGS_bufferedio;
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
1570
    options.advise_random_on_open = FLAGS_advise_random_on_open;
1571
    options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
H
Haobo Xu 已提交
1572
    options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
H
Haobo Xu 已提交
1573
    options.bytes_per_sync = FLAGS_bytes_per_sync;
H
Haobo Xu 已提交
1574

D
Deon Nicholas 已提交
1575
    // merge operator options
1576 1577 1578
    options.merge_operator = MergeOperators::CreateFromStringId(
        FLAGS_merge_operator);
    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
D
Deon Nicholas 已提交
1579 1580 1581 1582
      fprintf(stderr, "invalid merge operator: %s\n",
              FLAGS_merge_operator.c_str());
      exit(1);
    }
1583
    options.max_successive_merges = FLAGS_max_successive_merges;
D
Deon Nicholas 已提交
1584

1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
      options.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    }
    if (FLAGS_universal_min_merge_width != 0) {
      options.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    }
    if (FLAGS_universal_max_merge_width != 0) {
      options.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
      options.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    }
1602 1603 1604 1605
    if (FLAGS_universal_compression_size_percent != -1) {
      options.compaction_options_universal.compression_size_percent =
        FLAGS_universal_compression_size_percent;
    }
1606

H
heyongqiang 已提交
1607
    Status s;
1608
    if(FLAGS_readonly) {
H
heyongqiang 已提交
1609 1610 1611 1612
      s = DB::OpenForReadOnly(options, FLAGS_db, &db_);
    } else {
      s = DB::Open(options, FLAGS_db, &db_);
    }
1613 1614 1615 1616
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
1617
    if (FLAGS_min_level_to_compress >= 0) {
1618
      options.compression_per_level.clear();
1619
    }
1620 1621
  }

1622 1623 1624 1625
  enum WriteMode {
    RANDOM, SEQUENTIAL, UNIQUE_RANDOM
  };

1626
  void WriteSeq(ThreadState* thread) {
1627
    DoWrite(thread, SEQUENTIAL);
1628
  }
1629

1630
  void WriteRandom(ThreadState* thread) {
1631
    DoWrite(thread, RANDOM);
1632 1633
  }

1634 1635 1636 1637
  void WriteUniqueRandom(ThreadState* thread) {
    DoWrite(thread, UNIQUE_RANDOM);
  }

1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
  void writeOrFail(WriteBatch& batch) {
    Status s = db_->Write(write_options_, &batch);
    if (!s.ok()) {
      fprintf(stderr, "put error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

  void WriteFromStdin(ThreadState* thread) {
    size_t count = 0;
    WriteBatch batch;
    const size_t bufferLen = 32 << 20;
    unique_ptr<char[]> line = unique_ptr<char[]>(new char[bufferLen]);
    char* linep = line.get();
    const int batchSize = 100 << 10;
    const char columnSeparator = '\t';
    const char lineSeparator = '\n';

    while (fgets(linep, bufferLen, stdin) != nullptr) {
      ++count;
      char* tab = std::find(linep, linep + bufferLen, columnSeparator);
      if (tab == linep + bufferLen) {
I
Igor Canadi 已提交
1660
        fprintf(stderr, "[Error] No Key delimiter TAB at line %zu\n", count);
1661 1662 1663 1664 1665 1666
        continue;
      }
      Slice key(linep, tab - linep);
      tab++;
      char* endLine = std::find(tab, linep + bufferLen, lineSeparator);
      if (endLine  == linep + bufferLen) {
I
Igor Canadi 已提交
1667
        fprintf(stderr, "[Error] No ENTER at end of line # %zu\n", count);
1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685
        continue;
      }
      Slice value(tab, endLine - tab);
      thread->stats.FinishedSingleOp(db_);
      thread->stats.AddBytes(endLine - linep - 1);

      if (batch.Count() < batchSize) {
        batch.Put(key, value);
        continue;
      }
      writeOrFail(batch);
      batch.Clear();
    }
    if (batch.Count() > 0) {
      writeOrFail(batch);
    }
  }

1686 1687 1688 1689 1690 1691 1692 1693 1694
  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
    const int num_ops = writes_ == 0 ? num_ : writes_ ;
    Duration duration(test_duration, num_ops);
    unique_ptr<BitSet> bit_set;

    if (write_mode == UNIQUE_RANDOM) {
      bit_set.reset(new BitSet(num_ops));
    }
M
Mark Callaghan 已提交
1695

1696
    if (num_ != FLAGS_num) {
1697
      char msg[100];
1698
      snprintf(msg, sizeof(msg), "(%lld ops)", num_);
1699
      thread->stats.AddMessage(msg);
1700 1701
    }

1702
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
1703 1704
    WriteBatch batch;
    Status s;
1705
    int64_t bytes = 0;
M
Mark Callaghan 已提交
1706 1707
    int i = 0;
    while (!duration.Done(entries_per_batch_)) {
J
jorlow@chromium.org 已提交
1708
      batch.Clear();
1709
      for (int j = 0; j < entries_per_batch_; j++) {
1710
        long long k = 0;
1711 1712 1713 1714 1715 1716 1717 1718 1719
        switch(write_mode) {
          case SEQUENTIAL:
            k = i +j;
            break;
          case RANDOM:
            k = thread->rand.Next() % FLAGS_num;
            break;
          case UNIQUE_RANDOM:
            {
1720
              const long long t = thread->rand.Next() % FLAGS_num;
1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747
              if (!bit_set->test(t)) {
                // best case
                k = t;
              } else {
                bool found = false;
                // look forward
                for (size_t i = t + 1; i < bit_set->size(); ++i) {
                  if (!bit_set->test(i)) {
                    found = true;
                    k = i;
                    break;
                  }
                }
                if (!found) {
                  for (size_t i = t; i-- > 0;) {
                    if (!bit_set->test(i)) {
                      found = true;
                      k = i;
                      break;
                    }
                  }
                }
              }
              bit_set->set(k);
              break;
            }
        };
1748 1749 1750
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        batch.Put(key.get(), gen.Generate(value_size_));
        bytes += value_size_ + strlen(key.get());
M
Mark Callaghan 已提交
1751
        thread->stats.FinishedSingleOp(db_);
1752
      }
1753
      s = db_->Write(write_options_, &batch);
J
jorlow@chromium.org 已提交
1754 1755 1756 1757
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
M
Mark Callaghan 已提交
1758
      i += entries_per_batch_;
J
jorlow@chromium.org 已提交
1759
    }
1760
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
1761 1762
  }

1763
  void ReadSequential(ThreadState* thread) {
1764
    Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
1765
    long long i = 0;
1766
    int64_t bytes = 0;
1767
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
1768
      bytes += iter->key().size() + iter->value().size();
M
Mark Callaghan 已提交
1769
      thread->stats.FinishedSingleOp(db_);
1770 1771 1772
      ++i;
    }
    delete iter;
1773
    thread->stats.AddBytes(bytes);
1774 1775
  }

1776
  void ReadReverse(ThreadState* thread) {
1777
    Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
1778
    long long i = 0;
1779
    int64_t bytes = 0;
1780
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
1781
      bytes += iter->key().size() + iter->value().size();
M
Mark Callaghan 已提交
1782
      thread->stats.FinishedSingleOp(db_);
1783 1784 1785
      ++i;
    }
    delete iter;
1786
    thread->stats.AddBytes(bytes);
1787 1788
  }

1789 1790 1791
  // Calls MultiGet over a list of keys from a random distribution.
  // Returns the total number of keys found.
  long MultiGetRandom(ReadOptions& options, int num_keys,
1792
                     Random64& rand, long long range, const char* suffix) {
1793 1794 1795 1796 1797 1798
    assert(num_keys > 0);
    std::vector<Slice> keys(num_keys);
    std::vector<std::string> values(num_keys);
    std::vector<unique_ptr<char []> > gen_keys(num_keys);

    int i;
1799
    long long k;
1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836

    // Fill the keys vector
    for(i=0; i<num_keys; ++i) {
      k = rand.Next() % range;
      gen_keys[i] = GenerateKeyFromInt(k,suffix);
      keys[i] = gen_keys[i].get();
    }

    if (FLAGS_use_snapshot) {
      options.snapshot = db_->GetSnapshot();
    }

    // Apply the operation
    std::vector<Status> statuses = db_->MultiGet(options, keys, &values);
    assert((long)statuses.size() == num_keys);
    assert((long)keys.size() == num_keys);  // Should always be the case.
    assert((long)values.size() == num_keys);

    if (FLAGS_use_snapshot) {
      db_->ReleaseSnapshot(options.snapshot);
      options.snapshot = nullptr;
    }

    // Count number found
    long found = 0;
    for(i=0; i<num_keys; ++i) {
      if (statuses[i].ok()){
        ++found;
      } else if (FLAGS_warn_missing_keys == true) {
        // Key not found, or error.
        fprintf(stderr, "get error: %s\n", statuses[i].ToString().c_str());
      }
    }

    return found;
  }

1837
  void ReadRandom(ThreadState* thread) {
1838
    ReadOptions options(FLAGS_verify_checksum, true);
M
Mark Callaghan 已提交
1839
    Duration duration(FLAGS_duration, reads_);
1840

1841
    long long found = 0;
M
Mark Callaghan 已提交
1842

1843 1844 1845 1846 1847 1848 1849
    if (FLAGS_use_multiget) {   // MultiGet
      const long& kpg = FLAGS_keys_per_multiget;  // keys per multiget group
      long keys_left = reads_;

      // Recalculate number of keys per group, and call MultiGet until done
      long num_keys;
      while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
X
Xing Jin 已提交
1850
        found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "");
1851 1852 1853
        thread->stats.FinishedSingleOp(db_);
        keys_left -= num_keys;
      }
I
Igor Canadi 已提交
1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
    } else if (FLAGS_use_tailing_iterator) {  // use tailing iterator for gets
      options.tailing = true;
      Iterator* iter = db_->NewIterator(options);
      while (!duration.Done(1)) {
        const long long k = thread->rand.Next() % FLAGS_num;
        unique_ptr<char[]> key = GenerateKeyFromInt(k);

        iter->Seek(key.get());
        if (iter->Valid() && iter->key().compare(Slice(key.get())) == 0) {
          ++found;
        }

        thread->stats.FinishedSingleOp(db_);
      }
      delete iter;
1869 1870 1871 1872
    } else {    // Regular case. Do one "get" at a time Get
      Iterator* iter = db_->NewIterator(options);
      std::string value;
      while (!duration.Done(1)) {
1873
        const long long k = thread->rand.Next() % FLAGS_num;
1874 1875 1876
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        if (FLAGS_use_snapshot) {
          options.snapshot = db_->GetSnapshot();
1877
        }
M
Mark Callaghan 已提交
1878

1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900
        if (FLAGS_read_range < 2) {
          if (db_->Get(options, key.get(), &value).ok()) {
            found++;
          }
        } else {
          Slice skey(key.get());
          int count = 1;

          if (FLAGS_get_approx) {
            unique_ptr<char []> key2 =
                GenerateKeyFromInt(k + (int) FLAGS_read_range);
            Slice skey2(key2.get());
            Range range(skey, skey2);
            uint64_t sizes;
            db_->GetApproximateSizes(&range, 1, &sizes);
          }

          for (iter->Seek(skey);
               iter->Valid() && count <= FLAGS_read_range;
               ++count, iter->Next()) {
            found++;
          }
M
Mark Callaghan 已提交
1901 1902
        }

1903 1904 1905
        if (FLAGS_use_snapshot) {
          db_->ReleaseSnapshot(options.snapshot);
          options.snapshot = nullptr;
1906 1907
        }

1908
        thread->stats.FinishedSingleOp(db_);
M
Mark Callaghan 已提交
1909 1910
      }

1911
      delete iter;
S
Sanjay Ghemawat 已提交
1912
    }
1913

S
Sanjay Ghemawat 已提交
1914
    char msg[100];
1915
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
S
Sanjay Ghemawat 已提交
1916 1917 1918
    thread->stats.AddMessage(msg);
  }

T
Tyler Harter 已提交
1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953
  void PrefixScanRandom(ThreadState* thread) {
    if (FLAGS_use_prefix_api) {
      assert(FLAGS_use_prefix_blooms);
      assert(FLAGS_bloom_bits >= 1);
    }

    ReadOptions options(FLAGS_verify_checksum, true);
    Duration duration(FLAGS_duration, reads_);

    long long found = 0;

    while (!duration.Done(1)) {
      std::string value;
      const int k = thread->rand.Next() % FLAGS_num;
      unique_ptr<char []> key = GenerateKeyFromInt(k);
      Slice skey(key.get());
      Slice prefix = prefix_extractor_->Transform(skey);
      options.prefix = FLAGS_use_prefix_api ? &prefix : nullptr;

      Iterator* iter = db_->NewIterator(options);
      for (iter->Seek(skey);
           iter->Valid() && iter->key().starts_with(prefix);
           iter->Next()) {
        found++;
      }
      delete iter;

      thread->stats.FinishedSingleOp(db_);
    }

    char msg[100];
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
1954
  void ReadMissing(ThreadState* thread) {
1955 1956
    FLAGS_warn_missing_keys = false;    // Never warn about missing keys

M
Mark Callaghan 已提交
1957
    Duration duration(FLAGS_duration, reads_);
1958
    ReadOptions options(FLAGS_verify_checksum, true);
1959 1960 1961 1962 1963 1964 1965 1966 1967

    if (FLAGS_use_multiget) {
      const long& kpg = FLAGS_keys_per_multiget;  // keys per multiget group
      long keys_left = reads_;

      // Recalculate number of keys per group, and call MultiGet until done
      long num_keys;
      long found;
      while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
X
Xing Jin 已提交
1968 1969 1970 1971
        found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ".");

        // We should not find any key since the key we try to get has a
        // different suffix
1972 1973 1974
        if (found) {
          assert(false);
        }
X
Xing Jin 已提交
1975

1976 1977 1978 1979 1980 1981 1982
        thread->stats.FinishedSingleOp(db_);
        keys_left -= num_keys;
      }
    } else {  // Regular case (not MultiGet)
      std::string value;
      Status s;
      while (!duration.Done(1)) {
1983
        const long long k = thread->rand.Next() % FLAGS_num;
1984 1985 1986 1987 1988
        unique_ptr<char []> key = GenerateKeyFromInt(k, ".");
        s = db_->Get(options, key.get(), &value);
        assert(!s.ok() && s.IsNotFound());
        thread->stats.FinishedSingleOp(db_);
      }
J
jorlow@chromium.org 已提交
1989 1990 1991
    }
  }

1992
  void ReadHot(ThreadState* thread) {
M
Mark Callaghan 已提交
1993
    Duration duration(FLAGS_duration, reads_);
1994
    ReadOptions options(FLAGS_verify_checksum, true);
1995 1996
    const long long range = (FLAGS_num + 99) / 100;
    long long found = 0;
1997 1998

    if (FLAGS_use_multiget) {
1999 2000
      const long long kpg = FLAGS_keys_per_multiget;  // keys per multiget group
      long long keys_left = reads_;
2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011

      // Recalculate number of keys per group, and call MultiGet until done
      long num_keys;
      while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
        found += MultiGetRandom(options, num_keys, thread->rand, range, "");
        thread->stats.FinishedSingleOp(db_);
        keys_left -= num_keys;
      }
    } else {
      std::string value;
      while (!duration.Done(1)) {
2012
        const long long k = thread->rand.Next() % range;
2013 2014 2015 2016 2017 2018
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        if (db_->Get(options, key.get(), &value).ok()){
          ++found;
        }
        thread->stats.FinishedSingleOp(db_);
      }
2019
    }
2020 2021

    char msg[100];
2022
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
2023
    thread->stats.AddMessage(msg);
2024 2025
  }

2026 2027 2028 2029 2030 2031 2032 2033 2034 2035
  void IteratorCreation(ThreadState* thread) {
    Duration duration(FLAGS_duration, reads_);
    ReadOptions options(FLAGS_verify_checksum, true);
    while (!duration.Done(1)) {
      Iterator* iter = db_->NewIterator(options);
      delete iter;
      thread->stats.FinishedSingleOp(db_);
    }
  }

S
Sanjay Ghemawat 已提交
2036
  void SeekRandom(ThreadState* thread) {
M
Mark Callaghan 已提交
2037
    Duration duration(FLAGS_duration, reads_);
2038
    ReadOptions options(FLAGS_verify_checksum, true);
S
Sanjay Ghemawat 已提交
2039
    std::string value;
2040
    long long found = 0;
M
Mark Callaghan 已提交
2041
    while (!duration.Done(1)) {
S
Sanjay Ghemawat 已提交
2042
      Iterator* iter = db_->NewIterator(options);
2043
      const long long k = thread->rand.Next() % FLAGS_num;
2044 2045 2046
      unique_ptr<char []> key = GenerateKeyFromInt(k);
      iter->Seek(key.get());
      if (iter->Valid() && iter->key() == key.get()) found++;
S
Sanjay Ghemawat 已提交
2047
      delete iter;
M
Mark Callaghan 已提交
2048
      thread->stats.FinishedSingleOp(db_);
S
Sanjay Ghemawat 已提交
2049 2050
    }
    char msg[100];
2051
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, num_);
S
Sanjay Ghemawat 已提交
2052 2053 2054 2055 2056 2057
    thread->stats.AddMessage(msg);
  }

  void DoDelete(ThreadState* thread, bool seq) {
    WriteBatch batch;
    Status s;
M
Mark Callaghan 已提交
2058 2059 2060
    Duration duration(seq ? 0 : FLAGS_duration, num_);
    long i = 0;
    while (!duration.Done(entries_per_batch_)) {
S
Sanjay Ghemawat 已提交
2061 2062
      batch.Clear();
      for (int j = 0; j < entries_per_batch_; j++) {
2063
        const long long k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
2064 2065
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        batch.Delete(key.get());
M
Mark Callaghan 已提交
2066
        thread->stats.FinishedSingleOp(db_);
S
Sanjay Ghemawat 已提交
2067 2068 2069 2070 2071 2072
      }
      s = db_->Write(write_options_, &batch);
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
M
Mark Callaghan 已提交
2073
      ++i;
S
Sanjay Ghemawat 已提交
2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

2085 2086 2087 2088 2089 2090
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      // Special thread that keeps writing until other threads are done.
      RandomGenerator gen;
2091 2092 2093 2094 2095 2096 2097 2098 2099
      double last = FLAGS_env->NowMicros();
      int writes_per_second_by_10 = 0;
      int num_writes = 0;

      // --writes_per_second rate limit is enforced per 100 milliseconds
      // intervals to avoid a burst of writes at the start of each second.

      if (FLAGS_writes_per_second > 0)
        writes_per_second_by_10 = FLAGS_writes_per_second / 10;
2100 2101 2102 2103

      // Don't merge stats from this thread with the readers.
      thread->stats.SetExcludeFromMerge();

2104 2105 2106 2107 2108 2109 2110 2111 2112
      while (true) {
        {
          MutexLock l(&thread->shared->mu);
          if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
            // Other threads have finished
            break;
          }
        }

2113
        const long long k = thread->rand.Next() % FLAGS_num;
2114
        unique_ptr<char []> key = GenerateKeyFromInt(k);
2115 2116
        Status s = db_->Put(write_options_, key.get(),
                            gen.Generate(value_size_));
2117 2118 2119 2120
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
2121
        thread->stats.FinishedSingleOp(db_);
2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135

        ++num_writes;
        if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
          double now = FLAGS_env->NowMicros();
          double usecs_since_last = now - last;

          num_writes = 0;
          last = now;

          if (usecs_since_last < 100000.0) {
            FLAGS_env->SleepForMicroseconds(100000.0 - usecs_since_last);
            last = FLAGS_env->NowMicros();
          }
        }
2136 2137 2138 2139
      }
    }
  }

2140
  // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
2141 2142
  // in DB atomically i.e in a single batch. Also refer GetMany.
  Status PutMany(const WriteOptions& writeoptions,
2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159
                  const Slice& key, const Slice& value) {
    std::string suffixes[3] = {"2", "1", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Put(keys[i], value);
    }

    s = db_->Write(writeoptions, &batch);
    return s;
  }


  // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
2160 2161
  // in DB atomically i.e in a single batch. Also refer GetMany.
  Status DeleteMany(const WriteOptions& writeoptions,
2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178
                  const Slice& key) {
    std::string suffixes[3] = {"1", "2", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Delete(keys[i]);
    }

    s = db_->Write(writeoptions, &batch);
    return s;
  }

  // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
  // in the same snapshot, and verifies that all the values are identical.
2179 2180
  // ASSUMES that PutMany was used to put (K, V) into the DB.
  Status GetMany(const ReadOptions& readoptions,
2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217
                  const Slice& key, std::string* value) {
    std::string suffixes[3] = {"0", "1", "2"};
    std::string keys[3];
    Slice key_slices[3];
    std::string values[3];
    ReadOptions readoptionscopy = readoptions;
    readoptionscopy.snapshot = db_->GetSnapshot();
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      key_slices[i] = keys[i];
      s = db_->Get(readoptionscopy, key_slices[i], value);
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
      } else {
        values[i] = *value;
      }
    }
    db_->ReleaseSnapshot(readoptionscopy.snapshot);

    if ((values[0] != values[1]) || (values[1] != values[2])) {
      fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
              key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
              values[2].c_str());
      // we continue after error rather than exiting so that we can
      // find more errors if any
    }

    return s;
  }

  // Differs from readrandomwriterandom in the following ways:
2218
  // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
2219 2220 2221 2222
  // (b) Does deletes as well (per FLAGS_deletepercent)
  // (c) In order to achieve high % of 'found' during lookups, and to do
  //     multiple writes (including puts and deletes) it uses upto
  //     FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
2223
  // (d) Does not have a MultiGet option.
2224 2225 2226 2227
  void RandomWithVerify(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
2228
    long long found = 0;
2229 2230 2231
    int get_weight = 0;
    int put_weight = 0;
    int delete_weight = 0;
2232 2233 2234
    long long gets_done = 0;
    long long puts_done = 0;
    long long deletes_done = 0;
2235

2236
    // the number of iterations is the larger of read_ or write_
2237 2238
    for (long long i = 0; i < readwrites_; i++) {
      const long long k = thread->rand.Next() % (FLAGS_numdistinct);
2239
      unique_ptr<char []> key = GenerateKeyFromInt(k);
2240
      if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
2241
        // one batch completed, reinitialize for next batch
2242 2243 2244 2245 2246 2247
        get_weight = FLAGS_readwritepercent;
        delete_weight = FLAGS_deletepercent;
        put_weight = 100 - get_weight - delete_weight;
      }
      if (get_weight > 0) {
        // do all the gets first
2248
        Status s = GetMany(options, key.get(), &value);
2249
        if (!s.ok() && !s.IsNotFound()) {
2250
          fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
        get_weight--;
        gets_done++;
      } else if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
2261 2262
        Status s = PutMany(write_options_, key.get(),
                           gen.Generate(value_size_));
2263
        if (!s.ok()) {
2264
          fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
2265 2266 2267 2268 2269
          exit(1);
        }
        put_weight--;
        puts_done++;
      } else if (delete_weight > 0) {
2270
        Status s = DeleteMany(write_options_, key.get());
2271
        if (!s.ok()) {
2272
          fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
2273 2274 2275 2276 2277 2278 2279 2280 2281
          exit(1);
        }
        delete_weight--;
        deletes_done++;
      }

      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
2282 2283
    snprintf(msg, sizeof(msg),
             "( get:%lld put:%lld del:%lld total:%lld found:%lld)",
2284 2285 2286 2287
             gets_done, puts_done, deletes_done, readwrites_, found);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
2288
  // This is different from ReadWhileWriting because it does not use
2289
  // an extra thread.
2290
  void ReadRandomWriteRandom(ThreadState* thread) {
2291 2292 2293 2294 2295 2296
    if (FLAGS_use_multiget){
      // Separate function for multiget (for ease of reading)
      ReadRandomWriteRandomMultiGet(thread);
      return;
    }

2297 2298 2299
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
2300
    long long found = 0;
2301 2302
    int get_weight = 0;
    int put_weight = 0;
2303 2304
    long long reads_done = 0;
    long long writes_done = 0;
M
Mark Callaghan 已提交
2305 2306
    Duration duration(FLAGS_duration, readwrites_);

2307
    // the number of iterations is the larger of read_ or write_
M
Mark Callaghan 已提交
2308
    while (!duration.Done(1)) {
2309
      const long long k = thread->rand.Next() % FLAGS_num;
2310
      unique_ptr<char []> key = GenerateKeyFromInt(k);
2311
      if (get_weight == 0 && put_weight == 0) {
X
Xing Jin 已提交
2312
        // one batch completed, reinitialize for next batch
2313 2314 2315 2316
        get_weight = FLAGS_readwritepercent;
        put_weight = 100 - get_weight;
      }
      if (get_weight > 0) {
M
Mark Callaghan 已提交
2317 2318 2319 2320 2321 2322 2323

        if (FLAGS_use_snapshot) {
          options.snapshot = db_->GetSnapshot();
        }

        if (FLAGS_get_approx) {
          char key2[100];
2324
          snprintf(key2, sizeof(key2), "%016lld", k + 1);
M
Mark Callaghan 已提交
2325 2326 2327 2328 2329 2330 2331
          Slice skey2(key2);
          Slice skey(key2);
          Range range(skey, skey2);
          uint64_t sizes;
          db_->GetApproximateSizes(&range, 1, &sizes);
        }

2332
        // do all the gets first
2333
        Status s = db_->Get(options, key.get(), &value);
2334 2335 2336 2337 2338 2339 2340 2341
        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }

2342 2343
        get_weight--;
        reads_done++;
M
Mark Callaghan 已提交
2344 2345 2346 2347 2348

        if (FLAGS_use_snapshot) {
          db_->ReleaseSnapshot(options.snapshot);
        }

2349 2350 2351
      } else  if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
2352 2353
        Status s = db_->Put(write_options_, key.get(),
                            gen.Generate(value_size_));
2354 2355 2356 2357 2358 2359 2360
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        put_weight--;
        writes_done++;
      }
M
Mark Callaghan 已提交
2361
      thread->stats.FinishedSingleOp(db_);
2362 2363
    }
    char msg[100];
2364 2365
    snprintf(msg, sizeof(msg),
             "( reads:%lld writes:%lld total:%lld found:%lld)",
2366
             reads_done, writes_done, readwrites_, found);
2367 2368 2369
    thread->stats.AddMessage(msg);
  }

2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414
  // ReadRandomWriteRandom (with multiget)
  // Does FLAGS_keys_per_multiget reads (per multiget), followed by some puts.
  // FLAGS_readwritepercent will specify the ratio of gets to puts.
  // e.g.: If FLAGS_keys_per_multiget == 100 and FLAGS_readwritepercent == 75
  // Then each block will do 100 multigets and 33 puts
  // So there are 133 operations in-total: 100 of them (75%) are gets, and 33
  // of them (25%) are puts.
  void ReadRandomWriteRandomMultiGet(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;

    // For multiget
    const long& kpg = FLAGS_keys_per_multiget;  // keys per multiget group

    long keys_left = readwrites_;  // number of keys still left to read
    long num_keys;                  // number of keys to read in current group
    long num_put_keys;              // number of keys to put in current group

    long found = 0;
    long reads_done = 0;
    long writes_done = 0;
    long multigets_done = 0;

    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while(true) {
      // Read num_keys keys, then write num_put_keys keys.
      // The ratio of num_keys to num_put_keys is always FLAGS_readwritepercent
      // And num_keys is set to be FLAGS_keys_per_multiget (kpg)
      // num_put_keys is calculated accordingly (to maintain the ratio)
      // Note: On the final iteration, num_keys and num_put_keys will be smaller
      num_keys = std::min(keys_left*(FLAGS_readwritepercent + 99)/100, kpg);
      num_put_keys = num_keys * (100-FLAGS_readwritepercent)
                     / FLAGS_readwritepercent;

      // This will break the loop when duration is complete
      if (duration.Done(num_keys + num_put_keys)) {
        break;
      }

      // A quick check to make sure our formula doesn't break on edge cases
      assert(num_keys >= 1);
      assert(num_keys + num_put_keys <= keys_left);

      // Apply the MultiGet operations
X
Xing Jin 已提交
2415
      found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "");
2416 2417 2418 2419 2420 2421
      ++multigets_done;
      reads_done+=num_keys;
      thread->stats.FinishedSingleOp(db_);

      // Now do the puts
      int i;
2422
      long long k;
2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439
      for(i=0; i<num_put_keys; ++i) {
        k = thread->rand.Next() % FLAGS_num;
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        Status s = db_->Put(write_options_, key.get(),
                            gen.Generate(value_size_));
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        writes_done++;
        thread->stats.FinishedSingleOp(db_);
      }

      keys_left -= (num_keys + num_put_keys);
    }
    char msg[100];
    snprintf(msg, sizeof(msg),
2440
             "( reads:%ld writes:%ld total:%lld multiget_ops:%ld found:%ld)",
2441 2442 2443 2444
             reads_done, writes_done, readwrites_, multigets_done, found);
    thread->stats.AddMessage(msg);
  }

M
Mark Callaghan 已提交
2445 2446 2447 2448 2449 2450
  //
  // Read-modify-write for random keys
  void UpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
2451
    long long found = 0;
M
Mark Callaghan 已提交
2452 2453 2454 2455
    Duration duration(FLAGS_duration, readwrites_);

    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
2456
      const long long k = thread->rand.Next() % FLAGS_num;
M
Mark Callaghan 已提交
2457 2458 2459 2460 2461 2462 2463 2464
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      if (FLAGS_use_snapshot) {
        options.snapshot = db_->GetSnapshot();
      }

      if (FLAGS_get_approx) {
        char key2[100];
2465
        snprintf(key2, sizeof(key2), "%016lld", k + 1);
M
Mark Callaghan 已提交
2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488
        Slice skey2(key2);
        Slice skey(key2);
        Range range(skey, skey2);
        uint64_t sizes;
        db_->GetApproximateSizes(&range, 1, &sizes);
      }

      if (db_->Get(options, key.get(), &value).ok()) {
        found++;
      }

      if (FLAGS_use_snapshot) {
        db_->ReleaseSnapshot(options.snapshot);
      }

      Status s = db_->Put(write_options_, key.get(), gen.Generate(value_size_));
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
2489 2490
    snprintf(msg, sizeof(msg),
             "( updates:%lld found:%lld)", readwrites_, found);
M
Mark Callaghan 已提交
2491 2492 2493
    thread->stats.AddMessage(msg);
  }

D
Deon Nicholas 已提交
2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505
  // Read-modify-write for random keys.
  // Each operation causes the key grow by value_size (simulating an append).
  // Generally used for benchmarking against merges of similar type
  void AppendRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
    long found = 0;

    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
2506
      const long long k = thread->rand.Next() % FLAGS_num;
D
Deon Nicholas 已提交
2507 2508 2509 2510 2511 2512 2513 2514
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      if (FLAGS_use_snapshot) {
        options.snapshot = db_->GetSnapshot();
      }

      if (FLAGS_get_approx) {
        char key2[100];
2515
        snprintf(key2, sizeof(key2), "%016lld", k + 1);
D
Deon Nicholas 已提交
2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551
        Slice skey2(key2);
        Slice skey(key2);
        Range range(skey, skey2);
        uint64_t sizes;
        db_->GetApproximateSizes(&range, 1, &sizes);
      }

      // Get the existing value
      if (db_->Get(options, key.get(), &value).ok()) {
        found++;
      } else {
        // If not existing, then just assume an empty string of data
        value.clear();
      }

      if (FLAGS_use_snapshot) {
        db_->ReleaseSnapshot(options.snapshot);
      }

      // Update the value (by appending data)
      Slice operand = gen.Generate(value_size_);
      if (value.size() > 0) {
        // Use a delimeter to match the semantics for StringAppendOperator
        value.append(1,',');
      }
      value.append(operand.data(), operand.size());

      // Write back to the database
      Status s = db_->Put(write_options_, key.get(), value);
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
2552
    snprintf(msg, sizeof(msg), "( updates:%lld found:%ld)", readwrites_, found);
D
Deon Nicholas 已提交
2553 2554 2555 2556 2557 2558 2559 2560 2561 2562
    thread->stats.AddMessage(msg);
  }

  // Read-modify-write for random keys (using MergeOperator)
  // The merge operator to use should be defined by FLAGS_merge_operator
  // Adjust FLAGS_value_size so that the keys are reasonable for this operator
  // Assumes that the merge operator is non-null (i.e.: is well-defined)
  //
  // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
  // to simulate random additions over 64-bit integers using merge.
2563 2564 2565
  //
  // The number of merges on the same key can be controlled by adjusting
  // FLAGS_merge_keys.
D
Deon Nicholas 已提交
2566 2567 2568 2569 2570 2571
  void MergeRandom(ThreadState* thread) {
    RandomGenerator gen;

    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
2572
      const long long k = thread->rand.Next() % merge_keys_;
D
Deon Nicholas 已提交
2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      Status s = db_->Merge(write_options_, key.get(),
                            gen.Generate(value_size_));

      if (!s.ok()) {
        fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedSingleOp(db_);
    }

    // Print some statistics
    char msg[100];
2587
    snprintf(msg, sizeof(msg), "( updates:%lld)", readwrites_);
D
Deon Nicholas 已提交
2588 2589 2590
    thread->stats.AddMessage(msg);
  }

2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652
  // Read and merge random keys. The amount of reads and merges are controlled
  // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
  // keys (and thus also the number of reads and merges on the same key) can be
  // adjusted with FLAGS_merge_keys.
  //
  // As with MergeRandom, the merge operator to use should be defined by
  // FLAGS_merge_operator.
  void ReadRandomMergeRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
    long long num_hits = 0;
    long long num_gets = 0;
    long long num_merges = 0;
    size_t max_length = 0;

    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);

    while (!duration.Done(1)) {
      const long long k = thread->rand.Next() % merge_keys_;
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;

      if (do_merge) {
        Status s = db_->Merge(write_options_, key.get(),
                              gen.Generate(value_size_));
        if (!s.ok()) {
          fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
          exit(1);
        }

        num_merges++;

      } else {
        Status s = db_->Get(options, key.get(), &value);
        if (value.length() > max_length)
          max_length = value.length();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          num_hits++;
        }

        num_gets++;

      }

      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
    snprintf(msg, sizeof(msg),
             "(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)",
             num_gets, num_merges, readwrites_, num_hits, max_length);
    thread->stats.AddMessage(msg);
  }


2653
  void Compact(ThreadState* thread) {
2654
    db_->CompactRange(nullptr, nullptr);
J
jorlow@chromium.org 已提交
2655 2656
  }

S
Sanjay Ghemawat 已提交
2657
  void PrintStats(const char* key) {
2658
    std::string stats;
S
Sanjay Ghemawat 已提交
2659
    if (!db_->GetProperty(key, &stats)) {
2660
      stats = "(failed)";
2661
    }
2662
    fprintf(stdout, "\n%s\n", stats.c_str());
2663 2664
  }

J
jorlow@chromium.org 已提交
2665 2666 2667 2668 2669 2670
  static void WriteToFile(void* arg, const char* buf, int n) {
    reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
  }

  void HeapProfile() {
    char fname[100];
H
Haobo Xu 已提交
2671
    EnvOptions soptions;
2672 2673
    snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db.c_str(),
             ++heap_counter_);
2674
    unique_ptr<WritableFile> file;
2675
    Status s = FLAGS_env->NewWritableFile(fname, &file, soptions);
J
jorlow@chromium.org 已提交
2676
    if (!s.ok()) {
2677
      fprintf(stderr, "%s\n", s.ToString().c_str());
J
jorlow@chromium.org 已提交
2678 2679
      return;
    }
2680
    bool ok = port::GetHeapProfile(WriteToFile, file.get());
J
jorlow@chromium.org 已提交
2681
    if (!ok) {
2682
      fprintf(stderr, "heap profiling not supported\n");
2683
      FLAGS_env->DeleteFile(fname);
J
jorlow@chromium.org 已提交
2684 2685 2686 2687
    }
  }
};

2688
}  // namespace rocksdb
J
jorlow@chromium.org 已提交
2689 2690

int main(int argc, char** argv) {
2691
  rocksdb::InstallStackTraceHandler();
2692 2693 2694
  google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                          " [OPTIONS]...");
  google::ParseCommandLineFlags(&argc, &argv, true);
2695

2696 2697 2698
  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
J
jorlow@chromium.org 已提交
2699 2700
  }

2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729
  std::vector<std::string> fanout =
    rocksdb::stringSplit(FLAGS_max_bytes_for_level_multiplier_additional, ',');
  for (unsigned int j= 0; j < fanout.size(); j++) {
    FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
      std::stoi(fanout[j]));
  }

  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());

  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
  }

  if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
  else {
    fprintf(stdout, "Unknown compaction fadvice:%s\n",
            FLAGS_compaction_fadvice.c_str());
  }

  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());

2730 2731 2732
  // The number of background threads should be at least as much the
  // max number of concurrent compactions.
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
H
heyongqiang 已提交
2733
  // Choose a location for the test database if none given with --db=<path>
2734 2735 2736 2737 2738
  if (FLAGS_db.empty()) {
    std::string default_db_path;
    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
    default_db_path += "/dbbench";
    FLAGS_db = default_db_path;
H
heyongqiang 已提交
2739 2740
  }

2741
  rocksdb::Benchmark benchmark;
J
jorlow@chromium.org 已提交
2742 2743 2744
  benchmark.Run();
  return 0;
}