db_bench.cc 87.2 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

10
#include <cstddef>
J
jorlow@chromium.org 已提交
11 12 13
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
14
#include <gflags/gflags.h>
J
jorlow@chromium.org 已提交
15 16
#include "db/db_impl.h"
#include "db/version_set.h"
I
Igor Canadi 已提交
17
#include "rocksdb/statistics.h"
18 19 20 21
#include "rocksdb/options.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
22
#include "rocksdb/memtablerep.h"
23
#include "rocksdb/write_batch.h"
24 25
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
26
#include "rocksdb/statistics.h"
J
jorlow@chromium.org 已提交
27
#include "port/port.h"
28
#include "util/bit_set.h"
J
jorlow@chromium.org 已提交
29
#include "util/crc32c.h"
J
jorlow@chromium.org 已提交
30
#include "util/histogram.h"
31
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
32
#include "util/random.h"
33
#include "util/stack_trace.h"
34
#include "util/string_util.h"
I
Igor Canadi 已提交
35
#include "util/statistics.h"
J
jorlow@chromium.org 已提交
36
#include "util/testutil.h"
37
#include "hdfs/env_hdfs.h"
D
Deon Nicholas 已提交
38
#include "utilities/merge_operators.h"
J
jorlow@chromium.org 已提交
39

T
Tyler Harter 已提交
40

41 42 43 44 45 46 47 48 49 50 51 52 53
DEFINE_string(benchmarks,

              "fillseq,"
              "fillsync,"
              "fillrandom,"
              "overwrite,"
              "readrandom,"
              "readrandom,"
              "readseq,"
              "readreverse,"
              "compact,"
              "readrandom,"
              "readseq,"
M
Mark Callaghan 已提交
54
              "readtocache,"
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
              "readreverse,"
              "readwhilewriting,"
              "readrandomwriterandom,"
              "updaterandom,"
              "randomwithverify,"
              "fill100K,"
              "crc32c,"
              "snappycomp,"
              "snappyuncomp,"
              "acquireload,"
              "fillfromstdin,",

              "Comma-separated list of operations to run in the specified order"
              "Actual benchmarks:\n"
              "\tfillseq       -- write N values in sequential key"
              " order in async mode\n"
              "\tfillrandom    -- write N values in random key order in async"
              " mode\n"
              "\toverwrite     -- overwrite N values in random key order in"
              " async mode\n"
              "\tfillsync      -- write N/100 values in random key order in "
              "sync mode\n"
              "\tfill100K      -- write N/1000 100K values in random order in"
              " async mode\n"
              "\tdeleteseq     -- delete N keys in sequential order\n"
              "\tdeleterandom  -- delete N keys in random order\n"
              "\treadseq       -- read N times sequentially\n"
M
Mark Callaghan 已提交
82
              "\treadtocache   -- 1 thread reading database sequentially\n"
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
              "\treadreverse   -- read N times in reverse order\n"
              "\treadrandom    -- read N times in random order\n"
              "\treadmissing   -- read N missing keys in random order\n"
              "\treadhot       -- read N times in random order from 1% section "
              "of DB\n"
              "\treadwhilewriting      -- 1 writer, N threads doing random "
              "reads\n"
              "\treadrandomwriterandom -- N threads doing random-read, "
              "random-write\n"
              "\tprefixscanrandom      -- prefix scan N times in random order\n"
              "\tupdaterandom  -- N threads doing read-modify-write for random "
              "keys\n"
              "\tappendrandom  -- N threads doing read-modify-write with "
              "growing values\n"
              "\tmergerandom   -- same as updaterandom/appendrandom using merge"
              " operator. "
              "Must be used with merge_operator\n"
100 101
              "\treadrandommergerandom -- perform N random read-or-merge "
              "operations. Must be used with merge_operator\n"
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
              "\tseekrandom    -- N random seeks\n"
              "\tcrc32c        -- repeated crc32c of 4K of data\n"
              "\tacquireload   -- load N*1000 times\n"
              "Meta operations:\n"
              "\tcompact     -- Compact the entire DB\n"
              "\tstats       -- Print DB stats\n"
              "\tlevelstats  -- Print the number of files and bytes per level\n"
              "\tsstables    -- Print sstable info\n"
              "\theapprofile -- Dump a heap profile (if supported by this"
              " port)\n");

DEFINE_int64(num, 1000000, "Number of key/values to place in database");

DEFINE_int64(numdistinct, 1000,
             "Number of distinct keys to use. Used in RandomWithVerify to "
             "read/write on fewer keys so that gets are more likely to find the"
             " key and puts are more likely to update the same key");

120 121 122 123 124
DEFINE_int64(merge_keys, -1,
             "Number of distinct keys to use for MergeRandom and "
             "ReadRandomMergeRandom. "
             "If negative, there will be FLAGS_num keys.");

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
DEFINE_int64(reads, -1, "Number of read operations to do.  "
             "If negative, do FLAGS_num reads.");

DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use"
             " an iterator");

DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms");

DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for"
            " prefixscanrandom. If true, use_prefix_blooms must also be true.");

DEFINE_int64(seed, 0, "Seed base for random number generators. "
             "When 0 it is deterministic.");

DEFINE_int32(threads, 1, "Number of concurrent threads to run.");

DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
             " When 0 then num & reads determine the test duration");

DEFINE_int32(value_size, 100, "Size of each value");
T
Tyler Harter 已提交
145

146

147 148 149 150 151 152 153 154 155 156 157
// the maximum size of key in bytes
static const int kMaxKeySize = 128;
static bool ValidateKeySize(const char* flagname, int32_t value) {
  if (value > kMaxKeySize) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < %d\n",
            flagname, value, kMaxKeySize);
    return false;
  }
  return true;
}
DEFINE_int32(key_size, 16, "size of each key");
158

159 160
DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink"
              " to this fraction of their original size after compression");
J
jorlow@chromium.org 已提交
161

162
DEFINE_bool(histogram, false, "Print histogram of operation timings");
J
jorlow@chromium.org 已提交
163

164
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
165
             "Number of bytes to buffer in memtable before compacting");
166

167 168 169 170
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. Each memtable is of size"
             "write_buffer_size.");
171

172 173 174 175 176 177 178 179 180 181
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together"
             "before writing to storage. This is cheap because it is an"
             "in-memory merge. If this feature is not enabled, then all these"
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check"
             " in all of these files. Also, an in-memory merge may result in"
             " writing less data to storage if there are duplicate records "
             " in each of these individual write buffers.");
182

183 184 185 186
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions"
             " that can occur in parallel.");
187

188 189 190
static rocksdb::CompactionStyle FLAGS_compaction_style_e;
DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style,
             "style of compaction: level-based vs universal");
191

192 193 194
DEFINE_int32(universal_size_ratio, 0,
             "Percentage flexibility while comparing file size"
             " (for universal compaction only).");
195

196 197
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files in a"
             " single compaction run (for universal compaction only).");
198

199 200
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
201

202 203
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
204

205 206 207 208
DEFINE_int32(universal_compression_size_percent, -1,
             "The percentage of the database to compress for universal "
             "compaction. -1 means compress everything.");

209 210
DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed"
             "data. Negative means use default settings.");
J
jorlow@chromium.org 已提交
211

212 213
DEFINE_int32(block_size, rocksdb::Options().block_size,
             "Number of bytes in a block.");
214

215 216 217
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data.");

218 219 220
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time"
             " (use default if == 0)");
221

222 223
DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means"
             " use default settings.");
S
Sanjay Ghemawat 已提交
224

225 226 227
DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing"
            " database.  If you set this flag and also specify a benchmark that"
            " wants a fresh database, that benchmark will fail.");
228

229
DEFINE_string(db, "", "Use the db with the following name.");
230

231 232 233 234 235 236 237 238 239 240 241
static bool ValidateCacheNumshardbits(const char* flagname, int32_t value) {
  if (value >= 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be < 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(cache_numshardbits, -1, "Number of shards for the block cache"
             " is 2 ** cache_numshardbits. Negative means use default settings."
             " This is applied only if FLAGS_cache_size is non-negative.");
242

243
DEFINE_int32(cache_remove_scan_count_limit, 32, "");
244

245 246
DEFINE_bool(verify_checksum, false, "Verify checksum for every block read"
            " from storage");
247

248
DEFINE_bool(statistics, false, "Database statistics");
249
static class std::shared_ptr<rocksdb::Statistics> dbstats;
250

251 252
DEFINE_int64(writes, -1, "Number of write operations to do. If negative, do"
             " --num reads.");
H
heyongqiang 已提交
253

254 255
DEFINE_int32(writes_per_second, 0, "Per-thread rate limit on writes per second."
             " No limit when <= 0. Only for the readwhilewriting test.");
256

257
DEFINE_bool(sync, false, "Sync all writes to disk");
H
heyongqiang 已提交
258

259 260
DEFINE_bool(disable_data_sync, false, "If true, do not wait until data is"
            " synced to disk.");
M
Mark Callaghan 已提交
261

262
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
M
Mark Callaghan 已提交
263

264
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
265

266 267
DEFINE_bool(use_snapshot, false, "If true, create a snapshot per query when"
            " randomread benchmark is used");
H
heyongqiang 已提交
268

269 270
DEFINE_bool(get_approx, false, "If true, call GetApproximateSizes per query"
            " when read_range is > 1 and randomread benchmark is used");
H
heyongqiang 已提交
271

272
DEFINE_int32(num_levels, 7, "The total number of levels");
H
heyongqiang 已提交
273

274
DEFINE_int32(target_file_size_base, 2 * 1048576, "Target file size at level-1");
H
heyongqiang 已提交
275

276 277
DEFINE_int32(target_file_size_multiplier, 1,
             "A multiplier to compute target level-N file size (N >= 2)");
278

279
DEFINE_uint64(max_bytes_for_level_base,  10 * 1048576, "Max bytes for level-1");
H
heyongqiang 已提交
280

281 282
DEFINE_int32(max_bytes_for_level_multiplier, 10,
             "A multiplier to compute max bytes for level-N (N >= 2)");
H
heyongqiang 已提交
283

284 285 286
static std::vector<int> FLAGS_max_bytes_for_level_multiplier_additional_v;
DEFINE_string(max_bytes_for_level_multiplier_additional, "",
              "A vector that specifies additional fanout per level");
287

288 289
DEFINE_int32(level0_stop_writes_trigger, 12, "Number of files in level-0"
             " that will trigger put stop.");
290

291 292
DEFINE_int32(level0_slowdown_writes_trigger, 8, "Number of files in level-0"
             " that will slow down writes.");
293

294 295
DEFINE_int32(level0_file_num_compaction_trigger, 4, "Number of files in level-0"
             " when compactions start");
296

297 298 299 300 301 302 303 304 305 306 307 308 309
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value <= 0 || value>=100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
             " as percentage) for the ReadRandomWriteRandom workload. The "
             "default value 90 means 90% operations out of all reads and writes"
             " operations are reads. In other words, 9 gets for every 1 put.");

310 311 312 313 314
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
             " as percentage) for the ReadRandomMergeRandom workload. The"
             " default value 70 means 70% out of all read and merge operations"
             " are merges. In other words, 7 merges for every 3 gets.");

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
             "deletes (used in RandomWithVerify only). RandomWithVerify "
             "calculates writepercent as (100 - FLAGS_readwritepercent - "
             "deletepercent), so deletepercent must be smaller than (100 - "
             "FLAGS_readwritepercent)");

