提交 c8109471 编写于 作者: Z Zhichao Cao 提交者: Facebook GitHub Bot

Separate handling of WAL Sync io error with SST flush io error (#8049)

Summary:
In previous codebase, if WAL is used, all the retryable IO Error will be treated as hard error. So write is stalled. In this PR, the retryable IO error from WAL sync is separated from SST file flush io error. If WAL Sync is ok and retryable IO Error only happens during SST flush, the error is mapped to soft error. So user can continue insert to Memtable and append to WAL.

Resolve the bug that if WAL sync fails, the memtable status does not roll back due to calling PickMemtable early than calling and checking SyncClosedLog.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8049

Test Plan: added new unit test, make check

Reviewed By: anand1976

Differential Revision: D26965529

Pulled By: zhichao-cao

fbshipit-source-id: f5fecb66602212523c92ee49d7edcb6065982410
上级 e7a60d01
......@@ -24,6 +24,9 @@
* Enable backward iteration on keys with user-defined timestamps.
* Add statistics and info log for error handler: counters for bg error, bg io error, bg retryable io error, auto resume count, auto resume total retry number, and auto resume sucess; Histogram for auto resume retry count in each recovery call. Note that, each auto resume attempt will have one or multiple retries.
### Behavior Changes
* During flush, only WAL sync retryable IO error is mapped to hard error, which will stall the writes. When WAL is used but only SST file write has retryable IO error, it will be mapped to soft error and write will not be affected.
## 6.18.0 (02/19/2021)
### Behavior Changes
* When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper.
......
......@@ -82,29 +82,16 @@ TEST_F(DBFlushTest, SyncFail) {
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:GetVersionRefCount:1",
"DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
{"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
"DBFlushTest::SyncFail:GetVersionRefCount:2"},
{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put("key", "value"));
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
->cfd();
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
// Flush installs a new super-version. Get the ref count after that.
auto current_before = cfd->current();
int refs_before = cfd->current()->TEST_refs();
TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
int refs_after_picking_memtables = cfd->current()->TEST_refs();
ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
......@@ -115,9 +102,6 @@ TEST_F(DBFlushTest, SyncFail) {
#ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE
// Backgroun flush job should release ref count to current version.
ASSERT_EQ(current_before, cfd->current());
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}
......
......@@ -131,16 +131,11 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
MarkLogsNotSynced(current_log_number - 1);
}
if (!io_s.ok()) {
if (total_log_size_ > 0) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
} else {
// If the WAL is empty, we use different error reason
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
}
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return io_s;
}
}
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end");
return io_s;
}
......@@ -170,17 +165,14 @@ Status DBImpl::FlushMemTableToOutputFile(
&blob_callback_);
FileMetaData file_meta;
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
flush_job.PickMemTable();
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE
Status s;
IOStatus io_s = IOStatus::OK();
bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK();
if (logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
// If there are more than one column families, we need to make sure that
......@@ -189,16 +181,25 @@ Status DBImpl::FlushMemTableToOutputFile(
// flushed SST may contain data from write batches whose updates to
// other column families are missing.
// SyncClosedLogs() may unlock and re-lock the db_mutex.
io_s = SyncClosedLogs(job_context);
s = io_s;
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
log_io_s = SyncClosedLogs(job_context);
s = log_io_s;
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
}
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
}
// If the log sync failed, we do not need to pick memtable. Otherwise,
// num_flush_not_started_ needs to be rollback.
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
if (s.ok()) {
flush_job.PickMemTable();
need_cancel = true;
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
// Within flush_job.Run, rocksdb may call event listener to notify
// file creation and deletion.
//
......@@ -207,11 +208,16 @@ Status DBImpl::FlushMemTableToOutputFile(
// is unlocked by the current thread.
if (s.ok()) {
s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
} else {
need_cancel = false;
}
if (!s.ok() && need_cancel) {
flush_job.Cancel();
}
if (io_s.ok()) {
io_s = flush_job.io_status();
IOStatus io_s = IOStatus::OK();
io_s = flush_job.io_status();
if (s.ok()) {
s = io_s;
}
if (s.ok()) {
......@@ -247,6 +253,7 @@ Status DBImpl::FlushMemTableToOutputFile(
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) {
assert(log_io_s.ok());
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
......@@ -261,15 +268,17 @@ Status DBImpl::FlushMemTableToOutputFile(
error_handler_.SetBGError(io_s,
BackgroundErrorReason::kManifestWriteNoWAL);
}
} else if (total_log_size_ > 0) {
} else if (total_log_size_ > 0 || !log_io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
} else {
// If the WAL is empty, we use different error reason
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
}
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
if (log_io_s.ok()) {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
} else {
// If we got here, then we decided not to care about the i_os status (either
......@@ -402,12 +411,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
false /* sync_output_directory */, false /* write_manifest */,
thread_pri, io_tracer_, db_id_, db_session_id_,
cfd->GetFullHistoryTsLow()));
jobs.back()->PickMemTable();
}
std::vector<FileMetaData> file_meta(num_cfs);
Status s;
IOStatus io_s;
IOStatus log_io_s = IOStatus::OK();
assert(num_cfs == static_cast<int>(jobs.size()));
#ifndef ROCKSDB_LITE
......@@ -422,18 +430,36 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
io_s = SyncClosedLogs(job_context);
s = io_s;
log_io_s = SyncClosedLogs(job_context);
s = log_io_s;
if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
if (total_log_size_ > 0) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
} else {
// If the WAL is empty, we use different error reason
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL);
}
}
}
// exec_status stores the execution status of flush_jobs as
// <bool /* executed */, Status /* status code */>
autovector<std::pair<bool, Status>> exec_status;
autovector<IOStatus> io_status;
std::vector<bool> pick_status;
for (int i = 0; i != num_cfs; ++i) {
// Initially all jobs are not executed, with status OK.
exec_status.emplace_back(false, Status::OK());
io_status.emplace_back(IOStatus::OK());
pick_status.push_back(false);
}
if (s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
jobs[i]->PickMemTable();
pick_status[i] = true;
}
}
if (s.ok()) {
......@@ -474,6 +500,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = error_status.ok() ? s : error_status;
}
IOStatus io_s = IOStatus::OK();
if (io_s.ok()) {
IOStatus io_error = IOStatus::OK();
for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
......@@ -509,7 +536,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// Have to cancel the flush jobs that have NOT executed because we need to
// unref the versions.
for (int i = 0; i != num_cfs; ++i) {
if (!exec_status[i].first) {
if (pick_status[i] && !exec_status[i].first) {
jobs[i]->Cancel();
}
}
......@@ -653,6 +680,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
assert(log_io_s.ok());
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
......@@ -674,8 +702,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
}
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
if (log_io_s.ok()) {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
}
......
......@@ -351,7 +351,96 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) {
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError1) {
TEST_F(DBErrorHandlingFSTest, FLushWALWriteRetryableError) {
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_env_.get();
options.create_if_missing = true;
options.listeners.emplace_back(listener);
options.max_bgerror_resume_count = 0;
Status s;
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
listener->EnableAutoRecovery(false);
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:Start",
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
WriteOptions wo = WriteOptions();
wo.disableWAL = false;
ASSERT_OK(Put(Key(1), "val1", wo));
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_fs_->SetFilesystemActive(true);
auto cfh = dbfull()->GetColumnFamilyHandle(1);
s = dbfull()->DropColumnFamily(cfh);
s = dbfull()->Resume();
ASSERT_OK(s);
ASSERT_EQ("val1", Get(Key(1)));
ASSERT_OK(Put(Key(3), "val3", wo));
ASSERT_EQ("val3", Get(Key(3)));
s = Flush();
ASSERT_OK(s);
ASSERT_EQ("val3", Get(Key(3)));
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, FLushWALAtomicWriteRetryableError) {
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_env_.get();
options.create_if_missing = true;
options.listeners.emplace_back(listener);
options.max_bgerror_resume_count = 0;
options.atomic_flush = true;
Status s;
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
listener->EnableAutoRecovery(false);
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:Start",
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options);
WriteOptions wo = WriteOptions();
wo.disableWAL = false;
ASSERT_OK(Put(Key(1), "val1", wo));
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_fs_->SetFilesystemActive(true);
auto cfh = dbfull()->GetColumnFamilyHandle(1);
s = dbfull()->DropColumnFamily(cfh);
s = dbfull()->Resume();
ASSERT_OK(s);
ASSERT_EQ("val1", Get(Key(1)));
ASSERT_OK(Put(Key(3), "val3", wo));
ASSERT_EQ("val3", Get(Key(3)));
s = Flush();
ASSERT_OK(s);
ASSERT_EQ("val3", Get(Key(3)));
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册