db_bench.cc 33.8 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5 6 7 8 9
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include "db/db_impl.h"
#include "db/version_set.h"
10
#include "db/db_statistics.h"
11 12 13 14
#include "leveldb/cache.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/write_batch.h"
15
#include "leveldb/statistics.h"
J
jorlow@chromium.org 已提交
16 17
#include "port/port.h"
#include "util/crc32c.h"
J
jorlow@chromium.org 已提交
18
#include "util/histogram.h"
19
#include "util/mutexlock.h"
J
jorlow@chromium.org 已提交
20 21
#include "util/random.h"
#include "util/testutil.h"
22
#include "hdfs/env_hdfs.h"
J
jorlow@chromium.org 已提交
23 24 25

// Comma-separated list of operations to run in the specified order
//   Actual benchmarks:
26 27 28 29 30
//      fillseq       -- write N values in sequential key order in async mode
//      fillrandom    -- write N values in random key order in async mode
//      overwrite     -- overwrite N values in random key order in async mode
//      fillsync      -- write N/100 values in random key order in sync mode
//      fill100K      -- write N/1000 100K values in random order in async mode
S
Sanjay Ghemawat 已提交
31 32
//      deleteseq     -- delete N keys in sequential order
//      deleterandom  -- delete N keys in random order
33 34 35
//      readseq       -- read N times sequentially
//      readreverse   -- read N times in reverse order
//      readrandom    -- read N times in random order
S
Sanjay Ghemawat 已提交
36
//      readmissing   -- read N missing keys in random order
37
//      readhot       -- read N times in random order from 1% section of DB
S
Sanjay Ghemawat 已提交
38
//      seekrandom    -- N random seeks
J
jorlow@chromium.org 已提交
39
//      crc32c        -- repeated crc32c of 4K of data
40
//      acquireload   -- load N*1000 times
J
jorlow@chromium.org 已提交
41 42
//   Meta operations:
//      compact     -- Compact the entire DB
43
//      stats       -- Print DB stats
S
Sanjay Ghemawat 已提交
44
//      sstables    -- Print sstable info
J
jorlow@chromium.org 已提交
45 46
//      heapprofile -- Dump a heap profile (if supported by this port)
static const char* FLAGS_benchmarks =
47
    "fillseq,"
J
jorlow@chromium.org 已提交
48
    "fillsync,"
49 50
    "fillrandom,"
    "overwrite,"
J
jorlow@chromium.org 已提交
51 52
    "readrandom,"
    "readrandom,"  // Extra run to allow previous compactions to quiesce
J
jorlow@chromium.org 已提交
53
    "readseq,"
J
jorlow@chromium.org 已提交
54
    "readreverse,"
J
jorlow@chromium.org 已提交
55
    "compact,"
J
jorlow@chromium.org 已提交
56
    "readrandom,"
J
jorlow@chromium.org 已提交
57
    "readseq,"
J
jorlow@chromium.org 已提交
58
    "readreverse,"
J
jorlow@chromium.org 已提交
59 60
    "fill100K,"
    "crc32c,"
61 62
    "snappycomp,"
    "snappyuncomp,"
63
    "acquireload,"
J
jorlow@chromium.org 已提交
64
    ;
J
jorlow@chromium.org 已提交
65 66

// Number of key/values to place in database
67
static long FLAGS_num = 1000000;
J
jorlow@chromium.org 已提交
68

69
// Number of read operations to do.  If negative, do FLAGS_num reads.
70
static long FLAGS_reads = -1;
71

72 73 74
// Number of concurrent threads to run.
static int FLAGS_threads = 1;

J
jorlow@chromium.org 已提交
75 76 77 78 79
// Size of each value
static int FLAGS_value_size = 100;

// Arrange to generate values that shrink to this fraction of
// their original size after compression
80
static double FLAGS_compression_ratio = 0.5;
J
jorlow@chromium.org 已提交
81 82 83 84 85

// Print histogram of operation timings
static bool FLAGS_histogram = false;

// Number of bytes to buffer in memtable before compacting
86 87 88 89 90
// (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0;

// Number of bytes to use as a cache of uncompressed data.
// Negative means use default settings.
D
Dhruba Borthakur 已提交
91
static long FLAGS_cache_size = -1;
J
jorlow@chromium.org 已提交
92

93 94 95
// Maximum number of files to keep open at the same time (use default if == 0)
static int FLAGS_open_files = 0;

S
Sanjay Ghemawat 已提交
96 97 98 99
// Bloom filter bits per key.
// Negative means use default settings.
static int FLAGS_bloom_bits = -1;

100 101 102 103 104
// If true, do not destroy the existing database.  If you set this
// flag and also specify a benchmark that wants a fresh database, that
// benchmark will fail.
static bool FLAGS_use_existing_db = false;

105
// Use the db with the following name.
H
heyongqiang 已提交
106
static const char* FLAGS_db = NULL;
107

108 109 110 111 112
// Number of shards for the block cache is 2 ** FLAGS_cache_numshardbits.
// Negative means use default settings. This is applied only
// if FLAGS_cache_size is non-negative.
static int FLAGS_cache_numshardbits = -1;

113 114 115
// Verify checksum for every block read from storage
static bool FLAGS_verify_checksum = false;

