From 4985a9f73b9fb8a0323fbbb06222ae1f758a6b1d Mon Sep 17 00:00:00 2001 From: Deon Nicholas Date: Wed, 12 Jun 2013 12:42:21 -0700 Subject: [PATCH] [Rocksdb] [Multiget] Introduced multiget into db_bench Summary: Preliminary! Introduced the --use_multiget=1 and --keys_per_multiget=n flags for db_bench. Also updated and tested the ReadRandom() method to include an option to use multiget. By default, keys_per_multiget=100. Preliminary tests imply that multiget is at least 1.25x faster per key than regular get. Will continue adding Multiget for ReadMissing, ReadHot, RandomWithVerify, ReadRandomWriteRandom; soon. Will also think about ways to better verify benchmarks. Test Plan: 1. make db_bench 2. ./db_bench --benchmarks=fillrandom 3. ./db_bench --benchmarks=readrandom --use_existing_db=1 --use_multiget=1 --threads=4 --keys_per_multiget=100 4. ./db_bench --benchmarks=readrandom --use_existing_db=1 --threads=4 5. Verify ops/sec (and 1000000 of 1000000 keys found) Reviewers: haobo, MarkCallaghan, dhruba Reviewed By: MarkCallaghan CC: leveldb Differential Revision: https://reviews.facebook.net/D11127 --- db/db_bench.cc | 336 +++++++++++++++++++++++++++++++++++++++---------- db/db_impl.cc | 1 - 2 files changed, 271 insertions(+), 66 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 7105e0cdb..e9902dd2b 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -301,6 +301,17 @@ static bool FLAGS_advise_random_on_open = static auto FLAGS_compaction_fadvice = leveldb::Options().access_hint_on_compaction_start; +// Use multiget to access a series of keys instead of get +static bool FLAGS_use_multiget = false; + +// If FLAGS_use_multiget is true, determines number of keys to group per call +// Arbitrary default. 90 is good because it agrees with FLAGS_readwritepercent +static long FLAGS_keys_per_multiget = 90; + +// Print a message to user when a key is missing in a Get/MultiGet call +// TODO: Apply this flag to generic Get calls too. Currently only with Multiget +static bool FLAGS_warn_missing_keys = true; + // Use adaptive mutex static auto FLAGS_use_adaptive_mutex = leveldb::Options().use_adaptive_mutex; @@ -497,7 +508,7 @@ class Stats { fprintf(stdout, "%-12s : %11.3f micros/op %ld ops/sec;%s%s\n", name.ToString().c_str(), - seconds_ * 1e6 / done_, + elapsed * 1e6 / done_, (long)throughput, (extra.empty() ? "" : " "), extra.c_str()); @@ -550,10 +561,12 @@ class Duration { } bool Done(int increment) { + if (increment <= 0) increment = 1; // avoid Done(0) and infinite loops ops_ += increment; if (max_seconds_) { - if (!(ops_ % 1000)) { + // Recheck every appx 1000 ops (exact iff increment is factor of 1000) + if ((ops_/1000) != ((ops_-increment)/1000)) { double now = FLAGS_env->NowMicros(); return ((now - start_at_) / 1000000.0) >= max_seconds_; } else { @@ -712,7 +725,7 @@ class Benchmark { HistogramData histogramData; dbstats->histogramData(histogram_type, &histogramData); fprintf(stdout, "%s statistics Percentiles :", name.c_str()); - fprintf(stdout, "50 : %f ",histogramData.median); + fprintf(stdout, "50 : %f ", histogramData.median); fprintf(stdout, "95 : %f ", histogramData.percentile95); fprintf(stdout, "99 : %f\n", histogramData.percentile99); } @@ -743,6 +756,7 @@ class Benchmark { PrintHistogram(WAL_FILE_SYNC_MICROS, "WAL FILE SYNC MICROS"); PrintHistogram(MANIFEST_FILE_SYNC_MICROS, "Manifest SYNC MICROS"); PrintHistogram(TABLE_OPEN_IO_MICROS, "Table Open IO Micros"); + PrintHistogram(DB_MULTIGET, "DB_MULTIGET"); } } @@ -1244,80 +1258,185 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") thread->stats.AddBytes(bytes); } + // Calls MultiGet over a list of keys from a random distribution. + // Returns the total number of keys found. + long MultiGetRandom(ReadOptions& options, int num_keys, + Random& rand, int range, const char* suffix) { + assert(num_keys > 0); + std::vector keys(num_keys); + std::vector values(num_keys); + std::vector > gen_keys(num_keys); + + int i; + long k; + + // Fill the keys vector + for(i=0; iGetSnapshot(); + } + + // Apply the operation + std::vector statuses = db_->MultiGet(options, keys, &values); + assert((long)statuses.size() == num_keys); + assert((long)keys.size() == num_keys); // Should always be the case. + assert((long)values.size() == num_keys); + + if (FLAGS_use_snapshot) { + db_->ReleaseSnapshot(options.snapshot); + options.snapshot = nullptr; + } + + // Count number found + long found = 0; + for(i=0; iNewIterator(options); Duration duration(FLAGS_duration, reads_); - std::string value; long found = 0; - while (!duration.Done(1)) { - const int k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - if (FLAGS_use_snapshot) { - options.snapshot = db_->GetSnapshot(); - } - if (FLAGS_read_range < 2) { - if (db_->Get(options, key.get(), &value).ok()) { - found++; + if (FLAGS_use_multiget) { // MultiGet + const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group + long keys_left = reads_; + + // Recalculate number of keys per group, and call MultiGet until done + long num_keys; + while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { + found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num,""); + thread->stats.FinishedSingleOp(db_); + keys_left -= num_keys; + } + } else { // Regular case. Do one "get" at a time Get + Iterator* iter = db_->NewIterator(options); + std::string value; + while (!duration.Done(1)) { + const int k = thread->rand.Next() % FLAGS_num; + unique_ptr key = GenerateKeyFromInt(k); + if (FLAGS_use_snapshot) { + options.snapshot = db_->GetSnapshot(); } - } else { - Slice skey(key.get()); - int count = 1; - if (FLAGS_get_approx) { - unique_ptr key2 = - GenerateKeyFromInt(k + (int) FLAGS_read_range); - Slice skey2(key2.get()); - Range range(skey, skey2); - uint64_t sizes; - db_->GetApproximateSizes(&range, 1, &sizes); + if (FLAGS_read_range < 2) { + if (db_->Get(options, key.get(), &value).ok()) { + found++; + } + } else { + Slice skey(key.get()); + int count = 1; + + if (FLAGS_get_approx) { + unique_ptr key2 = + GenerateKeyFromInt(k + (int) FLAGS_read_range); + Slice skey2(key2.get()); + Range range(skey, skey2); + uint64_t sizes; + db_->GetApproximateSizes(&range, 1, &sizes); + } + + for (iter->Seek(skey); + iter->Valid() && count <= FLAGS_read_range; + ++count, iter->Next()) { + found++; + } } - for (iter->Seek(skey); - iter->Valid() && count <= FLAGS_read_range; - ++count, iter->Next()) { - found++; + if (FLAGS_use_snapshot) { + db_->ReleaseSnapshot(options.snapshot); + options.snapshot = nullptr; } - } - if (FLAGS_use_snapshot) { - db_->ReleaseSnapshot(options.snapshot); - options.snapshot = nullptr; + thread->stats.FinishedSingleOp(db_); } - thread->stats.FinishedSingleOp(db_); + delete iter; } - delete iter; + char msg[100]; - snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, num_); + snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, reads_); thread->stats.AddMessage(msg); } - void ReadMissing(ThreadState* thread) { +void ReadMissing(ThreadState* thread) { + FLAGS_warn_missing_keys = false; // Never warn about missing keys + Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); - std::string value; - while (!duration.Done(1)) { - const int k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k, "."); - db_->Get(options, key.get(), &value); - thread->stats.FinishedSingleOp(db_); + + if (FLAGS_use_multiget) { + const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group + long keys_left = reads_; + + // Recalculate number of keys per group, and call MultiGet until done + long num_keys; + long found; + while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { + found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num,"."); + assert(!found); + thread->stats.FinishedSingleOp(db_); + keys_left -= num_keys; + } + } else { // Regular case (not MultiGet) + std::string value; + Status s; + while (!duration.Done(1)) { + const int k = thread->rand.Next() % FLAGS_num; + unique_ptr key = GenerateKeyFromInt(k, "."); + s = db_->Get(options, key.get(), &value); + assert(!s.ok() && s.IsNotFound()); + thread->stats.FinishedSingleOp(db_); + } } } void ReadHot(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); - std::string value; const long range = (FLAGS_num + 99) / 100; - while (!duration.Done(1)) { - const int k = thread->rand.Next() % range; - unique_ptr key = GenerateKeyFromInt(k); - db_->Get(options, key.get(), &value); - thread->stats.FinishedSingleOp(db_); + long found = 0; + + if (FLAGS_use_multiget) { + const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group + long keys_left = reads_; + + // Recalculate number of keys per group, and call MultiGet until done + long num_keys; + while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { + found += MultiGetRandom(options, num_keys, thread->rand, range, ""); + thread->stats.FinishedSingleOp(db_); + keys_left -= num_keys; + } + } else { + std::string value; + while (!duration.Done(1)) { + const int k = thread->rand.Next() % range; + unique_ptr key = GenerateKeyFromInt(k); + if (db_->Get(options, key.get(), &value).ok()){ + ++found; + } + thread->stats.FinishedSingleOp(db_); + } } + + char msg[100]; + snprintf(msg, sizeof(msg), "(%ld of %ld found)", found, reads_); + thread->stats.AddMessage(msg); } void SeekRandom(ThreadState* thread) { @@ -1424,8 +1543,8 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") } // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V) - // in DB atomically i.e in a single batch. Also refer MultiGet. - Status MultiPut(const WriteOptions& writeoptions, + // in DB atomically i.e in a single batch. Also refer GetMany. + Status PutMany(const WriteOptions& writeoptions, const Slice& key, const Slice& value) { std::string suffixes[3] = {"2", "1", "0"}; std::string keys[3]; @@ -1443,8 +1562,8 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V) - // in DB atomically i.e in a single batch. Also refer MultiGet. - Status MultiDelete(const WriteOptions& writeoptions, + // in DB atomically i.e in a single batch. Also refer GetMany. + Status DeleteMany(const WriteOptions& writeoptions, const Slice& key) { std::string suffixes[3] = {"1", "2", "0"}; std::string keys[3]; @@ -1462,8 +1581,8 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2" // in the same snapshot, and verifies that all the values are identical. - // ASSUMES that MultiPut was used to put (K, V) into the DB. - Status MultiGet(const ReadOptions& readoptions, + // ASSUMES that PutMany was used to put (K, V) into the DB. + Status GetMany(const ReadOptions& readoptions, const Slice& key, std::string* value) { std::string suffixes[3] = {"0", "1", "2"}; std::string keys[3]; @@ -1501,11 +1620,12 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") } // Differs from readrandomwriterandom in the following ways: - // (a) Uses MultiGet/MultiPut to read/write key values. Refer to those funcs. + // (a) Uses GetMany/PutMany to read/write key values. Refer to those funcs. // (b) Does deletes as well (per FLAGS_deletepercent) // (c) In order to achieve high % of 'found' during lookups, and to do // multiple writes (including puts and deletes) it uses upto // FLAGS_numdistinct distinct keys instead of FLAGS_num distinct keys. + // (d) Does not have a MultiGet option. void RandomWithVerify(ThreadState* thread) { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; @@ -1517,21 +1637,22 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") long gets_done = 0; long puts_done = 0; long deletes_done = 0; + // the number of iterations is the larger of read_ or write_ for (long i = 0; i < readwrites_; i++) { const int k = thread->rand.Next() % (FLAGS_numdistinct); unique_ptr key = GenerateKeyFromInt(k); if (get_weight == 0 && put_weight == 0 && delete_weight == 0) { - // one batch complated, reinitialize for next batch + // one batch completed, reinitialize for next batch get_weight = FLAGS_readwritepercent; delete_weight = FLAGS_deletepercent; put_weight = 100 - get_weight - delete_weight; } if (get_weight > 0) { // do all the gets first - Status s = MultiGet(options, key.get(), &value); + Status s = GetMany(options, key.get(), &value); if (!s.ok() && !s.IsNotFound()) { - fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + fprintf(stderr, "getmany error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can // find more errors if any } else if (!s.IsNotFound()) { @@ -1542,17 +1663,17 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = MultiPut(write_options_, key.get(), gen.Generate(value_size_)); + Status s = PutMany(write_options_, key.get(), gen.Generate(value_size_)); if (!s.ok()) { - fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); + fprintf(stderr, "putmany error: %s\n", s.ToString().c_str()); exit(1); } put_weight--; puts_done++; } else if (delete_weight > 0) { - Status s = MultiDelete(write_options_, key.get()); + Status s = DeleteMany(write_options_, key.get()); if (!s.ok()) { - fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str()); + fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str()); exit(1); } delete_weight--; @@ -1572,6 +1693,12 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") // an extra thread. // void ReadRandomWriteRandom(ThreadState* thread) { + if (FLAGS_use_multiget){ + // Separate function for multiget (for ease of reading) + ReadRandomWriteRandomMultiGet(thread); + return; + } + ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; @@ -1617,9 +1744,6 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") found++; } - if (db_->Get(options, key.get(), &value).ok()) { - found++; - } get_weight--; reads_done++; @@ -1646,9 +1770,85 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") thread->stats.AddMessage(msg); } + // ReadRandomWriteRandom (with multiget) + // Does FLAGS_keys_per_multiget reads (per multiget), followed by some puts. + // FLAGS_readwritepercent will specify the ratio of gets to puts. + // e.g.: If FLAGS_keys_per_multiget == 100 and FLAGS_readwritepercent == 75 + // Then each block will do 100 multigets and 33 puts + // So there are 133 operations in-total: 100 of them (75%) are gets, and 33 + // of them (25%) are puts. + void ReadRandomWriteRandomMultiGet(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + RandomGenerator gen; + + // For multiget + const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group + + long keys_left = readwrites_; // number of keys still left to read + long num_keys; // number of keys to read in current group + long num_put_keys; // number of keys to put in current group + + long found = 0; + long reads_done = 0; + long writes_done = 0; + long multigets_done = 0; + + // the number of iterations is the larger of read_ or write_ + Duration duration(FLAGS_duration, readwrites_); + while(true) { + // Read num_keys keys, then write num_put_keys keys. + // The ratio of num_keys to num_put_keys is always FLAGS_readwritepercent + // And num_keys is set to be FLAGS_keys_per_multiget (kpg) + // num_put_keys is calculated accordingly (to maintain the ratio) + // Note: On the final iteration, num_keys and num_put_keys will be smaller + num_keys = std::min(keys_left*(FLAGS_readwritepercent + 99)/100, kpg); + num_put_keys = num_keys * (100-FLAGS_readwritepercent) + / FLAGS_readwritepercent; + + // This will break the loop when duration is complete + if (duration.Done(num_keys + num_put_keys)) { + break; + } + + // A quick check to make sure our formula doesn't break on edge cases + assert(num_keys >= 1); + assert(num_keys + num_put_keys <= keys_left); + + // Apply the MultiGet operations + found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num,""); + ++multigets_done; + reads_done+=num_keys; + thread->stats.FinishedSingleOp(db_); + + // Now do the puts + int i; + long k; + for(i=0; irand.Next() % FLAGS_num; + unique_ptr key = GenerateKeyFromInt(k); + Status s = db_->Put(write_options_, key.get(), + gen.Generate(value_size_)); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + writes_done++; + thread->stats.FinishedSingleOp(db_); + } + + keys_left -= (num_keys + num_put_keys); + } + char msg[100]; + snprintf(msg, sizeof(msg), + "( reads:%ld writes:%ld total:%ld multiget_ops:%ld found:%ld)", + reads_done, writes_done, readwrites_, multigets_done, found); + thread->stats.AddMessage(msg); + } + // // Read-modify-write for random keys // + // TODO: Implement MergeOperator tests here (Read-modify-write) void UpdateRandom(ThreadState* thread) { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; @@ -1971,6 +2171,12 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--use_adaptive_mutex=%d%c", &n, &junk) == 1 && (n == 0 || n ==1 )) { FLAGS_use_adaptive_mutex = n; + } else if (sscanf(argv[i], "--use_multiget=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_use_multiget = n; + } else if (sscanf(argv[i], "--keys_per_multiget=%d%c", + &n, &junk) == 1) { + FLAGS_keys_per_multiget = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 9726a7166..3e5f962c8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2087,7 +2087,6 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. // value will contain the current merge operand in the latter case. - // TODO: Maybe these could be run concurrently? for(int i=0; i