DEFINE_int32(disable_seek_compaction, false, "Option to disable compaction"
             " triggered by read.");

DEFINE_uint64(delete_obsolete_files_period_micros, 0, "Option to delete "
              "obsolete files periodically. 0 means that obsolete files are"
              " deleted after every compaction run.");

enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
  return rocksdb::kSnappyCompression; //default value
}
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
346
    rocksdb::kSnappyCompression;
347

348 349 350 351 352 353 354 355 356 357 358 359 360
DEFINE_int32(compression_level, -1,
             "Compression level. For zlib this should be -1 for the "
             "default level, or between 0 and 9.");

static bool ValidateCompressionLevel(const char* flagname, int32_t value) {
  if (value < -1 || value > 9) {
    fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n",
            flagname, value);
    return false;
  }
  return true;
}

I
Igor Canadi 已提交
361 362 363
static const bool FLAGS_compression_level_dummy __attribute__((unused)) =
    google::RegisterFlagValidator(&FLAGS_compression_level,
                                  &ValidateCompressionLevel);
364

365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
             " from this level. Levels with number < min_level_to_compress are"
             " not compressed. Otherwise, apply compression_type to "
             "all levels.");

static bool ValidateTableCacheNumshardbits(const char* flagname,
                                           int32_t value) {
  if (0 >= value || value > 20) {
    fprintf(stderr, "Invalid value for --%s: %d, must be  0 < val <= 20\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(table_cache_numshardbits, 4, "");
380

381
DEFINE_string(hdfs, "", "Name of hdfs environment");
382
// posix or hdfs environment
383
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
384

385 386
DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
             "this is greater than zero. When 0 the interval grows over time.");
387

388 389
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
             " this is greater than 0.");
390

391 392 393 394 395 396 397 398 399 400
static bool ValidateRateLimit(const char* flagname, double value) {
  static constexpr double EPSILON = 1e-10;
  if ( value < -EPSILON ) {
    fprintf(stderr, "Invalid value for --%s: %12.6f, must be >= 0.0\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_double(soft_rate_limit, 0.0, "");
J
Jim Paton 已提交
401

402 403 404
DEFINE_double(hard_rate_limit, 0.0, "When not equal to 0 this make threads "
              "sleep at each stats reporting interval until the compaction"
              " score for all levels is less than or equal to this value.");
405

406 407 408
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
             "When hard_rate_limit is set then this is the max time a put will"
             " be stalled.");
409

410 411 412
DEFINE_int32(max_grandparent_overlap_factor, 10, "Control maximum bytes of "
             "overlaps in grandparent (i.e., level+2) before we stop building a"
             " single file in a level->level+1 compaction.");
413

414
DEFINE_bool(readonly, false, "Run read only benchmarks.");
H
heyongqiang 已提交
415

416
DEFINE_bool(disable_auto_compactions, false, "Do not auto trigger compactions");
417

418 419 420
DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for"
             " a compaction run that compacts Level-K with Level-(K+1) (for"
             " K >= 1)");
421

422 423 424
DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds.");
DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
              " in MB.");
425

426 427
DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer,
            "Allow buffered io using OS buffers");
428

429 430
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
            "Allow reads to occur via mmap-ing files");
431

432 433
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
            "Allow writes to occur via mmap-ing files");
434

435 436
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
            "Advise random access on table file open");
437

438 439 440
DEFINE_string(compaction_fadvice, "NORMAL",
              "Access pattern advice when a file is compacted");
static auto FLAGS_compaction_fadvice_e =
441
  rocksdb::Options().access_hint_on_compaction_start;
442

443 444
DEFINE_bool(use_multiget, false,
            "Use multiget to access a series of keys instead of get");
445

446 447 448
DEFINE_int64(keys_per_multiget, 90, "If use_multiget is true, determines number"
             " of keys to group per call Arbitrary default is good because it"
             " agrees with readwritepercent");
449 450

// TODO: Apply this flag to generic Get calls too. Currently only with Multiget
451 452 453 454 455 456 457 458 459 460 461 462 463
DEFINE_bool(warn_missing_keys, true, "Print a message to user when a key is"
            " missing in a Get/MultiGet call");

DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
            "Use adaptive mutex");

DEFINE_uint64(bytes_per_sync,  rocksdb::Options().bytes_per_sync,
              "Allows OS to incrementally sync files to disk while they are"
              " being written, in the background. Issue one request for every"
              " bytes_per_sync written. 0 turns it off.");
DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop"
            " the delete if key not present");

464 465 466
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
             " operations on a key in the memtable");

467 468 469 470 471 472 473 474
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
  if (value < 0 || value>=2000000000) {
    fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
            flagname, value);
    return false;
  }
  return true;
}
I
Igor Canadi 已提交
475
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList");
J
Jim Paton 已提交
476 477 478 479 480 481

enum RepFactory {
  kSkipList,
  kPrefixHash,
  kVectorRep
};
482 483 484 485 486 487 488 489 490 491 492 493 494
enum RepFactory StringToRepFactory(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
    return kPrefixHash;
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
J
Jim Paton 已提交
495
static enum RepFactory FLAGS_rep_factory;
496
DEFINE_string(memtablerep, "skip_list", "");
J
Jim Paton 已提交
497

498 499 500 501
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
              "If a new merge operator is specified, be sure to use fresh"
              " database The possible merge operators are defined in"
              " utilities/merge_operators.h");
D
Deon Nicholas 已提交
502

K
kailiu 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
static const bool FLAGS_soft_rate_limit_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_soft_rate_limit,
                                &ValidateRateLimit);

static const bool FLAGS_hard_rate_limit_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_hard_rate_limit, &ValidateRateLimit);

static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);

static const bool FLAGS_key_size_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_key_size, &ValidateKeySize);

static const bool FLAGS_cache_numshardbits_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_cache_numshardbits,
                                &ValidateCacheNumshardbits);

static const bool FLAGS_readwritepercent_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_readwritepercent,
                                &ValidateInt32Percent);

static const bool FLAGS_deletepercent_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_deletepercent,
                                &ValidateInt32Percent);
static const bool
  FLAGS_table_cache_numshardbits_dummy __attribute__((unused)) =
  google::RegisterFlagValidator(&FLAGS_table_cache_numshardbits,
                                &ValidateTableCacheNumshardbits);