116 117 118 119
// Database statistics
static bool FLAGS_statistics = false;
static class leveldb::DBStatistics* dbstats = NULL;

120 121 122
// Number of write operations to do.  If negative, do FLAGS_num reads.
static long FLAGS_writes = -1;

H
heyongqiang 已提交
123 124
// These default values might change if the hardcoded

125 126 127
// Sync all writes to disk
static bool FLAGS_sync = false;

H
heyongqiang 已提交
128 129 130
// If true, do not wait until data is synced to disk.
static bool FLAGS_disable_data_sync = false;

131 132 133
// If true, issue fsync instead of fdatasync
static bool FLAGS_use_fsync = false;

H
heyongqiang 已提交
134 135 136
// If true, do not write WAL for write.
static bool FLAGS_disable_wal = false;

H
heyongqiang 已提交
137 138 139 140 141 142 143 144 145 146 147 148
// Target level-0 file size for compaction
static int FLAGS_target_file_size_base = 2 * 1048576;

// A multiplier to compute targe level-N file size
static int FLAGS_target_file_size_multiplier = 1;

// Max bytes for level-0
static int FLAGS_max_bytes_for_level_base = 10 * 1048576;

// A multiplier to compute max bytes for level-N
static int FLAGS_max_bytes_for_level_multiplier = 10;

H
heyongqiang 已提交
149 150 151
// Number of files in level-0 that will trigger put stop.
static int FLAGS_level0_stop_writes_trigger = 12;

152 153
// Number of files in level-0 that will slow down writes.
static int FLAGS_level0_slowdown_writes_trigger = 8;
H
heyongqiang 已提交
154

155 156 157
// posix or hdfs environment
static leveldb::Env* FLAGS_env = leveldb::Env::Default();

158 159
extern bool useOsBuffer;

J
jorlow@chromium.org 已提交
160 161 162
namespace leveldb {

namespace {
163 164

// Helper for quickly generating random data.
J
jorlow@chromium.org 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
class RandomGenerator {
 private:
  std::string data_;
  int pos_;

 public:
  RandomGenerator() {
    // We use a limited amount of data over and over again and ensure
    // that it is larger than the compression window (32KB), and also
    // large enough to serve all typical value sizes we want to write.
    Random rnd(301);
    std::string piece;
    while (data_.size() < 1048576) {
      // Add a short fragment that is as compressible as specified
      // by FLAGS_compression_ratio.
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
      data_.append(piece);
    }
    pos_ = 0;
  }

  Slice Generate(int len) {
    if (pos_ + len > data_.size()) {
      pos_ = 0;
      assert(len < data_.size());
    }
    pos_ += len;
    return Slice(data_.data() + pos_ - len, len);
  }
};
195 196 197 198 199 200 201 202 203 204 205 206 207

static Slice TrimSpace(Slice s) {
  int start = 0;
  while (start < s.size() && isspace(s[start])) {
    start++;
  }
  int limit = s.size();
  while (limit > start && isspace(s[limit-1])) {
    limit--;
  }
  return Slice(s.data() + start, limit - start);
}

208 209 210 211 212 213 214 215 216 217 218 219 220
static void AppendWithSpace(std::string* str, Slice msg) {
  if (msg.empty()) return;
  if (!str->empty()) {
    str->push_back(' ');
  }
  str->append(msg.data(), msg.size());
}

class Stats {
 private:
  double start_;
  double finish_;
  double seconds_;
221
  long done_;
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
  int next_report_;
  int64_t bytes_;
  double last_op_finish_;
  Histogram hist_;
  std::string message_;

 public:
  Stats() { Start(); }

  void Start() {
    next_report_ = 100;
    last_op_finish_ = start_;
    hist_.Clear();
    done_ = 0;
    bytes_ = 0;
    seconds_ = 0;
238
    start_ = FLAGS_env->NowMicros();
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
    finish_ = start_;
    message_.clear();
  }

  void Merge(const Stats& other) {
    hist_.Merge(other.hist_);
    done_ += other.done_;
    bytes_ += other.bytes_;
    seconds_ += other.seconds_;
    if (other.start_ < start_) start_ = other.start_;
    if (other.finish_ > finish_) finish_ = other.finish_;

    // Just keep the messages from one thread
    if (message_.empty()) message_ = other.message_;
  }

  void Stop() {
256
    finish_ = FLAGS_env->NowMicros();
257 258 259 260 261 262 263 264 265
    seconds_ = (finish_ - start_) * 1e-6;
  }

  void AddMessage(Slice msg) {
    AppendWithSpace(&message_, msg);
  }

  void FinishedSingleOp() {
    if (FLAGS_histogram) {
266
      double now = FLAGS_env->NowMicros();
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
      double micros = now - last_op_finish_;
      hist_.Add(micros);
      if (micros > 20000) {
        fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
        fflush(stderr);
      }
      last_op_finish_ = now;
    }

    done_++;
    if (done_ >= next_report_) {
      if      (next_report_ < 1000)   next_report_ += 100;
      else if (next_report_ < 5000)   next_report_ += 500;
      else if (next_report_ < 10000)  next_report_ += 1000;
      else if (next_report_ < 50000)  next_report_ += 5000;
      else if (next_report_ < 100000) next_report_ += 10000;
      else if (next_report_ < 500000) next_report_ += 50000;
      else                            next_report_ += 100000;
285
      fprintf(stderr, "... finished %ld ops%30s\r", done_, "");
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
      fflush(stderr);
    }
  }

