db_stress.cc 71.8 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
6 7 8
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
//
X
Xing Jin 已提交
10 11 12 13 14 15 16 17 18 19 20 21
// The test uses an array to compare against values written to the database.
// Keys written to the array are in 1:1 correspondence to the actual values in
// the database according to the formula in the function GenerateValue.

// Space is reserved in the array from 0 to FLAGS_max_key and values are
// randomly written/deleted/read from those positions. During verification we
// compare all the positions in the array. To shorten/elongate the running
// time, you could change the settings: FLAGS_max_key, FLAGS_ops_per_thread,
// (sometimes also FLAGS_threads).
//
// NOTE that if FLAGS_test_batches_snapshots is set, the test will have
// different behavior. See comment of the flag for details.
A
amayank 已提交
22

23 24 25 26 27 28 29 30
#ifndef GFLAGS
#include <cstdio>
int main() {
  fprintf(stderr, "Please install gflags to run rocksdb tools\n");
  return 1;
}
#else

31 32
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
33 34
#include <stdio.h>
#include <stdlib.h>
35 36
#include <sys/types.h>
#include <chrono>
37
#include <exception>
38 39
#include <thread>

40
#include <gflags/gflags.h>
41 42
#include "db/db_impl.h"
#include "db/version_set.h"
43 44
#include "hdfs/env_hdfs.h"
#include "port/port.h"
45 46
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
47 48
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
49 50 51
#include "rocksdb/statistics.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/write_batch.h"
52
#include "util/coding.h"
53
#include "util/compression.h"
54 55
#include "util/crc32c.h"
#include "util/histogram.h"
56
#include "util/logging.h"
57 58
#include "util/mutexlock.h"
#include "util/random.h"
59
#include "util/string_util.h"
60
#include "util/testutil.h"
D
Deon Nicholas 已提交
61
#include "utilities/merge_operators.h"
62

63 64 65
using GFLAGS::ParseCommandLineFlags;
using GFLAGS::RegisterFlagValidator;
using GFLAGS::SetUsageMessage;
66

67
static const long KB = 1024;
68

69 70
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
  if (value > std::numeric_limits<uint32_t>::max()) {
K
kailiu 已提交
71 72 73 74
    fprintf(stderr,
            "Invalid value for --%s: %lu, overflow\n",
            flagname,
            (unsigned long)value);
75 76 77 78
    return false;
  }
  return true;
}
79

80
DEFINE_uint64(seed, 2341234, "Seed for PRNG");
I
Igor Canadi 已提交
81
static const bool FLAGS_seed_dummy __attribute__((unused)) =
82
    RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range);
83

I
Igor Canadi 已提交
84
DEFINE_int64(max_key, 1 * KB* KB,
85
             "Max number of key/values to place in database");
86

I
Igor Canadi 已提交
87 88
DEFINE_int32(column_families, 10, "Number of column families");

89
DEFINE_bool(test_batches_snapshots, false,
A
Andres Noetzli 已提交
90
            "If set, the test uses MultiGet(), MultiPut() and MultiDelete()"
91 92 93 94 95 96 97 98 99
            " which read/write/delete multiple keys in a batch. In this mode,"
            " we do not verify db content by comparing the content with the "
            "pre-allocated array. Instead, we do partial verification inside"
            " MultiGet() by checking various values in a batch. Benefit of"
            " this mode:\n"
            "\t(a) No need to acquire mutexes during writes (less cache "
            "flushes in multi-core leading to speed up)\n"
            "\t(b) No long validation at the end (more speed up)\n"
            "\t(c) Test snapshot and atomicity of batch writes");
100

101
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
102

103 104 105 106
DEFINE_int32(ttl, -1,
             "Opens the db with this ttl value if this is not -1. "
             "Carefully specify a large value such that verifications on "
             "deleted values don't fail");
107

108 109
DEFINE_int32(value_size_mult, 8,
             "Size of value will be this number times rand_int(1,3) bytes");
110

111
DEFINE_bool(verify_before_write, false, "Verify before write");
112

113
DEFINE_bool(histogram, false, "Print histogram of operation timings");
114

115 116
DEFINE_bool(destroy_db_initially, true,
            "Destroys the database dir before start if this is true");
117

118 119 120 121
DEFINE_bool(verbose, false, "Verbose");

DEFINE_bool(progress_reports, true,
            "If true, db_stress will report number of finished operations");
122

123 124 125
DEFINE_uint64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
              "Number of bytes to buffer in all memtables before compacting");

126 127
DEFINE_int32(write_buffer_size,
             static_cast<int32_t>(rocksdb::Options().write_buffer_size),
128
             "Number of bytes to buffer in memtable before compacting");
129

130 131 132 133
DEFINE_int32(max_write_buffer_number,
             rocksdb::Options().max_write_buffer_number,
             "The number of in-memory memtables. "
             "Each memtable is of size FLAGS_write_buffer_size.");
134

135 136 137 138 139 140 141 142 143 144
DEFINE_int32(min_write_buffer_number_to_merge,
             rocksdb::Options().min_write_buffer_number_to_merge,
             "The minimum number of write buffers that will be merged together "
             "before writing to storage. This is cheap because it is an "
             "in-memory merge. If this feature is not enabled, then all these "
             "write buffers are flushed to L0 as separate files and this "
             "increases read amplification because a get request has to check "
             "in all of these files. Also, an in-memory merge may result in "
             "writing less data to storage if there are duplicate records in"
             " each of these individual write buffers.");
145

146 147 148 149 150 151 152 153 154 155 156 157 158 159
DEFINE_int32(max_write_buffer_number_to_maintain,
             rocksdb::Options().max_write_buffer_number_to_maintain,
             "The total maximum number of write buffers to maintain in memory "
             "including copies of buffers that have already been flushed. "
             "Unlike max_write_buffer_number, this parameter does not affect "
             "flushing. This controls the minimum amount of write history "
             "that will be available in memory for conflict checking when "
             "Transactions are used. If this value is too low, some "
             "transactions may fail at commit time due to not being able to "
             "determine whether there were any write conflicts. Setting this "
             "value to 0 will cause write buffers to be freed immediately "
             "after they are flushed.  If this value is set to -1, "
             "'max_write_buffer_number' will be used.");

160 161 162
DEFINE_int32(open_files, rocksdb::Options().max_open_files,
             "Maximum number of files to keep open at the same time "
             "(use default if == 0)");
163

164 165 166 167
DEFINE_int64(compressed_cache_size, -1,
             "Number of bytes to use as a cache of compressed data."
             " Negative means use default settings.");

168
DEFINE_int32(compaction_style, rocksdb::Options().compaction_style, "");
169

170 171 172
DEFINE_int32(level0_file_num_compaction_trigger,
             rocksdb::Options().level0_file_num_compaction_trigger,
             "Level0 compaction start trigger");
173

174 175 176
DEFINE_int32(level0_slowdown_writes_trigger,
             rocksdb::Options().level0_slowdown_writes_trigger,
             "Number of files in level-0 that will slow down writes");
177

178 179 180
DEFINE_int32(level0_stop_writes_trigger,
             rocksdb::Options().level0_stop_writes_trigger,
             "Number of files in level-0 that will trigger put stop.");
181

182 183
DEFINE_int32(block_size,
             static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
184
             "Number of bytes in a block.");
185

186 187 188 189
DEFINE_int32(max_background_compactions,
             rocksdb::Options().max_background_compactions,
             "The maximum number of concurrent background compactions "
             "that can occur in parallel.");
190

191 192 193 194
DEFINE_int32(compaction_thread_pool_adjust_interval, 0,
             "The interval (in milliseconds) to adjust compaction thread pool "
             "size. Don't change it periodically if the value is 0.");

C
clark.kang 已提交
195
DEFINE_int32(compaction_thread_pool_variations, 2,
A
Andres Noetzli 已提交
196
             "Range of background thread pool size variations when adjusted "
197 198
             "periodically.");

I
Igor Canadi 已提交
199 200 201 202
DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes,
             "The maximum number of concurrent background flushes "
             "that can occur in parallel.");

203 204
DEFINE_int32(universal_size_ratio, 0, "The ratio of file sizes that trigger"
             " compaction in universal style");
205

206 207
DEFINE_int32(universal_min_merge_width, 0, "The minimum number of files to "
             "compact in universal style compaction");
208

209 210
DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact"
             " in universal style compaction");
211

212 213
DEFINE_int32(universal_max_size_amplification_percent, 0,
             "The max size amplification for universal style compaction");
214

I
Igor Canadi 已提交
215 216 217 218 219
DEFINE_int32(clear_column_family_one_in, 1000000,
             "With a chance of 1/N, delete a column family and then recreate "
             "it again. If N == 0, never drop/create column families. "
             "When test_batches_snapshots is true, this flag has no effect");

L
Lei Jin 已提交
220 221 222
DEFINE_int32(set_options_one_in, 0,
             "With a chance of 1/N, change some random options");

223 224 225
DEFINE_int32(set_in_place_one_in, 0,
             "With a chance of 1/N, toggle in place support option");

226
DEFINE_int64(cache_size, 2LL * KB * KB * KB,
227
             "Number of bytes to use as a cache of uncompressed data.");
228

229 230 231 232 233 234
DEFINE_uint64(subcompactions, 1,
             "Maximum number of subcompactions to divide L0-L1 compactions "
             "into.");
static const bool FLAGS_subcompactions_dummy __attribute__((unused)) =
    RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);

