diff --git a/db/db_impl.cc b/db/db_impl.cc index bc9f18177d18e3a271e0d03b5864e66cc8a3b408..d86f32bb66c11e1c46513453f154f5efbbfda4bf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2159,6 +2159,7 @@ void DBImpl::NotifyOnCompactionCompleted( } // release lock while notifying events mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex"); { CompactionJobInfo info; info.cf_name = cfd->GetName(); @@ -3267,11 +3268,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *made_progress = true; } if (c != nullptr) { + c->ReleaseCompactionFiles(status); + *made_progress = true; NotifyOnCompactionCompleted( c->column_family_data(), c.get(), status, compaction_job_stats, job_context->job_id); - c->ReleaseCompactionFiles(status); - *made_progress = true; } // this will unref its input_version and column_family_data c.reset(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 4c80bb446f536827de7364c72323380e07a3a035..87a09595edfdc23575e6a4fa1a88f308e47c43d3 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -897,6 +897,75 @@ TEST_F(DBTest2, CompressionOptions) { ASSERT_EQ(listener->max_level_checked, 6); } } + +class CompactionStallTestListener : public EventListener { + public: + CompactionStallTestListener() : compacted_files_cnt_(0) {} + + void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override { + ASSERT_EQ(ci.cf_name, "default"); + ASSERT_EQ(ci.base_input_level, 0); + ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum); + compacted_files_cnt_ += ci.input_files.size(); + } + std::atomic compacted_files_cnt_; +}; + +TEST_F(DBTest2, CompactionStall) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"}, + {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"}, + {"DBTest2::CompactionStall:2", + "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = 4; + options.max_background_compactions = 40; + CompactionStallTestListener* listener = new CompactionStallTestListener(); + options.listeners.emplace_back(listener); + DestroyAndReopen(options); + + Random rnd(301); + + // 4 Files in L0 + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10))); + } + ASSERT_OK(Flush()); + } + + // Wait for compaction to be triggered + TEST_SYNC_POINT("DBTest2::CompactionStall:0"); + + // Clear "DBImpl::BGWorkCompaction" SYNC_POINT since we want to hold it again + // at DBTest2::CompactionStall::1 + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + + // Another 6 L0 files to trigger compaction again + for (int i = 0; i < 6; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10))); + } + ASSERT_OK(Flush()); + } + + // Wait for another compaction to be triggered + TEST_SYNC_POINT("DBTest2::CompactionStall:1"); + + // Hold NotifyOnCompactionCompleted in the unlock mutex section + TEST_SYNC_POINT("DBTest2::CompactionStall:2"); + + dbfull()->TEST_WaitForCompact(); + ASSERT_LT(NumTableFilesAtLevel(0), + options.level0_file_num_compaction_trigger); + ASSERT_GT(listener->compacted_files_cnt_.load(), + 10 - options.level0_file_num_compaction_trigger); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + #endif // ROCKSDB_LITE TEST_F(DBTest2, FirstSnapshotTest) {