  void AddBytes(int64_t n) {
    bytes_ += n;
  }

  void Report(const Slice& name) {
    // Pretend at least one op was done in case we are running a benchmark
    // that does not call FinishedSingleOp().
    if (done_ < 1) done_ = 1;

    std::string extra;
    if (bytes_ > 0) {
      // Rate is computed on actual elapsed time, not the sum of per-thread
      // elapsed times.
      double elapsed = (finish_ - start_) * 1e-6;
      char rate[100];
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
               (bytes_ / 1048576.0) / elapsed);
      extra = rate;
    }
    AppendWithSpace(&extra, message_);
310 311
    double elapsed = (finish_ - start_) * 1e-6;
    double throughput = (double)done_/elapsed;
312

D
Dhruba Borthakur 已提交
313
    fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n",
314 315
            name.ToString().c_str(),
            seconds_ * 1e6 / done_,
D
Dhruba Borthakur 已提交
316
            (long)throughput,
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
            (extra.empty() ? "" : " "),
            extra.c_str());
    if (FLAGS_histogram) {
      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
    }
    fflush(stdout);
  }
};

// State shared by all concurrent executions of the same benchmark.
struct SharedState {
  port::Mutex mu;
  port::CondVar cv;
  int total;

  // Each thread goes through the following states:
  //    (1) initializing
  //    (2) waiting for others to be initialized
  //    (3) running
  //    (4) done

338 339
  long num_initialized;
  long num_done;
340 341 342 343 344 345 346 347 348 349
  bool start;

  SharedState() : cv(&mu) { }
};

// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
  int tid;             // 0..n-1 when running in n threads
  Random rand;         // Has different seeds for different threads
  Stats stats;
350
  SharedState* shared;
351 352 353 354 355 356 357

  ThreadState(int index)
      : tid(index),
        rand(1000 + index) {
  }
};

H
Hans Wennborg 已提交
358
}  // namespace
J
jorlow@chromium.org 已提交
359 360 361 362

class Benchmark {
 private:
  Cache* cache_;
S
Sanjay Ghemawat 已提交
363
  const FilterPolicy* filter_policy_;
J
jorlow@chromium.org 已提交
364
  DB* db_;
365
  long num_;
366 367 368
  int value_size_;
  int entries_per_batch_;
  WriteOptions write_options_;
369
  long reads_;
370
  long writes_;
J
jorlow@chromium.org 已提交
371 372
  int heap_counter_;

373 374 375 376 377 378 379
  void PrintHeader() {
    const int kKeySize = 16;
    PrintEnvironment();
    fprintf(stdout, "Keys:       %d bytes each\n", kKeySize);
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
            FLAGS_value_size,
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
380
    fprintf(stdout, "Entries:    %ld\n", num_);
381
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
382 383
            ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
             / 1048576.0));
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
            (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
             / 1048576.0));
    PrintWarnings();
    fprintf(stdout, "------------------------------------------------\n");
  }

  void PrintWarnings() {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
    fprintf(stdout,
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
            );
#endif
#ifndef NDEBUG
    fprintf(stdout,
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
401 402 403 404 405 406 407 408 409

    // See if snappy is working by attempting to compress a compressible string
    const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
    std::string compressed;
    if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
      fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
    } else if (compressed.size() >= sizeof(text)) {
      fprintf(stdout, "WARNING: Snappy compression is not effective\n");
    }
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
  }

  void PrintEnvironment() {
    fprintf(stderr, "LevelDB:    version %d.%d\n",
            kMajorVersion, kMinorVersion);

#if defined(__linux)
    time_t now = time(NULL);
    fprintf(stderr, "Date:       %s", ctime(&now));  // ctime() adds newline

    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
    if (cpuinfo != NULL) {
      char line[1000];
      int num_cpus = 0;
      std::string cpu_type;
      std::string cache_size;
      while (fgets(line, sizeof(line), cpuinfo) != NULL) {
        const char* sep = strchr(line, ':');
        if (sep == NULL) {
          continue;
        }
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
        Slice val = TrimSpace(Slice(sep + 1));
        if (key == "model name") {
          ++num_cpus;
          cpu_type = val.ToString();
        } else if (key == "cache size") {
          cache_size = val.ToString();
        }
      }
      fclose(cpuinfo);
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
    }
#endif
  }

447 448 449 450 451 452 453 454 455
  void PrintStatistics() {
    if (FLAGS_statistics) {
      fprintf(stdout, "File opened:%ld closed:%ld errors:%ld\n",
              dbstats->getNumFileOpens(),
              dbstats->getNumFileCloses(),
              dbstats->getNumFileErrors());
    }
  }

J
jorlow@chromium.org 已提交
456
 public:
457
  Benchmark()
458 459 460 461
  : cache_(FLAGS_cache_size >= 0 ? 
           (FLAGS_cache_numshardbits >= 1 ? 
            NewLRUCache(FLAGS_cache_size, FLAGS_cache_numshardbits) : 
            NewLRUCache(FLAGS_cache_size)) : NULL),