235 236 237 238 239 240 241 242 243
static bool ValidateInt32Positive(const char* flagname, int32_t value) {
  if (value < 0) {
    fprintf(stderr, "Invalid value for --%s: %d, must be >=0\n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(reopen, 10, "Number of times database reopens");
I
Igor Canadi 已提交
244
static const bool FLAGS_reopen_dummy __attribute__((unused)) =
245
    RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive);
246

247 248
DEFINE_int32(bloom_bits, 10, "Bloom filter bits per key. "
             "Negative means use default settings.");
249

250 251 252
DEFINE_bool(use_block_based_filter, false, "use block based filter"
              "instead of full filter for block based table");

253
DEFINE_string(db, "", "Use the db with the following name.");
254

255 256
DEFINE_bool(verify_checksum, false,
            "Verify checksum for every block read from storage");
257

258 259
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
            "Allow reads to occur via mmap-ing files");
260

261 262 263
// Database statistics
static std::shared_ptr<rocksdb::Statistics> dbstats;
DEFINE_bool(statistics, false, "Create database statistics");
264

265
DEFINE_bool(sync, false, "Sync all writes to disk");
266

267 268
DEFINE_bool(disable_data_sync, false,
            "If true, do not wait until data is synced to disk.");
A
amayank 已提交
269

270
DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync");
271

272 273 274
DEFINE_int32(kill_random_test, 0,
             "If non-zero, kill at various points in source code with "
             "probability 1/this");
I
Igor Canadi 已提交
275
static const bool FLAGS_kill_random_test_dummy __attribute__((unused)) =
276
    RegisterFlagValidator(&FLAGS_kill_random_test, &ValidateInt32Positive);
277
extern int rocksdb_kill_odds;
278

279
DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
280

281 282
DEFINE_int32(target_file_size_base, 64 * KB,
             "Target level-1 file size for compaction");
283

284
DEFINE_int32(target_file_size_multiplier, 1,
A
Andres Noetzli 已提交
285
             "A multiplier to compute target level-N file size (N >= 2)");
286

287
DEFINE_uint64(max_bytes_for_level_base, 256 * KB, "Max bytes for level-1");
288

289 290
DEFINE_int32(max_bytes_for_level_multiplier, 2,
             "A multiplier to compute max bytes for level-N (N >= 2)");
291

292 293 294 295 296 297 298 299 300 301
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
  if (value < 0 || value>100) {
    fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n",
            flagname, value);
    return false;
  }
  return true;
}
DEFINE_int32(readpercent, 10,
             "Ratio of reads to total workload (expressed as a percentage)");
I
Igor Canadi 已提交
302
static const bool FLAGS_readpercent_dummy __attribute__((unused)) =
303
    RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent);
304 305 306 307

DEFINE_int32(prefixpercent, 20,
             "Ratio of prefix iterators to total workload (expressed as a"
             " percentage)");
I
Igor Canadi 已提交
308
static const bool FLAGS_prefixpercent_dummy __attribute__((unused)) =
309
    RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent);
310 311

DEFINE_int32(writepercent, 45,
A
Andres Noetzli 已提交
312
             "Ratio of writes to total workload (expressed as a percentage)");
I
Igor Canadi 已提交
313
static const bool FLAGS_writepercent_dummy __attribute__((unused)) =
314
    RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent);
315 316 317

DEFINE_int32(delpercent, 15,
             "Ratio of deletes to total workload (expressed as a percentage)");
I
Igor Canadi 已提交
318
static const bool FLAGS_delpercent_dummy __attribute__((unused)) =
319
    RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent);
320 321 322

DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload"
             " (expressed as a percentage)");
I
Igor Canadi 已提交
323
static const bool FLAGS_iterpercent_dummy __attribute__((unused)) =
324
    RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent);
325 326

DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run");
I
Igor Canadi 已提交
327
static const bool FLAGS_num_iterations_dummy __attribute__((unused)) =
328
    RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range);
329

I
Igor Canadi 已提交
330
namespace {
331 332 333 334 335 336 337 338 339 340 341
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "none"))
    return rocksdb::kNoCompression;
  else if (!strcasecmp(ctype, "snappy"))
    return rocksdb::kSnappyCompression;
  else if (!strcasecmp(ctype, "zlib"))
    return rocksdb::kZlibCompression;
  else if (!strcasecmp(ctype, "bzip2"))
    return rocksdb::kBZip2Compression;
A
Albert Strasheim 已提交
342 343 344 345
  else if (!strcasecmp(ctype, "lz4"))
    return rocksdb::kLZ4Compression;
  else if (!strcasecmp(ctype, "lz4hc"))
    return rocksdb::kLZ4HCCompression;
346 347
  else if (!strcasecmp(ctype, "zstd"))
    return rocksdb::kZSTDNotFinalCompression;
348 349 350 351

  fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
  return rocksdb::kSnappyCompression; //default value
}
I
Igor Canadi 已提交
352 353
}  // namespace

354 355 356
DEFINE_string(compression_type, "snappy",
              "Algorithm to use to compress the database");
static enum rocksdb::CompressionType FLAGS_compression_type_e =
357
    rocksdb::kSnappyCompression;
358

359
DEFINE_string(hdfs, "", "Name of hdfs environment");
360
// posix or hdfs environment
361
static rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
362

I
Igor Canadi 已提交
363 364
DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
static const bool FLAGS_ops_per_thread_dummy __attribute__((unused)) =
365
    RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range);
366

367
DEFINE_uint64(log2_keys_per_lock, 2, "Log2 of number of keys per lock");
I
Igor Canadi 已提交
368
static const bool FLAGS_log2_keys_per_lock_dummy __attribute__((unused)) =
369
    RegisterFlagValidator(&FLAGS_log2_keys_per_lock, &ValidateUint32Range);
370

371 372
DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop"
            " the delete if key not present");
373

374 375
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");

J
Jim Paton 已提交
376 377
enum RepFactory {
  kSkipList,
I
Igor Canadi 已提交
378
  kHashSkipList,
J
Jim Paton 已提交
379 380
  kVectorRep
};
I
Igor Canadi 已提交
381 382

namespace {
383 384 385 386 387 388
enum RepFactory StringToRepFactory(const char* ctype) {
  assert(ctype);

  if (!strcasecmp(ctype, "skip_list"))
    return kSkipList;
  else if (!strcasecmp(ctype, "prefix_hash"))
I
Igor Canadi 已提交
389
    return kHashSkipList;
390 391 392 393 394 395
  else if (!strcasecmp(ctype, "vector"))
    return kVectorRep;

  fprintf(stdout, "Cannot parse memreptable %s\n", ctype);
  return kSkipList;
}
I
Igor Canadi 已提交
396 397
}  // namespace

J
Jim Paton 已提交
398
static enum RepFactory FLAGS_rep_factory;
L
Lei Jin 已提交
399
DEFINE_string(memtablerep, "prefix_hash", "");
J
Jim Paton 已提交
400

401
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
L
Lei Jin 已提交
402 403
  if (value < 0 || value > 8) {
    fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n",
404 405 406 407 408
            flagname, value);
    return false;
  }
  return true;
}
I
Igor Canadi 已提交
409
DEFINE_int32(prefix_size, 7, "Control the prefix size for HashSkipListRep");
L
Lei Jin 已提交
410
static const bool FLAGS_prefix_size_dummy __attribute__((unused)) =
411
    RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
412 413 414

DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge "
            "that behaves like a Put");
J
Jim Paton 已提交
415

D
Deon Nicholas 已提交
416

417
namespace rocksdb {
418

419 420 421 422 423 424 425 426 427 428 429 430 431
// convert long to a big-endian slice key
static std::string Key(long val) {
  std::string little_endian_key;
  std::string big_endian_key;
  PutFixed64(&little_endian_key, val);
  assert(little_endian_key.size() == sizeof(val));
  big_endian_key.resize(sizeof(val));
  for (int i=0; i<(int)sizeof(val); i++) {
    big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i];
  }
  return big_endian_key;
}

432 433 434 435 436 437 438 439 440 441 442
static std::string StringToHex(const std::string& str) {
  std::string result = "0x";
  char buf[10];
  for (size_t i = 0; i < str.length(); i++) {
    snprintf(buf, 10, "%02X", (unsigned char)str[i]);
    result += buf;
  }
  return result;
}


443 444 445 446 447 448 449 450 451
class StressTest;
namespace {

class Stats {
 private:
  double start_;
  double finish_;
  double seconds_;
  long done_;
452 453
  long gets_;
  long prefixes_;
454
  long writes_;
A
amayank 已提交
455
  long deletes_;
456
  long iterator_size_sums_;
457
  long founds_;
458
  long iterations_;
459
  long errors_;
460 461 462
  int next_report_;
  size_t bytes_;
  double last_op_finish_;
463
  HistogramImpl hist_;
464 465 466 467 468 469 470 471

 public:
  Stats() { }

  void Start() {
    next_report_ = 100;
    hist_.Clear();
    done_ = 0;
472 473
    gets_ = 0;
    prefixes_ = 0;
474
    writes_ = 0;
A
amayank 已提交
475
    deletes_ = 0;
476
    iterator_size_sums_ = 0;
477
    founds_ = 0;
478
    iterations_ = 0;
479
    errors_ = 0;
480 481 482 483 484 485 486 487 488 489
    bytes_ = 0;
    seconds_ = 0;
    start_ = FLAGS_env->NowMicros();
    last_op_finish_ = start_;
    finish_ = start_;
  }

  void Merge(const Stats& other) {
    hist_.Merge(other.hist_);
    done_ += other.done_;
490 491
    gets_ += other.gets_;
    prefixes_ += other.prefixes_;
492
    writes_ += other.writes_;
A
amayank 已提交
493
    deletes_ += other.deletes_;
494
    iterator_size_sums_ += other.iterator_size_sums_;
495
    founds_ += other.founds_;
496
    iterations_ += other.iterations_;
497
    errors_ += other.errors_;
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;
  }

  void Stop() {
    finish_ = FLAGS_env->NowMicros();
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void FinishedSingleOp() {
    if (FLAGS_histogram) {
      double now = FLAGS_env->NowMicros();
      double micros = now - last_op_finish_;
      hist_.Add(micros);
      if (micros > 20000) {
515
        fprintf(stdout, "long op: %.1f micros%30s\r", micros, "");
516 517 518 519
      }
      last_op_finish_ = now;
    }

520
      done_++;
521
    if (FLAGS_progress_reports) {
522 523 524 525 526 527 528 529 530 531
      if (done_ >= next_report_) {
        if      (next_report_ < 1000)   next_report_ += 100;
        else if (next_report_ < 5000)   next_report_ += 500;
        else if (next_report_ < 10000)  next_report_ += 1000;
        else if (next_report_ < 50000)  next_report_ += 5000;
        else if (next_report_ < 100000) next_report_ += 10000;
        else if (next_report_ < 500000) next_report_ += 50000;
        else                            next_report_ += 100000;
        fprintf(stdout, "... finished %ld ops%30s\r", done_, "");
      }
532 533 534
    }
  }

535 536 537 538 539
  void AddBytesForWrites(int nwrites, size_t nbytes) {
    writes_ += nwrites;
    bytes_ += nbytes;
  }

540 541 542
  void AddGets(int ngets, int nfounds) {
    founds_ += nfounds;
    gets_ += ngets;
543 544
  }

545 546 547 548 549
  void AddPrefixes(int nprefixes, int count) {
    prefixes_ += nprefixes;
    iterator_size_sums_ += count;
  }

550 551 552 553
  void AddIterations(int n) {
    iterations_ += n;
  }

554 555 556 557
  void AddDeletes(int n) {
    deletes_ += n;
  }

558 559
  void AddErrors(int n) {
    errors_ += n;
A
amayank 已提交
560 561
  }

562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
  void Report(const char* name) {
    std::string extra;
    if (bytes_ < 1 || done_ < 1) {
      fprintf(stderr, "No writes or ops?\n");
      return;
    }

    double elapsed = (finish_ - start_) * 1e-6;
    double bytes_mb = bytes_ / 1048576.0;
    double rate = bytes_mb / elapsed;
    double throughput = (double)done_/elapsed;

    fprintf(stdout, "%-12s: ", name);
    fprintf(stdout, "%.3f micros/op %ld ops/sec\n",
            seconds_ * 1e6 / done_, (long)throughput);
    fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
            "", bytes_mb, rate, (100*writes_)/done_, done_);
579
    fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_);
A
amayank 已提交
580
    fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
X
Xing Jin 已提交
581 582
    fprintf(stdout, "%-12s: %ld read and %ld found the key\n", "",
            gets_, founds_);
583 584 585
    fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_);
    fprintf(stdout, "%-12s: Iterator size sum is %ld\n", "",
            iterator_size_sums_);
586
    fprintf(stdout, "%-12s: Iterated %ld times\n", "", iterations_);
587
    fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_);