532
namespace rocksdb {
J
jorlow@chromium.org 已提交
533

534
// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
535 536 537
class RandomGenerator {
 private:
  std::string data_;
538
  unsigned int pos_;
J
jorlow@chromium.org 已提交
539 540 541 542 543 544 545 546

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
547
    while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
J
jorlow@chromium.org 已提交
548 549 550 551 552 553 554 555
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

556
  Slice Generate(unsigned int len) {
J
jorlow@chromium.org 已提交
557 558 559 560 561 562 563
    if (pos_ + len > data_.size()) {
      pos_ = 0;
      assert(len < data_.size());
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
564
};
X
Xing Jin 已提交
565

566 567 568 569 570 571 572 573 574 575
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

class Stats {
 private:
576
  int id_;
577 578 579
  double start_;
  double finish_;
  double seconds_;
580 581 582
  long long done_;
  long long last_report_done_;
  long long next_report_;
583 584
  int64_t bytes_;
  double last_op_finish_;
585
  double last_report_finish_;
586
  HistogramImpl hist_;
587
  std::string message_;
588
  bool exclude_from_merge_;
589 590

 public:
591
  Stats() { Start(-1); }
592

593 594 595
  void Start(int id) {
    id_ = id;
    next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
596 597 598
    last_op_finish_ = start_;
    hist_.Clear();
    done_ = 0;
599
    last_report_done_ = 0;
600 601
    bytes_ = 0;
    seconds_ = 0;
602
    start_ = FLAGS_env->NowMicros();
603
    finish_ = start_;
604
    last_report_finish_ = start_;
605
    message_.clear();
606 607
    // When set, stats from this thread won't be merged with others.
    exclude_from_merge_ = false;
608 609 610
  }

  void Merge(const Stats& other) {
611 612 613
    if (other.exclude_from_merge_)
      return;

614 615 616 617 618 619 620 621 622 623 624 625
    hist_.Merge(other.hist_);
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
626
    finish_ = FLAGS_env->NowMicros();
627 628 629 630 631 632 633
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

634
  void SetId(int id) { id_ = id; }
635
  void SetExcludeFromMerge() { exclude_from_merge_ = true; }
636

M
Mark Callaghan 已提交
637
  void FinishedSingleOp(DB* db) {
638
    if (FLAGS_histogram) {
639
      double now = FLAGS_env->NowMicros();
640 641
      double micros = now - last_op_finish_;
      hist_.Add(micros);
642
      if (micros > 20000 && !FLAGS_stats_interval) {
643 644 645 646 647 648 649 650
        fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

    done_++;
    if (done_ >= next_report_) {
651 652 653 654 655 656 657 658
      if (!FLAGS_stats_interval) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
659
        fprintf(stderr, "... finished %lld ops%30s\r", done_, "");
660 661 662 663
        fflush(stderr);
      } else {
        double now = FLAGS_env->NowMicros();
        fprintf(stderr,
664 665
                "%s ... thread %d: (%lld,%lld) ops and "
                "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
666
                FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(),
667
                id_,
M
Mark Callaghan 已提交
668
                done_ - last_report_done_, done_,
669
                (done_ - last_report_done_) /
M
Mark Callaghan 已提交
670 671 672 673
                ((now - last_report_finish_) / 1000000.0),
                done_ / ((now - start_) / 1000000.0),
                (now - last_report_finish_) / 1000000.0,
                (now - start_) / 1000000.0);
M
Mark Callaghan 已提交
674

675 676
        if (FLAGS_stats_per_interval) {
          std::string stats;
677
          if (db && db->GetProperty("rocksdb.stats", &stats))
678
            fprintf(stderr, "%s\n", stats.c_str());
679
        }
M
Mark Callaghan 已提交
680

681 682 683 684 685
        fflush(stderr);
        next_report_ += FLAGS_stats_interval;
        last_report_finish_ = now;
        last_report_done_ = done_;
      }
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
    // that does not call FinishedSingleOp().
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
709 710
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
711

D
Dhruba Borthakur 已提交
712
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
713
            name.ToString().c_str(),
714
            elapsed * 1e6 / done_,
D
Dhruba Borthakur 已提交
715
            (long)throughput,
716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
    }
    fflush(stdout);
  }
};

// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

737 738
  long num_initialized;
  long num_done;
739 740
  bool start;

741
  SharedState() : cv(&mu) { }
742 743 744 745 746
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
747
  Random64 rand;         // Has different seeds for different threads
748
  Stats stats;
749
  SharedState* shared;
750

A
Abhishek Kona 已提交
751
  /* implicit */ ThreadState(int index)
752
      : tid(index),
753
        rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {
754 755 756
  }
};

M
Mark Callaghan 已提交
757 758
class Duration {
 public:
759
  Duration(int max_seconds, long long max_ops) {
M
Mark Callaghan 已提交
760 761 762 763 764 765 766
    max_seconds_ = max_seconds;
    max_ops_= max_ops;
    ops_ = 0;
    start_at_ = FLAGS_env->NowMicros();
  }

  bool Done(int increment) {
767
    if (increment <= 0) increment = 1;    // avoid Done(0) and infinite loops
M
Mark Callaghan 已提交
768 769 770
    ops_ += increment;

    if (max_seconds_) {
771 772
      // Recheck every appx 1000 ops (exact iff increment is factor of 1000)
      if ((ops_/1000) != ((ops_-increment)/1000)) {
M
Mark Callaghan 已提交
773 774 775 776 777 778 779 780 781 782 783 784
        double now = FLAGS_env->NowMicros();
        return ((now - start_at_) / 1000000.0) >= max_seconds_;
      } else {
        return false;
      }
    } else {
      return ops_ > max_ops_;
    }
  }

 private:
  int max_seconds_;
785 786
  long long max_ops_;
  long long ops_;
M
Mark Callaghan 已提交
787 788 789
  double start_at_;
};

J
jorlow@chromium.org 已提交
790 791
class Benchmark {
 private:
792
  shared_ptr<Cache> cache_;
793
  shared_ptr<Cache> compressed_cache_;
S
Sanjay Ghemawat 已提交
794
  const FilterPolicy* filter_policy_;
T
Tyler Harter 已提交
795
  const SliceTransform* prefix_extractor_;
J
jorlow@chromium.org 已提交
796
  DB* db_;
797
  long long num_;
798
  int value_size_;
799
  int key_size_;
800 801
  int entries_per_batch_;
  WriteOptions write_options_;
802 803 804
  long long reads_;
  long long writes_;
  long long readwrites_;
805
  long long merge_keys_;
J
jorlow@chromium.org 已提交
806
  int heap_counter_;
807
  char keyFormat_[100]; // will contain the format of key. e.g "%016d"
808 809
  void PrintHeader() {
    PrintEnvironment();
810
    fprintf(stdout, "Keys:       %d bytes each\n", FLAGS_key_size);
811 812 813
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
814
    fprintf(stdout, "Entries:    %lld\n", num_);
815
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
816
            ((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
817
             / 1048576.0));
818
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
819 820
            (((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
              * num_)
821
             / 1048576.0));
822
    fprintf(stdout, "Write rate limit: %d\n", FLAGS_writes_per_second);
823
    switch (FLAGS_compression_type_e) {
824
      case rocksdb::kNoCompression:
825 826
        fprintf(stdout, "Compression: none\n");
        break;
827
      case rocksdb::kSnappyCompression:
828 829
        fprintf(stdout, "Compression: snappy\n");
        break;
830
      case rocksdb::kZlibCompression:
831 832
        fprintf(stdout, "Compression: zlib\n");
        break;
833
      case rocksdb::kBZip2Compression:
834 835 836 837
        fprintf(stdout, "Compression: bzip2\n");
        break;
    }

J
Jim Paton 已提交
838 839 840 841 842 843 844 845 846 847 848 849
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
        fprintf(stdout, "Memtablerep: prefix_hash\n");
        break;
      case kSkipList:
        fprintf(stdout, "Memtablerep: skip_list\n");
        break;
      case kVectorRep:
        fprintf(stdout, "Memtablerep: vector\n");
        break;
    }

850 851 852 853 854 855 856 857 858 859 860 861 862 863
    PrintWarnings();
    fprintf(stdout, "------------------------------------------------\n");
  }

  void PrintWarnings() {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
864
    if (FLAGS_compression_type_e != rocksdb::kNoCompression) {
865 866 867 868
      // The test string should not be too small.
      const int len = FLAGS_block_size;
      char* text = (char*) malloc(len+1);
      bool result = true;
869
      const char* name = nullptr;
870 871 872 873
      std::string compressed;

      memset(text, (int) 'y', len);
      text[len] = '\0';
874
      switch (FLAGS_compression_type_e) {
875
        case kSnappyCompression:
876 877
          result = port::Snappy_Compress(Options().compression_opts, text,
                                         strlen(text), &compressed);
878 879 880
          name = "Snappy";
          break;
        case kZlibCompression:
881 882
          result = port::Zlib_Compress(Options().compression_opts, text,
                                       strlen(text), &compressed);
883 884 885
          name = "Zlib";
          break;
        case kBZip2Compression:
886 887
          result = port::BZip2_Compress(Options().compression_opts, text,
                                        strlen(text), &compressed);
888 889
          name = "BZip2";
          break;
890 891 892
        case kNoCompression:
          assert(false); // cannot happen
          break;
893 894 895 896 897 898 899 900 901
      }

      if (!result) {
        fprintf(stdout, "WARNING: %s compression is not enabled\n", name);
      } else if (name && compressed.size() >= strlen(text)) {
        fprintf(stdout, "WARNING: %s compression is not effective\n", name);
      }

      free(text);
902
    }
903 904
  }

K
kailiu 已提交
905 906 907 908 909 910 911 912 913 914 915 916 917 918 919
// Current the following isn't equivalent to OS_LINUX.
#if defined(__linux)
  static Slice TrimSpace(Slice s) {
    unsigned int start = 0;
    while (start < s.size() && isspace(s[start])) {
      start++;
    }
    unsigned int limit = s.size();
    while (limit > start && isspace(s[limit-1])) {
      limit--;
    }
    return Slice(s.data() + start, limit - start);
  }
#endif

920 921 922 923 924
  void PrintEnvironment() {
    fprintf(stderr, "LevelDB:    version %d.%d\n",
            kMajorVersion, kMinorVersion);

#if defined(__linux)
925
    time_t now = time(nullptr);
926 927 928
    fprintf(stderr, "Date:       %s", ctime(&now));  // ctime() adds newline

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
929
    if (cpuinfo != nullptr) {
930 931 932 933
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
934
      while (fgets(line, sizeof(line), cpuinfo) != nullptr) {
935
        const char* sep = strchr(line, ':');
936
        if (sep == nullptr) {
937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

J
jorlow@chromium.org 已提交
955
 public:
956
  Benchmark()
957 958
  : cache_(FLAGS_cache_size >= 0 ?
           (FLAGS_cache_numshardbits >= 1 ?
959 960
            NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits,
                        FLAGS_cache_remove_scan_count_limit) :
961
            NewLRUCache(FLAGS_cache_size)) : nullptr),
962 963 964 965
    compressed_cache_(FLAGS_compressed_cache_size >= 0 ?
           (FLAGS_cache_numshardbits >= 1 ?
            NewLRUCache(FLAGS_compressed_cache_size, FLAGS_cache_numshardbits) :
            NewLRUCache(FLAGS_compressed_cache_size)) : nullptr),
S
Sanjay Ghemawat 已提交
966 967
    filter_policy_(FLAGS_bloom_bits >= 0
                   ? NewBloomFilterPolicy(FLAGS_bloom_bits)
968
                   : nullptr),
T
Tyler Harter 已提交
969
    prefix_extractor_(NewFixedPrefixTransform(FLAGS_key_size-1)),
970
    db_(nullptr),
971
    num_(FLAGS_num),
972
    value_size_(FLAGS_value_size),
973
    key_size_(FLAGS_key_size),
974
    entries_per_batch_(1),
975
    reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
976
    writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
977
    readwrites_((FLAGS_writes < 0  && FLAGS_reads < 0)? FLAGS_num :
978
                ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)
979
               ),
980
    merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
981
    heap_counter_(0) {
J
jorlow@chromium.org 已提交
982
    std::vector<std::string> files;
983
    FLAGS_env->GetChildren(FLAGS_db, &files);
984
    for (unsigned int i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
985
      if (Slice(files[i]).starts_with("heap-")) {
986
        FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
J
jorlow@chromium.org 已提交
987 988
      }
    }
989
    if (!FLAGS_use_existing_db) {
990
      DestroyDB(FLAGS_db, Options());
991
    }
J
jorlow@chromium.org 已提交
992 993 994 995
  }

  ~Benchmark() {
    delete db_;
S
Sanjay Ghemawat 已提交
996
    delete filter_policy_;
T
Tyler Harter 已提交
997
    delete prefix_extractor_;
J
jorlow@chromium.org 已提交
998 999
  }

1000
  //this function will construct string format for key. e.g "%016lld"
X
Xing Jin 已提交
1001 1002 1003
  void ConstructStrFormatForKey(char* str, int keySize) {
    str[0] = '%';
    str[1] = '0';
1004
    sprintf(str+2, "%dlld%s", keySize, "%s");
X
Xing Jin 已提交
1005 1006
  }

1007
  unique_ptr<char []> GenerateKeyFromInt(long long v, const char* suffix = "") {
1008 1009
    unique_ptr<char []> keyInStr(new char[kMaxKeySize + 1]);
    snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix);
X
Xing Jin 已提交
1010 1011 1012
    return keyInStr;
  }

J
jorlow@chromium.org 已提交
1013
  void Run() {
1014 1015
    PrintHeader();
    Open();
1016
    const char* benchmarks = FLAGS_benchmarks.c_str();
1017
    while (benchmarks != nullptr) {
J
jorlow@chromium.org 已提交
1018 1019
      const char* sep = strchr(benchmarks, ',');
      Slice name;
1020
      if (sep == nullptr) {
J
jorlow@chromium.org 已提交
1021
        name = benchmarks;
1022
        benchmarks = nullptr;
J
jorlow@chromium.org 已提交
1023 1024 1025 1026 1027
      } else {
        name = Slice(benchmarks, sep - benchmarks);
        benchmarks = sep + 1;
      }

X
Xing Jin 已提交
1028
      // Sanitize parameters
1029
      num_ = FLAGS_num;
1030
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
1031
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
1032
      value_size_ = FLAGS_value_size;
1033 1034
      key_size_ = FLAGS_key_size;
      ConstructStrFormatForKey(keyFormat_, key_size_);
1035 1036
      entries_per_batch_ = 1;
      write_options_ = WriteOptions();
1037 1038 1039
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
H
heyongqiang 已提交
1040 1041
      write_options_.disableWAL = FLAGS_disable_wal;

1042
      void (Benchmark::*method)(ThreadState*) = nullptr;
1043
      bool fresh_db = false;
1044
      int num_threads = FLAGS_threads;
1045 1046

      if (name == Slice("fillseq")) {
1047 1048
        fresh_db = true;
        method = &Benchmark::WriteSeq;
1049
      } else if (name == Slice("fillbatch")) {
1050 1051 1052
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
1053
      } else if (name == Slice("fillrandom")) {
1054 1055
        fresh_db = true;
        method = &Benchmark::WriteRandom;
1056 1057 1058
      } else if (name == Slice("fillfromstdin")) {
        fresh_db = true;
        method = &Benchmark::WriteFromStdin;
1059 1060 1061 1062 1063 1064 1065 1066
      } else if (name == Slice("filluniquerandom")) {
        fresh_db = true;
        if (num_threads > 1) {
          fprintf(stderr, "filluniquerandom multithreaded not supported"
                           " set --threads=1");
          exit(1);
        }
        method = &Benchmark::WriteUniqueRandom;
1067
      } else if (name == Slice("overwrite")) {
1068 1069
        fresh_db = false;
        method = &Benchmark::WriteRandom;
1070
      } else if (name == Slice("fillsync")) {
1071 1072 1073 1074
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
1075
      } else if (name == Slice("fill100K")) {
1076 1077 1078 1079
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
J
jorlow@chromium.org 已提交
1080
      } else if (name == Slice("readseq")) {
1081
        method = &Benchmark::ReadSequential;
M
Mark Callaghan 已提交
1082 1083 1084 1085
      } else if (name == Slice("readtocache")) {
        method = &Benchmark::ReadSequential;
        num_threads = 1;
        reads_ = num_;
J
jorlow@chromium.org 已提交
1086
      } else if (name == Slice("readreverse")) {
1087
        method = &Benchmark::ReadReverse;
J
jorlow@chromium.org 已提交
1088
      } else if (name == Slice("readrandom")) {
1089
        method = &Benchmark::ReadRandom;
S
Sanjay Ghemawat 已提交
1090 1091 1092 1093
      } else if (name == Slice("readmissing")) {
        method = &Benchmark::ReadMissing;
      } else if (name == Slice("seekrandom")) {
        method = &Benchmark::SeekRandom;
1094
      } else if (name == Slice("readhot")) {
1095
        method = &Benchmark::ReadHot;
1096
      } else if (name == Slice("readrandomsmall")) {
1097
        reads_ /= 1000;
1098
        method = &Benchmark::ReadRandom;
T
Tyler Harter 已提交
1099 1100
      } else if (name == Slice("prefixscanrandom")) {
        method = &Benchmark::PrefixScanRandom;
S
Sanjay Ghemawat 已提交
1101 1102 1103 1104
      } else if (name == Slice("deleteseq")) {
        method = &Benchmark::DeleteSeq;
      } else if (name == Slice("deleterandom")) {
        method = &Benchmark::DeleteRandom;
1105 1106 1107
      } else if (name == Slice("readwhilewriting")) {
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
1108 1109
      } else if (name == Slice("readrandomwriterandom")) {
        method = &Benchmark::ReadRandomWriteRandom;
1110 1111 1112 1113 1114 1115 1116 1117
      } else if (name == Slice("readrandommergerandom")) {
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
                  name.ToString().c_str());
          method = nullptr;
        } else {
          method = &Benchmark::ReadRandomMergeRandom;
        }
M
Mark Callaghan 已提交
1118 1119
      } else if (name == Slice("updaterandom")) {
        method = &Benchmark::UpdateRandom;
D
Deon Nicholas 已提交
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
      } else if (name == Slice("appendrandom")) {
        method = &Benchmark::AppendRandom;
      } else if (name == Slice("mergerandom")) {
        if (FLAGS_merge_operator.empty()) {
          fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
                  name.ToString().c_str());
          method = nullptr;
        } else {
          method = &Benchmark::MergeRandom;
        }
1130 1131
      } else if (name == Slice("randomwithverify")) {
        method = &Benchmark::RandomWithVerify;
J
jorlow@chromium.org 已提交
1132
      } else if (name == Slice("compact")) {
1133
        method = &Benchmark::Compact;
J
jorlow@chromium.org 已提交
1134
      } else if (name == Slice("crc32c")) {
1135
        method = &Benchmark::Crc32c;
1136
      } else if (name == Slice("acquireload")) {
1137
        method = &Benchmark::AcquireLoad;
1138
      } else if (name == Slice("snappycomp")) {
1139
        method = &Benchmark::SnappyCompress;
1140
      } else if (name == Slice("snappyuncomp")) {
1141
        method = &Benchmark::SnappyUncompress;
J
jorlow@chromium.org 已提交
1142 1143
      } else if (name == Slice("heapprofile")) {
        HeapProfile();
1144
      } else if (name == Slice("stats")) {
1145
        PrintStats("rocksdb.stats");
1146
      } else if (name == Slice("levelstats")) {
1147
        PrintStats("rocksdb.levelstats");
S
Sanjay Ghemawat 已提交
1148
      } else if (name == Slice("sstables")) {
1149
        PrintStats("rocksdb.sstables");
J
jorlow@chromium.org 已提交
1150
      } else {
1151 1152 1153 1154
        if (name != Slice()) {  // No error message for empty name
          fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
        }
      }
1155 1156 1157 1158 1159

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
                  name.ToString().c_str());
1160
          method = nullptr;
1161 1162
        } else {
          delete db_;
1163
          db_ = nullptr;
1164 1165 1166 1167 1168
          DestroyDB(FLAGS_db, Options());
          Open();
        }
      }

1169
      if (method != nullptr) {
1170
        fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
1171
        RunBenchmark(num_threads, name, method);
J
jorlow@chromium.org 已提交
1172 1173
      }
    }