S
Sanjay Ghemawat 已提交
462 463 464
    filter_policy_(FLAGS_bloom_bits >= 0
                   ? NewBloomFilterPolicy(FLAGS_bloom_bits)
                   : NULL),
465 466
    db_(NULL),
    num_(FLAGS_num),
467 468
    value_size_(FLAGS_value_size),
    entries_per_batch_(1),
469
    reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
470
    writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
471
    heap_counter_(0) {
J
jorlow@chromium.org 已提交
472
    std::vector<std::string> files;
473
    FLAGS_env->GetChildren(FLAGS_db, &files);
J
jorlow@chromium.org 已提交
474 475
    for (int i = 0; i < files.size(); i++) {
      if (Slice(files[i]).starts_with("heap-")) {
476
        FLAGS_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
J
jorlow@chromium.org 已提交
477 478
      }
    }
479
    if (!FLAGS_use_existing_db) {
480
      DestroyDB(FLAGS_db, Options());
481
    }
J
jorlow@chromium.org 已提交
482 483 484 485 486
  }

  ~Benchmark() {
    delete db_;
    delete cache_;
S
Sanjay Ghemawat 已提交
487
    delete filter_policy_;
J
jorlow@chromium.org 已提交
488 489 490
  }

  void Run() {
491 492
    PrintHeader();
    Open();
J
jorlow@chromium.org 已提交
493 494 495 496 497 498 499 500 501 502 503 504 505

    const char* benchmarks = FLAGS_benchmarks;
    while (benchmarks != NULL) {
      const char* sep = strchr(benchmarks, ',');
      Slice name;
      if (sep == NULL) {
        name = benchmarks;
        benchmarks = NULL;
      } else {
        name = Slice(benchmarks, sep - benchmarks);
        benchmarks = sep + 1;
      }

506 507
      // Reset parameters that may be overriddden bwlow
      num_ = FLAGS_num;
508
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
509
      writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
510 511 512
      value_size_ = FLAGS_value_size;
      entries_per_batch_ = 1;
      write_options_ = WriteOptions();
513 514 515
      if (FLAGS_sync) {
        write_options_.sync = true;
      }
516

H
heyongqiang 已提交
517 518
      write_options_.disableWAL = FLAGS_disable_wal;

519 520
      void (Benchmark::*method)(ThreadState*) = NULL;
      bool fresh_db = false;
521
      int num_threads = FLAGS_threads;
522 523

      if (name == Slice("fillseq")) {
524 525
        fresh_db = true;
        method = &Benchmark::WriteSeq;
526
      } else if (name == Slice("fillbatch")) {
527 528 529
        fresh_db = true;
        entries_per_batch_ = 1000;
        method = &Benchmark::WriteSeq;
530
      } else if (name == Slice("fillrandom")) {
531 532
        fresh_db = true;
        method = &Benchmark::WriteRandom;
533
      } else if (name == Slice("overwrite")) {
534 535
        fresh_db = false;
        method = &Benchmark::WriteRandom;
536
      } else if (name == Slice("fillsync")) {
537 538 539 540
        fresh_db = true;
        num_ /= 1000;
        write_options_.sync = true;
        method = &Benchmark::WriteRandom;
541
      } else if (name == Slice("fill100K")) {
542 543 544 545
        fresh_db = true;
        num_ /= 1000;
        value_size_ = 100 * 1000;
        method = &Benchmark::WriteRandom;
J
jorlow@chromium.org 已提交
546
      } else if (name == Slice("readseq")) {
547
        method = &Benchmark::ReadSequential;
J
jorlow@chromium.org 已提交
548
      } else if (name == Slice("readreverse")) {
549
        method = &Benchmark::ReadReverse;
J
jorlow@chromium.org 已提交
550
      } else if (name == Slice("readrandom")) {
551
        method = &Benchmark::ReadRandom;
S
Sanjay Ghemawat 已提交
552 553 554 555
      } else if (name == Slice("readmissing")) {
        method = &Benchmark::ReadMissing;
      } else if (name == Slice("seekrandom")) {
        method = &Benchmark::SeekRandom;
556
      } else if (name == Slice("readhot")) {
557
        method = &Benchmark::ReadHot;
558
      } else if (name == Slice("readrandomsmall")) {
559
        reads_ /= 1000;
560
        method = &Benchmark::ReadRandom;
S
Sanjay Ghemawat 已提交
561 562 563 564
      } else if (name == Slice("deleteseq")) {
        method = &Benchmark::DeleteSeq;
      } else if (name == Slice("deleterandom")) {
        method = &Benchmark::DeleteRandom;
565 566 567
      } else if (name == Slice("readwhilewriting")) {
        num_threads++;  // Add extra thread for writing
        method = &Benchmark::ReadWhileWriting;
J
jorlow@chromium.org 已提交
568
      } else if (name == Slice("compact")) {
569
        method = &Benchmark::Compact;
J
jorlow@chromium.org 已提交
570
      } else if (name == Slice("crc32c")) {
571
        method = &Benchmark::Crc32c;
572
      } else if (name == Slice("acquireload")) {
573
        method = &Benchmark::AcquireLoad;
574
      } else if (name == Slice("snappycomp")) {
575
        method = &Benchmark::SnappyCompress;
576
      } else if (name == Slice("snappyuncomp")) {
577
        method = &Benchmark::SnappyUncompress;
J
jorlow@chromium.org 已提交
578 579
      } else if (name == Slice("heapprofile")) {
        HeapProfile();
580
      } else if (name == Slice("stats")) {
S
Sanjay Ghemawat 已提交
581 582 583
        PrintStats("leveldb.stats");
      } else if (name == Slice("sstables")) {
        PrintStats("leveldb.sstables");
J
jorlow@chromium.org 已提交
584
      } else {
585 586 587 588
        if (name != Slice()) {  // No error message for empty name
          fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
        }
      }
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603

      if (fresh_db) {
        if (FLAGS_use_existing_db) {
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
                  name.ToString().c_str());
          method = NULL;
        } else {
          delete db_;
          db_ = NULL;
          DestroyDB(FLAGS_db, Options());
          Open();
        }
      }

      if (method != NULL) {
604
        RunBenchmark(num_threads, name, method);
J
jorlow@chromium.org 已提交
605 606
      }
    }