588 589 590 591 592 593 594 595 596 597 598

    if (FLAGS_histogram) {
      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
    }
    fflush(stdout);
  }
};

// State shared by all concurrent executions of the same benchmark.
class SharedState {
 public:
599
  static const uint32_t SENTINEL;
600

601 602
  explicit SharedState(StressTest* stress_test)
      : cv_(&mu_),
603
        seed_(static_cast<uint32_t>(FLAGS_seed)),
604
        max_key_(FLAGS_max_key),
605
        log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
606 607 608 609 610 611 612
        num_threads_(FLAGS_threads),
        num_initialized_(0),
        num_populated_(0),
        vote_reopen_(0),
        num_done_(0),
        start_(false),
        start_verify_(false),
613 614
        should_stop_bg_thread_(false),
        bg_thread_finished_(false),
615 616
        stress_test_(stress_test),
        verification_failure_(false) {
617 618 619 620
    if (FLAGS_test_batches_snapshots) {
      fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
      return;
    }
I
Igor Canadi 已提交
621 622 623 624
    values_.resize(FLAGS_column_families);

    for (int i = 0; i < FLAGS_column_families; ++i) {
      values_[i] = std::vector<uint32_t>(max_key_, SENTINEL);
625
    }
626

627 628
    long num_locks = (max_key_ >> log2_keys_per_lock_);
    if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
I
Igor Canadi 已提交
629 630 631 632
      num_locks++;
    }
    fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
    key_locks_.resize(FLAGS_column_families);
D
Dmitri Smirnov 已提交
633

I
Igor Canadi 已提交
634
    for (int i = 0; i < FLAGS_column_families; ++i) {
D
Dmitri Smirnov 已提交
635 636 637 638
      key_locks_[i].resize(num_locks);
      for (auto& ptr : key_locks_[i]) {
        ptr.reset(new port::Mutex);
      }
639 640 641
    }
  }

I
Igor Canadi 已提交
642
  ~SharedState() {}
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667

  port::Mutex* GetMutex() {
    return &mu_;
  }

  port::CondVar* GetCondVar() {
    return &cv_;
  }

  StressTest* GetStressTest() const {
    return stress_test_;
  }

  long GetMaxKey() const {
    return max_key_;
  }

  uint32_t GetNumThreads() const {
    return num_threads_;
  }

  void IncInitialized() {
    num_initialized_++;
  }

A
amayank 已提交
668
  void IncOperated() {
669 670 671 672 673 674 675
    num_populated_++;
  }

  void IncDone() {
    num_done_++;
  }

676 677 678 679
  void IncVotedReopen() {
    vote_reopen_ = (vote_reopen_ + 1) % num_threads_;
  }

680 681 682 683
  bool AllInitialized() const {
    return num_initialized_ >= num_threads_;
  }

A
amayank 已提交
684
  bool AllOperated() const {
685 686 687 688 689 690 691
    return num_populated_ >= num_threads_;
  }

  bool AllDone() const {
    return num_done_ >= num_threads_;
  }

692 693 694 695
  bool AllVotedReopen() {
    return (vote_reopen_ == 0);
  }

696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
  void SetStart() {
    start_ = true;
  }

  void SetStartVerify() {
    start_verify_ = true;
  }

  bool Started() const {
    return start_;
  }

  bool VerifyStarted() const {
    return start_verify_;
  }

I
Igor Canadi 已提交
712 713 714 715
  void SetVerificationFailure() { verification_failure_.store(true); }

  bool HasVerificationFailedYet() { return verification_failure_.load(); }

I
Igor Canadi 已提交
716
  port::Mutex* GetMutexForKey(int cf, long key) {
D
Dmitri Smirnov 已提交
717
    return key_locks_[cf][key >> log2_keys_per_lock_].get();
718 719
  }

I
Igor Canadi 已提交
720 721
  void LockColumnFamily(int cf) {
    for (auto& mutex : key_locks_[cf]) {
D
Dmitri Smirnov 已提交
722
      mutex->Lock();
I
Igor Canadi 已提交
723
    }
724 725
  }

I
Igor Canadi 已提交
726 727
  void UnlockColumnFamily(int cf) {
    for (auto& mutex : key_locks_[cf]) {
D
Dmitri Smirnov 已提交
728
      mutex->Unlock();
I
Igor Canadi 已提交
729
    }
730 731
  }

I
Igor Canadi 已提交
732 733
  void ClearColumnFamily(int cf) {
    std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL);
A
amayank 已提交
734 735
  }

I
Igor Canadi 已提交
736 737
  void Put(int cf, long key, uint32_t value_base) {
    values_[cf][key] = value_base;
738 739
  }

I
Igor Canadi 已提交
740 741 742 743 744 745
  uint32_t Get(int cf, long key) const { return values_[cf][key]; }

  void Delete(int cf, long key) { values_[cf][key] = SENTINEL; }

  uint32_t GetSeed() const { return seed_; }

746 747 748 749 750 751 752 753
  void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }

  bool ShoudStopBgThread() { return should_stop_bg_thread_; }

  void SetBgThreadFinish() { bg_thread_finished_ = true; }

  bool BgThreadFinished() const { return bg_thread_finished_; }

754 755 756 757 758 759 760 761 762
 private:
  port::Mutex mu_;
  port::CondVar cv_;
  const uint32_t seed_;
  const long max_key_;
  const uint32_t log2_keys_per_lock_;
  const int num_threads_;
  long num_initialized_;
  long num_populated_;
763
  long vote_reopen_;
764 765 766
  long num_done_;
  bool start_;
  bool start_verify_;
767 768
  bool should_stop_bg_thread_;
  bool bg_thread_finished_;
769
  StressTest* stress_test_;
I
Igor Canadi 已提交
770
  std::atomic<bool> verification_failure_;
771

I
Igor Canadi 已提交
772
  std::vector<std::vector<uint32_t>> values_;
D
Dmitri Smirnov 已提交
773 774
  // Has to make it owned by a smart ptr as port::Mutex is not copyable
  // and storing it in the container may require copying depending on the impl.
S
sdong 已提交
775
  std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
776 777
};

778 779
const uint32_t SharedState::SENTINEL = 0xffffffff;

780 781 782 783 784 785 786
// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  uint32_t tid; // 0..n-1
  Random rand;  // Has different seeds for different threads
  SharedState* shared;
  Stats stats;

I
Igor Canadi 已提交
787
  ThreadState(uint32_t index, SharedState* _shared)
I
Igor Canadi 已提交
788
      : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
789 790
};

791 792 793 794
class DbStressListener : public EventListener {
 public:
  DbStressListener(
      const std::string& db_name,
Y
Yueh-Hsuan Chiang 已提交
795
      const std::vector<DbPath>& db_paths) :
796 797 798 799 800 801
      db_name_(db_name),
      db_paths_(db_paths),
      rand_(301) {}
  virtual ~DbStressListener() {}
#ifndef ROCKSDB_LITE
  virtual void OnFlushCompleted(
802
      DB* db, const FlushJobInfo& info) override {
803 804
    assert(db);
    assert(db->GetName() == db_name_);
805 806
    assert(IsValidColumnFamilyName(info.cf_name));
    VerifyFilePath(info.file_path);
807 808 809 810 811 812
    // pretending doing some work here
    std::this_thread::sleep_for(
        std::chrono::microseconds(rand_.Uniform(5000)));
  }

  virtual void OnCompactionCompleted(
813
      DB *db, const CompactionJobInfo& ci) override {
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
    assert(db);
    assert(db->GetName() == db_name_);
    assert(IsValidColumnFamilyName(ci.cf_name));
    assert(ci.input_files.size() + ci.output_files.size() > 0U);
    for (const auto& file_path : ci.input_files) {
      VerifyFilePath(file_path);
    }
    for (const auto& file_path : ci.output_files) {
      VerifyFilePath(file_path);
    }
    // pretending doing some work here
    std::this_thread::sleep_for(
        std::chrono::microseconds(rand_.Uniform(5000)));
  }

829 830 831 832 833 834 835 836 837 838 839 840
  virtual void OnTableFileCreated(
      const TableFileCreationInfo& info) override {
    assert(info.db_name == db_name_);
    assert(IsValidColumnFamilyName(info.cf_name));
    VerifyFilePath(info.file_path);
    assert(info.file_size > 0);
    assert(info.job_id > 0);
    assert(info.table_properties.data_size > 0);
    assert(info.table_properties.raw_key_size > 0);
    assert(info.table_properties.num_entries > 0);
  }

841 842 843 844 845
 protected:
  bool IsValidColumnFamilyName(const std::string& cf_name) const {
    if (cf_name == kDefaultColumnFamilyName) {
      return true;
    }
Y
Yueh-Hsuan Chiang 已提交
846 847 848 849
    // The column family names in the stress tests are numbers.
    for (size_t i = 0; i < cf_name.size(); ++i) {
      if (cf_name[i] < '0' || cf_name[i] > '9') {
        return false;
850 851
      }
    }
Y
Yueh-Hsuan Chiang 已提交
852
    return true;
853 854 855
  }

  void VerifyFileDir(const std::string& file_dir) {
856
#ifndef NDEBUG
857 858 859 860 861 862 863 864 865
    if (db_name_ == file_dir) {
      return;
    }
    for (const auto& db_path : db_paths_) {
      if (db_path.path == file_dir) {
        return;
      }
    }
    assert(false);
866
#endif  // !NDEBUG
867 868 869
  }