1174 1175 1176
    if (FLAGS_statistics) {
     fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
    }
J
jorlow@chromium.org 已提交
1177 1178
  }

1179
 private:
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

1202
    thread->stats.Start(thread->tid);
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

1215 1216
  void RunBenchmark(int n, Slice name,
                    void (Benchmark::*method)(ThreadState*)) {
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;

    ThreadArg* arg = new ThreadArg[n];
    for (int i = 0; i < n; i++) {
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
1229
      arg[i].thread->shared = &shared;
1230
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

1245 1246 1247 1248
    // Stats for some threads can be excluded.
    Stats merge_stats;
    for (int i = 0; i < n; i++) {
      merge_stats.Merge(arg[i].thread->stats);
1249
    }
1250
    merge_stats.Report(name);
1251 1252 1253 1254 1255 1256 1257 1258

    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
1259
    // Checksum about 500MB of data total
1260 1261
    const int size = 4096;
    const char* label = "(4K per op)";
J
jorlow@chromium.org 已提交
1262
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
1263 1264 1265 1266
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
1267
      thread->stats.FinishedSingleOp(nullptr);
J
jorlow@chromium.org 已提交
1268 1269 1270 1271 1272
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

1273 1274
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
1275 1276
  }

1277
  void AcquireLoad(ThreadState* thread) {
1278 1279 1280
    int dummy;
    port::AtomicPointer ap(&dummy);
    int count = 0;
1281
    void *ptr = nullptr;
1282
    thread->stats.AddMessage("(each op is 1000 loads)");
1283 1284 1285 1286 1287
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
        ptr = ap.Acquire_Load();
      }
      count++;
1288
      thread->stats.FinishedSingleOp(nullptr);
1289
    }
1290
    if (ptr == nullptr) exit(1); // Disable unused variable warning.
1291 1292
  }

1293 1294 1295
  void SnappyCompress(ThreadState* thread) {
    RandomGenerator gen;
    Slice input = gen.Generate(Options().block_size);
1296 1297 1298 1299 1300
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
1301 1302
      ok = port::Snappy_Compress(Options().compression_opts, input.data(),
                                 input.size(), &compressed);
1303 1304
      produced += compressed.size();
      bytes += input.size();
1305
      thread->stats.FinishedSingleOp(nullptr);
1306 1307 1308
    }

    if (!ok) {
1309
      thread->stats.AddMessage("(snappy failure)");
1310 1311 1312 1313
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
1314 1315
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
1316 1317 1318
    }
  }

1319 1320 1321
  void SnappyUncompress(ThreadState* thread) {
    RandomGenerator gen;
    Slice input = gen.Generate(Options().block_size);
1322
    std::string compressed;
1323 1324
    bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
                                    input.size(), &compressed);
1325
    int64_t bytes = 0;
1326
    char* uncompressed = new char[input.size()];
1327 1328
    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
      ok =  port::Snappy_Uncompress(compressed.data(), compressed.size(),
1329 1330
                                    uncompressed);
      bytes += input.size();
1331
      thread->stats.FinishedSingleOp(nullptr);
1332
    }
1333
    delete[] uncompressed;
1334 1335

    if (!ok) {
1336
      thread->stats.AddMessage("(snappy failure)");
1337
    } else {
1338
      thread->stats.AddBytes(bytes);
1339 1340 1341
    }
  }

1342
  void Open() {
1343
    assert(db_ == nullptr);
1344
    Options options;
1345
    options.create_if_missing = !FLAGS_use_existing_db;
1346
    options.block_cache = cache_;
1347
    options.block_cache_compressed = compressed_cache_;
1348
    if (cache_ == nullptr) {
1349 1350
      options.no_block_cache = true;
    }
1351
    options.write_buffer_size = FLAGS_write_buffer_size;
1352
    options.max_write_buffer_number = FLAGS_max_write_buffer_number;
1353 1354
    options.min_write_buffer_number_to_merge =
      FLAGS_min_write_buffer_number_to_merge;
1355
    options.max_background_compactions = FLAGS_max_background_compactions;
1356
    options.compaction_style = FLAGS_compaction_style_e;
1357
    options.block_size = FLAGS_block_size;
S
Sanjay Ghemawat 已提交
1358
    options.filter_policy = filter_policy_;
T
Tyler Harter 已提交
1359 1360
    options.prefix_extractor = FLAGS_use_prefix_blooms ? prefix_extractor_
                                                       : nullptr;
1361 1362
    options.max_open_files = FLAGS_open_files;
    options.statistics = dbstats;
1363
    options.env = FLAGS_env;
H
heyongqiang 已提交
1364
    options.disableDataSync = FLAGS_disable_data_sync;
1365
    options.use_fsync = FLAGS_use_fsync;
1366
    options.num_levels = FLAGS_num_levels;
H
heyongqiang 已提交
1367 1368 1369 1370 1371
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
1372
    options.filter_deletes = FLAGS_filter_deletes;
J
Jim Paton 已提交
1373 1374 1375 1376 1377 1378 1379
    if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
      fprintf(stderr,
            "prefix_size should be non-zero iff memtablerep == prefix_hash\n");
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kPrefixHash:
I
Igor Canadi 已提交
1380 1381
        options.memtable_factory.reset(NewHashSkipListRepFactory(
            NewFixedPrefixTransform(FLAGS_prefix_size)));
J
Jim Paton 已提交
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
        break;
      case kSkipList:
        // no need to do anything
        break;
      case kVectorRep:
        options.memtable_factory.reset(
          new VectorRepFactory
        );
        break;
    }
