diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 711d34e5572af20269f429b64021417de4c15304..f20be594e6657c9e895af657922879c1e1f11e37 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4071,6 +4071,83 @@ TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) { Close(); } +// FixFileIngestionCompactionDeadlock tests and verifies that compaction and +// file ingestion do not cause deadlock in the event of write stall triggered +// by number of L0 files reaching level0_stop_writes_trigger. +TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { + const int kNumKeysPerFile = 100; + // Generate SST files. + Options options = CurrentOptions(); + + // Generate an external SST file containing a single key, i.e. 99 + std::string sst_files_dir = dbname_ + "/sst_files/"; + test::DestroyDir(env_, sst_files_dir); + ASSERT_OK(env_->CreateDir(sst_files_dir)); + SstFileWriter sst_writer(EnvOptions(), options); + const std::string sst_file_path = sst_files_dir + "test.sst"; + ASSERT_OK(sst_writer.Open(sst_file_path)); + ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value")); + ASSERT_OK(sst_writer.Finish()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter", + "BackgroundCallCompaction:0"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + options.write_buffer_size = 110 << 10; // 110KB + options.level0_file_num_compaction_trigger = + options.level0_stop_writes_trigger; + options.max_subcompactions = max_subcompactions_; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + Random rnd(301); + + // Generate level0_stop_writes_trigger L0 files to trigger write stop + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + for (int j = 0; j != kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 990))); + } + if (0 == i) { + // When we reach here, the memtables have kNumKeysPerFile keys. Note that + // flush is not yet triggered. We need to write an extra key so that the + // write path will call PreprocessWrite and flush the previous key-value + // pairs to e flushed. After that, there will be the newest key in the + // memtable, and a bunch of L0 files. Since there is already one key in + // the memtable, then for i = 1, 2, ..., we do not have to write this + // extra key to trigger flush. + ASSERT_OK(Put("", "")); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1); + } + // When we reach this point, there will be level0_stop_writes_trigger L0 + // files and one extra key (99) in memory, which overlaps with the external + // SST file. Write stall triggers, and can be cleared only after compaction + // reduces the number of L0 files. + + // Compaction will also be triggered since we have reached the threshold for + // auto compaction. Note that compaction may begin after the following file + // ingestion thread and waits for ingestion to finish. + + // Thread to ingest file with overlapping key range with the current + // memtable. Consequently ingestion will trigger a flush. The flush MUST + // proceed without waiting for the write stall condition to clear, otherwise + // deadlock can happen. + port::Thread ingestion_thr([&]() { + IngestExternalFileOptions ifo; + Status s = db_->IngestExternalFile({sst_file_path}, ifo); + ASSERT_OK(s); + }); + + // More write to trigger write stop + ingestion_thr.join(); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + Close(); +} + #endif // !defined(ROCKSDB_LITE) } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index 404d51da0c4a58a15ba7a553b0b9f26a62d0a3f4..e22ce20a43e29cb5cea237671b08e2c745c5fd09 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3116,6 +3116,7 @@ Status DBImpl::IngestExternalFile( } num_running_ingest_file_++; + TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter"); // We cannot ingest a file into a dropped CF if (cfd->IsDropped()) { @@ -3130,16 +3131,18 @@ Status DBImpl::IngestExternalFile( TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", &need_flush); if (status.ok() && need_flush) { + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; if (immutable_db_options_.atomic_flush) { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - status = AtomicFlushMemTables(cfds, FlushOptions(), + status = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kExternalFileIngestion, true /* writes_stopped */); } else { mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions(), + status = FlushMemTable(cfd, flush_opts, FlushReason::kExternalFileIngestion, true /* writes_stopped */); }