From 2ea1219eb615f03130a6c0ef713cd3ab31d67553 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 31 Oct 2014 18:36:07 -0700 Subject: [PATCH] Fix RecordIn and RecordDrop stats Summary: 1. fix possible overflow of the two stats by using uint64_t 2. use a similar source of data to calculate RecordDrop. Previous one is not correct. Test Plan: See outputs of db_bench settings, and the results look reasonable Reviewers: MarkCallaghan, ljin, igor Reviewed By: igor Subscribers: rven, leveldb, yhchiang, dhruba Differential Revision: https://reviews.facebook.net/D28155 --- db/compaction_job.cc | 43 ++++++++++++------------ db/compaction_job.h | 3 +- db/internal_stats.cc | 80 ++++++++++++++++++++------------------------ db/internal_stats.h | 4 +-- 4 files changed, 60 insertions(+), 70 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 5db087b3c..dc472233b 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -81,7 +81,11 @@ struct CompactionJob::CompactionState { Output* current_output() { return &outputs[outputs.size() - 1]; } - explicit CompactionState(Compaction* c) : compaction(c), total_bytes(0) {} + explicit CompactionState(Compaction* c) + : compaction(c), + total_bytes(0), + num_input_records(0), + num_output_records(0) {} // Create a client visible context of this compaction CompactionFilter::Context GetFilterContextV1() { @@ -117,6 +121,9 @@ struct CompactionJob::CompactionState { std::string cur_prefix_; + uint64_t num_input_records; + uint64_t num_output_records; + // Buffers the kv-pair that will be run through compaction filter V2 // in the future. void BufferKeyValueSlices(const Slice& key, const Slice& value) { @@ -271,7 +278,6 @@ Status CompactionJob::Run() { log_buffer_->FlushBufferToLog(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); - int num_output_records = 0; const uint64_t start_micros = env_->NowMicros(); std::unique_ptr input( versions_->MakeInputIterator(compact_->compaction)); @@ -289,8 +295,7 @@ Status CompactionJob::Run() { int64_t imm_micros = 0; // Micros spent doing imm_ compactions if (!compaction_filter_v2) { - status = ProcessKeyValueCompaction(&imm_micros, input.get(), false, - &num_output_records); + status = ProcessKeyValueCompaction(&imm_micros, input.get(), false); } else { // temp_backup_input always point to the start of the current buffer // temp_backup_input = backup_input; @@ -361,8 +366,7 @@ Status CompactionJob::Run() { // Done buffering for the current prefix. Spit it out to disk // Now just iterate through all the kv-pairs - status = ProcessKeyValueCompaction(&imm_micros, input.get(), true, - &num_output_records); + status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); if (!status.ok()) { break; @@ -387,8 +391,7 @@ Status CompactionJob::Run() { } compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); - status = ProcessKeyValueCompaction(&imm_micros, input.get(), true, - &num_output_records); + status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); compact_->CleanupBatchBuffer(); compact_->CleanupMergedBuffer(); @@ -399,8 +402,7 @@ Status CompactionJob::Run() { CallCompactionFilterV2(compaction_filter_v2); } compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); - status = ProcessKeyValueCompaction(&imm_micros, input.get(), true, - &num_output_records); + status = ProcessKeyValueCompaction(&imm_micros, input.get(), true); } // checking for compaction filter v2 if (status.ok() && @@ -434,27 +436,26 @@ Status CompactionJob::Run() { } compaction_stats_.files_out_levelnp1 = num_output_files; - uint64_t num_input_records = 0; - for (int i = 0; i < compact_->compaction->num_input_files(0); i++) { compaction_stats_.bytes_readn += compact_->compaction->input(0, i)->fd.GetFileSize(); compaction_stats_.num_input_records += - compact_->compaction->input(0, i)->num_entries; - num_input_records += compact_->compaction->input(0, i)->num_entries; + static_cast(compact_->compaction->input(0, i)->num_entries); } for (int i = 0; i < compact_->compaction->num_input_files(1); i++) { compaction_stats_.bytes_readnp1 += compact_->compaction->input(1, i)->fd.GetFileSize(); - num_input_records += compact_->compaction->input(1, i)->num_entries; } for (int i = 0; i < num_output_files; i++) { compaction_stats_.bytes_written += compact_->outputs[i].file_size; } - compaction_stats_.num_dropped_records = - static_cast(num_input_records) - num_output_records; + if (compact_->num_input_records > compact_->num_output_records) { + compaction_stats_.num_dropped_records += + compact_->num_input_records - compact_->num_output_records; + compact_->num_input_records = compact_->num_output_records = 0; + } RecordCompactionIOStats(); @@ -518,10 +519,7 @@ void CompactionJob::AllocateCompactionOutputFileNumbers() { Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, - bool is_compaction_v2, - int* num_output_records) { - assert(num_output_records != nullptr); - + bool is_compaction_v2) { size_t combined_idx = 0; Status status; std::string compaction_filter_value; @@ -553,6 +551,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, int64_t loop_cnt = 0; while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && status.ok()) { + compact_->num_input_records++; if (++loop_cnt > 1000) { if (key_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); @@ -795,7 +794,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } compact_->current_output()->largest.DecodeFrom(newkey); compact_->builder->Add(newkey, value); - (*num_output_records)++, + compact_->num_output_records++, compact_->current_output()->largest_seqno = std::max(compact_->current_output()->largest_seqno, seqno); diff --git a/db/compaction_job.h b/db/compaction_job.h index 7b91e012a..f090c351d 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -83,8 +83,7 @@ class CompactionJob { // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input, - bool is_compaction_v2, - int* num_output_records); + bool is_compaction_v2); // Call compaction_filter_v2->Filter() on kv-pairs in compact void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2); Status FinishCompactionOutputFile(Iterator* input); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index cda75e0c8..617626cb1 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -46,50 +46,42 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, double elapsed = (stats.micros + 1) / 1000000.0; snprintf(buf, len, - "%4s %5d/%-3d %8.0f %5.1f " /* Level, Files, Size(MB), Score */ - "%8.1f " /* Read(GB) */ - "%7.1f " /* Rn(GB) */ - "%8.1f " /* Rnp1(GB) */ - "%9.1f " /* Write(GB) */ - "%8.1f " /* Wnew(GB) */ - "%6.1f " /* RW-Amp */ - "%5.1f " /* W-Amp */ - "%8.1f " /* Rd(MB/s) */ - "%8.1f " /* Wr(MB/s) */ - "%8d " /* Rn(cnt) */ - "%9d " /* Rnp1(cnt) */ - "%9d " /* Wnp1(cnt) */ - "%9d " /* Wnew(cnt) */ - "%10.0f " /* Comp(sec) */ - "%9d " /* Comp(cnt) */ - "%8.3f " /* Avg(sec) */ - "%10.2f " /* Stall(sec) */ - "%10" PRIu64 " " /* Stall(cnt) */ - "%7.2f " /* Avg(ms) */ - "%12d " /* input entries */ - "%12d\n" /* number of records reduced */, - name.c_str(), num_files, being_compacted, total_file_size / kMB, score, - bytes_read / kGB, - stats.bytes_readn / kGB, - stats.bytes_readnp1 / kGB, - stats.bytes_written / kGB, - bytes_new / kGB, - rw_amp, - w_amp, - bytes_read / kMB / elapsed, - stats.bytes_written / kMB / elapsed, - stats.files_in_leveln, - stats.files_in_levelnp1, - stats.files_out_levelnp1, - stats.files_out_levelnp1 - stats.files_in_levelnp1, - stats.micros / 1000000.0, - stats.count, - stats.count == 0 ? 0 : stats.micros / 1000000.0 / stats.count, - stall_us / 1000000.0, - stalls, - stalls == 0 ? 0 : stall_us / 1000.0 / stalls, - stats.num_input_records, - stats.num_dropped_records); + "%4s %5d/%-3d %8.0f %5.1f " /* Level, Files, Size(MB), Score */ + "%8.1f " /* Read(GB) */ + "%7.1f " /* Rn(GB) */ + "%8.1f " /* Rnp1(GB) */ + "%9.1f " /* Write(GB) */ + "%8.1f " /* Wnew(GB) */ + "%6.1f " /* RW-Amp */ + "%5.1f " /* W-Amp */ + "%8.1f " /* Rd(MB/s) */ + "%8.1f " /* Wr(MB/s) */ + "%8d " /* Rn(cnt) */ + "%9d " /* Rnp1(cnt) */ + "%9d " /* Wnp1(cnt) */ + "%9d " /* Wnew(cnt) */ + "%10.0f " /* Comp(sec) */ + "%9d " /* Comp(cnt) */ + "%8.3f " /* Avg(sec) */ + "%10.2f " /* Stall(sec) */ + "%10" PRIu64 + " " /* Stall(cnt) */ + "%7.2f " /* Avg(ms) */ + "%12" PRIu64 + " " /* input entries */ + "%12" PRIu64 "\n" /* number of records reduced */, + name.c_str(), num_files, being_compacted, total_file_size / kMB, + score, bytes_read / kGB, stats.bytes_readn / kGB, + stats.bytes_readnp1 / kGB, stats.bytes_written / kGB, + bytes_new / kGB, rw_amp, w_amp, bytes_read / kMB / elapsed, + stats.bytes_written / kMB / elapsed, stats.files_in_leveln, + stats.files_in_levelnp1, stats.files_out_levelnp1, + stats.files_out_levelnp1 - stats.files_in_levelnp1, + stats.micros / 1000000.0, stats.count, + stats.count == 0 ? 0 : stats.micros / 1000000.0 / stats.count, + stall_us / 1000000.0, stalls, + stalls == 0 ? 0 : stall_us / 1000.0 / stalls, + stats.num_input_records, stats.num_dropped_records); } diff --git a/db/internal_stats.h b/db/internal_stats.h index 0c3ee6db7..2fbcefd4c 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -127,11 +127,11 @@ class InternalStats { int files_out_levelnp1; // Total incoming entries during compaction between levels N and N+1 - int num_input_records; + uint64_t num_input_records; // Accumulated diff number of entries // (num input entries - num output entires) for compaction levels N and N+1 - int num_dropped_records; + uint64_t num_dropped_records; // Number of compactions done int count; -- GitLab