diff --git a/db/db_impl.cc b/db/db_impl.cc index 85fbc112dcf923a5d22f6b194a1c4414fe7b9a10..46b03d27e59e0cfd0c7c67a0b8386f748be0e0ad 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -346,7 +346,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) has_unpersisted_data_(false), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), num_running_addfile_(0), - addfile_cv_(&mutex_), #ifndef ROCKSDB_LITE wal_manager_(immutable_db_options_, env_options_), #endif // ROCKSDB_LITE @@ -2033,7 +2032,6 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, int max_level_with_files = 0; { InstrumentedMutexLock l(&mutex_); - WaitForAddFile(); Version* base = cfd->current(); for (int level = 1; level < base->storage_info()->num_non_empty_levels(); level++) { @@ -2746,6 +2744,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.end = &end_storage; } + TEST_SYNC_POINT("DBImpl::RunManualCompaction:0"); + TEST_SYNC_POINT("DBImpl::RunManualCompaction:1"); InstrumentedMutexLock l(&mutex_); // When a manual compaction arrives, temporarily disable scheduling of @@ -2813,6 +2813,10 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ca->m = &manual; manual.incomplete = false; bg_compaction_scheduled_++; + // manual.compaction will be added to running_compactions_ and erased + // inside BackgroundCompaction() but we need to put it now since we + // will unlock the mutex. + running_compactions_.insert(manual.compaction); env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, &DBImpl::UnscheduleCallback); scheduled = true; @@ -3653,6 +3657,11 @@ void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) { } bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) { + if (num_running_addfile_ > 0) { + // We need to wait for other AddFile() calls to finish + // before running a manual compaction. + return true; + } if (m->exclusive) { return (bg_compaction_scheduled_ > 0); } diff --git a/db/db_impl.h b/db/db_impl.h index 4b0768ae1c2e486b4f875039bc98c46e675fc57a..4eff025a5c948c2edd791d73fe77504028ee799c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -655,11 +655,12 @@ class DBImpl : public DB { // REQUIRES: mutex_ held void WaitForAddFile(); - Status CompactFilesImpl( - const CompactionOptions& compact_options, ColumnFamilyData* cfd, - Version* version, const std::vector& input_file_names, - const int output_level, int output_path_id, JobContext* job_context, - LogBuffer* log_buffer); + Status CompactFilesImpl(const CompactionOptions& compact_options, + ColumnFamilyData* cfd, Version* version, + const std::vector& input_file_names, + const int output_level, int output_path_id, + JobContext* job_context, LogBuffer* log_buffer); + Status ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, const std::string& file_path, ExternalSstFileInfo* file_info); @@ -737,6 +738,7 @@ class DBImpl : public DB { // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases // (i.e. whenever a flush is done, even if it didn't make any progress) // * whenever there is an error in background purge, flush or compaction + // * whenever num_running_addfile_ goes to 0. InstrumentedCondVar bg_cv_; uint64_t logfile_number_; std::deque @@ -986,10 +988,6 @@ class DBImpl : public DB { // REQUIRES: mutex held int num_running_addfile_; - // A condition variable that will be signaled whenever - // num_running_addfile_ goes to 0. - InstrumentedCondVar addfile_cv_; - #ifndef ROCKSDB_LITE WalManager wal_manager_; #endif // ROCKSDB_LITE diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc index e90a5669449951bb5a923a38f231170cceb0bd22..4277e93c02ff17c066d8506754b98205cd712ce5 100644 --- a/db/db_impl_add_file.cc +++ b/db/db_impl_add_file.cc @@ -340,7 +340,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, num_running_addfile_--; if (num_running_addfile_ == 0) { - addfile_cv_.SignalAll(); + bg_cv_.SignalAll(); } TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock"); } // mutex_ is unlocked here; @@ -426,7 +426,7 @@ int DBImpl::PickLevelForIngestedFile(ColumnFamilyData* cfd, void DBImpl::WaitForAddFile() { mutex_.AssertHeld(); while (num_running_addfile_ > 0) { - addfile_cv_.Wait(); + bg_cv_.Wait(); } } #endif // ROCKSDB_LITE diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index d752786809bfc6ee6f86e40f0232ebcf9baaa77a..b79f1d957248a8f7645caa5a572d00863c10a2dd 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1011,10 +1011,13 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) { // We have 2 overlapping files in L0 EXPECT_EQ(FilesPerLevel(), "2"); - rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"}, - {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"}, - }); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::AddFile:MutexLock", "ExternalSSTFileTest::PickedLevelBug:0"}, + {"ExternalSSTFileTest::PickedLevelBug:1", "DBImpl::AddFile:MutexUnlock"}, + {"ExternalSSTFileTest::PickedLevelBug:2", + "DBImpl::RunManualCompaction:0"}, + {"ExternalSSTFileTest::PickedLevelBug:3", + "DBImpl::RunManualCompaction:1"}}); std::atomic bg_compact_started(false); rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -1023,6 +1026,12 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) { rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // While writing the MANIFEST start a thread that will ask for compaction + std::thread bg_compact([&]() { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + }); + TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:2"); + // Start a thread that will ingest a new file std::thread bg_addfile([&]() { file_keys = {1, 2, 3}; @@ -1032,10 +1041,7 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) { // Wait for AddFile to start picking levels and writing MANIFEST TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:0"); - // While writing the MANIFEST start a thread that will ask for compaction - std::thread bg_compact([&]() { - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - }); + TEST_SYNC_POINT("ExternalSSTFileTest::PickedLevelBug:3"); // We need to verify that no compactions can run while AddFile is // ingesting the files into the levels it find suitable. So we will @@ -1065,6 +1071,51 @@ TEST_F(ExternalSSTFileTest, PickedLevelBug) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { + Options options = CurrentOptions(); + options.disable_auto_compactions = false; + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 2; + options.env = env_; + DestroyAndReopen(options); + + std::function bg_compact = [&]() { + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + }; + + int range_id = 0; + std::vector file_keys; + std::function bg_addfile = [&]() { + ASSERT_OK(GenerateAndAddExternalFile(options, file_keys, range_id)); + }; + + std::vector threads; + while (range_id < 5000) { + int range_start = (range_id * 20); + int range_end = range_start + 10; + + file_keys.clear(); + for (int k = range_start + 1; k < range_end; k++) { + file_keys.push_back(k); + } + ASSERT_OK(Put(Key(range_start), Key(range_start))); + ASSERT_OK(Put(Key(range_end), Key(range_end))); + ASSERT_OK(Flush()); + + if (range_id % 10 == 0) { + threads.emplace_back(bg_compact); + } + threads.emplace_back(bg_addfile); + + for (auto& t : threads) { + t.join(); + } + threads.clear(); + + range_id++; + } +} + TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { Options options = CurrentOptions(); options.disable_auto_compactions = false;