1392 1393
    if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
      if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
1394 1395
          (unsigned int)FLAGS_num_levels) {
        fprintf(stderr, "Insufficient number of fanouts specified %d\n",
1396
                (int)FLAGS_max_bytes_for_level_multiplier_additional_v.size());
1397 1398 1399
        exit(1);
      }
      options.max_bytes_for_level_multiplier_additional =
1400
        FLAGS_max_bytes_for_level_multiplier_additional_v;
1401
    }
H
heyongqiang 已提交
1402
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
M
Mark Callaghan 已提交
1403
    options.level0_file_num_compaction_trigger =
1404
        FLAGS_level0_file_num_compaction_trigger;
H
heyongqiang 已提交
1405 1406
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
1407
    options.compression = FLAGS_compression_type_e;
1408
    options.compression_opts.level = FLAGS_compression_level;
1409 1410
    options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
    options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
1411 1412
    if (FLAGS_min_level_to_compress >= 0) {
      assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
1413
      options.compression_per_level.resize(FLAGS_num_levels);
1414
      for (int i = 0; i < FLAGS_min_level_to_compress; i++) {
1415 1416
        options.compression_per_level[i] = kNoCompression;
      }
1417
      for (int i = FLAGS_min_level_to_compress;
1418
           i < FLAGS_num_levels; i++) {
1419
        options.compression_per_level[i] = FLAGS_compression_type_e;
1420 1421
      }
    }
1422
    options.disable_seek_compaction = FLAGS_disable_seek_compaction;
1423 1424
    options.delete_obsolete_files_period_micros =
      FLAGS_delete_obsolete_files_period_micros;
J
Jim Paton 已提交
1425 1426 1427 1428
    options.soft_rate_limit = FLAGS_soft_rate_limit;
    options.hard_rate_limit = FLAGS_hard_rate_limit;
    options.rate_limit_delay_max_milliseconds =
      FLAGS_rate_limit_delay_max_milliseconds;
1429
    options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;
A
Abhishek Kona 已提交
1430
    options.max_grandparent_overlap_factor =
1431
      FLAGS_max_grandparent_overlap_factor;
1432
    options.disable_auto_compactions = FLAGS_disable_auto_compactions;
1433
    options.source_compaction_factor = FLAGS_source_compaction_factor;
1434 1435

    // fill storage options
1436 1437 1438
    options.allow_os_buffer = FLAGS_bufferedio;
    options.allow_mmap_reads = FLAGS_mmap_read;
    options.allow_mmap_writes = FLAGS_mmap_write;
1439
    options.advise_random_on_open = FLAGS_advise_random_on_open;
1440
    options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
H
Haobo Xu 已提交
1441
    options.use_adaptive_mutex = FLAGS_use_adaptive_mutex;
H
Haobo Xu 已提交
1442
    options.bytes_per_sync = FLAGS_bytes_per_sync;
H
Haobo Xu 已提交
1443

D
Deon Nicholas 已提交
1444
    // merge operator options
1445 1446 1447
    options.merge_operator = MergeOperators::CreateFromStringId(
        FLAGS_merge_operator);
    if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
D
Deon Nicholas 已提交
1448 1449 1450 1451
      fprintf(stderr, "invalid merge operator: %s\n",
              FLAGS_merge_operator.c_str());
      exit(1);
    }
1452
    options.max_successive_merges = FLAGS_max_successive_merges;
D
Deon Nicholas 已提交
1453

1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
      options.compaction_options_universal.size_ratio =
        FLAGS_universal_size_ratio;
    }
    if (FLAGS_universal_min_merge_width != 0) {
      options.compaction_options_universal.min_merge_width =
        FLAGS_universal_min_merge_width;
    }
    if (FLAGS_universal_max_merge_width != 0) {
      options.compaction_options_universal.max_merge_width =
        FLAGS_universal_max_merge_width;
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
      options.compaction_options_universal.max_size_amplification_percent =
        FLAGS_universal_max_size_amplification_percent;
    }
1471 1472 1473 1474
    if (FLAGS_universal_compression_size_percent != -1) {
      options.compaction_options_universal.compression_size_percent =
        FLAGS_universal_compression_size_percent;
    }
1475

H
heyongqiang 已提交
1476
    Status s;
1477
    if(FLAGS_readonly) {
H
heyongqiang 已提交
1478 1479 1480 1481
      s = DB::OpenForReadOnly(options, FLAGS_db, &db_);
    } else {
      s = DB::Open(options, FLAGS_db, &db_);
    }
1482 1483 1484 1485
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
1486
    if (FLAGS_min_level_to_compress >= 0) {
1487
      options.compression_per_level.clear();
1488
    }
1489 1490
  }

1491 1492 1493 1494
  enum WriteMode {
    RANDOM, SEQUENTIAL, UNIQUE_RANDOM
  };

1495
  void WriteSeq(ThreadState* thread) {
1496
    DoWrite(thread, SEQUENTIAL);
1497
  }
1498

1499
  void WriteRandom(ThreadState* thread) {
1500
    DoWrite(thread, RANDOM);
1501 1502
  }

1503 1504 1505 1506
  void WriteUniqueRandom(ThreadState* thread) {
    DoWrite(thread, UNIQUE_RANDOM);
  }

1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528
  void writeOrFail(WriteBatch& batch) {
    Status s = db_->Write(write_options_, &batch);
    if (!s.ok()) {
      fprintf(stderr, "put error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

  void WriteFromStdin(ThreadState* thread) {
    size_t count = 0;
    WriteBatch batch;
    const size_t bufferLen = 32 << 20;
    unique_ptr<char[]> line = unique_ptr<char[]>(new char[bufferLen]);
    char* linep = line.get();
    const int batchSize = 100 << 10;
    const char columnSeparator = '\t';
    const char lineSeparator = '\n';

    while (fgets(linep, bufferLen, stdin) != nullptr) {
      ++count;
      char* tab = std::find(linep, linep + bufferLen, columnSeparator);
      if (tab == linep + bufferLen) {
I
Igor Canadi 已提交
1529
        fprintf(stderr, "[Error] No Key delimiter TAB at line %zu\n", count);
1530 1531 1532 1533 1534 1535
        continue;
      }
      Slice key(linep, tab - linep);
      tab++;
      char* endLine = std::find(tab, linep + bufferLen, lineSeparator);
      if (endLine  == linep + bufferLen) {
I
Igor Canadi 已提交
1536
        fprintf(stderr, "[Error] No ENTER at end of line # %zu\n", count);
1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554
        continue;
      }
      Slice value(tab, endLine - tab);
      thread->stats.FinishedSingleOp(db_);
      thread->stats.AddBytes(endLine - linep - 1);

      if (batch.Count() < batchSize) {
        batch.Put(key, value);
        continue;
      }
      writeOrFail(batch);
      batch.Clear();
    }
    if (batch.Count() > 0) {
      writeOrFail(batch);
    }
  }

1555 1556 1557 1558 1559 1560 1561 1562 1563
  void DoWrite(ThreadState* thread, WriteMode write_mode) {
    const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
    const int num_ops = writes_ == 0 ? num_ : writes_ ;
    Duration duration(test_duration, num_ops);
    unique_ptr<BitSet> bit_set;

    if (write_mode == UNIQUE_RANDOM) {
      bit_set.reset(new BitSet(num_ops));
    }
M
Mark Callaghan 已提交
1564

1565
    if (num_ != FLAGS_num) {
1566
      char msg[100];
1567
      snprintf(msg, sizeof(msg), "(%lld ops)", num_);
1568
      thread->stats.AddMessage(msg);
1569 1570
    }

1571
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
1572 1573
    WriteBatch batch;
    Status s;
1574
    int64_t bytes = 0;
M
Mark Callaghan 已提交
1575 1576
    int i = 0;
    while (!duration.Done(entries_per_batch_)) {
J
jorlow@chromium.org 已提交
1577
      batch.Clear();
1578
      for (int j = 0; j < entries_per_batch_; j++) {
1579
        long long k = 0;
1580 1581 1582 1583 1584 1585 1586 1587 1588
        switch(write_mode) {
          case SEQUENTIAL:
            k = i +j;
            break;
          case RANDOM:
            k = thread->rand.Next() % FLAGS_num;
            break;
          case UNIQUE_RANDOM:
            {
1589
              const long long t = thread->rand.Next() % FLAGS_num;
1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616
              if (!bit_set->test(t)) {
                // best case
                k = t;
              } else {
                bool found = false;
                // look forward
                for (size_t i = t + 1; i < bit_set->size(); ++i) {
                  if (!bit_set->test(i)) {
                    found = true;
                    k = i;
                    break;
                  }
                }
                if (!found) {
                  for (size_t i = t; i-- > 0;) {
                    if (!bit_set->test(i)) {
                      found = true;
                      k = i;
                      break;
                    }
                  }
                }
              }
              bit_set->set(k);
              break;
            }
        };
1617 1618 1619
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        batch.Put(key.get(), gen.Generate(value_size_));
        bytes += value_size_ + strlen(key.get());
M
Mark Callaghan 已提交
1620
        thread->stats.FinishedSingleOp(db_);
1621
      }
1622
      s = db_->Write(write_options_, &batch);
J
jorlow@chromium.org 已提交
1623 1624 1625 1626
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
M
Mark Callaghan 已提交
1627
      i += entries_per_batch_;
J
jorlow@chromium.org 已提交
1628
    }
1629
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
1630 1631
  }

1632
  void ReadSequential(ThreadState* thread) {
1633
    Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
1634
    long long i = 0;
1635
    int64_t bytes = 0;
1636
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
1637
      bytes += iter->key().size() + iter->value().size();
M
Mark Callaghan 已提交
1638
      thread->stats.FinishedSingleOp(db_);
1639 1640 1641
      ++i;
    }
    delete iter;
1642
    thread->stats.AddBytes(bytes);
1643 1644
  }

1645
  void ReadReverse(ThreadState* thread) {
1646
    Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
1647
    long long i = 0;
1648
    int64_t bytes = 0;
1649
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
1650
      bytes += iter->key().size() + iter->value().size();
M
Mark Callaghan 已提交
1651
      thread->stats.FinishedSingleOp(db_);
1652 1653 1654
      ++i;
    }
    delete iter;
1655
    thread->stats.AddBytes(bytes);
1656 1657
  }

1658 1659 1660
  // Calls MultiGet over a list of keys from a random distribution.
  // Returns the total number of keys found.
  long MultiGetRandom(ReadOptions& options, int num_keys,
1661
                     Random64& rand, long long range, const char* suffix) {
1662 1663 1664 1665 1666 1667
    assert(num_keys > 0);
    std::vector<Slice> keys(num_keys);
    std::vector<std::string> values(num_keys);
    std::vector<unique_ptr<char []> > gen_keys(num_keys);

    int i;
1668
    long long k;
1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705

    // Fill the keys vector
    for(i=0; i<num_keys; ++i) {
      k = rand.Next() % range;
      gen_keys[i] = GenerateKeyFromInt(k,suffix);
      keys[i] = gen_keys[i].get();
    }

    if (FLAGS_use_snapshot) {
      options.snapshot = db_->GetSnapshot();
    }

    // Apply the operation
    std::vector<Status> statuses = db_->MultiGet(options, keys, &values);
    assert((long)statuses.size() == num_keys);
    assert((long)keys.size() == num_keys);  // Should always be the case.
    assert((long)values.size() == num_keys);

    if (FLAGS_use_snapshot) {
      db_->ReleaseSnapshot(options.snapshot);
      options.snapshot = nullptr;
    }

    // Count number found
    long found = 0;
    for(i=0; i<num_keys; ++i) {
      if (statuses[i].ok()){
        ++found;
      } else if (FLAGS_warn_missing_keys == true) {
        // Key not found, or error.
        fprintf(stderr, "get error: %s\n", statuses[i].ToString().c_str());
      }
    }

    return found;
  }

1706
  void ReadRandom(ThreadState* thread) {
1707
    ReadOptions options(FLAGS_verify_checksum, true);
M
Mark Callaghan 已提交
1708
    Duration duration(FLAGS_duration, reads_);
1709

1710
    long long found = 0;
M
Mark Callaghan 已提交
1711

1712 1713 1714 1715 1716 1717 1718
    if (FLAGS_use_multiget) {   // MultiGet
      const long& kpg = FLAGS_keys_per_multiget;  // keys per multiget group
      long keys_left = reads_;

      // Recalculate number of keys per group, and call MultiGet until done
      long num_keys;
      while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
X
Xing Jin 已提交
1719
        found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "");
1720 1721 1722 1723 1724 1725 1726
        thread->stats.FinishedSingleOp(db_);
        keys_left -= num_keys;
      }
    } else {    // Regular case. Do one "get" at a time Get
      Iterator* iter = db_->NewIterator(options);
      std::string value;
      while (!duration.Done(1)) {
1727
        const long long k = thread->rand.Next() % FLAGS_num;
1728 1729 1730
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        if (FLAGS_use_snapshot) {
          options.snapshot = db_->GetSnapshot();
1731
        }
M
Mark Callaghan 已提交
1732

1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754
        if (FLAGS_read_range < 2) {
          if (db_->Get(options, key.get(), &value).ok()) {
            found++;
          }
        } else {
          Slice skey(key.get());
          int count = 1;

          if (FLAGS_get_approx) {
            unique_ptr<char []> key2 =
                GenerateKeyFromInt(k + (int) FLAGS_read_range);
            Slice skey2(key2.get());
            Range range(skey, skey2);
            uint64_t sizes;
            db_->GetApproximateSizes(&range, 1, &sizes);
          }

          for (iter->Seek(skey);
               iter->Valid() && count <= FLAGS_read_range;
               ++count, iter->Next()) {
            found++;
          }
M
Mark Callaghan 已提交
1755 1756
        }

1757 1758 1759
        if (FLAGS_use_snapshot) {
          db_->ReleaseSnapshot(options.snapshot);
          options.snapshot = nullptr;
1760 1761
        }

1762
        thread->stats.FinishedSingleOp(db_);
M
Mark Callaghan 已提交
1763 1764
      }

1765
      delete iter;
S
Sanjay Ghemawat 已提交
1766
    }