607
    PrintStatistics();
J
jorlow@chromium.org 已提交
608 609
  }

610
 private:
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645
  struct ThreadArg {
    Benchmark* bm;
    SharedState* shared;
    ThreadState* thread;
    void (Benchmark::*method)(ThreadState*);
  };

  static void ThreadBody(void* v) {
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
    SharedState* shared = arg->shared;
    ThreadState* thread = arg->thread;
    {
      MutexLock l(&shared->mu);
      shared->num_initialized++;
      if (shared->num_initialized >= shared->total) {
        shared->cv.SignalAll();
      }
      while (!shared->start) {
        shared->cv.Wait();
      }
    }

    thread->stats.Start();
    (arg->bm->*(arg->method))(thread);
    thread->stats.Stop();

    {
      MutexLock l(&shared->mu);
      shared->num_done++;
      if (shared->num_done >= shared->total) {
        shared->cv.SignalAll();
      }
    }
  }

646 647
  void RunBenchmark(int n, Slice name,
                    void (Benchmark::*method)(ThreadState*)) {
648 649 650 651 652 653 654 655 656 657 658 659
    SharedState shared;
    shared.total = n;
    shared.num_initialized = 0;
    shared.num_done = 0;
    shared.start = false;

    ThreadArg* arg = new ThreadArg[n];
    for (int i = 0; i < n; i++) {
      arg[i].bm = this;
      arg[i].method = method;
      arg[i].shared = &shared;
      arg[i].thread = new ThreadState(i);
660
      arg[i].thread->shared = &shared;
661
      FLAGS_env->StartThread(ThreadBody, &arg[i]);
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
    }

    shared.mu.Lock();
    while (shared.num_initialized < n) {
      shared.cv.Wait();
    }

    shared.start = true;
    shared.cv.SignalAll();
    while (shared.num_done < n) {
      shared.cv.Wait();
    }
    shared.mu.Unlock();

    for (int i = 1; i < n; i++) {
      arg[0].thread->stats.Merge(arg[i].thread->stats);
    }
    arg[0].thread->stats.Report(name);

    for (int i = 0; i < n; i++) {
      delete arg[i].thread;
    }
    delete[] arg;
  }

  void Crc32c(ThreadState* thread) {
J
jorlow@chromium.org 已提交
688
    // Checksum about 500MB of data total
689 690
    const int size = 4096;
    const char* label = "(4K per op)";
J
jorlow@chromium.org 已提交
691
    std::string data(size, 'x');
J
jorlow@chromium.org 已提交
692 693 694 695
    int64_t bytes = 0;
    uint32_t crc = 0;
    while (bytes < 500 * 1048576) {
      crc = crc32c::Value(data.data(), size);
696
      thread->stats.FinishedSingleOp();
J
jorlow@chromium.org 已提交
697 698 699 700 701
      bytes += size;
    }
    // Print so result is not dead
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));

702 703
    thread->stats.AddBytes(bytes);
    thread->stats.AddMessage(label);
J
jorlow@chromium.org 已提交
704 705
  }

706
  void AcquireLoad(ThreadState* thread) {
707 708 709 710
    int dummy;
    port::AtomicPointer ap(&dummy);
    int count = 0;
    void *ptr = NULL;
711
    thread->stats.AddMessage("(each op is 1000 loads)");
712 713 714 715 716
    while (count < 100000) {
      for (int i = 0; i < 1000; i++) {
        ptr = ap.Acquire_Load();
      }
      count++;
717
      thread->stats.FinishedSingleOp();
718 719 720 721
    }
    if (ptr == NULL) exit(1); // Disable unused variable warning.
  }

722 723 724
  void SnappyCompress(ThreadState* thread) {
    RandomGenerator gen;
    Slice input = gen.Generate(Options().block_size);
725 726 727 728 729 730 731 732
    int64_t bytes = 0;
    int64_t produced = 0;
    bool ok = true;
    std::string compressed;
    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
      ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
      produced += compressed.size();
      bytes += input.size();
733
      thread->stats.FinishedSingleOp();
734 735 736
    }

    if (!ok) {
737
      thread->stats.AddMessage("(snappy failure)");
738 739 740 741
    } else {
      char buf[100];
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
               (produced * 100.0) / bytes);
