From 9be3e6b48884c80733fa791b982a02f62a199e92 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 5 Dec 2018 14:57:06 -0800 Subject: [PATCH] Allow file-ingest-triggered flush to skip waiting for write-stall clear (#4751) Summary: When write stall has already been triggered due to number of L0 files reaching threshold, file ingestion must proceed with its flush without waiting for the write stall condition to cleared by the compaction because compaction can wait for ingestion to finish (circular wait). In order to avoid this wait, we can set `FlushOptions.allow_write_stall` to be true (default is false). Setting it to false can cause deadlock. This can happen when the number of compaction threads is low. Considere the following ``` Time compaction_thread ingestion_thread | num_running_ingest_file_++ | while(num_running_ingest_file_>0){wait} | flush V ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/4751 Differential Revision: D13343037 Pulled By: riversand963 fbshipit-source-id: d3b95938814af46ec4c463feff0b50c70bd8b23f --- db/db_compaction_test.cc | 77 ++++++++++++++++++++++++++++++++++++++++ db/db_impl.cc | 7 ++-- 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 711d34e55..f20be594e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -4071,6 +4071,83 @@ TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) { Close(); } +// FixFileIngestionCompactionDeadlock tests and verifies that compaction and +// file ingestion do not cause deadlock in the event of write stall triggered +// by number of L0 files reaching level0_stop_writes_trigger. +TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { + const int kNumKeysPerFile = 100; + // Generate SST files. + Options options = CurrentOptions(); + + // Generate an external SST file containing a single key, i.e. 99 + std::string sst_files_dir = dbname_ + "/sst_files/"; + test::DestroyDir(env_, sst_files_dir); + ASSERT_OK(env_->CreateDir(sst_files_dir)); + SstFileWriter sst_writer(EnvOptions(), options); + const std::string sst_file_path = sst_files_dir + "test.sst"; + ASSERT_OK(sst_writer.Open(sst_file_path)); + ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value")); + ASSERT_OK(sst_writer.Finish()); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter", + "BackgroundCallCompaction:0"}, + }); + SyncPoint::GetInstance()->EnableProcessing(); + + options.write_buffer_size = 110 << 10; // 110KB + options.level0_file_num_compaction_trigger = + options.level0_stop_writes_trigger; + options.max_subcompactions = max_subcompactions_; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + Random rnd(301); + + // Generate level0_stop_writes_trigger L0 files to trigger write stop + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + for (int j = 0; j != kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 990))); + } + if (0 == i) { + // When we reach here, the memtables have kNumKeysPerFile keys. Note that + // flush is not yet triggered. We need to write an extra key so that the + // write path will call PreprocessWrite and flush the previous key-value + // pairs to e flushed. After that, there will be the newest key in the + // memtable, and a bunch of L0 files. Since there is already one key in + // the memtable, then for i = 1, 2, ..., we do not have to write this + // extra key to trigger flush. + ASSERT_OK(Put("", "")); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1); + } + // When we reach this point, there will be level0_stop_writes_trigger L0 + // files and one extra key (99) in memory, which overlaps with the external + // SST file. Write stall triggers, and can be cleared only after compaction + // reduces the number of L0 files. + + // Compaction will also be triggered since we have reached the threshold for + // auto compaction. Note that compaction may begin after the following file + // ingestion thread and waits for ingestion to finish. + + // Thread to ingest file with overlapping key range with the current + // memtable. Consequently ingestion will trigger a flush. The flush MUST + // proceed without waiting for the write stall condition to clear, otherwise + // deadlock can happen. + port::Thread ingestion_thr([&]() { + IngestExternalFileOptions ifo; + Status s = db_->IngestExternalFile({sst_file_path}, ifo); + ASSERT_OK(s); + }); + + // More write to trigger write stop + ingestion_thr.join(); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + Close(); +} + #endif // !defined(ROCKSDB_LITE) } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index 404d51da0..e22ce20a4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3116,6 +3116,7 @@ Status DBImpl::IngestExternalFile( } num_running_ingest_file_++; + TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter"); // We cannot ingest a file into a dropped CF if (cfd->IsDropped()) { @@ -3130,16 +3131,18 @@ Status DBImpl::IngestExternalFile( TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush", &need_flush); if (status.ok() && need_flush) { + FlushOptions flush_opts; + flush_opts.allow_write_stall = true; if (immutable_db_options_.atomic_flush) { autovector cfds; SelectColumnFamiliesForAtomicFlush(&cfds); mutex_.Unlock(); - status = AtomicFlushMemTables(cfds, FlushOptions(), + status = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kExternalFileIngestion, true /* writes_stopped */); } else { mutex_.Unlock(); - status = FlushMemTable(cfd, FlushOptions(), + status = FlushMemTable(cfd, flush_opts, FlushReason::kExternalFileIngestion, true /* writes_stopped */); } -- GitLab