1767

S
Sanjay Ghemawat 已提交
1768
    char msg[100];
1769
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
S
Sanjay Ghemawat 已提交
1770 1771 1772
    thread->stats.AddMessage(msg);
  }

T
Tyler Harter 已提交
1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807
  void PrefixScanRandom(ThreadState* thread) {
    if (FLAGS_use_prefix_api) {
      assert(FLAGS_use_prefix_blooms);
      assert(FLAGS_bloom_bits >= 1);
    }

    ReadOptions options(FLAGS_verify_checksum, true);
    Duration duration(FLAGS_duration, reads_);

    long long found = 0;

    while (!duration.Done(1)) {
      std::string value;
      const int k = thread->rand.Next() % FLAGS_num;
      unique_ptr<char []> key = GenerateKeyFromInt(k);
      Slice skey(key.get());
      Slice prefix = prefix_extractor_->Transform(skey);
      options.prefix = FLAGS_use_prefix_api ? &prefix : nullptr;

      Iterator* iter = db_->NewIterator(options);
      for (iter->Seek(skey);
           iter->Valid() && iter->key().starts_with(prefix);
           iter->Next()) {
        found++;
      }
      delete iter;

      thread->stats.FinishedSingleOp(db_);
    }

    char msg[100];
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
1808
  void ReadMissing(ThreadState* thread) {
1809 1810
    FLAGS_warn_missing_keys = false;    // Never warn about missing keys

M
Mark Callaghan 已提交
1811
    Duration duration(FLAGS_duration, reads_);
1812
    ReadOptions options(FLAGS_verify_checksum, true);
1813 1814 1815 1816 1817 1818 1819 1820 1821

    if (FLAGS_use_multiget) {
      const long& kpg = FLAGS_keys_per_multiget;  // keys per multiget group
      long keys_left = reads_;

      // Recalculate number of keys per group, and call MultiGet until done
      long num_keys;
      long found;
      while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
X
Xing Jin 已提交
1822 1823 1824 1825
        found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ".");

        // We should not find any key since the key we try to get has a
        // different suffix
1826 1827 1828
        if (found) {
          assert(false);
        }
X
Xing Jin 已提交
1829

1830 1831 1832 1833 1834 1835 1836
        thread->stats.FinishedSingleOp(db_);
        keys_left -= num_keys;
      }
    } else {  // Regular case (not MultiGet)
      std::string value;
      Status s;
      while (!duration.Done(1)) {
1837
        const long long k = thread->rand.Next() % FLAGS_num;
1838 1839 1840 1841 1842
        unique_ptr<char []> key = GenerateKeyFromInt(k, ".");
        s = db_->Get(options, key.get(), &value);
        assert(!s.ok() && s.IsNotFound());
        thread->stats.FinishedSingleOp(db_);
      }
J
jorlow@chromium.org 已提交
1843 1844 1845
    }
  }

1846
  void ReadHot(ThreadState* thread) {
M
Mark Callaghan 已提交
1847
    Duration duration(FLAGS_duration, reads_);
1848
    ReadOptions options(FLAGS_verify_checksum, true);
1849 1850
    const long long range = (FLAGS_num + 99) / 100;
    long long found = 0;
1851 1852

    if (FLAGS_use_multiget) {
1853 1854
      const long long kpg = FLAGS_keys_per_multiget;  // keys per multiget group
      long long keys_left = reads_;
1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865

      // Recalculate number of keys per group, and call MultiGet until done
      long num_keys;
      while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
        found += MultiGetRandom(options, num_keys, thread->rand, range, "");
        thread->stats.FinishedSingleOp(db_);
        keys_left -= num_keys;
      }
    } else {
      std::string value;
      while (!duration.Done(1)) {
1866
        const long long k = thread->rand.Next() % range;
1867 1868 1869 1870 1871 1872
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        if (db_->Get(options, key.get(), &value).ok()){
          ++found;
        }
        thread->stats.FinishedSingleOp(db_);
      }
1873
    }
1874 1875

    char msg[100];
1876
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_);
1877
    thread->stats.AddMessage(msg);
1878 1879
  }

S
Sanjay Ghemawat 已提交
1880
  void SeekRandom(ThreadState* thread) {
M
Mark Callaghan 已提交
1881
    Duration duration(FLAGS_duration, reads_);
1882
    ReadOptions options(FLAGS_verify_checksum, true);
S
Sanjay Ghemawat 已提交
1883
    std::string value;
1884
    long long found = 0;
M
Mark Callaghan 已提交
1885
    while (!duration.Done(1)) {
S
Sanjay Ghemawat 已提交
1886
      Iterator* iter = db_->NewIterator(options);
1887
      const long long k = thread->rand.Next() % FLAGS_num;
1888 1889 1890
      unique_ptr<char []> key = GenerateKeyFromInt(k);
      iter->Seek(key.get());
      if (iter->Valid() && iter->key() == key.get()) found++;
S
Sanjay Ghemawat 已提交
1891
      delete iter;
M
Mark Callaghan 已提交
1892
      thread->stats.FinishedSingleOp(db_);
S
Sanjay Ghemawat 已提交
1893 1894
    }
    char msg[100];
1895
    snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, num_);