742 743
      thread->stats.AddMessage(buf);
      thread->stats.AddBytes(bytes);
744 745 746
    }
  }

747 748 749
  void SnappyUncompress(ThreadState* thread) {
    RandomGenerator gen;
    Slice input = gen.Generate(Options().block_size);
750 751 752
    std::string compressed;
    bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
    int64_t bytes = 0;
753
    char* uncompressed = new char[input.size()];
754 755
    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
      ok =  port::Snappy_Uncompress(compressed.data(), compressed.size(),
756 757
                                    uncompressed);
      bytes += input.size();
758
      thread->stats.FinishedSingleOp();
759
    }
760
    delete[] uncompressed;
761 762

    if (!ok) {
763
      thread->stats.AddMessage("(snappy failure)");
764
    } else {
765
      thread->stats.AddBytes(bytes);
766 767 768
    }
  }

769 770 771
  void Open() {
    assert(db_ == NULL);
    Options options;
772
    options.create_if_missing = !FLAGS_use_existing_db;
773 774
    options.block_cache = cache_;
    options.write_buffer_size = FLAGS_write_buffer_size;
S
Sanjay Ghemawat 已提交
775
    options.filter_policy = filter_policy_;
776 777
    options.max_open_files = FLAGS_open_files;
    options.statistics = dbstats;
778
    options.env = FLAGS_env;
H
heyongqiang 已提交
779
    options.disableDataSync = FLAGS_disable_data_sync;
780
    options.use_fsync = FLAGS_use_fsync;
H
heyongqiang 已提交
781 782 783 784 785
    options.target_file_size_base = FLAGS_target_file_size_base;
    options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
    options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
    options.max_bytes_for_level_multiplier =
        FLAGS_max_bytes_for_level_multiplier;
H
heyongqiang 已提交
786 787 788
    options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
    options.level0_slowdown_writes_trigger =
      FLAGS_level0_slowdown_writes_trigger;
789
    Status s = DB::Open(options, FLAGS_db, &db_);
790 791 792 793 794 795
    if (!s.ok()) {
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
      exit(1);
    }
  }

796 797 798
  void WriteSeq(ThreadState* thread) {
    DoWrite(thread, true);
  }
799

800 801 802 803 804 805
  void WriteRandom(ThreadState* thread) {
    DoWrite(thread, false);
  }

  void DoWrite(ThreadState* thread, bool seq) {
    if (num_ != FLAGS_num) {
806
      char msg[100];
807
      snprintf(msg, sizeof(msg), "(%ld ops)", num_);
808
      thread->stats.AddMessage(msg);
809 810
    }

811
    RandomGenerator gen;
J
jorlow@chromium.org 已提交
812 813
    WriteBatch batch;
    Status s;
814
    int64_t bytes = 0;
815
    for (int i = 0; i < writes_; i += entries_per_batch_) {
J
jorlow@chromium.org 已提交
816
      batch.Clear();
817 818
      for (int j = 0; j < entries_per_batch_; j++) {
        const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
819 820
        char key[100];
        snprintf(key, sizeof(key), "%016d", k);
821 822 823
        batch.Put(key, gen.Generate(value_size_));
        bytes += value_size_ + strlen(key);
        thread->stats.FinishedSingleOp();
824
      }
825
      s = db_->Write(write_options_, &batch);
J
jorlow@chromium.org 已提交
826 827 828 829 830
      if (!s.ok()) {
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
        exit(1);
      }
    }
831
    thread->stats.AddBytes(bytes);
J
jorlow@chromium.org 已提交
832 833
  }

834
  void ReadSequential(ThreadState* thread) {
835
    Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
836
    long i = 0;
837
    int64_t bytes = 0;
838
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
839 840
      bytes += iter->key().size() + iter->value().size();
      thread->stats.FinishedSingleOp();
841 842 843
      ++i;
    }
    delete iter;
844
    thread->stats.AddBytes(bytes);
845 846
  }

847
  void ReadReverse(ThreadState* thread) {
848
    Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true));
849
    long i = 0;
850
    int64_t bytes = 0;
851
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
852 853
      bytes += iter->key().size() + iter->value().size();
      thread->stats.FinishedSingleOp();
854 855 856
      ++i;
    }
    delete iter;
857
    thread->stats.AddBytes(bytes);
858 859
  }

860
  void ReadRandom(ThreadState* thread) {
861
    ReadOptions options(FLAGS_verify_checksum, true);
862
    std::string value;
863
    long found = 0;
864
    for (long i = 0; i < reads_; i++) {
865
      char key[100];
866
      const int k = thread->rand.Next() % FLAGS_num;
867
      snprintf(key, sizeof(key), "%016d", k);
S
Sanjay Ghemawat 已提交
868 869 870 871 872 873
      if (db_->Get(options, key, &value).ok()) {
        found++;
      }
      thread->stats.FinishedSingleOp();
    }
    char msg[100];
874
    snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_);