  void VerifyFileName(const std::string& file_name) {
870
#ifndef NDEBUG
871 872 873 874 875
    uint64_t file_number;
    FileType file_type;
    bool result = ParseFileName(file_name, &file_number, &file_type);
    assert(result);
    assert(file_type == kTableFile);
876
#endif  // !NDEBUG
877 878 879
  }

  void VerifyFilePath(const std::string& file_path) {
880
#ifndef NDEBUG
881 882 883 884 885 886 887 888 889
    size_t pos = file_path.find_last_of("/");
    if (pos == std::string::npos) {
      VerifyFileName(file_path);
    } else {
      if (pos > 0) {
        VerifyFileDir(file_path.substr(0, pos));
      }
      VerifyFileName(file_path.substr(pos));
    }
890
#endif  // !NDEBUG
891 892 893 894 895 896 897 898 899
  }
#endif  // !ROCKSDB_LITE

 private:
  std::string db_name_;
  std::vector<DbPath> db_paths_;
  Random rand_;
};

900 901 902 903 904 905
}  // namespace

class StressTest {
 public:
  StressTest()
      : cache_(NewLRUCache(FLAGS_cache_size)),
I
Igor Canadi 已提交
906 907 908
        compressed_cache_(FLAGS_compressed_cache_size >= 0
                              ? NewLRUCache(FLAGS_compressed_cache_size)
                              : nullptr),
909
        filter_policy_(FLAGS_bloom_bits >= 0
910 911 912 913
                   ? FLAGS_use_block_based_filter
                     ? NewBloomFilterPolicy(FLAGS_bloom_bits, true)
                     : NewBloomFilterPolicy(FLAGS_bloom_bits, false)
                   : nullptr),
914
        db_(nullptr),
915
        new_column_family_name_(1),
916
        num_times_reopened_(0) {
917 918 919 920 921
    if (FLAGS_destroy_db_initially) {
      std::vector<std::string> files;
      FLAGS_env->GetChildren(FLAGS_db, &files);
      for (unsigned int i = 0; i < files.size(); i++) {
        if (Slice(files[i]).starts_with("heap-")) {
922
          FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]);
923
        }
924
      }
925
      DestroyDB(FLAGS_db, Options());
926 927 928 929
    }
  }

  ~StressTest() {
I
Igor Canadi 已提交
930 931 932 933
    for (auto cf : column_families_) {
      delete cf;
    }
    column_families_.clear();
934 935 936
    delete db_;
  }

L
Lei Jin 已提交
937 938 939 940
  bool BuildOptionsTable() {
    if (FLAGS_set_options_one_in <= 0) {
      return true;
    }
D
Dmitri Smirnov 已提交
941

S
sdong 已提交
942 943 944
    std::unordered_map<std::string, std::vector<std::string> > options_tbl = {
        {"write_buffer_size",
         {ToString(FLAGS_write_buffer_size),
945
          ToString(FLAGS_write_buffer_size * 2),
S
sdong 已提交
946 947 948
          ToString(FLAGS_write_buffer_size * 4)}},
        {"max_write_buffer_number",
         {ToString(FLAGS_max_write_buffer_number),
949
          ToString(FLAGS_max_write_buffer_number * 2),
S
sdong 已提交
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
          ToString(FLAGS_max_write_buffer_number * 4)}},
        {"arena_block_size",
         {
             ToString(Options().arena_block_size),
             ToString(FLAGS_write_buffer_size / 4),
             ToString(FLAGS_write_buffer_size / 8),
         }},
        {"memtable_prefix_bloom_bits", {"0", "8", "10"}},
        {"memtable_prefix_bloom_probes", {"4", "5", "6"}},
        {"memtable_prefix_bloom_huge_page_tlb_size",
         {"0", ToString(2 * 1024 * 1024)}},
        {"max_successive_merges", {"0", "2", "4"}},
        {"filter_deletes", {"0", "1"}},
        {"inplace_update_num_locks", {"100", "200", "300"}},
        // TODO(ljin): enable test for this option
        // {"disable_auto_compactions", {"100", "200", "300"}},
        {"soft_rate_limit", {"0", "0.5", "0.9"}},
        {"hard_rate_limit", {"0", "1.1", "2.0"}},
        {"level0_file_num_compaction_trigger",
         {
             ToString(FLAGS_level0_file_num_compaction_trigger),
             ToString(FLAGS_level0_file_num_compaction_trigger + 2),
             ToString(FLAGS_level0_file_num_compaction_trigger + 4),
         }},
        {"level0_slowdown_writes_trigger",
         {
             ToString(FLAGS_level0_slowdown_writes_trigger),
             ToString(FLAGS_level0_slowdown_writes_trigger + 2),
             ToString(FLAGS_level0_slowdown_writes_trigger + 4),
         }},
        {"level0_stop_writes_trigger",
         {
             ToString(FLAGS_level0_stop_writes_trigger),
             ToString(FLAGS_level0_stop_writes_trigger + 2),
             ToString(FLAGS_level0_stop_writes_trigger + 4),
         }},
        {"max_grandparent_overlap_factor",
         {
             ToString(Options().max_grandparent_overlap_factor - 5),
             ToString(Options().max_grandparent_overlap_factor),
             ToString(Options().max_grandparent_overlap_factor + 5),
         }},
        {"expanded_compaction_factor",
         {
             ToString(Options().expanded_compaction_factor - 5),
             ToString(Options().expanded_compaction_factor),
             ToString(Options().expanded_compaction_factor + 5),
         }},
        {"source_compaction_factor",
         {
             ToString(Options().source_compaction_factor),
             ToString(Options().source_compaction_factor * 2),
             ToString(Options().source_compaction_factor * 4),
         }},
        {"target_file_size_base",
         {
             ToString(FLAGS_target_file_size_base),
             ToString(FLAGS_target_file_size_base * 2),
             ToString(FLAGS_target_file_size_base * 4),
         }},
        {"target_file_size_multiplier",
         {
             ToString(FLAGS_target_file_size_multiplier), "1", "2",
         }},
        {"max_bytes_for_level_base",
         {
             ToString(FLAGS_max_bytes_for_level_base / 2),
             ToString(FLAGS_max_bytes_for_level_base),
             ToString(FLAGS_max_bytes_for_level_base * 2),
         }},
        {"max_bytes_for_level_multiplier",
         {
             ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2",
         }},
        {"max_sequential_skip_in_iterations", {"4", "8", "12"}},
L
Lei Jin 已提交
1025
    };
D
Dmitri Smirnov 已提交
1026 1027 1028

    options_table_ = std::move(options_tbl);

L
Lei Jin 已提交
1029 1030 1031 1032 1033 1034
    for (const auto& iter : options_table_) {
      options_index_.push_back(iter.first);
    }
    return true;
  }

I
Igor Canadi 已提交
1035
  bool Run() {
1036
    PrintEnv();
L
Lei Jin 已提交
1037
    BuildOptionsTable();
1038 1039 1040 1041 1042 1043 1044 1045 1046
    Open();
    SharedState shared(this);
    uint32_t n = shared.GetNumThreads();

    std::vector<ThreadState*> threads(n);
    for (uint32_t i = 0; i < n; i++) {
      threads[i] = new ThreadState(i, &shared);
      FLAGS_env->StartThread(ThreadBody, threads[i]);
    }
1047 1048 1049 1050 1051
    ThreadState bg_thread(0, &shared);
    if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
      FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
    }

1052
    // Each thread goes through the following states:
A
amayank 已提交
1053 1054
    // initializing -> wait for others to init -> read/populate/depopulate
    // wait for others to operate -> verify -> done
