diff --git a/db/db_impl.cc b/db/db_impl.cc index 49fd3c9fb76177364e6882777c052172ed9b1ce3..fd336935349408334e72cf7eda32cc11af12cc75 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -276,6 +276,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), tmp_batch_(), + bg_schedule_needed_(false), bg_compaction_scheduled_(0), bg_manual_only_(0), bg_flush_scheduled_(0), @@ -1830,17 +1831,21 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); + bg_schedule_needed_ = false; if (bg_work_gate_closed_) { // gate closed for backgrond work } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { bool is_flush_pending = imm_.IsFlushPending(); - if (is_flush_pending && - (bg_flush_scheduled_ < options_.max_background_flushes)) { - // memtable flush needed - bg_flush_scheduled_++; - env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + if (is_flush_pending) { + if (bg_flush_scheduled_ < options_.max_background_flushes) { + // memtable flush needed + bg_flush_scheduled_++; + env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + } else { + bg_schedule_needed_ = true; + } } // Schedule BGWorkCompaction if there's a compaction pending (or a memtable @@ -1850,11 +1855,13 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if ((manual_compaction_ || versions_->current()->NeedsCompaction() || (is_flush_pending && (options_.max_background_flushes <= 0))) && - bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { - - bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + if (bg_compaction_scheduled_ < options_.max_background_compactions) { + bg_compaction_scheduled_++; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + } else { + bg_schedule_needed_ = true; + } } } } @@ -1912,15 +1919,26 @@ void DBImpl::BackgroundCallFlush() { // to delete all obsolete files and we force FindObsoleteFiles() FindObsoleteFiles(deletion_state, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { + if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); + // Have to flush the info logs before bg_flush_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } mutex_.Lock(); } bg_flush_scheduled_--; - if (madeProgress) { + // Any time the mutex is released After finding the work to do, another + // thread might execute MaybeScheduleFlushOrCompaction(). It is possible + // that there is a pending job but it is not scheduled because of the + // max thread limit. + if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } log_buffer.FlushBufferToLog(); @@ -1979,10 +1997,17 @@ void DBImpl::BackgroundCallCompaction() { FindObsoleteFiles(deletion_state, !s.ok()); // delete unnecessary files if any, this is done outside the mutex - if (deletion_state.HaveSomethingToDelete()) { + if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); + // Have to flush the info logs before bg_compaction_scheduled_-- + // because if bg_flush_scheduled_ becomes 0 and the lock is + // released, the deconstructor of DB can kick in and destroy all the + // states of DB so info_log might not be available after that point. + // It also applies to access other states that DB owns. log_buffer.FlushBufferToLog(); - PurgeObsoleteFiles(deletion_state); + if (deletion_state.HaveSomethingToDelete()) { + PurgeObsoleteFiles(deletion_state); + } mutex_.Lock(); } @@ -1993,7 +2018,12 @@ void DBImpl::BackgroundCallCompaction() { // Previous compaction may have produced too many files in a level, // So reschedule another compaction if we made progress in the // last compaction. - if (madeProgress) { + // + // Also, any time the mutex is released After finding the work to do, + // another thread might execute MaybeScheduleFlushOrCompaction(). It is + // possible that there is a pending job but it is not scheduled because of + // the max thread limit. + if (madeProgress || bg_schedule_needed_) { MaybeScheduleFlushOrCompaction(); } log_buffer.FlushBufferToLog(); diff --git a/db/db_impl.h b/db/db_impl.h index 96e3f1ea31e6c62aa00f3d2525271c9a4763cd79..e42848d11c7b67af6e2754fec60770bd05d5757f 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -454,6 +454,10 @@ class DBImpl : public DB { // part of ongoing compactions. std::set pending_outputs_; + // At least one compaction or flush job is pending but not yet scheduled + // because of the max background thread limit. + bool bg_schedule_needed_; + // count how many background compactions are running or have been scheduled int bg_compaction_scheduled_; diff --git a/util/log_buffer.h b/util/log_buffer.h index 76503a084a67e0521d114d6e849c294c93606943..8ebe92e0345bb134c4f200d40cf0334bd17c8f9f 100644 --- a/util/log_buffer.h +++ b/util/log_buffer.h @@ -23,6 +23,8 @@ class LogBuffer { // Add a log entry to the buffer. void AddLogToBuffer(const char* format, va_list ap); + size_t IsEmpty() const { return logs_.empty(); } + // Flush all buffered log to the info log. void FlushBufferToLog();