S
Sanjay Ghemawat 已提交
875 876 877 878
    thread->stats.AddMessage(msg);
  }

  void ReadMissing(ThreadState* thread) {
879
    ReadOptions options(FLAGS_verify_checksum, true);
S
Sanjay Ghemawat 已提交
880
    std::string value;
881
    for (long i = 0; i < reads_; i++) {
S
Sanjay Ghemawat 已提交
882 883 884
      char key[100];
      const int k = thread->rand.Next() % FLAGS_num;
      snprintf(key, sizeof(key), "%016d.", k);
885
      db_->Get(options, key, &value);
886
      thread->stats.FinishedSingleOp();
J
jorlow@chromium.org 已提交
887 888 889
    }
  }

890
  void ReadHot(ThreadState* thread) {
891
    ReadOptions options(FLAGS_verify_checksum, true);
892
    std::string value;
893 894
    const long range = (FLAGS_num + 99) / 100;
    for (long i = 0; i < reads_; i++) {
895
      char key[100];
896
      const int k = thread->rand.Next() % range;
897 898
      snprintf(key, sizeof(key), "%016d", k);
      db_->Get(options, key, &value);
899
      thread->stats.FinishedSingleOp();
900 901 902
    }
  }

S
Sanjay Ghemawat 已提交
903
  void SeekRandom(ThreadState* thread) {
904
    ReadOptions options(FLAGS_verify_checksum, true);
S
Sanjay Ghemawat 已提交
905
    std::string value;
906
    long found = 0;
907
    for (long i = 0; i < reads_; i++) {
S
Sanjay Ghemawat 已提交
908 909 910 911 912 913 914 915 916 917
      Iterator* iter = db_->NewIterator(options);
      char key[100];
      const int k = thread->rand.Next() % FLAGS_num;
      snprintf(key, sizeof(key), "%016d", k);
      iter->Seek(key);
      if (iter->Valid() && iter->key() == key) found++;
      delete iter;
      thread->stats.FinishedSingleOp();
    }
    char msg[100];
918
    snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_);
S
Sanjay Ghemawat 已提交
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950
    thread->stats.AddMessage(msg);
  }

  void DoDelete(ThreadState* thread, bool seq) {
    RandomGenerator gen;
    WriteBatch batch;
    Status s;
    for (int i = 0; i < num_; i += entries_per_batch_) {
      batch.Clear();
      for (int j = 0; j < entries_per_batch_; j++) {
        const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
        char key[100];
        snprintf(key, sizeof(key), "%016d", k);
        batch.Delete(key);
        thread->stats.FinishedSingleOp();
      }
      s = db_->Write(write_options_, &batch);
      if (!s.ok()) {
        fprintf(stderr, "del error: %s\n", s.ToString().c_str());
        exit(1);
      }
    }
  }

  void DeleteSeq(ThreadState* thread) {
    DoDelete(thread, true);
  }

  void DeleteRandom(ThreadState* thread) {
    DoDelete(thread, false);
  }

951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
  void ReadWhileWriting(ThreadState* thread) {
    if (thread->tid > 0) {
      ReadRandom(thread);
    } else {
      // Special thread that keeps writing until other threads are done.
      RandomGenerator gen;
      while (true) {
        {
          MutexLock l(&thread->shared->mu);
          if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
            // Other threads have finished
            break;
          }
        }

        const int k = thread->rand.Next() % FLAGS_num;
        char key[100];
        snprintf(key, sizeof(key), "%016d", k);
        Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
        if (!s.ok()) {
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
          exit(1);
        }
      }

      // Do not count any of the preceding work/delay in stats.
      thread->stats.Start();
    }
  }

981
  void Compact(ThreadState* thread) {
G
Gabor Cselle 已提交
982
    db_->CompactRange(NULL, NULL);
J
jorlow@chromium.org 已提交
983 984
  }

S
Sanjay Ghemawat 已提交
985
  void PrintStats(const char* key) {
986
    std::string stats;
S
Sanjay Ghemawat 已提交
987
    if (!db_->GetProperty(key, &stats)) {
988
      stats = "(failed)";
989
    }
990
    fprintf(stdout, "\n%s\n", stats.c_str());
991 992
  }

J
jorlow@chromium.org 已提交
993 994 995 996 997 998
  static void WriteToFile(void* arg, const char* buf, int n) {
    reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
  }

  void HeapProfile() {
    char fname[100];
999
    snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
J
jorlow@chromium.org 已提交
1000
    WritableFile* file;
1001
    Status s = FLAGS_env->NewWritableFile(fname, &file);
J
jorlow@chromium.org 已提交
1002
    if (!s.ok()) {
1003
      fprintf(stderr, "%s\n", s.ToString().c_str());
J
jorlow@chromium.org 已提交
1004 1005 1006 1007 1008
      return;
    }
    bool ok = port::GetHeapProfile(WriteToFile, file);
    delete file;
    if (!ok) {
1009
      fprintf(stderr, "heap profiling not supported\n");
1010
      FLAGS_env->DeleteFile(fname);
J
jorlow@chromium.org 已提交
1011 1012 1013 1014
    }
  }
};

H
Hans Wennborg 已提交
1015
}  // namespace leveldb
J
jorlow@chromium.org 已提交
1016 1017