1055 1056 1057 1058 1059 1060 1061

    {
      MutexLock l(shared.GetMutex());
      while (!shared.AllInitialized()) {
        shared.GetCondVar()->Wait();
      }

1062 1063 1064
      double now = FLAGS_env->NowMicros();
      fprintf(stdout, "%s Starting database operations\n",
              FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
A
amayank 已提交
1065

1066 1067
      shared.SetStart();
      shared.GetCondVar()->SignalAll();
A
amayank 已提交
1068
      while (!shared.AllOperated()) {
1069 1070 1071
        shared.GetCondVar()->Wait();
      }

1072
      now = FLAGS_env->NowMicros();
1073 1074 1075 1076 1077 1078 1079
      if (FLAGS_test_batches_snapshots) {
        fprintf(stdout, "%s Limited verification already done during gets\n",
                FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
      } else {
        fprintf(stdout, "%s Starting verification\n",
                FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
      }
1080

1081 1082 1083 1084 1085 1086 1087
      shared.SetStartVerify();
      shared.GetCondVar()->SignalAll();
      while (!shared.AllDone()) {
        shared.GetCondVar()->Wait();
      }
    }

1088
    for (unsigned int i = 1; i < n; i++) {
1089 1090 1091 1092
      threads[0]->stats.Merge(threads[i]->stats);
    }
    threads[0]->stats.Report("Stress Test");

1093
    for (unsigned int i = 0; i < n; i++) {
1094
      delete threads[i];
1095
      threads[i] = nullptr;
1096
    }
1097
    double now = FLAGS_env->NowMicros();
1098 1099 1100 1101
    if (!FLAGS_test_batches_snapshots) {
      fprintf(stdout, "%s Verification successful\n",
              FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
    }
1102
    PrintStatistics();
I
Igor Canadi 已提交
1103

1104 1105 1106 1107 1108 1109 1110 1111
    if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
      MutexLock l(shared.GetMutex());
      shared.SetShouldStopBgThread();
      while (!shared.BgThreadFinished()) {
        shared.GetCondVar()->Wait();
      }
    }

I
Igor Canadi 已提交
1112 1113 1114 1115 1116
    if (shared.HasVerificationFailedYet()) {
      printf("Verification failed :(\n");
      return false;
    }
    return true;
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
  }

 private:

  static void ThreadBody(void* v) {
    ThreadState* thread = reinterpret_cast<ThreadState*>(v);
    SharedState* shared = thread->shared;

    {
      MutexLock l(shared->GetMutex());
      shared->IncInitialized();
      if (shared->AllInitialized()) {
        shared->GetCondVar()->SignalAll();
      }
      while (!shared->Started()) {
        shared->GetCondVar()->Wait();
      }
    }
A
amayank 已提交
1135
    thread->shared->GetStressTest()->OperateDb(thread);
1136 1137 1138

    {
      MutexLock l(shared->GetMutex());
A
amayank 已提交
1139 1140
      shared->IncOperated();
      if (shared->AllOperated()) {
1141 1142 1143 1144 1145 1146 1147
        shared->GetCondVar()->SignalAll();
      }
      while (!shared->VerifyStarted()) {
        shared->GetCondVar()->Wait();
      }
    }

1148
    if (!FLAGS_test_batches_snapshots) {
M
Mayank Agarwal 已提交
1149
      thread->shared->GetStressTest()->VerifyDb(thread);
1150
    }
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161

    {
      MutexLock l(shared->GetMutex());
      shared->IncDone();
      if (shared->AllDone()) {
        shared->GetCondVar()->SignalAll();
      }
    }

  }

1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
  static void PoolSizeChangeThread(void* v) {
    assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
    ThreadState* thread = reinterpret_cast<ThreadState*>(v);
    SharedState* shared = thread->shared;

    while (true) {
      {
        MutexLock l(shared->GetMutex());
        if (shared->ShoudStopBgThread()) {
          shared->SetBgThreadFinish();
          shared->GetCondVar()->SignalAll();
          return;
        }
      }

      auto thread_pool_size_base = FLAGS_max_background_compactions;
I
Igor Canadi 已提交
1178
      auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193
      int new_thread_pool_size =
          thread_pool_size_base - thread_pool_size_var +
          thread->rand.Next() % (thread_pool_size_var * 2 + 1);
      if (new_thread_pool_size < 1) {
        new_thread_pool_size = 1;
      }
      FLAGS_env->SetBackgroundThreads(new_thread_pool_size);
      // Sleep up to 3 seconds
      FLAGS_env->SleepForMicroseconds(
          thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
              1000 +
          1);
    }
  }

1194 1195 1196
  // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
  // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
  // Also refer MultiGet.
I
Igor Canadi 已提交
1197 1198 1199
  Status MultiPut(ThreadState* thread, const WriteOptions& writeoptions,
                  ColumnFamilyHandle* column_family, const Slice& key,
                  const Slice& value, size_t sz) {
1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
    std::string keys[10] = {"9", "8", "7", "6", "5",
                            "4", "3", "2", "1", "0"};
    std::string values[10] = {"9", "8", "7", "6", "5",
                              "4", "3", "2", "1", "0"};
    Slice value_slices[10];
    WriteBatch batch;
    Status s;
    for (int i = 0; i < 10; i++) {
      keys[i] += key.ToString();
      values[i] += value.ToString();
      value_slices[i] = values[i];
1211
      if (FLAGS_use_merge) {
I
Igor Canadi 已提交
1212
        batch.Merge(column_family, keys[i], value_slices[i]);
D
Deon Nicholas 已提交
1213
      } else {
I
Igor Canadi 已提交
1214
        batch.Put(column_family, keys[i], value_slices[i]);
D
Deon Nicholas 已提交
1215
      }
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231
    }

    s = db_->Write(writeoptions, &batch);
    if (!s.ok()) {
      fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    } else {
      // we did 10 writes each of size sz + 1
      thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
    }

    return s;
  }

  // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
  // in DB atomically i.e in a single batch. Also refer MultiGet.
I
Igor Canadi 已提交
1232 1233
  Status MultiDelete(ThreadState* thread, const WriteOptions& writeoptions,
                     ColumnFamilyHandle* column_family, const Slice& key) {
1234 1235 1236 1237 1238 1239 1240
    std::string keys[10] = {"9", "7", "5", "3", "1",
                            "8", "6", "4", "2", "0"};

    WriteBatch batch;
    Status s;
    for (int i = 0; i < 10; i++) {
      keys[i] += key.ToString();
I
Igor Canadi 已提交
1241
      batch.Delete(column_family, keys[i]);
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
    }

    s = db_->Write(writeoptions, &batch);
    if (!s.ok()) {
      fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
      thread->stats.AddErrors(1);
    } else {
      thread->stats.AddDeletes(10);
    }

    return s;
  }

  // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
  // in the same snapshot, and verifies that all the values are of the form
  // "0"+V, "1"+V,..."9"+V.
  // ASSUMES that MultiPut was used to put (K, V) into the DB.
I
Igor Canadi 已提交
1259 1260 1261
  Status MultiGet(ThreadState* thread, const ReadOptions& readoptions,
                  ColumnFamilyHandle* column_family, const Slice& key,
                  std::string* value) {
1262 1263 1264 1265 1266 1267 1268 1269 1270
    std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
    Slice key_slices[10];
    std::string values[10];
    ReadOptions readoptionscopy = readoptions;
    readoptionscopy.snapshot = db_->GetSnapshot();
    Status s;
    for (int i = 0; i < 10; i++) {
      keys[i] += key.ToString();
      key_slices[i] = keys[i];
I
Igor Canadi 已提交
1271
      s = db_->Get(readoptionscopy, column_family, key_slices[i], value);
1272 1273 1274 1275 1276 1277 1278 1279
      if (!s.ok() && !s.IsNotFound()) {
        fprintf(stderr, "get error: %s\n", s.ToString().c_str());
        values[i] = "";
        thread->stats.AddErrors(1);
        // we continue after error rather than exiting so that we can
        // find more errors if any
      } else if (s.IsNotFound()) {
        values[i] = "";
1280
        thread->stats.AddGets(1, 0);
1281 1282 1283 1284 1285 1286
      } else {
        values[i] = *value;

        char expected_prefix = (keys[i])[0];
        char actual_prefix = (values[i])[0];
        if (actual_prefix != expected_prefix) {
1287
          fprintf(stderr, "error expected prefix = %c actual = %c\n",
1288 1289 1290
                  expected_prefix, actual_prefix);
        }
        (values[i])[0] = ' '; // blank out the differing character
1291
        thread->stats.AddGets(1, 1);
1292 1293 1294 1295 1296 1297 1298
      }
    }
    db_->ReleaseSnapshot(readoptionscopy.snapshot);

    // Now that we retrieved all values, check that they all match
    for (int i = 1; i < 10; i++) {
      if (values[i] != values[0]) {
1299
        fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
1300 1301
                key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
                StringToHex(values[i]).c_str());
1302 1303 1304 1305 1306 1307 1308 1309
      // we continue after error rather than exiting so that we can
      // find more errors if any
      }
    }

    return s;
  }

L
Lei Jin 已提交
1310 1311 1312 1313 1314
  // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
  // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
  // of the key. Each of these 10 scans returns a series of values;
  // each series should be the same length, and it is verified for each
  // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
1315
  // ASSUMES that MultiPut was used to put (K, V)
I
Igor Canadi 已提交
1316 1317
  Status MultiPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
                         ColumnFamilyHandle* column_family,
L
Lei Jin 已提交
1318
                         const Slice& key) {
1319 1320 1321 1322 1323 1324 1325 1326
    std::string prefixes[10] = {"0", "1", "2", "3", "4",
                                "5", "6", "7", "8", "9"};
    Slice prefix_slices[10];
    ReadOptions readoptionscopy[10];
    const Snapshot* snapshot = db_->GetSnapshot();
    Iterator* iters[10];
    Status s = Status::OK();
    for (int i = 0; i < 10; i++) {
L
Lei Jin 已提交
1327 1328 1329
      prefixes[i] += key.ToString();
      prefixes[i].resize(FLAGS_prefix_size);
      prefix_slices[i] = Slice(prefixes[i]);
1330 1331
      readoptionscopy[i] = readoptions;
      readoptionscopy[i].snapshot = snapshot;
I
Igor Canadi 已提交
1332
      iters[i] = db_->NewIterator(readoptionscopy[i], column_family);
I
Igor Canadi 已提交
1333
      iters[i]->Seek(prefix_slices[i]);
1334 1335 1336
    }

    int count = 0;
1337
    while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
1338 1339 1340 1341 1342
      count++;
      std::string values[10];
      // get list of all values for this iteration
      for (int i = 0; i < 10; i++) {
        // no iterator should finish before the first one
1343 1344
        assert(iters[i]->Valid() &&
               iters[i]->key().starts_with(prefix_slices[i]));
1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358
        values[i] = iters[i]->value().ToString();

        char expected_first = (prefixes[i])[0];
        char actual_first = (values[i])[0];

        if (actual_first != expected_first) {
          fprintf(stderr, "error expected first = %c actual = %c\n",
                  expected_first, actual_first);
        }
        (values[i])[0] = ' '; // blank out the differing character
      }
      // make sure all values are equivalent
      for (int i = 0; i < 10; i++) {
        if (values[i] != values[0]) {
1359 1360 1361
          fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n",
                  i, prefixes[i].c_str(), StringToHex(values[0]).c_str(),
                  StringToHex(values[i]).c_str());
1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
          // we continue after error rather than exiting so that we can
          // find more errors if any
        }
        iters[i]->Next();
      }
    }

    // cleanup iterators and snapshot
    for (int i = 0; i < 10; i++) {
      // if the first iterator finished, they should have all finished
1372 1373
      assert(!iters[i]->Valid() ||
             !iters[i]->key().starts_with(prefix_slices[i]));
1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
      assert(iters[i]->status().ok());
      delete iters[i];
    }
    db_->ReleaseSnapshot(snapshot);

    if (s.ok()) {
      thread->stats.AddPrefixes(1, count);
    } else {
      thread->stats.AddErrors(1);
    }

    return s;
  }

1388 1389
  // Given a key K, this creates an iterator which scans to K and then
  // does a random sequence of Next/Prev operations.
I
Igor Canadi 已提交
1390 1391
  Status MultiIterate(ThreadState* thread, const ReadOptions& readoptions,
                      ColumnFamilyHandle* column_family, const Slice& key) {
1392 1393 1394 1395
    Status s;
    const Snapshot* snapshot = db_->GetSnapshot();
    ReadOptions readoptionscopy = readoptions;
    readoptionscopy.snapshot = snapshot;
I
Igor Canadi 已提交
1396
    unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy, column_family));
1397 1398

    iter->Seek(key);
1399
    for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417
      if (thread->rand.OneIn(2)) {
        iter->Next();
      } else {
        iter->Prev();
      }
    }

    if (s.ok()) {
      thread->stats.AddIterations(1);
    } else {
      thread->stats.AddErrors(1);
    }

    db_->ReleaseSnapshot(snapshot);

    return s;
  }