S
Sanjay Ghemawat 已提交
1896 1897 1898 1899 1900 1901
    thread->stats.AddMessage(msg);
  }

  void DoDelete(ThreadState* thread, bool seq) {
    WriteBatch batch;
    Status s;
M
Mark Callaghan 已提交
1902 1903 1904
    Duration duration(seq ? 0 : FLAGS_duration, num_);
    long i = 0;
    while (!duration.Done(entries_per_batch_)) {
S
Sanjay Ghemawat 已提交
1905 1906
      batch.Clear();
      for (int j = 0; j < entries_per_batch_; j++) {
1907
        const long long k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
1908 1909
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        batch.Delete(key.get());
M
Mark Callaghan 已提交
1910
        thread->stats.FinishedSingleOp(db_);
S
Sanjay Ghemawat 已提交
1911 1912 1913 1914 1915 1916
      }
      s = db_->Write(write_options_, &batch);
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
M
Mark Callaghan 已提交
1917
      ++i;
S
Sanjay Ghemawat 已提交
1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

1929 1930 1931 1932 1933 1934
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      // Special thread that keeps writing until other threads are done.
      RandomGenerator gen;
1935 1936 1937 1938 1939 1940 1941 1942 1943
      double last = FLAGS_env->NowMicros();
      int writes_per_second_by_10 = 0;
      int num_writes = 0;

      // --writes_per_second rate limit is enforced per 100 milliseconds
      // intervals to avoid a burst of writes at the start of each second.

      if (FLAGS_writes_per_second > 0)
        writes_per_second_by_10 = FLAGS_writes_per_second / 10;
1944 1945 1946 1947

      // Don't merge stats from this thread with the readers.
      thread->stats.SetExcludeFromMerge();

1948 1949 1950 1951 1952 1953 1954 1955 1956
      while (true) {
        {
          MutexLock l(&thread->shared->mu);
          if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
            // Other threads have finished
            break;
          }
        }

1957
        const long long k = thread->rand.Next() % FLAGS_num;
1958
        unique_ptr<char []> key = GenerateKeyFromInt(k);
1959 1960
        Status s = db_->Put(write_options_, key.get(),
                            gen.Generate(value_size_));
1961 1962 1963 1964
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
1965
        thread->stats.FinishedSingleOp(db_);
1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979

        ++num_writes;
        if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
          double now = FLAGS_env->NowMicros();
          double usecs_since_last = now - last;

          num_writes = 0;
          last = now;

          if (usecs_since_last < 100000.0) {
            FLAGS_env->SleepForMicroseconds(100000.0 - usecs_since_last);
            last = FLAGS_env->NowMicros();
          }
        }
1980 1981 1982 1983
      }
    }
  }

1984
  // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
1985 1986
  // in DB atomically i.e in a single batch. Also refer GetMany.
  Status PutMany(const WriteOptions& writeoptions,
1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003
                  const Slice& key, const Slice& value) {
    std::string suffixes[3] = {"2", "1", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Put(keys[i], value);
    }

    s = db_->Write(writeoptions, &batch);
    return s;
  }


  // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V)
2004 2005
  // in DB atomically i.e in a single batch. Also refer GetMany.
  Status DeleteMany(const WriteOptions& writeoptions,
2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022
                  const Slice& key) {
    std::string suffixes[3] = {"1", "2", "0"};
    std::string keys[3];

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      batch.Delete(keys[i]);
    }

    s = db_->Write(writeoptions, &batch);
    return s;
  }

  // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2"
  // in the same snapshot, and verifies that all the values are identical.
2023 2024
  // ASSUMES that PutMany was used to put (K, V) into the DB.
  Status GetMany(const ReadOptions& readoptions,
2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061
                  const Slice& key, std::string* value) {
    std::string suffixes[3] = {"0", "1", "2"};
    std::string keys[3];
    Slice key_slices[3];
    std::string values[3];
    ReadOptions readoptionscopy = readoptions;
    readoptionscopy.snapshot = db_->GetSnapshot();
    Status s;
    for (int i = 0; i < 3; i++) {
      keys[i] = key.ToString() + suffixes[i];
      key_slices[i] = keys[i];
      s = db_->Get(readoptionscopy, key_slices[i], value);
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
      } else {
        values[i] = *value;
      }
    }
    db_->ReleaseSnapshot(readoptionscopy.snapshot);

    if ((values[0] != values[1]) || (values[1] != values[2])) {
      fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n",
              key.ToString().c_str(), values[0].c_str(), values[1].c_str(),
              values[2].c_str());
      // we continue after error rather than exiting so that we can
      // find more errors if any
    }

    return s;
  }

  // Differs from readrandomwriterandom in the following ways:
2062
  // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs.
2063 2064 2065 2066
  // (b) Does deletes as well (per FLAGS_deletepercent)
  // (c) In order to achieve high % of 'found' during lookups, and to do
  //     multiple writes (including puts and deletes) it uses upto
  //     FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys.
2067
  // (d) Does not have a MultiGet option.
2068 2069 2070 2071
  void RandomWithVerify(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
2072
    long long found = 0;
2073 2074 2075
    int get_weight = 0;
    int put_weight = 0;
    int delete_weight = 0;
2076 2077 2078
    long long gets_done = 0;
    long long puts_done = 0;
    long long deletes_done = 0;
2079

2080
    // the number of iterations is the larger of read_ or write_
2081 2082
    for (long long i = 0; i < readwrites_; i++) {
      const long long k = thread->rand.Next() % (FLAGS_numdistinct);
2083
      unique_ptr<char []> key = GenerateKeyFromInt(k);
2084
      if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
2085
        // one batch completed, reinitialize for next batch
2086 2087 2088 2089 2090 2091
        get_weight = FLAGS_readwritepercent;
        delete_weight = FLAGS_deletepercent;
        put_weight = 100 - get_weight - delete_weight;
      }
      if (get_weight > 0) {
        // do all the gets first
2092
        Status s = GetMany(options, key.get(), &value);
2093
        if (!s.ok() && !s.IsNotFound()) {
2094
          fprintf(stderr, "getmany error: %s\n", s.ToString().c_str());
2095 2096 2097 2098 2099 2100 2101 2102 2103 2104
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }
        get_weight--;
        gets_done++;
      } else if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
2105 2106
        Status s = PutMany(write_options_, key.get(),
                           gen.Generate(value_size_));
2107
        if (!s.ok()) {
2108
          fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
2109 2110 2111 2112 2113
          exit(1);
        }
        put_weight--;
        puts_done++;
      } else if (delete_weight > 0) {
2114
        Status s = DeleteMany(write_options_, key.get());
2115
        if (!s.ok()) {
2116
          fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str());
2117 2118 2119 2120 2121 2122 2123 2124 2125
          exit(1);
        }
        delete_weight--;
        deletes_done++;
      }

      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
2126 2127
    snprintf(msg, sizeof(msg),
             "( get:%lld put:%lld del:%lld total:%lld found:%lld)",
2128 2129 2130 2131
             gets_done, puts_done, deletes_done, readwrites_, found);
    thread->stats.AddMessage(msg);
  }

X
Xing Jin 已提交
2132
  // This is different from ReadWhileWriting because it does not use
2133
  // an extra thread.
2134
  void ReadRandomWriteRandom(ThreadState* thread) {
2135 2136 2137 2138 2139 2140
    if (FLAGS_use_multiget){
      // Separate function for multiget (for ease of reading)
      ReadRandomWriteRandomMultiGet(thread);
      return;
    }

2141 2142 2143
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
2144
    long long found = 0;
2145 2146
    int get_weight = 0;
    int put_weight = 0;
2147 2148
    long long reads_done = 0;
    long long writes_done = 0;
M
Mark Callaghan 已提交
2149 2150
    Duration duration(FLAGS_duration, readwrites_);

2151
    // the number of iterations is the larger of read_ or write_
M
Mark Callaghan 已提交
2152
    while (!duration.Done(1)) {
2153
      const long long k = thread->rand.Next() % FLAGS_num;
2154
      unique_ptr<char []> key = GenerateKeyFromInt(k);
2155
      if (get_weight == 0 && put_weight == 0) {
X
Xing Jin 已提交
2156
        // one batch completed, reinitialize for next batch
2157 2158 2159 2160
        get_weight = FLAGS_readwritepercent;
        put_weight = 100 - get_weight;
      }
      if (get_weight > 0) {
M
Mark Callaghan 已提交
2161 2162 2163 2164 2165 2166 2167

        if (FLAGS_use_snapshot) {
          options.snapshot = db_->GetSnapshot();
        }

        if (FLAGS_get_approx) {
          char key2[100];
2168
          snprintf(key2, sizeof(key2), "%016lld", k + 1);
M
Mark Callaghan 已提交
2169 2170 2171 2172 2173 2174 2175
          Slice skey2(key2);
          Slice skey(key2);
          Range range(skey, skey2);
          uint64_t sizes;
          db_->GetApproximateSizes(&range, 1, &sizes);
        }

2176
        // do all the gets first
2177
        Status s = db_->Get(options, key.get(), &value);
2178 2179 2180 2181 2182 2183 2184 2185
        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          found++;
        }

2186 2187
        get_weight--;
        reads_done++;
M
Mark Callaghan 已提交
2188 2189 2190 2191 2192

        if (FLAGS_use_snapshot) {
          db_->ReleaseSnapshot(options.snapshot);
        }

2193 2194 2195
      } else  if (put_weight > 0) {
        // then do all the corresponding number of puts
        // for all the gets we have done earlier
2196 2197
        Status s = db_->Put(write_options_, key.get(),
                            gen.Generate(value_size_));
2198 2199 2200 2201 2202 2203 2204
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        put_weight--;
        writes_done++;
      }
M
Mark Callaghan 已提交
2205
      thread->stats.FinishedSingleOp(db_);
2206 2207
    }
    char msg[100];
2208 2209
    snprintf(msg, sizeof(msg),
             "( reads:%lld writes:%lld total:%lld found:%lld)",
2210
             reads_done, writes_done, readwrites_, found);
2211 2212 2213
    thread->stats.AddMessage(msg);
  }

2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258
  // ReadRandomWriteRandom (with multiget)
  // Does FLAGS_keys_per_multiget reads (per multiget), followed by some puts.
  // FLAGS_readwritepercent will specify the ratio of gets to puts.
  // e.g.: If FLAGS_keys_per_multiget == 100 and FLAGS_readwritepercent == 75
  // Then each block will do 100 multigets and 33 puts
  // So there are 133 operations in-total: 100 of them (75%) are gets, and 33
  // of them (25%) are puts.
  void ReadRandomWriteRandomMultiGet(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;

    // For multiget
    const long& kpg = FLAGS_keys_per_multiget;  // keys per multiget group

    long keys_left = readwrites_;  // number of keys still left to read
    long num_keys;                  // number of keys to read in current group
    long num_put_keys;              // number of keys to put in current group

    long found = 0;
    long reads_done = 0;
    long writes_done = 0;
    long multigets_done = 0;

    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while(true) {
      // Read num_keys keys, then write num_put_keys keys.
      // The ratio of num_keys to num_put_keys is always FLAGS_readwritepercent
      // And num_keys is set to be FLAGS_keys_per_multiget (kpg)
      // num_put_keys is calculated accordingly (to maintain the ratio)
      // Note: On the final iteration, num_keys and num_put_keys will be smaller
      num_keys = std::min(keys_left*(FLAGS_readwritepercent + 99)/100, kpg);
      num_put_keys = num_keys * (100-FLAGS_readwritepercent)
                     / FLAGS_readwritepercent;

      // This will break the loop when duration is complete
      if (duration.Done(num_keys + num_put_keys)) {
        break;
      }

      // A quick check to make sure our formula doesn't break on edge cases
      assert(num_keys >= 1);
      assert(num_keys + num_put_keys <= keys_left);

      // Apply the MultiGet operations
X
Xing Jin 已提交
2259
      found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "");
2260 2261 2262 2263 2264 2265
      ++multigets_done;
      reads_done+=num_keys;
      thread->stats.FinishedSingleOp(db_);

      // Now do the puts
      int i;
2266
      long long k;
2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
      for(i=0; i<num_put_keys; ++i) {
        k = thread->rand.Next() % FLAGS_num;
        unique_ptr<char []> key = GenerateKeyFromInt(k);
        Status s = db_->Put(write_options_, key.get(),
                            gen.Generate(value_size_));
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
        writes_done++;
        thread->stats.FinishedSingleOp(db_);
      }

      keys_left -= (num_keys + num_put_keys);
    }
    char msg[100];
    snprintf(msg, sizeof(msg),
2284
             "( reads:%ld writes:%ld total:%lld multiget_ops:%ld found:%ld)",
2285 2286 2287 2288
             reads_done, writes_done, readwrites_, multigets_done, found);
    thread->stats.AddMessage(msg);
  }

