From 3b2a1ddd2e9ca0998aa711644258675324febf6a Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Thu, 28 Jan 2016 11:56:16 -0800 Subject: [PATCH] Add options.base_background_compactions as a number of compaction threads for low compaction debt Summary: If options.base_background_compactions is given, we try to schedule number of compactions not existing this number, only when L0 files increase to certain number, or pending compaction bytes more than certain threshold, we schedule compactions based on options.max_background_compactions. The watermarks are calculated based on slowdown thresholds. Test Plan: Add new test cases in column_family_test. Adding more unit tests. Reviewers: IslamAbdelRahman, yhchiang, kradhakrishnan, rven, anthony Reviewed By: anthony Subscribers: leveldb, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D53409 --- db/column_family.cc | 49 +++++++++++++ db/column_family_test.cc | 142 ++++++++++++++++++++++++++++++++++++++ db/db_compaction_test.cc | 117 +++++++++++++++++++++++++++++++ db/db_impl.cc | 28 ++++++-- db/db_impl.h | 4 ++ db/write_controller.cc | 12 ++++ db/write_controller.h | 18 ++++- include/rocksdb/options.h | 11 +++ util/options.cc | 5 ++ util/options_helper.h | 5 +- util/options_test.cc | 45 +----------- 11 files changed, 384 insertions(+), 52 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 4c12a35bd..ca3be7855 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -239,6 +239,17 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, result.level0_slowdown_writes_trigger, result.level0_file_num_compaction_trigger); } + + if (result.soft_pending_compaction_bytes_limit == 0) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } else if (result.hard_pending_compaction_bytes_limit > 0 && + result.soft_pending_compaction_bytes_limit > + result.hard_pending_compaction_bytes_limit) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } + if (result.level_compaction_dynamic_level_bytes) { if (result.compaction_style != kCompactionStyleLevel || db_options.db_paths.size() > 1U) { @@ -513,6 +524,21 @@ std::unique_ptr SetupDelay( } return write_controller->GetDelayToken(write_rate); } + +int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, + int level0_slowdown_writes_trigger) { + // SanitizeOptions() ensures it. + assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger); + + // 1/4 of the way between L0 compaction trigger threshold and slowdown + // condition. + // Or twice as compaction trigger, if it is smaller. + return std::min(level0_file_num_compaction_trigger * 2, + level0_file_num_compaction_trigger + + (level0_slowdown_writes_trigger - + level0_file_num_compaction_trigger) / + 4); +} } // namespace void ColumnFamilyData::RecalculateWriteStallConditions( @@ -598,6 +624,29 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "bytes %" PRIu64 " rate %" PRIu64, name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); + } else if (vstorage->l0_delay_trigger_count() >= + GetL0ThresholdSpeedupCompaction( + mutable_cf_options.level0_file_num_compaction_trigger, + mutable_cf_options.level0_slowdown_writes_trigger)) { + write_controller_token_ = write_controller->GetCompactionPressureToken(); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Increasing compaction threads because we have %d level-0 " + "files ", + name_.c_str(), vstorage->l0_delay_trigger_count()); + } else if (vstorage->estimated_compaction_needed_bytes() >= + mutable_cf_options.soft_pending_compaction_bytes_limit / 4) { + // Increase compaction threads if bytes needed for compaction exceeds + // 1/4 of threshold for slowing down. + // If soft pending compaction byte limit is not set, always speed up + // compaction. + write_controller_token_ = write_controller->GetCompactionPressureToken(); + if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) { + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Increasing compaction threads because of estimated pending " + "compaction " + "bytes %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes()); + } } else { write_controller_token_.reset(); } diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 62fadbbee..114451464 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2137,6 +2137,9 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + Open({"default"}); ColumnFamilyData* cfd = static_cast(db_->DefaultColumnFamily())->cfd(); @@ -2162,6 +2165,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(400); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2169,6 +2173,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2224,6 +2229,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->TEST_set_estimated_compaction_needed_bytes(3001); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2248,6 +2254,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); vstorage->set_l0_delay_trigger_count(101); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2320,6 +2327,73 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { dbfull()->TEST_write_controler().delayed_write_rate()); } +TEST_F(ColumnFamilyTest, CompactionSpeedupSingleColumnFamily) { + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + Open({"default"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + + // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 36; + mutable_cf_options.level0_stop_writes_trigger = 50; + // Speedup threshold = 200 / 4 = 50 + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + vstorage->TEST_set_estimated_compaction_needed_bytes(40); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(50); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(300); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(45); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(7); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(9); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(6); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + // Speed up threshold = min(4 * 2, 4 + (12 - 4)/4) = 6 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 16; + mutable_cf_options.level0_stop_writes_trigger = 30; + + vstorage->set_l0_delay_trigger_count(5); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(7); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(3); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); +} + TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { const uint64_t kBaseRate = 810000u; db_options_.delayed_write_rate = kBaseRate; @@ -2401,6 +2475,74 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { ASSERT_EQ(kBaseRate / 1.2, dbfull()->TEST_write_controler().delayed_write_rate()); } + +TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) { + db_options_.base_background_compactions = 2; + db_options_.max_background_compactions = 6; + column_family_options_.soft_pending_compaction_bytes_limit = 200; + column_family_options_.hard_pending_compaction_bytes_limit = 2000; + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + ColumnFamilyData* cfd1 = + static_cast(handles_[1])->cfd(); + VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + // Speed up threshold = min(4 * 2, 4 + (36 - 4)/4) = 8 + mutable_cf_options.level0_file_num_compaction_trigger = 4; + mutable_cf_options.level0_slowdown_writes_trigger = 36; + mutable_cf_options.level0_stop_writes_trigger = 30; + // Speedup threshold = 200 / 4 = 50 + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + MutableCFOptions mutable_cf_options1 = mutable_cf_options; + mutable_cf_options1.level0_slowdown_writes_trigger = 16; + + vstorage->TEST_set_estimated_compaction_needed_bytes(40); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(60); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(30); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(70); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(20); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(3); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(9); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage1->set_l0_delay_trigger_count(2); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(6, dbfull()->BGCompactionsAllowed()); + + vstorage->set_l0_delay_trigger_count(0); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_EQ(2, dbfull()->BGCompactionsAllowed()); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d29b50e7f..5d9e0536f 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -533,6 +533,104 @@ TEST_P(DBCompactionTestWithParam, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +TEST_F(DBCompactionTest, BGCompactionsAllowed) { + // Create several column families. Make compaction triggers in all of them + // and see number of compactions scheduled to be less than allowed. + const int kNumKeysPerFile = 100; + + Options options; + options.write_buffer_size = 110 << 10; // 110KB + options.arena_block_size = 4 << 10; + options.num_levels = 3; + // Should speed up compaction when there are 4 files. + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 20; + options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large + options.base_background_compactions = 1; + options.max_background_compactions = 3; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options = CurrentOptions(options); + + // Block all threads in thread pool. + const size_t kTotalTasks = 4; + env_->SetBackgroundThreads(4, Env::LOW); + test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; + for (size_t i = 0; i < kTotalTasks; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], Env::Priority::LOW); + sleeping_tasks[i].WaitUntilSleeping(); + } + + CreateAndReopenWithCF({"one", "two", "three"}, options); + + Random rnd(301); + for (int cf = 0; cf < 4; cf++) { + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); + } + } + + // Now all column families qualify compaction but only one should be + // scheduled, because no column family hits speed up condition. + ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + // Create two more files for one column family, which triggers speed up + // condition, three compactions will be scheduled. + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(2, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(2, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[2]); + ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1, + NumTableFilesAtLevel(0, 2)); + } + ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + // Unblock all threads to unblock all compactions. + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } + dbfull()->TEST_WaitForCompact(); + + // Verify number of compactions allowed will come back to 1. + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].Reset(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], Env::Priority::LOW); + sleeping_tasks[i].WaitUntilSleeping(); + } + for (int cf = 0; cf < 4; cf++) { + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); + } + } + + // Now all column families qualify compaction but only one should be + // scheduled, because no column family hits speed up condition. + ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + + for (size_t i = 0; i < kTotalTasks; i++) { + sleeping_tasks[i].WakeUp(); + sleeping_tasks[i].WaitUntilDone(); + } +} + TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) { Options options; options.write_buffer_size = 100000000; // Large write buffer @@ -2198,6 +2296,25 @@ TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) { Destroy(options); } +TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) { + Options options = CurrentOptions(); + options.max_background_compactions = 5; + options.soft_pending_compaction_bytes_limit = 0; + options.hard_pending_compaction_bytes_limit = 100; + options.create_if_missing = true; + DestroyAndReopen(options); + ASSERT_EQ(5, db_->GetOptions().base_background_compactions); + ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit); + + options.base_background_compactions = 4; + options.max_background_compactions = 3; + options.soft_pending_compaction_bytes_limit = 200; + options.hard_pending_compaction_bytes_limit = 150; + DestroyAndReopen(options); + ASSERT_EQ(3, db_->GetOptions().base_background_compactions); + ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit); +} + // This tests for a bug that could cause two level0 compactions running // concurrently // TODO(aekmekji): Make sure that the reason this fails when run with diff --git a/db/db_impl.cc b/db/db_impl.cc index 6db05ae66..2afab389d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -146,6 +146,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.info_log = nullptr; } } + if (result.base_background_compactions == -1) { + result.base_background_compactions = result.max_background_compactions; + } + if (result.base_background_compactions > result.max_background_compactions) { + result.base_background_compactions = result.max_background_compactions; + } result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions, Env::Priority::LOW); result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes, @@ -2448,12 +2454,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); } + auto bg_compactions_allowed = BGCompactionsAllowed(); + // special case -- if max_background_flushes == 0, then schedule flush on a // compaction thread if (db_options_.max_background_flushes == 0) { while (unscheduled_flushes_ > 0 && bg_flush_scheduled_ + bg_compaction_scheduled_ < - db_options_.max_background_compactions) { + bg_compactions_allowed) { unscheduled_flushes_--; bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); @@ -2466,7 +2474,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { return; } - while (bg_compaction_scheduled_ < db_options_.max_background_compactions && + while (bg_compaction_scheduled_ < bg_compactions_allowed && unscheduled_compactions_ > 0) { CompactionArg* ca = new CompactionArg; ca->db = this; @@ -2478,6 +2486,14 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } +int DBImpl::BGCompactionsAllowed() const { + if (write_controller_.NeedSpeedupCompaction()) { + return db_options_.max_background_compactions; + } else { + return db_options_.base_background_compactions; + } +} + void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { assert(!cfd->pending_compaction()); cfd->Ref(); @@ -2590,10 +2606,10 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogToBuffer( log_buffer, "Calling FlushMemTableToOutputFile with column " - "family [%s], flush slots available %d, compaction slots available %d", - cfd->GetName().c_str(), - db_options_.max_background_flushes - bg_flush_scheduled_, - db_options_.max_background_compactions - bg_compaction_scheduled_); + "family [%s], flush slots available %d, compaction slots allowed %d, " + "compaction slots scheduled %d", + cfd->GetName().c_str(), db_options_.max_background_flushes, + bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_); status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress, job_context, log_buffer); if (cfd->Unref()) { diff --git a/db/db_impl.h b/db/db_impl.h index 683fd49dc..d09d645d7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -347,6 +347,10 @@ class DBImpl : public DB { #endif // NDEBUG + // Return maximum background compaction alowed to be scheduled based on + // compaction status. + int BGCompactionsAllowed() const; + // Returns the list of live files in 'live' and the list // of all files in the filesystem in 'candidate_files'. // If force == false and the last call was less than diff --git a/db/write_controller.cc b/db/write_controller.cc index 7a933ec42..a0c18835f 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -26,6 +26,13 @@ std::unique_ptr WriteController::GetDelayToken( return std::unique_ptr(new DelayWriteToken(this)); } +std::unique_ptr +WriteController::GetCompactionPressureToken() { + ++total_compaction_pressure_; + return std::unique_ptr( + new CompactionPressureToken(this)); +} + bool WriteController::IsStopped() const { return total_stopped_ > 0; } // This is inside DB mutex, so we can't sleep and need to minimize // frequency to get time. @@ -106,4 +113,9 @@ DelayWriteToken::~DelayWriteToken() { assert(controller_->total_delayed_ >= 0); } +CompactionPressureToken::~CompactionPressureToken() { + controller_->total_compaction_pressure_--; + assert(controller_->total_compaction_pressure_ >= 0); +} + } // namespace rocksdb diff --git a/db/write_controller.h b/db/write_controller.h index a5d498c3a..6cba2c643 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -23,6 +23,7 @@ class WriteController { explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u) : total_stopped_(0), total_delayed_(0), + total_compaction_pressure_(0), bytes_left_(0), last_refill_time_(0) { set_delayed_write_rate(_delayed_write_rate); @@ -38,10 +39,16 @@ class WriteController { // which returns number of microseconds to sleep. std::unique_ptr GetDelayToken( uint64_t delayed_write_rate); + // When an actor (column family) requests a moderate token, compaction + // threads will be increased + std::unique_ptr GetCompactionPressureToken(); - // these two metods are querying the state of the WriteController + // these three metods are querying the state of the WriteController bool IsStopped() const; bool NeedsDelay() const { return total_delayed_ > 0; } + bool NeedSpeedupCompaction() const { + return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; + } // return how many microseconds the caller needs to sleep after the call // num_bytes: how many number of bytes to put into the DB. // Prerequisite: DB mutex held. @@ -59,9 +66,11 @@ class WriteController { friend class WriteControllerToken; friend class StopWriteToken; friend class DelayWriteToken; + friend class CompactionPressureToken; int total_stopped_; int total_delayed_; + int total_compaction_pressure_; uint64_t bytes_left_; uint64_t last_refill_time_; uint64_t delayed_write_rate_; @@ -96,4 +105,11 @@ class DelayWriteToken : public WriteControllerToken { virtual ~DelayWriteToken(); }; +class CompactionPressureToken : public WriteControllerToken { + public: + explicit CompactionPressureToken(WriteController* controller) + : WriteControllerToken(controller) {} + virtual ~CompactionPressureToken(); +}; + } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a3f410422..9eee07ac0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -933,8 +933,19 @@ struct DBOptions { // regardless of this setting uint64_t delete_obsolete_files_period_micros; + // Suggested number of concurrent background compaction jobs, submitted to + // the default LOW priority thread pool. + // + // Default: max_background_compactions + int base_background_compactions; + // Maximum number of concurrent background compaction jobs, submitted to // the default LOW priority thread pool. + // We first try to schedule compactions based on + // `base_background_compactions`. If the compaction cannot catch up , we + // will increase number of compaction threads up to + // `max_background_compactions`. + // // If you're increasing this, also consider increasing number of threads in // LOW priority thread pool. For more information, see // Env::SetBackgroundThreads diff --git a/util/options.cc b/util/options.cc index d21d2a24b..00d797167 100644 --- a/util/options.cc +++ b/util/options.cc @@ -229,6 +229,7 @@ DBOptions::DBOptions() db_log_dir(""), wal_dir(""), delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000), + base_background_compactions(-1), max_background_compactions(1), max_subcompactions(1), max_background_flushes(1), @@ -295,6 +296,7 @@ DBOptions::DBOptions(const Options& options) wal_dir(options.wal_dir), delete_obsolete_files_period_micros( options.delete_obsolete_files_period_micros), + base_background_compactions(options.base_background_compactions), max_background_compactions(options.max_background_compactions), max_subcompactions(options.max_subcompactions), max_background_flushes(options.max_background_flushes), @@ -383,6 +385,8 @@ void DBOptions::Dump(Logger* log) const { table_cache_numshardbits); Header(log, " Options.delete_obsolete_files_period_micros: %" PRIu64, delete_obsolete_files_period_micros); + Header(log, " Options.base_background_compactions: %d", + base_background_compactions); Header(log, " Options.max_background_compactions: %d", max_background_compactions); Header(log, " Options.max_subcompactions: %" PRIu32, @@ -652,6 +656,7 @@ Options::PrepareForBulkLoad() // to L1. This is helpful so that all files that are // input to the manual compaction are all at L0. max_background_compactions = 2; + base_background_compactions = 2; // The compaction would create large files in L1. target_file_size_base = 256 * 1024 * 1024; diff --git a/util/options_helper.h b/util/options_helper.h index 84d547cfc..4c4555aca 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -208,7 +208,7 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, random_access_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"writable_file_max_buffer_size", - {offsetof(struct DBOptions, writable_file_max_buffer_size), + {offsetof(struct DBOptions, writable_file_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal}}, {"use_adaptive_mutex", {offsetof(struct DBOptions, use_adaptive_mutex), OptionType::kBoolean, @@ -219,6 +219,9 @@ static std::unordered_map db_options_type_info = { {"max_background_compactions", {offsetof(struct DBOptions, max_background_compactions), OptionType::kInt, OptionVerificationType::kNormal}}, + {"base_background_compactions", + {offsetof(struct DBOptions, base_background_compactions), OptionType::kInt, + OptionVerificationType::kNormal}}, {"max_background_flushes", {offsetof(struct DBOptions, max_background_flushes), OptionType::kInt, OptionVerificationType::kNormal}}, diff --git a/util/options_test.cc b/util/options_test.cc index 65c45c2b0..961593bd0 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -1669,50 +1669,7 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) { "table_cache_numshardbits=28;" "max_open_files=72;" "max_file_opening_threads=35;" - "max_background_compactions=33;" - "use_fsync=true;" - "use_adaptive_mutex=false;" - "max_total_wal_size=4295005604;" - "compaction_readahead_size=0;" - "new_table_reader_for_compaction_inputs=false;" - "keep_log_file_num=4890;" - "skip_stats_update_on_db_open=false;" - "max_manifest_file_size=4295009941;" - "db_log_dir=path/to/db_log_dir;" - "skip_log_error_on_recovery=true;" - "writable_file_max_buffer_size=1048576;" - "paranoid_checks=true;" - "is_fd_close_on_exec=false;" - "bytes_per_sync=4295013613;" - "enable_thread_tracking=false;" - "disable_data_sync=false;" - "recycle_log_file_num=0;" - "disableDataSync=false;" - "create_missing_column_families=true;" - "log_file_time_to_roll=3097;" - "max_background_flushes=35;" - "create_if_missing=false;" - "error_if_exists=true;" - "allow_os_buffer=false;" - "delayed_write_rate=4294976214;" - "manifest_preallocation_size=1222;" - "allow_mmap_writes=false;" - "stats_dump_period_sec=70127;" - "allow_fallocate=true;" - "allow_mmap_reads=false;" - "max_log_file_size=4607;" - "random_access_max_buffer_size=1048576;" - "advise_random_on_open=true;" - "wal_bytes_per_sync=4295048118;" - "delete_obsolete_files_period_micros=4294967758;" - "WAL_ttl_seconds=4295008036;" - "WAL_size_limit_MB=4295036161;" - "wal_dir=path/to/wal_dir;" - "db_write_buffer_size=2587;" - "max_subcompactions=64330;" - "table_cache_numshardbits=28;" - "max_open_files=72;" - "max_file_opening_threads=35;" + "base_background_compactions=3;" "max_background_compactions=33;" "use_fsync=true;" "use_adaptive_mutex=false;" -- GitLab