From 70c42bf05f0727d2781b6a6533d196e8872d1821 Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Fri, 26 Oct 2012 13:37:21 -0700 Subject: [PATCH] Adds DB::GetNextCompaction and then uses that for rate limiting db_bench Summary: Adds a method that returns the score for the next level that most needs compaction. That method is then used by db_bench to rate limit threads. Threads are put to sleep at the end of each stats interval until the score is less than the limit. The limit is set via the --rate_limit=$double option. The specified value must be > 1.0. Also adds the option --stats_per_interval to enable additional metrics reported every stats interval. Task ID: # Blame Rev: Test Plan: run db_bench Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - Reviewers: dhruba Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D6243 --- db/db_bench.cc | 24 +++++++++++++++++++++--- db/db_impl.cc | 19 +++++++++++++++++-- db/db_impl.h | 1 + db/version_set.h | 5 +++++ include/leveldb/options.h | 4 ++++ util/options.cc | 5 ++++- 6 files changed, 52 insertions(+), 6 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 626cc163b..c5e838176 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -186,6 +186,15 @@ static leveldb::Env* FLAGS_env = leveldb::Env::Default(); // than zero. When 0 the interval grows over time. static int FLAGS_stats_interval = 0; +// Reports additional stats per interval when this is greater +// than 0. +static int FLAGS_stats_per_interval = 0; + +// When not equal to 0 this make threads sleep at each stats +// reporting interval until the compaction score for all levels is +// less than or equal to this value. +static double FLAGS_rate_limit = 0; + extern bool useOsBuffer; extern bool useFsReadAhead; extern bool useMmapRead; @@ -336,9 +345,11 @@ class Stats { (done_ - last_report_done_) / ((now - last_report_finish_) / 1000000.0)); - std::string stats; - if (db && db->GetProperty("leveldb.stats", &stats)) - fprintf(stderr, stats.c_str()); + if (FLAGS_stats_per_interval) { + std::string stats; + if (db && db->GetProperty("leveldb.stats", &stats)) + fprintf(stderr, stats.c_str()); + } fflush(stderr); next_report_ += FLAGS_stats_interval; @@ -903,6 +914,7 @@ class Benchmark { options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; + options.rate_limit = FLAGS_rate_limit; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -1316,6 +1328,12 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--stats_interval=%d%c", &n, &junk) == 1 && n >= 0 && n < 2000000000) { FLAGS_stats_interval = n; + } else if (sscanf(argv[i], "--stats_per_interval=%d%c", &n, &junk) == 1 + && (n == 0 || n == 1)) { + FLAGS_stats_per_interval = n; + } else if (sscanf(argv[i], "--rate_limit=%lf%c", &d, &junk) == 1 && + d > 1.0) { + FLAGS_rate_limit = d; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 678acf057..cd4cb05f3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -184,6 +184,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) disable_delete_obsolete_files_(false), delete_obsolete_files_last_run_(0), stall_level0_slowdown_(0), + stall_leveln_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), started_at_(options.env->NowMicros()) { @@ -1488,6 +1489,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { assert(!writers_.empty()); bool allow_delay = !force; Status s; + double score; while (true) { if (!bg_error_.ok()) { @@ -1528,6 +1530,18 @@ Status DBImpl::MakeRoomForWrite(bool force) { uint64_t t1 = env_->NowMicros(); bg_cv_.Wait(); stall_level0_num_files_ += env_->NowMicros() - t1; + } else if ( + allow_delay && + options_.rate_limit > 1.0 && + (score = versions_->MaxCompactionScore()) > options_.rate_limit) { + // Delay a write when the compaction score for any level is too large. + mutex_.Unlock(); + env_->SleepForMicroseconds(1000); + stall_leveln_slowdown_ += 1000; + allow_delay = false; // Do not delay a single write more than once + Log(options_.info_log, + "delaying write for rate limits with max score %.2f\n", score); + mutex_.Lock(); } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); @@ -1640,10 +1654,11 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf(buf, sizeof(buf), "Stalls(secs): %.3f level0_slowdown, %.3f level0_numfiles, " - "%.3f memtable_compaction\n", + "%.3f memtable_compaction, %.3f leveln_slowdown\n", stall_level0_slowdown_ / 1000000.0, stall_level0_num_files_ / 1000000.0, - stall_memtable_compaction_ / 1000000.0); + stall_memtable_compaction_ / 1000000.0, + stall_leveln_slowdown_ / 1000000.0); value->append(buf); return true; diff --git a/db/db_impl.h b/db/db_impl.h index fcedea010..7264a9627 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -217,6 +217,7 @@ class DBImpl : public DB { uint64_t stall_level0_slowdown_; uint64_t stall_memtable_compaction_; uint64_t stall_level0_num_files_; + uint64_t stall_leveln_slowdown_; // Time at which this instance was started. const uint64_t started_at_; diff --git a/db/version_set.h b/db/version_set.h index 5295bd058..66611fdb8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -235,6 +235,11 @@ class VersionSet { return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); } + // Returns the maxmimum compaction score for levels 1 to max + double MaxCompactionScore() const { + return current_->compaction_score_; + } + // Add all files listed in any live version to *live. // May also mutate some internal state. void AddLiveFiles(std::set* live); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c090532bf..3b94d829f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -253,6 +253,10 @@ struct Options { // log file. size_t max_log_file_size; + // Puts are delayed when any level has a compaction score that + // exceeds rate_limit. This is ignored when <= 1.0. + double rate_limit; + // Create an Options object with default values for all fields. Options(); diff --git a/util/options.cc b/util/options.cc index 0a2a70fc6..eb132877e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -43,7 +43,8 @@ Options::Options() db_log_dir(""), disable_seek_compaction(false), max_log_file_size(0), - delete_obsolete_files_period_micros(0) { + delete_obsolete_files_period_micros(0), + rate_limit(0.0) { } void @@ -97,6 +98,8 @@ Options::Dump( disable_seek_compaction); Log(log," Options.delete_obsolete_files_period_micros: %ld", delete_obsolete_files_period_micros); + Log(log," Options.rate_limit: %.2f", + rate_limit); } // Options::Dump -- GitLab