M
Mark Callaghan 已提交
2289 2290 2291 2292 2293 2294
  //
  // Read-modify-write for random keys
  void UpdateRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
2295
    long long found = 0;
M
Mark Callaghan 已提交
2296 2297 2298 2299
    Duration duration(FLAGS_duration, readwrites_);

    // the number of iterations is the larger of read_ or write_
    while (!duration.Done(1)) {
2300
      const long long k = thread->rand.Next() % FLAGS_num;
M
Mark Callaghan 已提交
2301 2302 2303 2304 2305 2306 2307 2308
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      if (FLAGS_use_snapshot) {
        options.snapshot = db_->GetSnapshot();
      }

      if (FLAGS_get_approx) {
        char key2[100];
2309
        snprintf(key2, sizeof(key2), "%016lld", k + 1);
M
Mark Callaghan 已提交
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332
        Slice skey2(key2);
        Slice skey(key2);
        Range range(skey, skey2);
        uint64_t sizes;
        db_->GetApproximateSizes(&range, 1, &sizes);
      }

      if (db_->Get(options, key.get(), &value).ok()) {
        found++;
      }

      if (FLAGS_use_snapshot) {
        db_->ReleaseSnapshot(options.snapshot);
      }

      Status s = db_->Put(write_options_, key.get(), gen.Generate(value_size_));
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
2333 2334
    snprintf(msg, sizeof(msg),
             "( updates:%lld found:%lld)", readwrites_, found);
M
Mark Callaghan 已提交
2335 2336 2337
    thread->stats.AddMessage(msg);
  }

D
Deon Nicholas 已提交
2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349
  // Read-modify-write for random keys.
  // Each operation causes the key grow by value_size (simulating an append).
  // Generally used for benchmarking against merges of similar type
  void AppendRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
    long found = 0;

    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
2350
      const long long k = thread->rand.Next() % FLAGS_num;
D
Deon Nicholas 已提交
2351 2352 2353 2354 2355 2356 2357 2358
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      if (FLAGS_use_snapshot) {
        options.snapshot = db_->GetSnapshot();
      }

      if (FLAGS_get_approx) {
        char key2[100];
2359
        snprintf(key2, sizeof(key2), "%016lld", k + 1);
D
Deon Nicholas 已提交
2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395
        Slice skey2(key2);
        Slice skey(key2);
        Range range(skey, skey2);
        uint64_t sizes;
        db_->GetApproximateSizes(&range, 1, &sizes);
      }

      // Get the existing value
      if (db_->Get(options, key.get(), &value).ok()) {
        found++;
      } else {
        // If not existing, then just assume an empty string of data
        value.clear();
      }

      if (FLAGS_use_snapshot) {
        db_->ReleaseSnapshot(options.snapshot);
      }

      // Update the value (by appending data)
      Slice operand = gen.Generate(value_size_);
      if (value.size() > 0) {
        // Use a delimeter to match the semantics for StringAppendOperator
        value.append(1,',');
      }
      value.append(operand.data(), operand.size());

      // Write back to the database
      Status s = db_->Put(write_options_, key.get(), value);
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
2396
    snprintf(msg, sizeof(msg), "( updates:%lld found:%ld)", readwrites_, found);
D
Deon Nicholas 已提交
2397 2398 2399 2400 2401 2402 2403 2404 2405 2406
    thread->stats.AddMessage(msg);
  }

  // Read-modify-write for random keys (using MergeOperator)
  // The merge operator to use should be defined by FLAGS_merge_operator
  // Adjust FLAGS_value_size so that the keys are reasonable for this operator
  // Assumes that the merge operator is non-null (i.e.: is well-defined)
  //
  // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
  // to simulate random additions over 64-bit integers using merge.
2407 2408 2409
  //
  // The number of merges on the same key can be controlled by adjusting
  // FLAGS_merge_keys.
D
Deon Nicholas 已提交
2410 2411 2412 2413 2414 2415
  void MergeRandom(ThreadState* thread) {
    RandomGenerator gen;

    // The number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);
    while (!duration.Done(1)) {
2416
      const long long k = thread->rand.Next() % merge_keys_;
D
Deon Nicholas 已提交
2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      Status s = db_->Merge(write_options_, key.get(),
                            gen.Generate(value_size_));

      if (!s.ok()) {
        fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
        exit(1);
      }
      thread->stats.FinishedSingleOp(db_);
    }

    // Print some statistics
    char msg[100];
2431
    snprintf(msg, sizeof(msg), "( updates:%lld)", readwrites_);
D
Deon Nicholas 已提交
2432 2433 2434
    thread->stats.AddMessage(msg);
  }

2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
  // Read and merge random keys. The amount of reads and merges are controlled
  // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
  // keys (and thus also the number of reads and merges on the same key) can be
  // adjusted with FLAGS_merge_keys.
  //
  // As with MergeRandom, the merge operator to use should be defined by
  // FLAGS_merge_operator.
  void ReadRandomMergeRandom(ThreadState* thread) {
    ReadOptions options(FLAGS_verify_checksum, true);
    RandomGenerator gen;
    std::string value;
    long long num_hits = 0;
    long long num_gets = 0;
    long long num_merges = 0;
    size_t max_length = 0;

    // the number of iterations is the larger of read_ or write_
    Duration duration(FLAGS_duration, readwrites_);

    while (!duration.Done(1)) {
      const long long k = thread->rand.Next() % merge_keys_;
      unique_ptr<char []> key = GenerateKeyFromInt(k);

      bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;

      if (do_merge) {
        Status s = db_->Merge(write_options_, key.get(),
                              gen.Generate(value_size_));
        if (!s.ok()) {
          fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
          exit(1);
        }

        num_merges++;

      } else {
        Status s = db_->Get(options, key.get(), &value);
        if (value.length() > max_length)
          max_length = value.length();

        if (!s.ok() && !s.IsNotFound()) {
          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
          // we continue after error rather than exiting so that we can
          // find more errors if any
        } else if (!s.IsNotFound()) {
          num_hits++;
        }

        num_gets++;

      }

      thread->stats.FinishedSingleOp(db_);
    }
    char msg[100];
    snprintf(msg, sizeof(msg),
             "(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)",
             num_gets, num_merges, readwrites_, num_hits, max_length);
    thread->stats.AddMessage(msg);
  }


2497
  void Compact(ThreadState* thread) {
2498
    db_->CompactRange(nullptr, nullptr);
J
jorlow@chromium.org 已提交
2499 2500
  }

S
Sanjay Ghemawat 已提交
2501
  void PrintStats(const char* key) {
2502
    std::string stats;
S
Sanjay Ghemawat 已提交
2503
    if (!db_->GetProperty(key, &stats)) {
2504
      stats = "(failed)";
2505
    }
2506
    fprintf(stdout, "\n%s\n", stats.c_str());
2507 2508
  }

J
jorlow@chromium.org 已提交
2509 2510 2511 2512 2513 2514
  static void WriteToFile(void* arg, const char* buf, int n) {
    reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
  }

  void HeapProfile() {
    char fname[100];
H
Haobo Xu 已提交
2515
    EnvOptions soptions;
2516 2517
    snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db.c_str(),
             ++heap_counter_);
2518
    unique_ptr<WritableFile> file;
2519
    Status s = FLAGS_env->NewWritableFile(fname, &file, soptions);
J
jorlow@chromium.org 已提交
2520
    if (!s.ok()) {
2521
      fprintf(stderr, "%s\n", s.ToString().c_str());
J
jorlow@chromium.org 已提交
2522 2523
      return;
    }
2524
    bool ok = port::GetHeapProfile(WriteToFile, file.get());
J
jorlow@chromium.org 已提交
2525
    if (!ok) {
2526
      fprintf(stderr, "heap profiling not supported\n");
2527
      FLAGS_env->DeleteFile(fname);
J
jorlow@chromium.org 已提交
2528 2529 2530 2531
    }
  }
};

2532
}  // namespace rocksdb
J
jorlow@chromium.org 已提交
2533 2534

int main(int argc, char** argv) {
2535
  rocksdb::InstallStackTraceHandler();
I
Igor Canadi 已提交
2536 2537
  google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                          " [OPTIONS]...");
2538
  google::ParseCommandLineFlags(&argc, &argv, true);
2539

2540 2541 2542
  FLAGS_compaction_style_e = (rocksdb::CompactionStyle) FLAGS_compaction_style;
  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
J
jorlow@chromium.org 已提交
2543 2544
  }

2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573
  std::vector<std::string> fanout =
    rocksdb::stringSplit(FLAGS_max_bytes_for_level_multiplier_additional, ',');
  for (unsigned int j= 0; j < fanout.size(); j++) {
    FLAGS_max_bytes_for_level_multiplier_additional_v.push_back(
      std::stoi(fanout[j]));
  }

  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());

  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
  }

  if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NONE;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::NORMAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "SEQUENTIAL"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::SEQUENTIAL;
  else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "WILLNEED"))
    FLAGS_compaction_fadvice_e = rocksdb::Options::WILLNEED;
  else {
    fprintf(stdout, "Unknown compaction fadvice:%s\n",
            FLAGS_compaction_fadvice.c_str());
  }

  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());

2574 2575 2576
  // The number of background threads should be at least as much the
  // max number of concurrent compactions.
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
H
heyongqiang 已提交
2577
  // Choose a location for the test database if none given with --db=<path>
2578 2579 2580 2581 2582
  if (FLAGS_db.empty()) {
    std::string default_db_path;
    rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
    default_db_path += "/dbbench";
    FLAGS_db = default_db_path;
H
heyongqiang 已提交
2583 2584
  }

2585
  rocksdb::Benchmark benchmark;
J
jorlow@chromium.org 已提交
2586 2587 2588
  benchmark.Run();
  return 0;
}