提交 a291f3a1 编写于 作者: Z Zhongyi Xie 提交者: Facebook Github Bot

Collect compaction stats by priority and dump to info LOG (#5050)

Summary:
In order to better understand compaction done by different priority thread pool, we now collect compaction stats by priority and also print them to info LOG through stats dump.

```
** Compaction Stats [default] **
Priority    Files   Size     Score Read(GB)  Rn(GB) Rnp1(GB) Write(GB) Wnew(GB) Moved(GB) W-Amp Rd(MB/s) Wr(MB/s) Comp(sec) CompMergeCPU(sec) Comp(cnt) Avg(sec) KeyIn KeyDrop
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Low      0/0    0.00 KB   0.0     16.8    11.3      5.5       5.6      0.1       0.0   0.0    406.4    136.1     42.24             34.96        45    0.939     13M  8865K
High      0/0    0.00 KB   0.0      0.0     0.0      0.0      11.4     11.4       0.0   0.0      0.0     76.2    153.00             35.74     12185    0.013       0      0
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5050

Differential Revision: D14408583

Pulled By: miasantreble

fbshipit-source-id: e53746586ea27cb8abc9fec35805bd80ed30f608
上级 e50326f3
......@@ -312,7 +312,8 @@ CompactionJob::CompactionJob(
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats)
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
......@@ -340,7 +341,8 @@ CompactionJob::CompactionJob(
bottommost_level_(false),
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET) {
write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri) {
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
......@@ -717,7 +719,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
Status status = compact_->status;
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), compaction_stats_);
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
......
......@@ -62,17 +62,17 @@ class CompactionJob {
const EnvOptions env_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex,
ErrorHandler* db_error_handler,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname,
CompactionJobStats* compaction_job_stats);
CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri);
~CompactionJob();
......@@ -172,6 +172,7 @@ class CompactionJob {
// Stores the approx size of keys covered in the range of each subcompaction
std::vector<uint64_t> sizes_;
Env::WriteLifeTimeHint write_hint_;
Env::Priority thread_pri_;
};
} // namespace rocksdb
......@@ -262,7 +262,8 @@ class CompactionJobTest : public testing::Test {
&shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr,
nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_);
&event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();
......
......@@ -948,7 +948,8 @@ class DBImpl : public DB {
SuperVersionContext* superversion_context,
std::vector<SequenceNumber>& snapshot_seqs,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer);
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
Env::Priority thread_pri);
// Argument required by background flush thread.
struct BGFlushArg {
......@@ -971,15 +972,22 @@ class DBImpl : public DB {
SuperVersionContext* superversion_context_;
};
// Argument passed to flush thread.
struct FlushThreadArg {
DBImpl* db_;
Env::Priority thread_pri_;
};
// Flush the memtables of (multiple) column families to multiple files on
// persistent storage.
Status FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
Status AtomicFlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer);
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
......@@ -1122,18 +1130,21 @@ class DBImpl : public DB {
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
static void BGWorkBottomCompaction(void* arg);
static void BGWorkFlush(void* db);
static void BGWorkFlush(void* arg);
static void BGWorkPurge(void* arg);
static void UnscheduleCallback(void* arg);
static void UnscheduleCompactionCallback(void* arg);
static void UnscheduleFlushCallback(void* arg);
void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri);
void BackgroundCallFlush();
Env::Priority thread_pri);
void BackgroundCallFlush(Env::Priority thread_pri);
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason);
LogBuffer* log_buffer, FlushReason* reason,
Env::Priority thread_pri);
bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
const std::vector<CompactionInputFiles>& inputs,
......
......@@ -135,7 +135,8 @@ Status DBImpl::FlushMemTableToOutputFile(
SuperVersionContext* superversion_context,
std::vector<SequenceNumber>& snapshot_seqs,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer) {
SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
Env::Priority thread_pri) {
mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
......@@ -149,7 +150,8 @@ Status DBImpl::FlushMemTableToOutputFile(
GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */);
true /* sync_output_directory */, true /* write_manifest */,
thread_pri);
FileMetaData file_meta;
......@@ -232,10 +234,11 @@ Status DBImpl::FlushMemTableToOutputFile(
Status DBImpl::FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
if (immutable_db_options_.atomic_flush) {
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
job_context, log_buffer,
thread_pri);
}
std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot;
......@@ -250,7 +253,7 @@ Status DBImpl::FlushMemTablesToOutputFiles(
Status s = FlushMemTableToOutputFile(
cfd, mutable_cf_options, made_progress, job_context,
superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, log_buffer);
snapshot_checker, log_buffer, thread_pri);
if (!s.ok()) {
status = s;
if (!s.IsShutdownInProgress()) {
......@@ -274,7 +277,7 @@ Status DBImpl::FlushMemTablesToOutputFiles(
*/
Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
mutex_.AssertHeld();
autovector<ColumnFamilyData*> cfds;
......@@ -331,7 +334,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */);
false /* sync_output_directory */, false /* write_manifest */,
thread_pri);
jobs.back().PickMemTable();
}
......@@ -957,7 +961,7 @@ Status DBImpl::CompactFilesImpl(
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats);
&compaction_job_stats, Env::Priority::USER);
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
......@@ -1445,7 +1449,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
manual.incomplete = false;
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
&DBImpl::UnscheduleCompactionCallback);
scheduled = true;
}
}
......@@ -1785,7 +1789,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
fta->thread_pri_ = Env::Priority::HIGH;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
&DBImpl::UnscheduleFlushCallback);
}
// special case -- if high-pri (flush) thread pool is empty, then schedule
......@@ -1795,7 +1803,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bg_flush_scheduled_ + bg_compaction_scheduled_ <
bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
FlushThreadArg* fta = new FlushThreadArg;
fta->db_ = this;
fta->thread_pri_ = Env::Priority::LOW;
env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
&DBImpl::UnscheduleFlushCallback);
}
}
......@@ -1825,7 +1837,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
&DBImpl::UnscheduleCallback);
&DBImpl::UnscheduleCompactionCallback);
}
}
......@@ -1940,10 +1952,13 @@ void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
purge_queue_.push_back(std::move(file_info));
}
void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
void DBImpl::BGWorkFlush(void* arg) {
FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
delete reinterpret_cast<FlushThreadArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
reinterpret_cast<DBImpl*>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
}
......@@ -1978,7 +1993,7 @@ void DBImpl::BGWorkPurge(void* db) {
TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
}
void DBImpl::UnscheduleCallback(void* arg) {
void DBImpl::UnscheduleCompactionCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg);
if (ca.prepicked_compaction != nullptr) {
......@@ -1987,11 +2002,17 @@ void DBImpl::UnscheduleCallback(void* arg) {
}
delete ca.prepicked_compaction;
}
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
}
void DBImpl::UnscheduleFlushCallback(void* arg) {
delete reinterpret_cast<FlushThreadArg*>(arg);
TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
}
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason) {
LogBuffer* log_buffer, FlushReason* reason,
Env::Priority thread_pri) {
mutex_.AssertHeld();
Status status;
......@@ -2052,7 +2073,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
bg_compaction_scheduled_);
}
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
job_context, log_buffer, thread_pri);
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
*reason = bg_flush_args[0].cfd_->GetFlushReason();
......@@ -2067,7 +2088,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
return status;
}
void DBImpl::BackgroundCallFlush() {
void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
......@@ -2084,8 +2105,8 @@ void DBImpl::BackgroundCallFlush() {
CaptureCurrentFileNumberInPendingOutputs();
FlushReason reason;
Status s =
BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason);
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
&reason, thread_pri);
if (!s.ok() && !s.IsShutdownInProgress() &&
reason != FlushReason::kErrorRecovery) {
// Wait a little bit before retrying background flush in
......@@ -2168,7 +2189,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
bg_bottom_compaction_scheduled_) ||
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
prepicked_compaction);
prepicked_compaction, bg_thread_pri);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
......@@ -2255,7 +2276,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction) {
PrepickedCompaction* prepicked_compaction,
Env::Priority thread_pri) {
ManualCompactionState* manual_compaction =
prepicked_compaction == nullptr
? nullptr
......@@ -2568,7 +2590,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
ca->prepicked_compaction->task_token = std::move(task_token);
++bg_bottom_compaction_scheduled_;
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
this, &DBImpl::UnscheduleCallback);
this, &DBImpl::UnscheduleCompactionCallback);
} else {
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
......@@ -2587,11 +2609,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
&mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
&mutex_, &error_handler_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats);
&compaction_job_stats, thread_pri);
compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
......
......@@ -1059,7 +1059,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
stats.num_output_files = 1;
cfd->internal_stats()->AddCompactionStats(level, stats);
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta.fd.GetFileSize());
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
......
......@@ -227,7 +227,8 @@ void ExternalSstFileIngestionJob::UpdateStats() {
stats.bytes_moved = f.fd.GetFileSize();
}
stats.num_output_files = 1;
cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats);
cfd_->internal_stats()->AddCompactionStats(f.picked_level,
Env::Priority::USER, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
f.fd.GetFileSize());
total_keys += f.num_entries;
......
......@@ -100,7 +100,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest)
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
......@@ -125,7 +126,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
write_manifest_(write_manifest),
edit_(nullptr),
base_(nullptr),
pick_memtable_called(false) {
pick_memtable_called(false),
thread_pri_(thread_pri) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
......@@ -417,7 +419,7 @@ Status FlushJob::WriteLevel0Table() {
stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
stats.bytes_written = meta_.fd.GetFileSize();
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta_.fd.GetFileSize());
RecordFlushIOStats();
......
......@@ -68,7 +68,8 @@ class FlushJob {
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest);
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri);
~FlushJob();
......@@ -137,6 +138,7 @@ class FlushJob {
VersionEdit* edit_;
Version* base_;
bool pick_memtable_called;
Env::Priority thread_pri_;
};
} // namespace rocksdb
......@@ -118,13 +118,14 @@ TEST_F(FlushJobTest, Empty) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, {},
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, nullptr, &event_logger, false,
true /* sync_output_directory */, true /* write_manifest */);
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, nullptr, &event_logger, false,
true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
......@@ -165,13 +166,14 @@ TEST_F(FlushJobTest, NonEmpty) {
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, {},
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
true, true /* sync_output_directory */, true /* write_manifest */);
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, db_options_.statistics.get(),
&event_logger, true, true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
HistogramData hist;
FileMetaData file_meta;
......@@ -228,13 +230,14 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) {
uint64_t smallest_memtable_id = memtable_ids.front();
uint64_t flush_memtable_id = smallest_memtable_id + num_mems_to_flush - 1;
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), &flush_memtable_id, env_options_,
versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */, true /* write_manifest */);
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
&flush_memtable_id, env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
HistogramData hist;
FileMetaData file_meta;
mutex_.Lock();
......@@ -304,7 +307,8 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
&shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression,
db_options_.statistics.get(), &event_logger, true,
false /* sync_output_directory */, false /* write_manifest */);
false /* sync_output_directory */, false /* write_manifest */,
Env::Priority::USER);
k++;
}
HistogramData hist;
......@@ -413,13 +417,14 @@ TEST_F(FlushJobTest, Snapshots) {
EventLogger event_logger(db_options_.info_log.get());
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), nullptr /* memtable_id */,
env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots,
kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr,
nullptr, kNoCompression, db_options_.statistics.get(), &event_logger,
true, true /* sync_output_directory */, true /* write_manifest */);
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
nullptr /* memtable_id */, env_options_, versions_.get(),
&mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
snapshot_checker, &job_context, nullptr, nullptr, nullptr,
kNoCompression, db_options_.statistics.get(),
&event_logger, true, true /* sync_output_directory */,
true /* write_manifest */, Env::Priority::USER);
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());
......
......@@ -58,7 +58,8 @@ const double kMB = 1048576.0;
const double kGB = kMB * 1024;
const double kMicrosInSec = 1000000.0;
void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) {
void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name,
const std::string& group_by) {
int written_size =
snprintf(buf, len, "\n** Compaction Stats [%s] **\n", cf_name.c_str());
auto hdr = [](LevelStatType t) {
......@@ -66,17 +67,18 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name) {
};
int line_size = snprintf(
buf + written_size, len - written_size,
"Level %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n",
"%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n",
// Note that we skip COMPACTED_FILES and merge it with Files column
hdr(LevelStatType::NUM_FILES), hdr(LevelStatType::SIZE_BYTES),
hdr(LevelStatType::SCORE), hdr(LevelStatType::READ_GB),
hdr(LevelStatType::RN_GB), hdr(LevelStatType::RNP1_GB),
hdr(LevelStatType::WRITE_GB), hdr(LevelStatType::W_NEW_GB),
hdr(LevelStatType::MOVED_GB), hdr(LevelStatType::WRITE_AMP),
hdr(LevelStatType::READ_MBPS), hdr(LevelStatType::WRITE_MBPS),
hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_CPU_SEC),
hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::AVG_SEC),
hdr(LevelStatType::KEY_IN), hdr(LevelStatType::KEY_DROP));
group_by.c_str(), hdr(LevelStatType::NUM_FILES),
hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE),
hdr(LevelStatType::READ_GB), hdr(LevelStatType::RN_GB),
hdr(LevelStatType::RNP1_GB), hdr(LevelStatType::WRITE_GB),
hdr(LevelStatType::W_NEW_GB), hdr(LevelStatType::MOVED_GB),
hdr(LevelStatType::WRITE_AMP), hdr(LevelStatType::READ_MBPS),
hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC),
hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT),
hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN),
hdr(LevelStatType::KEY_DROP));
written_size += line_size;
snprintf(buf + written_size, len - written_size, "%s\n",
......@@ -1152,6 +1154,20 @@ void InternalStats::DumpCFMapStats(
(*levels_stats)[-1] = sum_stats; // -1 is for the Sum level
}
void InternalStats::DumpCFMapStatsByPriority(
std::map<int, std::map<LevelStatType, double>>* priorities_stats) {
for (size_t priority = 0; priority < comp_stats_by_pri_.size(); priority++) {
if (comp_stats_by_pri_[priority].micros > 0) {
std::map<LevelStatType, double> priority_stats;
PrepareLevelStats(&priority_stats, 0 /* num_files */,
0 /* being_compacted */, 0 /* total_file_size */,
0 /* compaction_score */, 0 /* w_amp */,
comp_stats_by_pri_[priority]);
(*priorities_stats)[static_cast<int>(priority)] = priority_stats;
}
}
}
void InternalStats::DumpCFMapStatsIOStalls(
std::map<std::string, std::string>* cf_stats) {
(*cf_stats)["io_stalls.level0_slowdown"] =
......@@ -1192,7 +1208,7 @@ void InternalStats::DumpCFStats(std::string* value) {
void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
char buf[2000];
// Per-ColumnFamily stats
PrintLevelStatsHeader(buf, sizeof(buf), cfd_->GetName());
PrintLevelStatsHeader(buf, sizeof(buf), cfd_->GetName(), "Level");
value->append(buf);
// Print stats for each level
......@@ -1238,6 +1254,21 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
PrintLevelStats(buf, sizeof(buf), "Int", 0, 0, 0, 0, w_amp, interval_stats);
value->append(buf);
PrintLevelStatsHeader(buf, sizeof(buf), cfd_->GetName(), "Priority");
value->append(buf);
std::map<int, std::map<LevelStatType, double>> priorities_stats;
DumpCFMapStatsByPriority(&priorities_stats);
for (size_t priority = 0; priority < comp_stats_by_pri_.size(); ++priority) {
if (priorities_stats.find(static_cast<int>(priority)) !=
priorities_stats.end()) {
PrintLevelStats(
buf, sizeof(buf),
Env::PriorityToString(static_cast<Env::Priority>(priority)),
priorities_stats[static_cast<int>(priority)]);
value->append(buf);
}
}
double seconds_up = (env_->NowMicros() - started_at_ + 1) / kMicrosInSec;
double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up;
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
......
......@@ -125,6 +125,7 @@ class InternalStats {
cf_stats_value_{},
cf_stats_count_{},
comp_stats_(num_levels),
comp_stats_by_pri_(Env::Priority::TOTAL),
file_read_latency_(num_levels),
bg_error_count_(0),
number_levels_(num_levels),
......@@ -318,8 +319,10 @@ class InternalStats {
started_at_ = env_->NowMicros();
}
void AddCompactionStats(int level, const CompactionStats& stats) {
void AddCompactionStats(int level, Env::Priority thread_pri,
const CompactionStats& stats) {
comp_stats_[level].Add(stats);
comp_stats_by_pri_[thread_pri].Add(stats);
}
void IncBytesMoved(int level, uint64_t amount) {
......@@ -381,6 +384,8 @@ class InternalStats {
void DumpCFMapStats(
std::map<int, std::map<LevelStatType, double>>* level_stats,
CompactionStats* compaction_stats_sum);
void DumpCFMapStatsByPriority(
std::map<int, std::map<LevelStatType, double>>* priorities_stats);
void DumpCFMapStatsIOStalls(std::map<std::string, std::string>* cf_stats);
void DumpCFStats(std::string* value);
void DumpCFStatsNoFileHistogram(std::string* value);
......@@ -395,6 +400,7 @@ class InternalStats {
uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX];
// Per-ColumnFamily/level compaction stats
std::vector<CompactionStats> comp_stats_;
std::vector<CompactionStats> comp_stats_by_pri_;
std::vector<HistogramImpl> file_read_latency_;
// Used to compute per-interval statistics
......@@ -625,7 +631,8 @@ class InternalStats {
void Subtract(const CompactionStats& /*c*/) {}
};
void AddCompactionStats(int /*level*/, const CompactionStats& /*stats*/) {}
void AddCompactionStats(int /*level*/, Env::Priority /*thread_pri*/,
const CompactionStats& /*stats*/) {}
void IncBytesMoved(int /*level*/, uint64_t /*amount*/) {}
......
......@@ -30,6 +30,8 @@ std::string Env::PriorityToString(Env::Priority priority) {
return "Low";
case Env::Priority::HIGH:
return "High";
case Env::Priority::USER:
return "User";
case Env::Priority::TOTAL:
assert(false);
}
......
......@@ -319,7 +319,7 @@ class Env {
virtual Status UnlockFile(FileLock* lock) = 0;
// Priority for scheduling job in thread pool
enum Priority { BOTTOM, LOW, HIGH, TOTAL };
enum Priority { BOTTOM, LOW, HIGH, USER, TOTAL };
static std::string PriorityToString(Priority priority);
......
......@@ -292,6 +292,9 @@ void* ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
case Env::Priority::BOTTOM:
thread_type = ThreadStatus::BOTTOM_PRIORITY;
break;
case Env::Priority::USER:
thread_type = ThreadStatus::USER;
break;
case Env::Priority::TOTAL:
assert(false);
return nullptr;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册