From a291f3a1e5aa62e6a0ff33cfcf5ddca26ea45d6a Mon Sep 17 00:00:00 2001 From: Zhongyi Xie Date: Tue, 19 Mar 2019 17:24:09 -0700 Subject: [PATCH] Collect compaction stats by priority and dump to info LOG (#5050) Summary: In order to better understand compaction done by different priority thread pool, we now collect compaction stats by priority and also print them to info LOG through stats dump. ``` ** Compaction Stats [default] ** Priority Files Size Score Read(GB) Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Low 0/0 0.00 KB 0.0 16.8 11.3 5.5 5.6 0.1 0.0 0.0 406.4 136.1 42.24 34.96 45 0.939 13M 8865K High 0/0 0.00 KB 0.0 0.0 0.0 0.0 11.4 11.4 0.0 0.0 0.0 76.2 153.00 35.74 12185 0.013 0 0 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/5050 Differential Revision: D14408583 Pulled By: miasantreble fbshipit-source-id: e53746586ea27cb8abc9fec35805bd80ed30f608 --- db/compaction_job.cc | 8 ++- db/compaction_job.h | 11 ++-- db/compaction_job_test.cc | 3 +- db/db_impl.h | 29 +++++++--- db/db_impl_compaction_flush.cc | 80 +++++++++++++++++---------- db/db_impl_open.cc | 2 +- db/external_sst_file_ingestion_job.cc | 3 +- db/flush_job.cc | 8 ++- db/flush_job.h | 4 +- db/flush_job_test.cc | 63 +++++++++++---------- db/internal_stats.cc | 55 ++++++++++++++---- db/internal_stats.h | 11 +++- env/env.cc | 2 + include/rocksdb/env.h | 2 +- util/threadpool_imp.cc | 3 + 15 files changed, 187 insertions(+), 97 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index b208b444c..65e9719a3 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -312,7 +312,8 @@ CompactionJob::CompactionJob( SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, - const std::string& dbname, CompactionJobStats* compaction_job_stats) + const std::string& dbname, CompactionJobStats* compaction_job_stats, + Env::Priority thread_pri) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -340,7 +341,8 @@ CompactionJob::CompactionJob( bottommost_level_(false), paranoid_file_checks_(paranoid_file_checks), measure_io_stats_(measure_io_stats), - write_hint_(Env::WLTH_NOT_SET) { + write_hint_(Env::WLTH_NOT_SET), + thread_pri_(thread_pri) { assert(log_buffer_ != nullptr); const auto* cfd = compact_->compaction->column_family_data(); ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, @@ -717,7 +719,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { Status status = compact_->status; ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( - compact_->compaction->output_level(), compaction_stats_); + compact_->compaction->output_level(), thread_pri_, compaction_stats_); if (status.ok()) { status = InstallCompactionResults(mutable_cf_options); diff --git a/db/compaction_job.h b/db/compaction_job.h index 596b5cc60..9767985f3 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -62,17 +62,17 @@ class CompactionJob { const EnvOptions env_options, VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - LogBuffer* log_buffer, - Directory* db_directory, Directory* output_directory, - Statistics* stats, InstrumentedMutex* db_mutex, - ErrorHandler* db_error_handler, + LogBuffer* log_buffer, Directory* db_directory, + Directory* output_directory, Statistics* stats, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, - CompactionJobStats* compaction_job_stats); + CompactionJobStats* compaction_job_stats, + Env::Priority thread_pri); ~CompactionJob(); @@ -172,6 +172,7 @@ class CompactionJob { // Stores the approx size of keys covered in the range of each subcompaction std::vector sizes_; Env::WriteLifeTimeHint write_hint_; + Env::Priority thread_pri_; }; } // namespace rocksdb diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index d41b4d1ac..f05a8ec2f 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -262,7 +262,8 @@ class CompactionJobTest : public testing::Test { &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, - &event_logger, false, false, dbname_, &compaction_job_stats_); + &event_logger, false, false, dbname_, &compaction_job_stats_, + Env::Priority::USER); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); diff --git a/db/db_impl.h b/db/db_impl.h index 39ac83c1b..f7ead885b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -948,7 +948,8 @@ class DBImpl : public DB { SuperVersionContext* superversion_context, std::vector& snapshot_seqs, SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, LogBuffer* log_buffer); + SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, + Env::Priority thread_pri); // Argument required by background flush thread. struct BGFlushArg { @@ -971,15 +972,22 @@ class DBImpl : public DB { SuperVersionContext* superversion_context_; }; + // Argument passed to flush thread. + struct FlushThreadArg { + DBImpl* db_; + + Env::Priority thread_pri_; + }; + // Flush the memtables of (multiple) column families to multiple files on // persistent storage. Status FlushMemTablesToOutputFiles( const autovector& bg_flush_args, bool* made_progress, - JobContext* job_context, LogBuffer* log_buffer); + JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); Status AtomicFlushMemTablesToOutputFiles( const autovector& bg_flush_args, bool* made_progress, - JobContext* job_context, LogBuffer* log_buffer); + JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, @@ -1122,18 +1130,21 @@ class DBImpl : public DB { // Runs a pre-chosen universal compaction involving bottom level in a // separate, bottom-pri thread pool. static void BGWorkBottomCompaction(void* arg); - static void BGWorkFlush(void* db); + static void BGWorkFlush(void* arg); static void BGWorkPurge(void* arg); - static void UnscheduleCallback(void* arg); + static void UnscheduleCompactionCallback(void* arg); + static void UnscheduleFlushCallback(void* arg); void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, - Env::Priority bg_thread_pri); - void BackgroundCallFlush(); + Env::Priority thread_pri); + void BackgroundCallFlush(Env::Priority thread_pri); void BackgroundCallPurge(); Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, - PrepickedCompaction* prepicked_compaction); + PrepickedCompaction* prepicked_compaction, + Env::Priority thread_pri); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, - LogBuffer* log_buffer, FlushReason* reason); + LogBuffer* log_buffer, FlushReason* reason, + Env::Priority thread_pri); bool EnoughRoomForCompaction(ColumnFamilyData* cfd, const std::vector& inputs, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 2298237a8..ed4d5d13b 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -135,7 +135,8 @@ Status DBImpl::FlushMemTableToOutputFile( SuperVersionContext* superversion_context, std::vector& snapshot_seqs, SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, LogBuffer* log_buffer) { + SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, + Env::Priority thread_pri) { mutex_.AssertHeld(); assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->IsFlushPending()); @@ -149,7 +150,8 @@ Status DBImpl::FlushMemTableToOutputFile( GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, - true /* sync_output_directory */, true /* write_manifest */); + true /* sync_output_directory */, true /* write_manifest */, + thread_pri); FileMetaData file_meta; @@ -232,10 +234,11 @@ Status DBImpl::FlushMemTableToOutputFile( Status DBImpl::FlushMemTablesToOutputFiles( const autovector& bg_flush_args, bool* made_progress, - JobContext* job_context, LogBuffer* log_buffer) { + JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { if (immutable_db_options_.atomic_flush) { return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress, - job_context, log_buffer); + job_context, log_buffer, + thread_pri); } std::vector snapshot_seqs; SequenceNumber earliest_write_conflict_snapshot; @@ -250,7 +253,7 @@ Status DBImpl::FlushMemTablesToOutputFiles( Status s = FlushMemTableToOutputFile( cfd, mutable_cf_options, made_progress, job_context, superversion_context, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, log_buffer); + snapshot_checker, log_buffer, thread_pri); if (!s.ok()) { status = s; if (!s.IsShutdownInProgress()) { @@ -274,7 +277,7 @@ Status DBImpl::FlushMemTablesToOutputFiles( */ Status DBImpl::AtomicFlushMemTablesToOutputFiles( const autovector& bg_flush_args, bool* made_progress, - JobContext* job_context, LogBuffer* log_buffer) { + JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { mutex_.AssertHeld(); autovector cfds; @@ -331,7 +334,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, - false /* sync_output_directory */, false /* write_manifest */); + false /* sync_output_directory */, false /* write_manifest */, + thread_pri); jobs.back().PickMemTable(); } @@ -957,7 +961,7 @@ Status DBImpl::CompactFilesImpl( snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, - &compaction_job_stats); + &compaction_job_stats, Env::Priority::USER); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already @@ -1445,7 +1449,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, manual.incomplete = false; bg_compaction_scheduled_++; env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, - &DBImpl::UnscheduleCallback); + &DBImpl::UnscheduleCompactionCallback); scheduled = true; } } @@ -1785,7 +1789,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { while (!is_flush_pool_empty && unscheduled_flushes_ > 0 && bg_flush_scheduled_ < bg_job_limits.max_flushes) { bg_flush_scheduled_++; - env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this); + FlushThreadArg* fta = new FlushThreadArg; + fta->db_ = this; + fta->thread_pri_ = Env::Priority::HIGH; + env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this, + &DBImpl::UnscheduleFlushCallback); } // special case -- if high-pri (flush) thread pool is empty, then schedule @@ -1795,7 +1803,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bg_flush_scheduled_ + bg_compaction_scheduled_ < bg_job_limits.max_flushes) { bg_flush_scheduled_++; - env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this); + FlushThreadArg* fta = new FlushThreadArg; + fta->db_ = this; + fta->thread_pri_ = Env::Priority::LOW; + env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this, + &DBImpl::UnscheduleFlushCallback); } } @@ -1825,7 +1837,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bg_compaction_scheduled_++; unscheduled_compactions_--; env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, - &DBImpl::UnscheduleCallback); + &DBImpl::UnscheduleCompactionCallback); } } @@ -1940,10 +1952,13 @@ void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync, purge_queue_.push_back(std::move(file_info)); } -void DBImpl::BGWorkFlush(void* db) { - IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); +void DBImpl::BGWorkFlush(void* arg) { + FlushThreadArg fta = *(reinterpret_cast(arg)); + delete reinterpret_cast(arg); + + IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_); TEST_SYNC_POINT("DBImpl::BGWorkFlush"); - reinterpret_cast(db)->BackgroundCallFlush(); + reinterpret_cast(fta.db_)->BackgroundCallFlush(fta.thread_pri_); TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); } @@ -1978,7 +1993,7 @@ void DBImpl::BGWorkPurge(void* db) { TEST_SYNC_POINT("DBImpl::BGWorkPurge:end"); } -void DBImpl::UnscheduleCallback(void* arg) { +void DBImpl::UnscheduleCompactionCallback(void* arg) { CompactionArg ca = *(reinterpret_cast(arg)); delete reinterpret_cast(arg); if (ca.prepicked_compaction != nullptr) { @@ -1987,11 +2002,17 @@ void DBImpl::UnscheduleCallback(void* arg) { } delete ca.prepicked_compaction; } - TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); + TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback"); +} + +void DBImpl::UnscheduleFlushCallback(void* arg) { + delete reinterpret_cast(arg); + TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback"); } Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, - LogBuffer* log_buffer, FlushReason* reason) { + LogBuffer* log_buffer, FlushReason* reason, + Env::Priority thread_pri) { mutex_.AssertHeld(); Status status; @@ -2052,7 +2073,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, bg_compaction_scheduled_); } status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress, - job_context, log_buffer); + job_context, log_buffer, thread_pri); // All the CFDs in the FlushReq must have the same flush reason, so just // grab the first one *reason = bg_flush_args[0].cfd_->GetFlushReason(); @@ -2067,7 +2088,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, return status; } -void DBImpl::BackgroundCallFlush() { +void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) { bool made_progress = false; JobContext job_context(next_job_id_.fetch_add(1), true); @@ -2084,8 +2105,8 @@ void DBImpl::BackgroundCallFlush() { CaptureCurrentFileNumberInPendingOutputs(); FlushReason reason; - Status s = - BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason); + Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer, + &reason, thread_pri); if (!s.ok() && !s.IsShutdownInProgress() && reason != FlushReason::kErrorRecovery) { // Wait a little bit before retrying background flush in @@ -2168,7 +2189,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, bg_bottom_compaction_scheduled_) || (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_)); Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, - prepicked_compaction); + prepicked_compaction, bg_thread_pri); TEST_SYNC_POINT("BackgroundCallCompaction:1"); if (s.IsBusy()) { bg_cv_.SignalAll(); // In case a waiter can proceed despite the error @@ -2255,7 +2276,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, Status DBImpl::BackgroundCompaction(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer, - PrepickedCompaction* prepicked_compaction) { + PrepickedCompaction* prepicked_compaction, + Env::Priority thread_pri) { ManualCompactionState* manual_compaction = prepicked_compaction == nullptr ? nullptr @@ -2568,7 +2590,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ca->prepicked_compaction->task_token = std::move(task_token); ++bg_bottom_compaction_scheduled_; env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, - this, &DBImpl::UnscheduleCallback); + this, &DBImpl::UnscheduleCompactionCallback); } else { TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", c->column_family_data()); @@ -2587,11 +2609,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), stats_, - &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, - snapshot_checker, table_cache_, &event_logger_, - c->mutable_cf_options()->paranoid_file_checks, + &mutex_, &error_handler_, snapshot_seqs, + earliest_write_conflict_snapshot, snapshot_checker, table_cache_, + &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, - &compaction_job_stats); + &compaction_job_stats, thread_pri); compaction_job.Prepare(); NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 0806c486e..52ee53748 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1059,7 +1059,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.fd.GetFileSize(); stats.num_output_files = 1; - cfd->internal_stats()->AddCompactionStats(level, stats); + cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize()); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 7c927c079..28b481678 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -227,7 +227,8 @@ void ExternalSstFileIngestionJob::UpdateStats() { stats.bytes_moved = f.fd.GetFileSize(); } stats.num_output_files = 1; - cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats); + cfd_->internal_stats()->AddCompactionStats(f.picked_level, + Env::Priority::USER, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, f.fd.GetFileSize()); total_keys += f.num_entries; diff --git a/db/flush_job.cc b/db/flush_job.cc index f8837352f..f03188141 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -100,7 +100,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats, - const bool sync_output_directory, const bool write_manifest) + const bool sync_output_directory, const bool write_manifest, + Env::Priority thread_pri) : dbname_(dbname), cfd_(cfd), db_options_(db_options), @@ -125,7 +126,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, write_manifest_(write_manifest), edit_(nullptr), base_(nullptr), - pick_memtable_called(false) { + pick_memtable_called(false), + thread_pri_(thread_pri) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -417,7 +419,7 @@ Status FlushJob::WriteLevel0Table() { stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros; stats.bytes_written = meta_.fd.GetFileSize(); RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); - cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats); + cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, meta_.fd.GetFileSize()); RecordFlushIOStats(); diff --git a/db/flush_job.h b/db/flush_job.h index d993e410d..c40819456 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -68,7 +68,8 @@ class FlushJob { LogBuffer* log_buffer, Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats, - const bool sync_output_directory, const bool write_manifest); + const bool sync_output_directory, const bool write_manifest, + Env::Priority thread_pri); ~FlushJob(); @@ -137,6 +138,7 @@ class FlushJob { VersionEdit* edit_; Version* base_; bool pick_memtable_called; + Env::Priority thread_pri_; }; } // namespace rocksdb diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 4f9060be4..199ed29ca 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -118,13 +118,14 @@ TEST_F(FlushJobTest, Empty) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job( - dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, - env_options_, versions_.get(), &mutex_, &shutting_down_, {}, - kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, - nullptr, kNoCompression, nullptr, &event_logger, false, - true /* sync_output_directory */, true /* write_manifest */); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + nullptr /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, + kNoCompression, nullptr, &event_logger, false, + true /* sync_output_directory */, + true /* write_manifest */, Env::Priority::USER); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -165,13 +166,14 @@ TEST_F(FlushJobTest, NonEmpty) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job( - dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, - env_options_, versions_.get(), &mutex_, &shutting_down_, {}, - kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, - nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, - true, true /* sync_output_directory */, true /* write_manifest */); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + nullptr /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, {}, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, + kNoCompression, db_options_.statistics.get(), + &event_logger, true, true /* sync_output_directory */, + true /* write_manifest */, Env::Priority::USER); HistogramData hist; FileMetaData file_meta; @@ -228,13 +230,14 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { uint64_t smallest_memtable_id = memtable_ids.front(); uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1; - FlushJob flush_job( - dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_, - versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - db_options_.statistics.get(), &event_logger, true, - true /* sync_output_directory */, true /* write_manifest */); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + &flush_memtable_id, env_options_, versions_.get(), &mutex_, + &shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, + &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, + true /* write_manifest */, Env::Priority::USER); HistogramData hist; FileMetaData file_meta; mutex_.Lock(); @@ -304,7 +307,8 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, - false /* sync_output_directory */, false /* write_manifest */); + false /* sync_output_directory */, false /* write_manifest */, + Env::Priority::USER); k++; } HistogramData hist; @@ -413,13 +417,14 @@ TEST_F(FlushJobTest, Snapshots) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job( - dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */, - env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots, - kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, - nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, - true, true /* sync_output_directory */, true /* write_manifest */); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + nullptr /* memtable_id */, env_options_, versions_.get(), + &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber, + snapshot_checker, &job_context, nullptr, nullptr, nullptr, + kNoCompression, db_options_.statistics.get(), + &event_logger, true, true /* sync_output_directory */, + true /* write_manifest */, Env::Priority::USER); mutex_.Lock(); flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 6d58552e2..51e55f583 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -58,7 +58,8 @@ const double kMB = 1048576.0; const double kGB = kMB * 1024; const double kMicrosInSec = 1000000.0; -void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) { +void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name, + const std::string& group_by) { int written_size = snprintf(buf, len, "\n** Compaction Stats [%s] **\n", cf_name.c_str()); auto hdr = [](LevelStatType t) { @@ -66,17 +67,18 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) { }; int line_size = snprintf( buf + written_size, len - written_size, - "Level %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", + "%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", // Note that we skip COMPACTED_FILES and merge it with Files column - hdr(LevelStatType::NUM_FILES), hdr(LevelStatType::SIZE_BYTES), - hdr(LevelStatType::SCORE), hdr(LevelStatType::READ_GB), - hdr(LevelStatType::RN_GB), hdr(LevelStatType::RNP1_GB), - hdr(LevelStatType::WRITE_GB), hdr(LevelStatType::W_NEW_GB), - hdr(LevelStatType::MOVED_GB), hdr(LevelStatType::WRITE_AMP), - hdr(LevelStatType::READ_MBPS), hdr(LevelStatType::WRITE_MBPS), - hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_CPU_SEC), - hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::AVG_SEC), - hdr(LevelStatType::KEY_IN), hdr(LevelStatType::KEY_DROP)); + group_by.c_str(), hdr(LevelStatType::NUM_FILES), + hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE), + hdr(LevelStatType::READ_GB), hdr(LevelStatType::RN_GB), + hdr(LevelStatType::RNP1_GB), hdr(LevelStatType::WRITE_GB), + hdr(LevelStatType::W_NEW_GB), hdr(LevelStatType::MOVED_GB), + hdr(LevelStatType::WRITE_AMP), hdr(LevelStatType::READ_MBPS), + hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC), + hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT), + hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN), + hdr(LevelStatType::KEY_DROP)); written_size += line_size; snprintf(buf + written_size, len - written_size, "%s\n", @@ -1152,6 +1154,20 @@ void InternalStats::DumpCFMapStats( (*levels_stats)[-1] = sum_stats; // -1 is for the Sum level } +void InternalStats::DumpCFMapStatsByPriority( + std::map>* priorities_stats) { + for (size_t priority = 0; priority < comp_stats_by_pri_.size(); priority++) { + if (comp_stats_by_pri_[priority].micros > 0) { + std::map priority_stats; + PrepareLevelStats(&priority_stats, 0 /* num_files */, + 0 /* being_compacted */, 0 /* total_file_size */, + 0 /* compaction_score */, 0 /* w_amp */, + comp_stats_by_pri_[priority]); + (*priorities_stats)[static_cast(priority)] = priority_stats; + } + } +} + void InternalStats::DumpCFMapStatsIOStalls( std::map* cf_stats) { (*cf_stats)["io_stalls.level0_slowdown"] = @@ -1192,7 +1208,7 @@ void InternalStats::DumpCFStats(std::string* value) { void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { char buf[2000]; // Per-ColumnFamily stats - PrintLevelStatsHeader(buf, sizeof(buf), cfd_->GetName()); + PrintLevelStatsHeader(buf, sizeof(buf), cfd_->GetName(), "Level"); value->append(buf); // Print stats for each level @@ -1238,6 +1254,21 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { PrintLevelStats(buf, sizeof(buf), "Int", 0, 0, 0, 0, w_amp, interval_stats); value->append(buf); + PrintLevelStatsHeader(buf, sizeof(buf), cfd_->GetName(), "Priority"); + value->append(buf); + std::map> priorities_stats; + DumpCFMapStatsByPriority(&priorities_stats); + for (size_t priority = 0; priority < comp_stats_by_pri_.size(); ++priority) { + if (priorities_stats.find(static_cast(priority)) != + priorities_stats.end()) { + PrintLevelStats( + buf, sizeof(buf), + Env::PriorityToString(static_cast(priority)), + priorities_stats[static_cast(priority)]); + value->append(buf); + } + } + double seconds_up = (env_->NowMicros() - started_at_ + 1) / kMicrosInSec; double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up; snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", diff --git a/db/internal_stats.h b/db/internal_stats.h index 6fa8727a4..20fb07f48 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -125,6 +125,7 @@ class InternalStats { cf_stats_value_{}, cf_stats_count_{}, comp_stats_(num_levels), + comp_stats_by_pri_(Env::Priority::TOTAL), file_read_latency_(num_levels), bg_error_count_(0), number_levels_(num_levels), @@ -318,8 +319,10 @@ class InternalStats { started_at_ = env_->NowMicros(); } - void AddCompactionStats(int level, const CompactionStats& stats) { + void AddCompactionStats(int level, Env::Priority thread_pri, + const CompactionStats& stats) { comp_stats_[level].Add(stats); + comp_stats_by_pri_[thread_pri].Add(stats); } void IncBytesMoved(int level, uint64_t amount) { @@ -381,6 +384,8 @@ class InternalStats { void DumpCFMapStats( std::map>* level_stats, CompactionStats* compaction_stats_sum); + void DumpCFMapStatsByPriority( + std::map>* priorities_stats); void DumpCFMapStatsIOStalls(std::map* cf_stats); void DumpCFStats(std::string* value); void DumpCFStatsNoFileHistogram(std::string* value); @@ -395,6 +400,7 @@ class InternalStats { uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX]; // Per-ColumnFamily/level compaction stats std::vector comp_stats_; + std::vector comp_stats_by_pri_; std::vector file_read_latency_; // Used to compute per-interval statistics @@ -625,7 +631,8 @@ class InternalStats { void Subtract(const CompactionStats& /*c*/) {} }; - void AddCompactionStats(int /*level*/, const CompactionStats& /*stats*/) {} + void AddCompactionStats(int /*level*/, Env::Priority /*thread_pri*/, + const CompactionStats& /*stats*/) {} void IncBytesMoved(int /*level*/, uint64_t /*amount*/) {} diff --git a/env/env.cc b/env/env.cc index e98f082ff..fde03577d 100644 --- a/env/env.cc +++ b/env/env.cc @@ -30,6 +30,8 @@ std::string Env::PriorityToString(Env::Priority priority) { return "Low"; case Env::Priority::HIGH: return "High"; + case Env::Priority::USER: + return "User"; case Env::Priority::TOTAL: assert(false); } diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 07bc3aba3..814903311 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -319,7 +319,7 @@ class Env { virtual Status UnlockFile(FileLock* lock) = 0; // Priority for scheduling job in thread pool - enum Priority { BOTTOM, LOW, HIGH, TOTAL }; + enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL }; static std::string PriorityToString(Priority priority); diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index b431830ee..acac0063b 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -292,6 +292,9 @@ void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { case Env::Priority::BOTTOM: thread_type = ThreadStatus::BOTTOM_PRIORITY; break; + case Env::Priority::USER: + thread_type = ThreadStatus::USER; + break; case Env::Priority::TOTAL: assert(false); return nullptr; -- GitLab