diff --git a/HISTORY.md b/HISTORY.md index bbe2d02035a139cbb3ceeea0ffd230720a56f387..7e32e73f7b9a611dd83ecca96430d687cfb014aa 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -14,6 +14,7 @@ * Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage. * Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks. * Fsync parent directory after deleting a file in delete scheduler. +* In level-based compaction, if bottom-pri thread pool was setup via `Env::SetBackgroundThreads()`, compactions to the bottom level will be delegated to that thread pool. ### Bug Fixes * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 3cc02ecbe61a65e0b01c6dc6387caed5c3d63a3f..7092afa3797394c824fd95cfc56bd7aa607b8a8e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3128,6 +3128,48 @@ TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) { ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound()); } +TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) { + const int kNumFilesTrigger = 3; + Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); + for (bool use_universal_compaction : {false, true}) { + Options options = CurrentOptions(); + if (use_universal_compaction) { + options.compaction_style = kCompactionStyleUniversal; + } else { + options.compaction_style = kCompactionStyleLevel; + options.level_compaction_dynamic_level_bytes = true; + } + options.num_levels = 4; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB + options.level0_file_num_compaction_trigger = kNumFilesTrigger; + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + DestroyAndReopen(options); + + int num_bottom_pri_compactions = 0; + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkBottomCompaction", + [&](void* /*arg*/) { ++num_bottom_pri_compactions; }); + SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int num = 0; num < kNumFilesTrigger; num++) { + ASSERT_EQ(NumSortedRuns(), num); + int key_idx = 0; + GenerateNewFile(&rnd, &key_idx); + } + dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(1, num_bottom_pri_compactions); + + // Verify that size amplification did occur + ASSERT_EQ(NumSortedRuns(), 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } + Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); +} + TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) { // Deletions can be dropped when compacted to non-last level if they fall // outside the lower-level files' key-ranges. diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 9c43b6ab9557c56e67afd0835754d3a17e631d85..34d870e8d6f0eec570ed606c470140aeb0cb576e 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -1866,9 +1866,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); - } else if (c->column_family_data()->ioptions()->compaction_style == - kCompactionStyleUniversal && - !is_prepicked && c->output_level() > 0 && + } else if (!is_prepicked && c->output_level() > 0 && c->output_level() == c->column_family_data() ->current() @@ -1876,9 +1874,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ->MaxOutputLevel( immutable_db_options_.allow_ingest_behind) && env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) { - // Forward universal compactions involving last level to the bottom pool - // if it exists, such that long-running compactions can't block short- - // lived ones, like L0->L0s. + // Forward compactions involving last level to the bottom pool if it exists, + // such that compactions unlikely to contribute to write stalls can be + // delayed or deprioritized. TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool"); CompactionArg* ca = new CompactionArg; ca->db = this; diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 09727c690dffe1b3f459b4b81cab26518681eea2..d9dcf03e762fb826228ad2185d13be5706329503 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -1680,50 +1680,6 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionSecondPathRatio) { Destroy(options); } -TEST_P(DBTestUniversalCompaction, FullCompactionInBottomPriThreadPool) { - const int kNumFilesTrigger = 3; - Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); - for (bool allow_ingest_behind : {false, true}) { - Options options = CurrentOptions(); - options.allow_ingest_behind = allow_ingest_behind; - options.compaction_style = kCompactionStyleUniversal; - options.num_levels = num_levels_; - options.write_buffer_size = 100 << 10; // 100KB - options.target_file_size_base = 32 << 10; // 32KB - options.level0_file_num_compaction_trigger = kNumFilesTrigger; - // Trigger compaction if size amplification exceeds 110% - options.compaction_options_universal.max_size_amplification_percent = 110; - DestroyAndReopen(options); - - int num_bottom_pri_compactions = 0; - SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BGWorkBottomCompaction", - [&](void* /*arg*/) { ++num_bottom_pri_compactions; }); - SyncPoint::GetInstance()->EnableProcessing(); - - Random rnd(301); - for (int num = 0; num < kNumFilesTrigger; num++) { - ASSERT_EQ(NumSortedRuns(), num); - int key_idx = 0; - GenerateNewFile(&rnd, &key_idx); - } - dbfull()->TEST_WaitForCompact(); - - if (allow_ingest_behind || num_levels_ > 1) { - // allow_ingest_behind increases number of levels while sanitizing. - ASSERT_EQ(1, num_bottom_pri_compactions); - } else { - // for single-level universal, everything's bottom level so nothing should - // be executed in bottom-pri thread pool. - ASSERT_EQ(0, num_bottom_pri_compactions); - } - // Verify that size amplification did occur - ASSERT_EQ(NumSortedRuns(), 1); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - } - Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); -} - TEST_P(DBTestUniversalCompaction, ConcurrentBottomPriLowPriCompactions) { if (num_levels_ == 1) { // for single-level universal, everything's bottom level so nothing should