1418
  Status SetOptions(ThreadState* thread) {
L
Lei Jin 已提交
1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
    assert(FLAGS_set_options_one_in > 0);
    std::unordered_map<std::string, std::string> opts;
    std::string name = options_index_[
      thread->rand.Next() % options_index_.size()];
    int value_idx = thread->rand.Next() % options_table_[name].size();
    if (name == "soft_rate_limit" || name == "hard_rate_limit") {
      opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx];
      opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx];
    } else if (name == "level0_file_num_compaction_trigger" ||
               name == "level0_slowdown_writes_trigger" ||
               name == "level0_stop_writes_trigger") {
      opts["level0_file_num_compaction_trigger"] =
        options_table_["level0_file_num_compaction_trigger"][value_idx];
      opts["level0_slowdown_writes_trigger"] =
        options_table_["level0_slowdown_writes_trigger"][value_idx];
      opts["level0_stop_writes_trigger"] =
        options_table_["level0_stop_writes_trigger"][value_idx];
    } else {
      opts[name] = options_table_[name][value_idx];
    }

    int rand_cf_idx = thread->rand.Next() % FLAGS_column_families;
    auto cfh = column_families_[rand_cf_idx];
    return db_->SetOptions(cfh, opts);
  }

A
amayank 已提交
1445
  void OperateDb(ThreadState* thread) {
1446 1447
    ReadOptions read_opts(FLAGS_verify_checksum, true);
    WriteOptions write_opts;
A
amayank 已提交
1448
    char value[100];
1449 1450 1451 1452 1453 1454
    long max_key = thread->shared->GetMaxKey();
    std::string from_db;
    if (FLAGS_sync) {
      write_opts.sync = true;
    }
    write_opts.disableWAL = FLAGS_disable_wal;
1455 1456 1457
    const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent;
    const int writeBound = prefixBound + (int)FLAGS_writepercent;
    const int delBound = writeBound + (int)FLAGS_delpercent;
1458 1459

    thread->stats.Start();
1460
    for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
I
Igor Canadi 已提交
1461 1462 1463 1464
      if (thread->shared->HasVerificationFailedYet()) {
        break;
      }
      if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
1465
        {
1466
          thread->stats.FinishedSingleOp();
1467 1468 1469 1470 1471 1472 1473 1474 1475
          MutexLock l(thread->shared->GetMutex());
          thread->shared->IncVotedReopen();
          if (thread->shared->AllVotedReopen()) {
            thread->shared->GetStressTest()->Reopen();
            thread->shared->GetCondVar()->SignalAll();
          }
          else {
            thread->shared->GetCondVar()->Wait();
          }
1476 1477
          // Commenting this out as we don't want to reset stats on each open.
          // thread->stats.Start();
1478 1479
        }
      }
1480

L
Lei Jin 已提交
1481 1482 1483 1484 1485 1486
      // Change Options
      if (FLAGS_set_options_one_in > 0 &&
          thread->rand.OneIn(FLAGS_set_options_one_in)) {
        SetOptions(thread);
      }

1487 1488 1489 1490 1491
      if (FLAGS_set_in_place_one_in > 0 &&
          thread->rand.OneIn(FLAGS_set_in_place_one_in)) {
        options_.inplace_update_support ^= options_.inplace_update_support;
      }

I
Igor Canadi 已提交
1492
      if (!FLAGS_test_batches_snapshots &&
1493
          FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) {
I
Igor Canadi 已提交
1494 1495 1496 1497
        if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) {
          // drop column family and then create it again (can't drop default)
          int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
          std::string new_name =
1498
              ToString(new_column_family_name_.fetch_add(1));
I
Igor Canadi 已提交
1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
          {
            MutexLock l(thread->shared->GetMutex());
            fprintf(
                stdout,
                "[CF %d] Dropping and recreating column family. new name: %s\n",
                cf, new_name.c_str());
          }
          thread->shared->LockColumnFamily(cf);
          Status s __attribute__((unused));
          s = db_->DropColumnFamily(column_families_[cf]);
          delete column_families_[cf];
1510 1511 1512 1513 1514
          if (!s.ok()) {
            fprintf(stderr, "dropping column family error: %s\n",
                s.ToString().c_str());
            std::terminate();
          }
I
Igor Canadi 已提交
1515 1516 1517 1518
          s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
                                      &column_families_[cf]);
          column_family_names_[cf] = new_name;
          thread->shared->ClearColumnFamily(cf);
1519 1520 1521 1522 1523
          if (!s.ok()) {
            fprintf(stderr, "creating column family error: %s\n",
                s.ToString().c_str());
            std::terminate();
          }
I
Igor Canadi 已提交
1524 1525 1526 1527
          thread->shared->UnlockColumnFamily(cf);
        }
      }

1528
      long rand_key = thread->rand.Next() % max_key;
I
Igor Canadi 已提交
1529
      int rand_column_family = thread->rand.Next() % FLAGS_column_families;
1530 1531 1532
      std::string keystr = Key(rand_key);
      Slice key = keystr;
      int prob_op = thread->rand.Uniform(100);
I
Igor Canadi 已提交
1533 1534 1535 1536 1537 1538
      std::unique_ptr<MutexLock> l;
      if (!FLAGS_test_batches_snapshots) {
        l.reset(new MutexLock(
            thread->shared->GetMutexForKey(rand_column_family, rand_key)));
      }
      auto column_family = column_families_[rand_column_family];
1539 1540

      if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
1541
        // OPERATION read
1542
        if (!FLAGS_test_batches_snapshots) {
I
Igor Canadi 已提交
1543
          Status s = db_->Get(read_opts, column_family, key, &from_db);
1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554
          if (s.ok()) {
            // found case
            thread->stats.AddGets(1, 1);
          } else if (s.IsNotFound()) {
            // not found case
            thread->stats.AddGets(1, 0);
          } else {
            // errors case
            thread->stats.AddErrors(1);
          }
        } else {
I
Igor Canadi 已提交
1555
          MultiGet(thread, read_opts, column_family, key, &from_db);
1556
        }
1557 1558
      } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) {
        // OPERATION prefix scan
L
Lei Jin 已提交
1559 1560
        // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
        // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
I
Igor Canadi 已提交
1561 1562
        // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same
        // prefix
1563
        if (!FLAGS_test_batches_snapshots) {
L
Lei Jin 已提交
1564
          Slice prefix = Slice(key.data(), FLAGS_prefix_size);
I
Igor Canadi 已提交
1565
          Iterator* iter = db_->NewIterator(read_opts, column_family);
L
Lei Jin 已提交
1566
          int64_t count = 0;
I
Igor Canadi 已提交
1567 1568
          for (iter->Seek(prefix);
               iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
L
Lei Jin 已提交
1569
            ++count;
1570
          }
I
Igor Canadi 已提交
1571 1572
          assert(count <=
                 (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
1573
          if (iter->status().ok()) {
1574
            thread->stats.AddPrefixes(1, static_cast<int>(count));
1575 1576 1577 1578
          } else {
            thread->stats.AddErrors(1);
          }
          delete iter;
1579
        } else {
1580
          MultiPrefixScan(thread, read_opts, column_family, key);
A
amayank 已提交
1581
        }
1582 1583
      } else if (prefixBound <= prob_op && prob_op < writeBound) {
        // OPERATION write
1584 1585 1586
        uint32_t value_base = thread->rand.Next();
        size_t sz = GenerateValue(value_base, value, sizeof(value));
        Slice v(value, sz);
1587
        if (!FLAGS_test_batches_snapshots) {
1588
          if (FLAGS_verify_before_write) {
I
Igor Canadi 已提交
1589 1590
            std::string keystr2 = Key(rand_key);
            Slice k = keystr2;
I
Igor Canadi 已提交
1591
            Status s = db_->Get(read_opts, column_family, k, &from_db);
I
Igor Canadi 已提交
1592 1593 1594 1595
            if (VerifyValue(rand_column_family, rand_key, read_opts,
                            thread->shared, from_db, s, true) == false) {
              break;
            }
1596
          }
I
Igor Canadi 已提交
1597
          thread->shared->Put(rand_column_family, rand_key, value_base);
1598
          Status s;
1599
          if (FLAGS_use_merge) {
1600
            s = db_->Merge(write_opts, column_family, key, v);
D
Deon Nicholas 已提交
1601
          } else {
1602 1603 1604 1605 1606
            s = db_->Put(write_opts, column_family, key, v);
          }
          if (!s.ok()) {
            fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
            std::terminate();
D
Deon Nicholas 已提交
1607
          }
1608 1609
          thread->stats.AddBytesForWrites(1, sz);
        } else {
I
Igor Canadi 已提交
1610
          MultiPut(thread, write_opts, column_family, key, v, sz);
1611
        }
1612 1613
        PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key),
                      value, sz);
1614 1615
      } else if (writeBound <= prob_op && prob_op < delBound) {
        // OPERATION delete
1616
        if (!FLAGS_test_batches_snapshots) {
I
Igor Canadi 已提交
1617
          thread->shared->Delete(rand_column_family, rand_key);
1618
          Status s = db_->Delete(write_opts, column_family, key);
1619
          thread->stats.AddDeletes(1);
1620 1621 1622 1623
          if (!s.ok()) {
            fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
            std::terminate();
          }
1624
        } else {
I
Igor Canadi 已提交
1625
          MultiDelete(thread, write_opts, column_family, key);
1626
        }
1627 1628
      } else {
        // OPERATION iterate
I
Igor Canadi 已提交
1629
        MultiIterate(thread, read_opts, column_family, key);
1630
      }
1631 1632
      thread->stats.FinishedSingleOp();
    }
1633

1634 1635 1636
    thread->stats.Stop();
  }