int main(int argc, char** argv) {
1018
  FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
1019
  FLAGS_open_files = leveldb::Options().max_open_files;
H
heyongqiang 已提交
1020
  std::string default_db_path;
1021

J
jorlow@chromium.org 已提交
1022 1023 1024
  for (int i = 1; i < argc; i++) {
    double d;
    int n;
1025
    long l;
J
jorlow@chromium.org 已提交
1026
    char junk;
1027
    char hdfsname[2048];
J
jorlow@chromium.org 已提交
1028 1029 1030 1031 1032 1033 1034
    if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
      FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
    } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
      FLAGS_compression_ratio = d;
    } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
               (n == 0 || n == 1)) {
      FLAGS_histogram = n;
1035 1036 1037
    } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
               (n == 0 || n == 1)) {
      FLAGS_use_existing_db = n;
1038 1039
    } else if (sscanf(argv[i], "--num=%ld%c", &l, &junk) == 1) {
      FLAGS_num = l;
1040 1041
    } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
      FLAGS_reads = n;
1042 1043
    } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
      FLAGS_threads = n;
J
jorlow@chromium.org 已提交
1044 1045 1046 1047
    } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
      FLAGS_value_size = n;
    } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
      FLAGS_write_buffer_size = n;
D
Dhruba Borthakur 已提交
1048
    } else if (sscanf(argv[i], "--cache_size=%ld%c", &n, &junk) == 1) {
1049
      FLAGS_cache_size = n;
1050 1051
    } else if (sscanf(argv[i], "--cache_numshardbits=%d%c", &n, &junk) == 1) {
      FLAGS_cache_numshardbits = n;
S
Sanjay Ghemawat 已提交
1052 1053
    } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
      FLAGS_bloom_bits = n;
1054 1055
    } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
      FLAGS_open_files = n;
1056 1057
    } else if (strncmp(argv[i], "--db=", 5) == 0) {
      FLAGS_db = argv[i] + 5;
1058 1059 1060
    } else if (sscanf(argv[i], "--verify_checksum=%d%c", &n, &junk) == 1 &&
               (n == 0 || n == 1)) {
      FLAGS_verify_checksum = n;
1061 1062 1063
    } else if (sscanf(argv[i], "--bufferedio=%d%c", &n, &junk) == 1 &&
               (n == 0 || n == 1)) {
      useOsBuffer = n;
1064 1065 1066 1067 1068 1069
    } else if (sscanf(argv[i], "--statistics=%d%c", &n, &junk) == 1 &&
               (n == 0 || n == 1)) {
      if (n == 1) {
        dbstats = new leveldb::DBStatistics();
        FLAGS_statistics = true;
      }
1070 1071 1072 1073 1074
    } else if (sscanf(argv[i], "--writes=%d%c", &n, &junk) == 1) {
      FLAGS_writes = n;
    } else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 &&
               (n == 0 || n == 1)) {
      FLAGS_sync = n;
H
heyongqiang 已提交
1075 1076 1077
    } else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 &&
        (n == 0 || n == 1)) {
      FLAGS_disable_data_sync = n;
1078 1079 1080
    } else if (sscanf(argv[i], "--use_fsync=%d%c", &n, &junk) == 1 &&
        (n == 0 || n == 1)) {
      FLAGS_use_fsync = n;
H
heyongqiang 已提交
1081
    } else if (sscanf(argv[i], "--disable_wal=%d%c", &n, &junk) == 1 &&
1082
        (n == 0 || n == 1)) {
H
heyongqiang 已提交
1083
      FLAGS_disable_wal = n;
1084 1085
    } else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) {
      FLAGS_env  = new leveldb::HdfsEnv(hdfsname);
H
heyongqiang 已提交
1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
    } else if (sscanf(argv[i], "--target_file_size_base=%d%c",
        &n, &junk) == 1) {
      FLAGS_target_file_size_base = n;
    } else if ( sscanf(argv[i], "--target_file_size_multiplier=%d%c",
        &n, &junk) == 1) {
      FLAGS_target_file_size_multiplier = n;
    } else if (
        sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) {
      FLAGS_max_bytes_for_level_base = n;
    } else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c",
        &n, &junk) == 1) {
      FLAGS_max_bytes_for_level_multiplier = n;
H
heyongqiang 已提交
1098 1099 1100 1101 1102 1103
    } else if (sscanf(argv[i],"--level0_stop_writes_trigger=%d%c",
        &n, &junk) == 1) {
      FLAGS_level0_stop_writes_trigger = n;
    } else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c",
        &n, &junk) == 1) {
      FLAGS_level0_slowdown_writes_trigger = n;
1104
    } else {
J
jorlow@chromium.org 已提交
1105 1106 1107 1108 1109
      fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
      exit(1);
    }
  }

H
heyongqiang 已提交
1110 1111 1112 1113 1114 1115 1116
  // Choose a location for the test database if none given with --db=<path>
  if (FLAGS_db == NULL) {
      leveldb::Env::Default()->GetTestDirectory(&default_db_path);
      default_db_path += "/dbbench";
      FLAGS_db = default_db_path.c_str();
  }

J
jorlow@chromium.org 已提交
1117 1118 1119 1120
  leveldb::Benchmark benchmark;
  benchmark.Run();
  return 0;
}