From 01542400a87ad130aed790bf895c029082745cbe Mon Sep 17 00:00:00 2001 From: Adrien Schildknecht Date: Thu, 5 Oct 2017 18:00:38 -0700 Subject: [PATCH] Inform caller when rocksdb is stalling writes Summary: Add a new function in Listener to let the caller know when rocksdb is stalling writes. Closes https://github.com/facebook/rocksdb/pull/2897 Differential Revision: D5860124 Pulled By: schischi fbshipit-source-id: ee791606169aa64f772c86f817cebf02624e05e1 --- db/column_family.cc | 45 ++++++++++++------ db/column_family.h | 14 +++--- db/compacted_db_impl.cc | 4 +- db/db_impl.cc | 28 ++++++----- db/db_impl.h | 19 ++++---- db/db_impl_compaction_flush.cc | 56 +++++++++++----------- db/db_impl_experimental.cc | 5 +- db/db_impl_open.cc | 6 ++- db/db_impl_readonly.cc | 5 +- db/db_impl_write.cc | 7 ++- db/db_test.cc | 21 +++++++++ db/job_context.h | 85 +++++++++++++++++++++++++++------- include/rocksdb/listener.h | 24 ++++++++++ 13 files changed, 219 insertions(+), 100 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index b4ff92951..766508c36 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -613,8 +613,9 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, } } // namespace -void ColumnFamilyData::RecalculateWriteStallConditions( +WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { + auto write_stall_condition = WriteStallCondition::kNormal; if (current_ != nullptr) { auto* vstorage = current_->storage_info(); auto write_controller = column_family_set_->write_controller_; @@ -627,6 +628,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); + write_stall_condition = WriteStallCondition::kStopped; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " @@ -638,6 +640,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); + write_stall_condition = WriteStallCondition::kStopped; if (compaction_picker_->IsLevel0CompactionInProgress()) { internal_stats_->AddCFStats( InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1); @@ -652,6 +655,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); + write_stall_condition = WriteStallCondition::kStopped; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stopping writes because of estimated pending compaction " @@ -665,6 +669,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( prev_compaction_needed_bytes_, was_stopped, mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); + write_stall_condition = WriteStallCondition::kDelayed; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stalling writes because we have %d immutable memtables " @@ -686,6 +691,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); + write_stall_condition = WriteStallCondition::kDelayed; if (compaction_picker_->IsLevel0CompactionInProgress()) { internal_stats_->AddCFStats( InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); @@ -716,6 +722,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1); + write_stall_condition = WriteStallCondition::kDelayed; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stalling writes because of estimated pending compaction " @@ -770,6 +777,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( } prev_compaction_needed_bytes_ = compaction_needed_bytes; } + return write_stall_condition; } const EnvOptions* ColumnFamilyData::soptions() const { @@ -915,15 +923,16 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { return false; } -SuperVersion* ColumnFamilyData::InstallSuperVersion( - SuperVersion* new_superversion, InstrumentedMutex* db_mutex) { +void ColumnFamilyData::InstallSuperVersion( + SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); - return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_); + return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_); } -SuperVersion* ColumnFamilyData::InstallSuperVersion( - SuperVersion* new_superversion, InstrumentedMutex* db_mutex, +void ColumnFamilyData::InstallSuperVersion( + SuperVersionContext* sv_context, InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) { + SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->db_mutex = db_mutex; new_superversion->mutable_cf_options = mutable_cf_options; new_superversion->Init(mem_, imm_.current(), current_); @@ -931,16 +940,24 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( super_version_ = new_superversion; ++super_version_number_; super_version_->version_number = super_version_number_; + super_version_->write_stall_condition = + RecalculateWriteStallConditions(mutable_cf_options); + + if (old_superversion != nullptr) { + if (old_superversion->write_stall_condition != + new_superversion->write_stall_condition) { + sv_context->PushWriteStallNotification( + old_superversion->write_stall_condition, + new_superversion->write_stall_condition, GetName(), ioptions()); + } + if (old_superversion->Unref()) { + old_superversion->Cleanup(); + sv_context->superversions_to_free.push_back(old_superversion); + } + } + // Reset SuperVersions cached in thread local storage ResetThreadLocalSuperVersions(); - - RecalculateWriteStallConditions(mutable_cf_options); - - if (old_superversion != nullptr && old_superversion->Unref()) { - old_superversion->Cleanup(); - return old_superversion; // will let caller delete outside of mutex - } - return nullptr; } void ColumnFamilyData::ResetThreadLocalSuperVersions() { diff --git a/db/column_family.h b/db/column_family.h index 3a807d22b..02ea43dde 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -41,6 +41,7 @@ class DBImpl; class LogBuffer; class InstrumentedMutex; class InstrumentedMutexLock; +struct SuperVersionContext; extern const double kIncSlowdownRatio; @@ -95,6 +96,7 @@ struct SuperVersion { MutableCFOptions mutable_cf_options; // Version number of the current SuperVersion uint64_t version_number; + WriteStallCondition write_stall_condition; InstrumentedMutex* db_mutex; @@ -311,11 +313,11 @@ class ColumnFamilyData { // As argument takes a pointer to allocated SuperVersion to enable // the clients to allocate SuperVersion outside of mutex. // IMPORTANT: Only call this from DBImpl::InstallSuperVersion() - SuperVersion* InstallSuperVersion(SuperVersion* new_superversion, - InstrumentedMutex* db_mutex, - const MutableCFOptions& mutable_cf_options); - SuperVersion* InstallSuperVersion(SuperVersion* new_superversion, - InstrumentedMutex* db_mutex); + void InstallSuperVersion(SuperVersionContext* sv_context, + InstrumentedMutex* db_mutex, + const MutableCFOptions& mutable_cf_options); + void InstallSuperVersion(SuperVersionContext* sv_context, + InstrumentedMutex* db_mutex); void ResetThreadLocalSuperVersions(); @@ -330,7 +332,7 @@ class ColumnFamilyData { // recalculation of compaction score. These values are used in // DBImpl::MakeRoomForWrite function to decide, if it need to make // a write stall - void RecalculateWriteStallConditions( + WriteStallCondition RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options); void set_initialized() { initialized_.store(true); } diff --git a/db/compacted_db_impl.cc b/db/compacted_db_impl.cc index d1007d972..7cff496f2 100644 --- a/db/compacted_db_impl.cc +++ b/db/compacted_db_impl.cc @@ -93,6 +93,7 @@ std::vector CompactedDBImpl::MultiGet(const ReadOptions& options, } Status CompactedDBImpl::Init(const Options& options) { + SuperVersionContext sv_context(/* create_superversion */ true); mutex_.Lock(); ColumnFamilyDescriptor cf(kDefaultColumnFamilyName, ColumnFamilyOptions(options)); @@ -100,9 +101,10 @@ Status CompactedDBImpl::Init(const Options& options) { if (s.ok()) { cfd_ = reinterpret_cast( DefaultColumnFamily())->cfd(); - delete cfd_->InstallSuperVersion(new SuperVersion(), &mutex_); + cfd_->InstallSuperVersion(&sv_context, &mutex_); } mutex_.Unlock(); + sv_context.Clean(); if (!s.ok()) { return s; } diff --git a/db/db_impl.cc b/db/db_impl.cc index fce79eacc..0b6a0bcab 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -484,6 +484,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, Status s; Status persist_options_status; WriteThread::Writer w; + SuperVersionContext sv_context(/* create_superversion */ true); { InstrumentedMutexLock l(&mutex_); s = cfd->SetOptions(options_map); @@ -496,14 +497,13 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, // Trigger possible flush/compactions. This has to be before we persist // options to file, otherwise there will be a deadlock with writer // thread. - auto* old_sv = - InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); - delete old_sv; + InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options); persist_options_status = WriteOptionsFile( false /*need_mutex_lock*/, true /*need_enter_write_thread*/); } } + sv_context.Clean(); ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetOptions() on column family [%s], inputs:", @@ -1229,6 +1229,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, return s; } + SuperVersionContext sv_context(/* create_superversion */ true); { InstrumentedMutexLock l(&mutex_); @@ -1260,8 +1261,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete InstallSuperVersionAndScheduleWork( - cfd, nullptr, *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWork( + cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); if (!cfd->mem()->IsSnapshotSupported()) { is_snapshot_supported_ = false; @@ -1280,6 +1281,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, } } // InstrumentedMutexLock l(&mutex_) + sv_context.Clean(); // this is outside the mutex if (s.ok()) { NewThreadStatusCfInfo( @@ -2035,8 +2037,9 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWorkWrapper( - cfd, &job_context, *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWork( + cfd, &job_context.superversion_context, + *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -2113,8 +2116,9 @@ Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family, status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWorkWrapper( - cfd, &job_context, *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWork( + cfd, &job_context.superversion_context, + *cfd->GetLatestMutableCFOptions()); } for (auto* deleted_file : deleted_files) { deleted_file->being_compacted = false; @@ -2678,6 +2682,7 @@ Status DBImpl::IngestExternalFile( return status; } + SuperVersionContext sv_context(/* create_superversion */ true); TEST_SYNC_POINT("DBImpl::AddFile:Start"); { // Lock db mutex @@ -2726,8 +2731,8 @@ Status DBImpl::IngestExternalFile( &mutex_, directories_.GetDbDir()); } if (status.ok()) { - delete InstallSuperVersionAndScheduleWork(cfd, nullptr, - *mutable_cf_options); + InstallSuperVersionAndScheduleWork(cfd, &sv_context, + *mutable_cf_options); } // Resume writes to the DB @@ -2753,6 +2758,7 @@ Status DBImpl::IngestExternalFile( // mutex_ is unlocked here // Cleanup + sv_context.Clean(); ingestion_job.Cleanup(status); if (status.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index 2253010aa..0b6146e19 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -666,13 +666,14 @@ class DBImpl : public DB { struct CompactionState; struct WriteContext { - autovector superversions_to_free_; + SuperVersionContext superversion_context; autovector memtables_to_free_; + explicit WriteContext(bool create_superversion = false) + : superversion_context(create_superversion) {} + ~WriteContext() { - for (auto& sv : superversions_to_free_) { - delete sv; - } + superversion_context.Clean(); for (auto& m : memtables_to_free_) { delete m; } @@ -1236,17 +1237,13 @@ class DBImpl : public DB { // Background threads call this function, which is just a wrapper around // the InstallSuperVersion() function. Background threads carry - // job_context which can have new_superversion already + // sv_context which can have new_superversion already // allocated. - void InstallSuperVersionAndScheduleWorkWrapper( - ColumnFamilyData* cfd, JobContext* job_context, - const MutableCFOptions& mutable_cf_options); - // All ColumnFamily state changes go through this function. Here we analyze // the new state and we schedule background work if we detect that the new // state needs flush or compaction. - SuperVersion* InstallSuperVersionAndScheduleWork( - ColumnFamilyData* cfd, SuperVersion* new_sv, + void InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersionContext* sv_context, const MutableCFOptions& mutable_cf_options); #ifndef ROCKSDB_LITE diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index db2988131..178e886e3 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -130,8 +130,8 @@ Status DBImpl::FlushMemTableToOutputFile( } if (s.ok()) { - InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, - mutable_cf_options); + InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context, + mutable_cf_options); if (made_progress) { *made_progress = 1; } @@ -573,8 +573,9 @@ Status DBImpl::CompactFilesImpl( Status status = compaction_job.Install(*c->mutable_cf_options()); if (status.ok()) { - InstallSuperVersionAndScheduleWorkWrapper( - c->column_family_data(), job_context, *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWork( + c->column_family_data(), &job_context->superversion_context, + *c->mutable_cf_options()); } c->ReleaseCompactionFiles(s); @@ -707,8 +708,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { return Status::InvalidArgument("Target level exceeds number of levels"); } - std::unique_ptr superversion_to_free; - std::unique_ptr new_superversion(new SuperVersion()); + SuperVersionContext sv_context(/* create_superversion */ true); Status status; @@ -763,8 +763,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); - superversion_to_free.reset(InstallSuperVersionAndScheduleWork( - cfd, new_superversion.release(), mutable_cf_options)); + InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(), status.ToString().data()); @@ -776,6 +775,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { } } + sv_context.Clean(); refitting_level_ = false; return status; @@ -1576,8 +1576,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); - InstallSuperVersionAndScheduleWorkWrapper( - c->column_family_data(), job_context, *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWork( + c->column_family_data(), &job_context->superversion_context, + *c->mutable_cf_options()); ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -1622,8 +1623,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions - InstallSuperVersionAndScheduleWorkWrapper( - c->column_family_data(), job_context, *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWork( + c->column_family_data(), &job_context->superversion_context, + *c->mutable_cf_options()); VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(), @@ -1696,8 +1698,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, status = compaction_job.Install(*c->mutable_cf_options()); if (status.ok()) { - InstallSuperVersionAndScheduleWorkWrapper( - c->column_family_data(), job_context, *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWork( + c->column_family_data(), &job_context->superversion_context, + *c->mutable_cf_options()); } *made_progress = true; } @@ -1863,30 +1866,21 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { return true; } -// JobContext gets created and destructed outside of the lock -- +// SuperVersionContext gets created and destructed outside of the lock -- // we // use this convinently to: // * malloc one SuperVersion() outside of the lock -- new_superversion // * delete SuperVersion()s outside of the lock -- superversions_to_free // // However, if InstallSuperVersionAndScheduleWork() gets called twice with the -// same job_context, we can't reuse the SuperVersion() that got +// same sv_context, we can't reuse the SuperVersion() that got // malloced because // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersionAndScheduleWorkWrapper( - ColumnFamilyData* cfd, JobContext* job_context, - const MutableCFOptions& mutable_cf_options) { - mutex_.AssertHeld(); - SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork( - cfd, job_context->new_superversion, mutable_cf_options); - job_context->new_superversion = nullptr; - job_context->superversions_to_free.push_back(old_superversion); -} -SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( - ColumnFamilyData* cfd, SuperVersion* new_sv, +void DBImpl::InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersionContext* sv_context, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); @@ -1898,8 +1892,11 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( old_sv->mutable_cf_options.max_write_buffer_number; } - auto* old = cfd->InstallSuperVersion( - new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); + if (sv_context->new_superversion == nullptr) { + sv_context->NewSuperVersion(); + } + cfd->InstallSuperVersion(sv_context, &mutex_, + mutable_cf_options); // Whenever we install new SuperVersion, we might need to issue new flushes or // compactions. @@ -1912,6 +1909,5 @@ SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( max_total_in_memory_state_ - old_memtable_size + mutable_cf_options.write_buffer_size * mutable_cf_options.max_write_buffer_number; - return old; } } // namespace rocksdb diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index ac1aa4fda..961c284c9 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -138,8 +138,9 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWorkWrapper( - cfd, &job_context, *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWork( + cfd, &job_context.superversion_context, + *cfd->GetLatestMutableCFOptions()); } } // lock released here LogFlush(immutable_db_options_.info_log); diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 9a273b850..458106a0a 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1046,10 +1046,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } } if (s.ok()) { + SuperVersionContext sv_context(/* create_superversion */ true); for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete impl->InstallSuperVersionAndScheduleWork( - cfd, nullptr, *cfd->GetLatestMutableCFOptions()); + impl->InstallSuperVersionAndScheduleWork( + cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); } + sv_context.Clean(); if (impl->concurrent_prepare_) { impl->log_write_mutex_.Lock(); } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index d69eecb98..08379c5a9 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -140,6 +140,7 @@ Status DB::OpenForReadOnly( *dbptr = nullptr; handles->clear(); + SuperVersionContext sv_context(/* create_superversion */ true); DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname); impl->mutex_.Lock(); Status s = impl->Recover(column_families, true /* read only */, @@ -158,10 +159,12 @@ Status DB::OpenForReadOnly( } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_); + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); } } impl->mutex_.Unlock(); + sv_context.Clean(); if (s.ok()) { *dbptr = impl; for (auto* h : *handles) { diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index eb8e5dd20..b0abd7c4a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1090,7 +1090,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } uint64_t new_log_number = creating_new_log ? versions_->NewFileNumber() : logfile_number_; - SuperVersion* new_superversion = nullptr; const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); // Set memtable_info for memtable sealed callback @@ -1146,7 +1145,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (s.ok()) { SequenceNumber seq = versions_->LastSequence(); new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); - new_superversion = new SuperVersion(); + context->superversion_context.NewSuperVersion(); } #ifndef ROCKSDB_LITE @@ -1204,8 +1203,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); - context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork( - cfd, new_superversion, mutable_cf_options)); + InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, + mutable_cf_options); if (concurrent_prepare_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } diff --git a/db/db_test.cc b/db/db_test.cc index 766a4c2c6..2e9a5075a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5160,6 +5160,19 @@ TEST_F(DBTest, HardLimit) { } #ifndef ROCKSDB_LITE +class WriteStallListener : public EventListener { + public: + WriteStallListener() : condition_(WriteStallCondition::kNormal) {} + void OnStallConditionsChanged(const WriteStallInfo& info) override { + condition_ = info.condition.cur; + } + bool CheckCondition(WriteStallCondition expected) { + return expected == condition_; + } + private: + WriteStallCondition condition_; +}; + TEST_F(DBTest, SoftLimit) { Options options = CurrentOptions(); options.env = env_; @@ -5175,6 +5188,8 @@ TEST_F(DBTest, SoftLimit) { options.max_bytes_for_level_multiplier = 10; options.max_background_compactions = 1; options.compression = kNoCompression; + WriteStallListener* listener = new WriteStallListener(); + options.listeners.emplace_back(listener); Reopen(options); @@ -5214,6 +5229,7 @@ TEST_F(DBTest, SoftLimit) { Flush(); } ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); sleeping_task_low.WakeUp(); sleeping_task_low.WaitUntilDone(); @@ -5224,6 +5240,7 @@ TEST_F(DBTest, SoftLimit) { // The L1 file size is around 30KB. ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal)); // Only allow one compactin going through. rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -5258,6 +5275,7 @@ TEST_F(DBTest, SoftLimit) { // doesn't trigger soft_pending_compaction_bytes_limit ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal)); // Create 3 L0 files, making score of L0 to be 3, higher than L0. for (int i = 0; i < 3; i++) { @@ -5278,11 +5296,13 @@ TEST_F(DBTest, SoftLimit) { // triggerring soft_pending_compaction_bytes_limit ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); sleeping_task_low.WakeUp(); sleeping_task_low.WaitUntilSleeping(); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal)); // shrink level base so L2 will hit soft limit easier. ASSERT_OK(dbfull()->SetOptions({ @@ -5292,6 +5312,7 @@ TEST_F(DBTest, SoftLimit) { Put("", ""); Flush(); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed)); sleeping_task_low.WaitUntilSleeping(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); diff --git a/db/job_context.h b/db/job_context.h index 950a3a667..e0af71031 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -13,17 +13,77 @@ #include #include "db/log_writer.h" +#include "db/column_family.h" namespace rocksdb { class MemTable; +struct SuperVersion; + +struct SuperVersionContext { + struct WriteStallNotification { + WriteStallInfo write_stall_info; + const ImmutableCFOptions* immutable_cf_options; + }; + + autovector superversions_to_free; + autovector write_stall_notifications; + unique_ptr new_superversion; // if nullptr no new superversion + + explicit SuperVersionContext(bool create_superversion = false) + : new_superversion(create_superversion ? new SuperVersion() : nullptr) {} + + void NewSuperVersion() { + new_superversion = unique_ptr(new SuperVersion()); + } + + inline bool HaveSomethingToDelete() const { + return superversions_to_free.size() > 0 || + write_stall_notifications.size() > 0; + } + + void PushWriteStallNotification( + WriteStallCondition old_cond, WriteStallCondition new_cond, + const std::string& name, const ImmutableCFOptions* ioptions) { +#ifndef ROCKSDB_LITE + WriteStallNotification notif; + notif.write_stall_info.cf_name = name; + notif.write_stall_info.condition.prev = old_cond; + notif.write_stall_info.condition.cur = new_cond; + notif.immutable_cf_options = ioptions; + write_stall_notifications.push_back(notif); +#endif // !ROCKSDB_LITE + } + + void Clean() { +#ifndef ROCKSDB_LITE + // notify listeners on changed write stall conditions + for (auto& notif : write_stall_notifications) { + for (auto listener : notif.immutable_cf_options->listeners) { + listener->OnStallConditionsChanged(notif.write_stall_info); + } + } + write_stall_notifications.clear(); +#endif // !ROCKSDB_LITE + // free superversions + for (auto s : superversions_to_free) { + delete s; + } + superversions_to_free.clear(); + } + + ~SuperVersionContext() { + assert(write_stall_notifications.size() == 0); + assert(superversions_to_free.size() == 0); + } +}; struct JobContext { inline bool HaveSomethingToDelete() const { return full_scan_candidate_files.size() || sst_delete_files.size() || log_delete_files.size() || manifest_delete_files.size() || - new_superversion != nullptr || superversions_to_free.size() > 0 || - memtables_to_free.size() > 0 || logs_to_free.size() > 0; + memtables_to_free.size() > 0 || logs_to_free.size() > 0 || + superversion_context.HaveSomethingToDelete(); } // Structure to store information for candidate files to delete. @@ -65,12 +125,10 @@ struct JobContext { // a list of memtables to be free autovector memtables_to_free; - autovector superversions_to_free; + SuperVersionContext superversion_context; autovector logs_to_free; - SuperVersion* new_superversion; // if nullptr no new superversion - // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. uint64_t manifest_file_number; @@ -83,13 +141,13 @@ struct JobContext { size_t num_alive_log_files = 0; uint64_t size_log_to_delete = 0; - explicit JobContext(int _job_id, bool create_superversion = false) { + explicit JobContext(int _job_id, bool create_superversion = false) + : superversion_context(create_superversion) { job_id = _job_id; manifest_file_number = 0; pending_manifest_file_number = 0; log_number = 0; prev_log_number = 0; - new_superversion = create_superversion ? new SuperVersion() : nullptr; } // For non-empty JobContext Clean() has to be called at least once before @@ -97,31 +155,22 @@ struct JobContext { // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally // doing potentially slow Clean() with locked DB mutex. void Clean() { + // free superversions + superversion_context.Clean(); // free pending memtables for (auto m : memtables_to_free) { delete m; } - // free superversions - for (auto s : superversions_to_free) { - delete s; - } for (auto l : logs_to_free) { delete l; } - // if new_superversion was not used, it will be non-nullptr and needs - // to be freed here - delete new_superversion; memtables_to_free.clear(); - superversions_to_free.clear(); logs_to_free.clear(); - new_superversion = nullptr; } ~JobContext() { assert(memtables_to_free.size() == 0); - assert(superversions_to_free.size() == 0); - assert(new_superversion == nullptr); assert(logs_to_free.size() == 0); } }; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index e132033db..44a765c0c 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -86,6 +86,22 @@ enum class BackgroundErrorReason { kMemTable, }; +enum class WriteStallCondition { + kNormal, + kDelayed, + kStopped, +}; + +struct WriteStallInfo { + // the name of the column family + std::string cf_name; + // state of the write controller + struct { + WriteStallCondition cur; + WriteStallCondition prev; + } condition; +}; + #ifndef ROCKSDB_LITE struct TableFileDeletionInfo { @@ -372,6 +388,14 @@ class EventListener { virtual void OnBackgroundError(BackgroundErrorReason /* reason */, Status* /* bg_error */) {} + // A call-back function for RocksDB which will be called whenever a change + // of superversion triggers a change of the stall conditions. + // + // Note that the this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {} + // Factory method to return CompactionEventListener. If multiple listeners // provides CompactionEventListner, only the first one will be used. virtual CompactionEventListener* GetCompactionEventListener() { -- GitLab