M
Mayank Agarwal 已提交
1637
  void VerifyDb(ThreadState* thread) const {
1638
    ReadOptions options(FLAGS_verify_checksum, true);
I
Igor Canadi 已提交
1639 1640 1641
    auto shared = thread->shared;
    static const long max_key = shared->GetMaxKey();
    static const long keys_per_thread = max_key / shared->GetNumThreads();
M
Mayank Agarwal 已提交
1642
    long start = keys_per_thread * thread->tid;
1643
    long end = start + keys_per_thread;
I
Igor Canadi 已提交
1644
    if (thread->tid == shared->GetNumThreads() - 1) {
1645 1646
      end = max_key;
    }
I
Igor Canadi 已提交
1647
    for (size_t cf = 0; cf < column_families_.size(); ++cf) {
I
Igor Canadi 已提交
1648 1649 1650
      if (thread->shared->HasVerificationFailedYet()) {
        break;
      }
I
Igor Canadi 已提交
1651 1652 1653 1654 1655 1656
      if (!thread->rand.OneIn(2)) {
        // Use iterator to verify this range
        unique_ptr<Iterator> iter(
            db_->NewIterator(options, column_families_[cf]));
        iter->Seek(Key(start));
        for (long i = start; i < end; i++) {
I
Igor Canadi 已提交
1657 1658 1659
          if (thread->shared->HasVerificationFailedYet()) {
            break;
          }
1660 1661 1662 1663 1664 1665
          // TODO(ljin): update "long" to uint64_t
          // Reseek when the prefix changes
          if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) ==
              0) {
            iter->Seek(Key(i));
          }
I
Igor Canadi 已提交
1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676
          std::string from_db;
          std::string keystr = Key(i);
          Slice k = keystr;
          Status s = iter->status();
          if (iter->Valid()) {
            if (iter->key().compare(k) > 0) {
              s = Status::NotFound(Slice());
            } else if (iter->key().compare(k) == 0) {
              from_db = iter->value().ToString();
              iter->Next();
            } else if (iter->key().compare(k) < 0) {
1677 1678
              VerificationAbort(shared, "An out of range key was found",
                                static_cast<int>(cf), i);
I
Igor Canadi 已提交
1679 1680 1681 1682
            }
          } else {
            // The iterator found no value for the key in question, so do not
            // move to the next item in the iterator
M
Mayank Agarwal 已提交
1683 1684
            s = Status::NotFound(Slice());
          }
1685 1686
          VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
                      true);
I
Igor Canadi 已提交
1687
          if (from_db.length()) {
1688 1689
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
                          from_db.data(), from_db.length());
I
Igor Canadi 已提交
1690
          }
M
Mayank Agarwal 已提交
1691
        }
I
Igor Canadi 已提交
1692 1693 1694
      } else {
        // Use Get to verify this range
        for (long i = start; i < end; i++) {
I
Igor Canadi 已提交
1695 1696 1697
          if (thread->shared->HasVerificationFailedYet()) {
            break;
          }
I
Igor Canadi 已提交
1698 1699 1700 1701
          std::string from_db;
          std::string keystr = Key(i);
          Slice k = keystr;
          Status s = db_->Get(options, column_families_[cf], k, &from_db);
1702 1703
          VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
                      true);
I
Igor Canadi 已提交
1704
          if (from_db.length()) {
1705 1706
            PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
                          from_db.data(), from_db.length());
I
Igor Canadi 已提交
1707
          }
M
Mayank Agarwal 已提交
1708
        }
1709 1710 1711 1712
      }
    }
  }

I
Igor Canadi 已提交
1713 1714 1715 1716 1717
  void VerificationAbort(SharedState* shared, std::string msg, int cf,
                         long key) const {
    printf("Verification failed for column family %d key %ld: %s\n", cf, key,
           msg.c_str());
    shared->SetVerificationFailure();
1718 1719
  }

I
Igor Canadi 已提交
1720 1721
  bool VerifyValue(int cf, long key, const ReadOptions& opts,
                   SharedState* shared, const std::string& value_from_db,
I
Igor Canadi 已提交
1722
                   Status s, bool strict = false) const {
I
Igor Canadi 已提交
1723 1724 1725
    if (shared->HasVerificationFailedYet()) {
      return false;
    }
1726
    // compare value_from_db with the value in the shared state
A
amayank 已提交
1727
    char value[100];
I
Igor Canadi 已提交
1728
    uint32_t value_base = shared->Get(cf, key);
1729
    if (value_base == SharedState::SENTINEL && !strict) {
I
Igor Canadi 已提交
1730
      return true;
1731 1732
    }

1733
    if (s.ok()) {
1734
      if (value_base == SharedState::SENTINEL) {
I
Igor Canadi 已提交
1735 1736
        VerificationAbort(shared, "Unexpected value found", cf, key);
        return false;
1737
      }
A
amayank 已提交
1738
      size_t sz = GenerateValue(value_base, value, sizeof(value));
1739
      if (value_from_db.length() != sz) {
I
Igor Canadi 已提交
1740 1741
        VerificationAbort(shared, "Length of value read is not equal", cf, key);
        return false;
1742
      }
1743
      if (memcmp(value_from_db.data(), value, sz) != 0) {
I
Igor Canadi 已提交
1744 1745 1746
        VerificationAbort(shared, "Contents of value read don't match", cf,
                          key);
        return false;
1747 1748 1749
      }
    } else {
      if (value_base != SharedState::SENTINEL) {
I
Igor Canadi 已提交
1750 1751
        VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
        return false;
1752 1753
      }
    }
I
Igor Canadi 已提交
1754
    return true;
1755 1756
  }

I
Igor Canadi 已提交
1757 1758 1759 1760 1761 1762 1763
  static void PrintKeyValue(int cf, uint32_t key, const char* value,
                            size_t sz) {
    if (!FLAGS_verbose) {
      return;
    }
    fprintf(stdout, "[CF %d] %u ==> (%u) ", cf, key, (unsigned int)sz);
    for (size_t i = 0; i < sz; i++) {
1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775
      fprintf(stdout, "%X", value[i]);
    }
    fprintf(stdout, "\n");
  }

  static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) {
    size_t value_sz = ((rand % 3) + 1) * FLAGS_value_size_mult;
    assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
    *((uint32_t*)v) = rand;
    for (size_t i=sizeof(uint32_t); i < value_sz; i++) {
      v[i] = (char)(rand ^ i);
    }
1776
    v[value_sz] = '\0';
1777 1778 1779 1780
    return value_sz; // the size of the value set.
  }

  void PrintEnv() const {
I
Igor Canadi 已提交
1781 1782 1783 1784 1785 1786 1787
    fprintf(stdout, "RocksDB version     : %d.%d\n", kMajorVersion,
            kMinorVersion);
    fprintf(stdout, "Column families     : %d\n", FLAGS_column_families);
    if (!FLAGS_test_batches_snapshots) {
      fprintf(stdout, "Clear CFs one in    : %d\n",
              FLAGS_clear_column_family_one_in);
    }
1788
    fprintf(stdout, "Number of threads   : %d\n", FLAGS_threads);
K
kailiu 已提交
1789 1790 1791
    fprintf(stdout,
            "Ops per thread      : %lu\n",
            (unsigned long)FLAGS_ops_per_thread);
1792 1793 1794 1795 1796
    std::string ttl_state("unused");
    if (FLAGS_ttl > 0) {
      ttl_state = NumberToString(FLAGS_ttl);
    }
    fprintf(stdout, "Time to live(sec)   : %s\n", ttl_state.c_str());
K
Kai Liu 已提交
1797 1798 1799 1800 1801
    fprintf(stdout, "Read percentage     : %d%%\n", FLAGS_readpercent);
    fprintf(stdout, "Prefix percentage   : %d%%\n", FLAGS_prefixpercent);
    fprintf(stdout, "Write percentage    : %d%%\n", FLAGS_writepercent);
    fprintf(stdout, "Delete percentage   : %d%%\n", FLAGS_delpercent);
    fprintf(stdout, "Iterate percentage  : %d%%\n", FLAGS_iterpercent);
1802 1803
    fprintf(stdout, "DB-write-buffer-size: %" PRIu64 "\n",
        FLAGS_db_write_buffer_size);
1804
    fprintf(stdout, "Write-buffer-size   : %d\n", FLAGS_write_buffer_size);
K
kailiu 已提交
1805 1806 1807 1808 1809 1810
    fprintf(stdout,
            "Iterations          : %lu\n",
            (unsigned long)FLAGS_num_iterations);
    fprintf(stdout,
            "Max key             : %lu\n",
            (unsigned long)FLAGS_max_key);
X
Xing Jin 已提交
1811 1812
    fprintf(stdout, "Ratio #ops/#keys    : %f\n",
            (1.0 * FLAGS_ops_per_thread * FLAGS_threads)/FLAGS_max_key);
1813
    fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen);
1814 1815
    fprintf(stdout, "Batches/snapshots   : %d\n",
            FLAGS_test_batches_snapshots);
1816
    fprintf(stdout, "Deletes use filter  : %d\n",
1817
            FLAGS_filter_deletes);
1818 1819
    fprintf(stdout, "Do update in place  : %d\n",
            FLAGS_in_place_update);
1820
    fprintf(stdout, "Num keys per lock   : %d\n",
1821 1822
            1 << FLAGS_log2_keys_per_lock);

1823 1824
    std::string compression = CompressionTypeToString(FLAGS_compression_type_e);
    fprintf(stdout, "Compression         : %s\n", compression.c_str());
J
Jim Paton 已提交
1825 1826 1827 1828 1829 1830

    const char* memtablerep = "";
    switch (FLAGS_rep_factory) {
      case kSkipList:
        memtablerep = "skip_list";
        break;
I
Igor Canadi 已提交
1831
      case kHashSkipList:
J
Jim Paton 已提交
1832 1833 1834 1835 1836 1837 1838 1839 1840
        memtablerep = "prefix_hash";
        break;
      case kVectorRep:
        memtablerep = "vector";
        break;
    }

    fprintf(stdout, "Memtablerep         : %s\n", memtablerep);

1841 1842 1843 1844
    fprintf(stdout, "------------------------------------------------\n");
  }

  void Open() {
1845
    assert(db_ == nullptr);
1846 1847 1848 1849
    BlockBasedTableOptions block_based_options;
    block_based_options.block_cache = cache_;
    block_based_options.block_cache_compressed = compressed_cache_;
    block_based_options.block_size = FLAGS_block_size;
1850
    block_based_options.format_version = 2;
1851 1852 1853
    block_based_options.filter_policy = filter_policy_;
    options_.table_factory.reset(
        NewBlockBasedTableFactory(block_based_options));
1854
    options_.db_write_buffer_size = FLAGS_db_write_buffer_size;
I
Igor Canadi 已提交
1855 1856 1857 1858
    options_.write_buffer_size = FLAGS_write_buffer_size;
    options_.max_write_buffer_number = FLAGS_max_write_buffer_number;
    options_.min_write_buffer_number_to_merge =
        FLAGS_min_write_buffer_number_to_merge;
1859 1860
    options_.max_write_buffer_number_to_maintain =
        FLAGS_max_write_buffer_number_to_maintain;
I
Igor Canadi 已提交
1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
    options_.max_background_compactions = FLAGS_max_background_compactions;
    options_.max_background_flushes = FLAGS_max_background_flushes;
    options_.compaction_style =
        static_cast<rocksdb::CompactionStyle>(FLAGS_compaction_style);
    options_.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size));
    options_.max_open_files = FLAGS_open_files;
    options_.statistics = dbstats;
    options_.env = FLAGS_env;
    options_.disableDataSync = FLAGS_disable_data_sync;
    options_.use_fsync = FLAGS_use_fsync;
    options_.allow_mmap_reads = FLAGS_mmap_read;
