diff --git a/HISTORY.md b/HISTORY.md index 1d91a97064d9f416b7ebc485119c049b66da613d..8bac7fd1e14dc1c48a1599268392a31a2a44a636 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ ### Bug Fixes * Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete. * Fix DB::Flush() keep waiting after flush finish under certain condition. +* Fix Handle error return from WriteBuffer() during WAL file close and DB close ## 5.10.0 (12/11/2017) ### Public API Change diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 92de4d5d66443ec67e1009fcb340e7fb89404988..23e6215f6bfd2a4e7ce226fe21b1e07264df88ac 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" +#include "util/fault_injection_test_env.h" #if !defined(ROCKSDB_LITE) #include "util/sync_point.h" #endif @@ -898,6 +899,28 @@ TEST_F(DBBasicTest, DBClose) { delete options.env; } +TEST_F(DBBasicTest, DBCloseFlushError) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.manual_wal_flush = true; + options.write_buffer_size=100; + options.env = fault_injection_env.get(); + + Reopen(options); + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + ASSERT_OK(dbfull()->TEST_SwitchMemtable()); + ASSERT_OK(Put("key3", "value3")); + fault_injection_env->SetFilesystemActive(false); + Status s = dbfull()->Close(); + fault_injection_env->SetFilesystemActive(true); + ASSERT_NE(s, Status::OK()); + + Destroy(options); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 6c24b716ea67e1f69b47a9e88376aea093e42c06..87f894a7b480d55026c69fc0c504114f40ddc95e 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -185,6 +185,26 @@ TEST_P(DBFlushDirectIOTest, DirectIO) { delete options.env; } +TEST_F(DBFlushTest, FlushError) { + Options options; + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + options.write_buffer_size = 100; + options.max_write_buffer_number = 4; + options.min_write_buffer_number_to_merge = 3; + options.disable_auto_compactions = true; + options.env = fault_injection_env.get(); + Reopen(options); + + ASSERT_OK(Put("key1", "value1")); + ASSERT_OK(Put("key2", "value2")); + fault_injection_env->SetFilesystemActive(false); + Status s = dbfull()->TEST_SwitchMemtable(); + fault_injection_env->SetFilesystemActive(true); + Destroy(options); + ASSERT_NE(s, Status::OK()); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl.cc b/db/db_impl.cc index a66f22857851c263458a1b1de7aee9d2e5c28ce3..837eee61678c65a67e2545c9749a3d30c0da7ffd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -287,6 +287,7 @@ Status DBImpl::CloseImpl() { env_->UnSchedule(this, Env::Priority::BOTTOM); int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); + Status ret; mutex_.Lock(); bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; bg_compaction_scheduled_ -= compactions_unscheduled; @@ -349,7 +350,18 @@ Status DBImpl::CloseImpl() { delete l; } for (auto& log : logs_) { - log.ClearWriter(); + uint64_t log_number = log.writer->get_log_number(); + Status s = log.ClearWriter(); + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Unable to Sync WAL file %s with error -- %s", + LogFileName(immutable_db_options_.wal_dir, log_number).c_str(), + s.ToString().c_str()); + // Retain the first error + if (ret.ok()) { + ret = s; + } + } } logs_.clear(); @@ -383,11 +395,13 @@ Status DBImpl::CloseImpl() { ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); LogFlush(immutable_db_options_.info_log); - Status s = Status::OK(); if (immutable_db_options_.info_log && own_info_log_) { - s = immutable_db_options_.info_log->Close(); + Status s = immutable_db_options_.info_log->Close(); + if (ret.ok()) { + ret = s; + } } - return s; + return ret; } DBImpl::~DBImpl() { Close(); } diff --git a/db/db_impl.h b/db/db_impl.h index fe51847e5257c4b47e817c2415a0010c1f5ed618..bc251492b7780c80fd5a6dec4b1fad3c18efe084 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -993,9 +993,11 @@ class DBImpl : public DB { writer = nullptr; return w; } - void ClearWriter() { + Status ClearWriter() { + Status s = writer->WriteBuffer(); delete writer; writer = nullptr; + return s; } uint64_t number; diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index ac97934f09712368b51f621e78a743b73547892c..d1a2daf66c9a8330707a6ffc4f28b40f6755c489 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1292,17 +1292,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { ". Immutable memtables: %d.\n", cfd->GetName().c_str(), new_log_number, num_imm_unflushed); mutex_.Lock(); - if (!s.ok()) { - // how do we fail if we're not creating new log? - assert(creating_new_log); - assert(!new_mem); - assert(!new_log); - if (two_write_queues_) { - nonmem_write_thread_.ExitUnbatched(&nonmem_w); - } - return s; - } - if (creating_new_log) { + if (s.ok() && creating_new_log) { log_write_mutex_.Lock(); logfile_number_ = new_log_number; assert(new_log != nullptr); @@ -1311,12 +1301,31 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one log::Writer* cur_log_writer = logs_.back().writer; - cur_log_writer->WriteBuffer(); + s = cur_log_writer->WriteBuffer(); + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64 + " WAL file -- %s\n", + cfd->GetName().c_str(), cur_log_writer->get_log_number(), + new_log_number); + } } logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); log_write_mutex_.Unlock(); } + + if (!s.ok()) { + // how do we fail if we're not creating new log? + assert(creating_new_log); + assert(!new_mem); + assert(!new_log); + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + return s; + } + for (auto loop_cfd : *versions_->GetColumnFamilySet()) { // all this is just optimization to delete logs that // are no longer needed -- if CF is empty, that means it diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 14fe02217e9bbf76f33a8cd5967b585a657178fd..96761d8bda2f7f87859ee1bb5a59d28df3fb0292 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -173,8 +173,9 @@ class DB { // Close the DB by releasing resources, closing files etc. This should be // called before calling the desctructor so that the caller can get back a - // status in case there are any errors. Regardless of the return status, the - // DB must be freed + // status in case there are any errors. This will not fsync the WAL files. + // If syncing is required, the caller must first call SyncWAL. + // Regardless of the return status, the DB must be freed virtual Status Close() { return Status::OK(); } // ListColumnFamilies will open the DB specified by argument name