提交 4b124fb9 编写于 作者: A Anand Ananthabhotla 提交者: Facebook Github Bot

Handle error return from WriteBuffer()

Summary:
There are a couple of places where we swallow any error from
WriteBuffer() - in SwitchMemtable() and DBImpl::CloseImpl(). Propagate
the error up in those cases rather than ignoring it.
Closes https://github.com/facebook/rocksdb/pull/3404

Differential Revision: D6879954

Pulled By: anand1976

fbshipit-source-id: 2ef88b554be5286b0a8bad7384ba17a105395bdb
上级 c3401846
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
### Bug Fixes ### Bug Fixes
* Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete. * Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete.
* Fix DB::Flush() keep waiting after flush finish under certain condition. * Fix DB::Flush() keep waiting after flush finish under certain condition.
* Fix Handle error return from WriteBuffer() during WAL file close and DB close
## 5.10.0 (12/11/2017) ## 5.10.0 (12/11/2017)
### Public API Change ### Public API Change
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE) #if !defined(ROCKSDB_LITE)
#include "util/sync_point.h" #include "util/sync_point.h"
#endif #endif
...@@ -898,6 +899,28 @@ TEST_F(DBBasicTest, DBClose) { ...@@ -898,6 +899,28 @@ TEST_F(DBBasicTest, DBClose) {
delete options.env; delete options.env;
} }
TEST_F(DBBasicTest, DBCloseFlushError) {
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(Env::Default()));
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.manual_wal_flush = true;
options.write_buffer_size=100;
options.env = fault_injection_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
ASSERT_OK(Put("key3", "value3"));
fault_injection_env->SetFilesystemActive(false);
Status s = dbfull()->Close();
fault_injection_env->SetFilesystemActive(true);
ASSERT_NE(s, Status::OK());
Destroy(options);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -185,6 +185,26 @@ TEST_P(DBFlushDirectIOTest, DirectIO) { ...@@ -185,6 +185,26 @@ TEST_P(DBFlushDirectIOTest, DirectIO) {
delete options.env; delete options.env;
} }
TEST_F(DBFlushTest, FlushError) {
Options options;
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
new FaultInjectionTestEnv(env_));
options.write_buffer_size = 100;
options.max_write_buffer_number = 4;
options.min_write_buffer_number_to_merge = 3;
options.disable_auto_compactions = true;
options.env = fault_injection_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "value1"));
ASSERT_OK(Put("key2", "value2"));
fault_injection_env->SetFilesystemActive(false);
Status s = dbfull()->TEST_SwitchMemtable();
fault_injection_env->SetFilesystemActive(true);
Destroy(options);
ASSERT_NE(s, Status::OK());
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool()); testing::Bool());
......
...@@ -287,6 +287,7 @@ Status DBImpl::CloseImpl() { ...@@ -287,6 +287,7 @@ Status DBImpl::CloseImpl() {
env_->UnSchedule(this, Env::Priority::BOTTOM); env_->UnSchedule(this, Env::Priority::BOTTOM);
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
Status ret;
mutex_.Lock(); mutex_.Lock();
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
bg_compaction_scheduled_ -= compactions_unscheduled; bg_compaction_scheduled_ -= compactions_unscheduled;
...@@ -349,7 +350,18 @@ Status DBImpl::CloseImpl() { ...@@ -349,7 +350,18 @@ Status DBImpl::CloseImpl() {
delete l; delete l;
} }
for (auto& log : logs_) { for (auto& log : logs_) {
log.ClearWriter(); uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
}
}
} }
logs_.clear(); logs_.clear();
...@@ -383,11 +395,13 @@ Status DBImpl::CloseImpl() { ...@@ -383,11 +395,13 @@ Status DBImpl::CloseImpl() {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
LogFlush(immutable_db_options_.info_log); LogFlush(immutable_db_options_.info_log);
Status s = Status::OK();
if (immutable_db_options_.info_log && own_info_log_) { if (immutable_db_options_.info_log && own_info_log_) {
s = immutable_db_options_.info_log->Close(); Status s = immutable_db_options_.info_log->Close();
if (ret.ok()) {
ret = s;
}
} }
return s; return ret;
} }
DBImpl::~DBImpl() { Close(); } DBImpl::~DBImpl() { Close(); }
......
...@@ -993,9 +993,11 @@ class DBImpl : public DB { ...@@ -993,9 +993,11 @@ class DBImpl : public DB {
writer = nullptr; writer = nullptr;
return w; return w;
} }
void ClearWriter() { Status ClearWriter() {
Status s = writer->WriteBuffer();
delete writer; delete writer;
writer = nullptr; writer = nullptr;
return s;
} }
uint64_t number; uint64_t number;
......
...@@ -1292,17 +1292,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ...@@ -1292,17 +1292,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
". Immutable memtables: %d.\n", ". Immutable memtables: %d.\n",
cfd->GetName().c_str(), new_log_number, num_imm_unflushed); cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
mutex_.Lock(); mutex_.Lock();
if (!s.ok()) { if (s.ok() && creating_new_log) {
// how do we fail if we're not creating new log?
assert(creating_new_log);
assert(!new_mem);
assert(!new_log);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
return s;
}
if (creating_new_log) {
log_write_mutex_.Lock(); log_write_mutex_.Lock();
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
assert(new_log != nullptr); assert(new_log != nullptr);
...@@ -1311,12 +1301,31 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ...@@ -1311,12 +1301,31 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (!logs_.empty()) { if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one // Alway flush the buffer of the last log before switching to a new one
log::Writer* cur_log_writer = logs_.back().writer; log::Writer* cur_log_writer = logs_.back().writer;
cur_log_writer->WriteBuffer(); s = cur_log_writer->WriteBuffer();
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
" WAL file -- %s\n",
cfd->GetName().c_str(), cur_log_writer->get_log_number(),
new_log_number);
}
} }
logs_.emplace_back(logfile_number_, new_log); logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
} }
if (!s.ok()) {
// how do we fail if we're not creating new log?
assert(creating_new_log);
assert(!new_mem);
assert(!new_log);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
return s;
}
for (auto loop_cfd : *versions_->GetColumnFamilySet()) { for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that // all this is just optimization to delete logs that
// are no longer needed -- if CF is empty, that means it // are no longer needed -- if CF is empty, that means it
......
...@@ -173,8 +173,9 @@ class DB { ...@@ -173,8 +173,9 @@ class DB {
// Close the DB by releasing resources, closing files etc. This should be // Close the DB by releasing resources, closing files etc. This should be
// called before calling the desctructor so that the caller can get back a // called before calling the desctructor so that the caller can get back a
// status in case there are any errors. Regardless of the return status, the // status in case there are any errors. This will not fsync the WAL files.
// DB must be freed // If syncing is required, the caller must first call SyncWAL.
// Regardless of the return status, the DB must be freed
virtual Status Close() { return Status::OK(); } virtual Status Close() { return Status::OK(); }
// ListColumnFamilies will open the DB specified by argument name // ListColumnFamilies will open the DB specified by argument name
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册