1872
    rocksdb_kill_odds = FLAGS_kill_random_test;
I
Igor Canadi 已提交
1873 1874 1875 1876
    options_.target_file_size_base = FLAGS_target_file_size_base;
    options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
    options_.max_bytes_for_level_multiplier =
1877
        FLAGS_max_bytes_for_level_multiplier;
I
Igor Canadi 已提交
1878 1879 1880 1881 1882 1883 1884 1885 1886
    options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
    options_.level0_slowdown_writes_trigger =
        FLAGS_level0_slowdown_writes_trigger;
    options_.level0_file_num_compaction_trigger =
        FLAGS_level0_file_num_compaction_trigger;
    options_.compression = FLAGS_compression_type_e;
    options_.create_if_missing = true;
    options_.max_manifest_file_size = 10 * 1024;
    options_.filter_deletes = FLAGS_filter_deletes;
1887
    options_.inplace_update_support = FLAGS_in_place_update;
1888
    options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
I
Igor Canadi 已提交
1889
    if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) {
J
Jim Paton 已提交
1890 1891 1892 1893 1894 1895 1896 1897
      fprintf(stderr,
            "prefix_size should be non-zero iff memtablerep == prefix_hash\n");
      exit(1);
    }
    switch (FLAGS_rep_factory) {
      case kSkipList:
        // no need to do anything
        break;
S
sdong 已提交
1898 1899 1900 1901
#ifndef ROCKSDB_LITE
      case kHashSkipList:
        options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
        break;
J
Jim Paton 已提交
1902
      case kVectorRep:
I
Igor Canadi 已提交
1903
        options_.memtable_factory.reset(new VectorRepFactory());
J
Jim Paton 已提交
1904
        break;
S
sdong 已提交
1905 1906 1907 1908 1909 1910
#else
      default:
        fprintf(stderr,
                "RocksdbLite only supports skip list mem table. Skip "
                "--rep_factory\n");
#endif  // ROCKSDB_LITE
J
Jim Paton 已提交
1911
    }
S
sdong 已提交
1912

1913
    if (FLAGS_use_merge) {
I
Igor Canadi 已提交
1914
      options_.merge_operator = MergeOperators::CreatePutOperator();
D
Deon Nicholas 已提交
1915 1916
    }

1917 1918
    // set universal style compaction configurations, if applicable
    if (FLAGS_universal_size_ratio != 0) {
I
Igor Canadi 已提交
1919 1920
      options_.compaction_options_universal.size_ratio =
          FLAGS_universal_size_ratio;
1921 1922
    }
    if (FLAGS_universal_min_merge_width != 0) {
I
Igor Canadi 已提交
1923 1924
      options_.compaction_options_universal.min_merge_width =
          FLAGS_universal_min_merge_width;
1925 1926
    }
    if (FLAGS_universal_max_merge_width != 0) {
I
Igor Canadi 已提交
1927 1928
      options_.compaction_options_universal.max_merge_width =
          FLAGS_universal_max_merge_width;
1929 1930
    }
    if (FLAGS_universal_max_size_amplification_percent != 0) {
I
Igor Canadi 已提交
1931 1932
      options_.compaction_options_universal.max_size_amplification_percent =
          FLAGS_universal_max_size_amplification_percent;
1933 1934
    }

1935
    fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
1936

1937 1938
    Status s;
    if (FLAGS_ttl == -1) {
I
Igor Canadi 已提交
1939 1940 1941 1942 1943 1944 1945
      std::vector<std::string> existing_column_families;
      s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db,
                                 &existing_column_families);  // ignore errors
      if (!s.ok()) {
        // DB doesn't exist
        assert(existing_column_families.empty());
        assert(column_family_names_.empty());
1946
        column_family_names_.push_back(kDefaultColumnFamilyName);
I
Igor Canadi 已提交
1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973
      } else if (column_family_names_.empty()) {
        // this is the first call to the function Open()
        column_family_names_ = existing_column_families;
      } else {
        // this is a reopen. just assert that existing column_family_names are
        // equivalent to what we remember
        auto sorted_cfn = column_family_names_;
        sort(sorted_cfn.begin(), sorted_cfn.end());
        sort(existing_column_families.begin(), existing_column_families.end());
        if (sorted_cfn != existing_column_families) {
          fprintf(stderr,
                  "Expected column families differ from the existing:\n");
          printf("Expected: {");
          for (auto cf : sorted_cfn) {
            printf("%s ", cf.c_str());
          }
          printf("}\n");
          printf("Existing: {");
          for (auto cf : existing_column_families) {
            printf("%s ", cf.c_str());
          }
          printf("}\n");
        }
        assert(sorted_cfn == existing_column_families);
      }
      std::vector<ColumnFamilyDescriptor> cf_descriptors;
      for (auto name : column_family_names_) {
1974
        if (name != kDefaultColumnFamilyName) {
I
Igor Canadi 已提交
1975 1976 1977 1978 1979
          new_column_family_name_ =
              std::max(new_column_family_name_.load(), std::stoi(name) + 1);
        }
        cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
      }
I
Igor Canadi 已提交
1980
      while (cf_descriptors.size() < (size_t)FLAGS_column_families) {
1981
        std::string name = ToString(new_column_family_name_.load());
I
Igor Canadi 已提交
1982 1983 1984 1985
        new_column_family_name_++;
        cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_));
        column_family_names_.push_back(name);
      }
1986 1987
      options_.listeners.clear();
      options_.listeners.emplace_back(
Y
Yueh-Hsuan Chiang 已提交
1988
          new DbStressListener(FLAGS_db, options_.db_paths));
I
Igor Canadi 已提交
1989
      options_.create_missing_column_families = true;
I
Igor Canadi 已提交
1990 1991 1992 1993
      s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,
                   &column_families_, &db_);
      assert(!s.ok() || column_families_.size() ==
                            static_cast<size_t>(FLAGS_column_families));
1994
    } else {
S
sdong 已提交
1995
#ifndef ROCKSDB_LITE
I
Igor Canadi 已提交
1996 1997 1998
      DBWithTTL* db_with_ttl;
      s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
      db_ = db_with_ttl;
S
sdong 已提交
1999 2000 2001 2002
#else
      fprintf(stderr, "TTL is not supported in RocksDBLite\n");
      exit(1);
#endif
2003
    }
2004 2005 2006 2007 2008 2009
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

2010
  void Reopen() {
I
Igor Canadi 已提交
2011 2012
    for (auto cf : column_families_) {
      delete cf;
2013
    }
I
Igor Canadi 已提交
2014 2015
    column_families_.clear();
    delete db_;
2016
    db_ = nullptr;
2017 2018 2019 2020 2021 2022

    num_times_reopened_++;
    double now = FLAGS_env->NowMicros();
    fprintf(stdout, "%s Reopening database for the %dth time\n",
            FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(),
            num_times_reopened_);
2023 2024 2025
    Open();
  }

2026 2027
  void PrintStatistics() {
    if (dbstats) {
2028
      fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
2029 2030 2031 2032
    }
  }

 private:
2033 2034 2035
  std::shared_ptr<Cache> cache_;
  std::shared_ptr<Cache> compressed_cache_;
  std::shared_ptr<const FilterPolicy> filter_policy_;
2036
  DB* db_;
I
Igor Canadi 已提交
2037 2038 2039 2040
  Options options_;
  std::vector<ColumnFamilyHandle*> column_families_;
  std::vector<std::string> column_family_names_;
  std::atomic<int> new_column_family_name_;
2041
  int num_times_reopened_;
L
Lei Jin 已提交
2042 2043
  std::unordered_map<std::string, std::vector<std::string>> options_table_;
  std::vector<std::string> options_index_;
2044 2045
};

2046
}  // namespace rocksdb
2047 2048

int main(int argc, char** argv) {
2049 2050 2051
  SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
                  " [OPTIONS]...");
  ParseCommandLineFlags(&argc, &argv, true);
2052 2053 2054 2055 2056 2057 2058 2059

  if (FLAGS_statistics) {
    dbstats = rocksdb::CreateDBStatistics();
  }
  FLAGS_compression_type_e =
    StringToCompressionType(FLAGS_compression_type.c_str());
  if (!FLAGS_hdfs.empty()) {
    FLAGS_env  = new rocksdb::HdfsEnv(FLAGS_hdfs);
2060
  }
2061
  FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
2062

2063 2064 2065 2066
  // The number of background threads should be at least as much the
  // max number of concurrent compactions.
  FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);

L
Lei Jin 已提交
2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
  if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) {
    fprintf(stderr,
            "Error: prefixpercent is non-zero while prefix_size is "
            "not positive!\n");
    exit(1);
  }
  if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) {
    fprintf(stderr,
            "Error: please specify prefix_size for "
            "test_batches_snapshots test!\n");
    exit(1);
  }
2079
  if ((FLAGS_readpercent + FLAGS_prefixpercent +
2080 2081 2082
       FLAGS_writepercent + FLAGS_delpercent + FLAGS_iterpercent) != 100) {
      fprintf(stderr,
              "Error: Read+Prefix+Write+Delete+Iterate percents != 100!\n");
A
amayank 已提交
2083 2084
      exit(1);
  }
2085 2086 2087 2088
  if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {
      fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n");
      exit(1);
  }
2089
  if ((unsigned)FLAGS_reopen >= FLAGS_ops_per_thread) {
K
kailiu 已提交
2090 2091 2092 2093 2094
      fprintf(stderr,
              "Error: #DB-reopens should be < ops_per_thread\n"
              "Provided reopens = %d and ops_per_thread = %lu\n",
              FLAGS_reopen,
              (unsigned long)FLAGS_ops_per_thread);
2095 2096 2097
      exit(1);
  }

2098
  // Choose a location for the test database if none given with --db=<path>
2099 2100
  if (FLAGS_db.empty()) {
      std::string default_db_path;
2101
      rocksdb::Env::Default()->GetTestDirectory(&default_db_path);
2102
      default_db_path += "/dbstress";
2103
      FLAGS_db = default_db_path;
2104 2105
  }

2106
  rocksdb::StressTest stress;
I
Igor Canadi 已提交
2107 2108 2109 2110 2111
  if (stress.Run()) {
    return 0;
  } else {
    return 1;
  }
2112
}
2113 2114

#endif  // GFLAGS