提交 9be3e6b4 编写于 作者: Y Yanqin Jin 提交者: Facebook Github Bot

Allow file-ingest-triggered flush to skip waiting for write-stall clear (#4751)

Summary:
When write stall has already been triggered due to number of L0 files reaching
threshold, file ingestion must proceed with its flush without waiting for the
write stall condition to cleared by the compaction because compaction can wait
for ingestion to finish (circular wait).

In order to avoid this wait, we can set `FlushOptions.allow_write_stall` to be
true (default is false). Setting it to false can cause deadlock.

This can happen when the number of compaction threads is low.

Considere the following
```
Time  compaction_thread                        ingestion_thread
 |                                             num_running_ingest_file_++
 |    while(num_running_ingest_file_>0){wait}
 |                                             flush
 V
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4751

Differential Revision: D13343037

Pulled By: riversand963

fbshipit-source-id: d3b95938814af46ec4c463feff0b50c70bd8b23f
上级 b96fccb1
......@@ -4071,6 +4071,83 @@ TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
Close();
}
// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
// file ingestion do not cause deadlock in the event of write stall triggered
// by number of L0 files reaching level0_stop_writes_trigger.
TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
const int kNumKeysPerFile = 100;
// Generate SST files.
Options options = CurrentOptions();
// Generate an external SST file containing a single key, i.e. 99
std::string sst_files_dir = dbname_ + "/sst_files/";
test::DestroyDir(env_, sst_files_dir);
ASSERT_OK(env_->CreateDir(sst_files_dir));
SstFileWriter sst_writer(EnvOptions(), options);
const std::string sst_file_path = sst_files_dir + "test.sst";
ASSERT_OK(sst_writer.Open(sst_file_path));
ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
ASSERT_OK(sst_writer.Finish());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
"BackgroundCallCompaction:0"},
});
SyncPoint::GetInstance()->EnableProcessing();
options.write_buffer_size = 110 << 10; // 110KB
options.level0_file_num_compaction_trigger =
options.level0_stop_writes_trigger;
options.max_subcompactions = max_subcompactions_;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
Random rnd(301);
// Generate level0_stop_writes_trigger L0 files to trigger write stop
for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
for (int j = 0; j != kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
}
if (0 == i) {
// When we reach here, the memtables have kNumKeysPerFile keys. Note that
// flush is not yet triggered. We need to write an extra key so that the
// write path will call PreprocessWrite and flush the previous key-value
// pairs to e flushed. After that, there will be the newest key in the
// memtable, and a bunch of L0 files. Since there is already one key in
// the memtable, then for i = 1, 2, ..., we do not have to write this
// extra key to trigger flush.
ASSERT_OK(Put("", ""));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
}
// When we reach this point, there will be level0_stop_writes_trigger L0
// files and one extra key (99) in memory, which overlaps with the external
// SST file. Write stall triggers, and can be cleared only after compaction
// reduces the number of L0 files.
// Compaction will also be triggered since we have reached the threshold for
// auto compaction. Note that compaction may begin after the following file
// ingestion thread and waits for ingestion to finish.
// Thread to ingest file with overlapping key range with the current
// memtable. Consequently ingestion will trigger a flush. The flush MUST
// proceed without waiting for the write stall condition to clear, otherwise
// deadlock can happen.
port::Thread ingestion_thr([&]() {
IngestExternalFileOptions ifo;
Status s = db_->IngestExternalFile({sst_file_path}, ifo);
ASSERT_OK(s);
});
// More write to trigger write stop
ingestion_thr.join();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Close();
}
#endif // !defined(ROCKSDB_LITE)
} // namespace rocksdb
......
......@@ -3116,6 +3116,7 @@ Status DBImpl::IngestExternalFile(
}
num_running_ingest_file_++;
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
// We cannot ingest a file into a dropped CF
if (cfd->IsDropped()) {
......@@ -3130,16 +3131,18 @@ Status DBImpl::IngestExternalFile(
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
status = AtomicFlushMemTables(cfds, FlushOptions(),
status = AtomicFlushMemTables(cfds, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
} else {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(),
status = FlushMemTable(cfd, flush_opts,
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册