diff --git a/HISTORY.md b/HISTORY.md index bb18de05e008b09691413485e8b38e83b9082a47..79a18591dfd77ee377fb50137b12f071268e440a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,9 @@ ### Behavior changes * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984) +### Bug Fixes +* Fix a race condition in WAL size tracking which is caused by an unsafe iterator access after container is changed. + ## 7.3.0 (05/20/2022) ### Bug Fixes * Fixed a bug where manual flush would block forever even though flush options had wait=false. diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 0891d001b8359f2c934243716edef2861bbabe7a..c3a4b814c3b475b7e8e88e72d3cc011593f6a07f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1834,12 +1834,13 @@ class DBImpl : public DB { IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, Env::IOPriority rate_limiter_priority, - bool with_db_mutex = false, bool with_log_mutex = false); + LogFileNumberSize& log_file_number_size); IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, bool need_log_sync, bool need_log_dir_sync, - SequenceNumber sequence); + SequenceNumber sequence, + LogFileNumberSize& log_file_number_size); IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t* log_used, @@ -2169,11 +2170,7 @@ class DBImpl : public DB { // are protected by locking both mutex_ and log_write_mutex_, and reads must // be under either mutex_ or log_write_mutex_. std::deque alive_log_files_; - // Caching the result of `alive_log_files_.back()` so that we do not have to - // call `alive_log_files_.back()` in the write thread (WriteToWAL()) which - // requires locking db mutex if log_mutex_ is not already held in - // two-write-queues mode. - std::deque::reverse_iterator alive_log_files_tail_; + // Log files that aren't fully synced, and the current log file. // Synchronization: // - push_back() is done from write_thread_ with locked mutex_ and diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 9a2e912037f6ec3d1034316ac7b1fd130825b1bd..c87ac00ec5c4729b5eae4a6de4ed79fe054d4d5f 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1420,7 +1420,6 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { total_log_size_ += log.size; alive_log_files_.push_back(log); } - alive_log_files_tail_ = alive_log_files_.rbegin(); if (two_write_queues_) { log_write_mutex_.Unlock(); } @@ -1807,7 +1806,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); - impl->alive_log_files_tail_ = impl->alive_log_files_.rbegin(); if (impl->two_write_queues_) { impl->log_write_mutex_.Unlock(); } @@ -1828,8 +1826,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, WriteOptions write_options; uint64_t log_used, log_size; log::Writer* log_writer = impl->logs_.back().writer; + LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back(); + + assert(log_writer->get_log_number() == log_file_number_size.number); + impl->mutex_.AssertHeld(); s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size, - Env::IO_TOTAL, /*with_db_mutex==*/true); + Env::IO_TOTAL, log_file_number_size); if (s.ok()) { // Need to fsync, otherwise it might get lost after a power reset. s = impl->FlushWAL(false); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index c980b2dc6e1d313367de0f47d126c6a446f94a0c..62d68bc088bdc7b95b1845b0d44a4cfac8153023 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -319,7 +319,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); } + log::Writer* log_writer = logs_.back().writer; + LogFileNumberSize& log_file_number_size = alive_log_files_.back(); + + assert(log_writer->get_log_number() == log_file_number_size.number); mutex_.Unlock(); @@ -419,7 +423,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, last_sequence + 1); + need_log_dir_sync, last_sequence + 1, + log_file_number_size); } } else { if (status.ok() && !write_options.disableWAL) { @@ -586,6 +591,10 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); log::Writer* log_writer = logs_.back().writer; + LogFileNumberSize& log_file_number_size = alive_log_files_.back(); + + assert(log_writer->get_log_number() == log_file_number_size.number); + mutex_.Unlock(); // This can set non-OK status if callback fail. @@ -649,8 +658,9 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, current_sequence); + io_s = + WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, current_sequence, log_file_number_size); w.status = io_s; } @@ -1178,17 +1188,9 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, uint64_t* log_used, uint64_t* log_size, Env::IOPriority rate_limiter_priority, - bool with_db_mutex, bool with_log_mutex) { + LogFileNumberSize& log_file_number_size) { assert(log_size != nullptr); - // Assert mutex explicitly. - if (with_db_mutex) { - mutex_.AssertHeld(); - } else if (two_write_queues_) { - log_write_mutex_.AssertHeld(); - assert(with_log_mutex); - } - Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); // When two_write_queues_ WriteToWAL has to be protected from concurretn calls @@ -1211,12 +1213,7 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, *log_used = logfile_number_; } total_log_size_ += log_entry.size(); - if (with_db_mutex || with_log_mutex) { - assert(alive_log_files_tail_ == alive_log_files_.rbegin()); - assert(alive_log_files_tail_ != alive_log_files_.rend()); - } - LogFileNumberSize& last_alive_log = *alive_log_files_tail_; - last_alive_log.AddSize(*log_size); + log_file_number_size.AddSize(*log_size); log_empty_ = false; return io_s; } @@ -1224,7 +1221,8 @@ IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer, uint64_t* log_used, bool need_log_sync, bool need_log_dir_sync, - SequenceNumber sequence) { + SequenceNumber sequence, + LogFileNumberSize& log_file_number_size) { IOStatus io_s; assert(!two_write_queues_); assert(!write_group.leader->disable_wal); @@ -1245,7 +1243,8 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, - write_group.leader->rate_limiter_priority); + write_group.leader->rate_limiter_priority, + log_file_number_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1339,10 +1338,14 @@ IOStatus DBImpl::ConcurrentWriteToWAL( WriteBatchInternal::SetSequence(merged_batch, sequence); log::Writer* log_writer = logs_.back().writer; + LogFileNumberSize& log_file_number_size = alive_log_files_.back(); + + assert(log_writer->get_log_number() == log_file_number_size.number); + uint64_t log_size; io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, write_group.leader->rate_limiter_priority, - /*with_db_mutex=*/false, /*with_log_mutex=*/true); + log_file_number_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; @@ -1998,7 +2001,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_dir_synced_ = false; logs_.emplace_back(logfile_number_, new_log); alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); - alive_log_files_tail_ = alive_log_files_.rbegin(); } log_write_mutex_.Unlock(); }