diff --git a/HISTORY.md b/HISTORY.md index 61809025fee537012e4f80965365cd4b10b49da7..e657127c3cc0935c0c4c7085b5e621bf0efb2643 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,7 +7,7 @@ ### Public API Changes * Removed class Env::RandomRWFile and Env::NewRandomRWFile(). -* Renamed DBOptions.num_subcompactions to DBOptions.max_subcompactions to make the name better match the actual funcionality of the option. +* Renamed DBOptions.num_subcompactions to DBOptions.max_subcompactions to make the name better match the actual functionality of the option. ## 3.13.0 (8/6/2015) ### New Features diff --git a/db/compaction_job.cc b/db/compaction_job.cc index edaf80933e5cebbb310373388b94297ef693e1b8..9bd333905c2086cca11fdac5b40b5d0aa2db0e42 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -100,9 +100,6 @@ struct CompactionJob::SubCompactionState { uint64_t total_bytes; uint64_t num_input_records; uint64_t num_output_records; - SequenceNumber earliest_snapshot; - SequenceNumber visible_at_tip; - SequenceNumber latest_snapshot; CompactionJobStats compaction_job_stats; // "level_ptrs" holds indices that remember which file of an associated @@ -113,20 +110,15 @@ struct CompactionJob::SubCompactionState { // is in or beyond the last file checked during the previous call std::vector level_ptrs; - SubCompactionState(Compaction* c, Slice* _start, Slice* _end, - SequenceNumber earliest, SequenceNumber visible, - SequenceNumber latest) - : compaction(c), - start(_start), - end(_end), - outfile(nullptr), - builder(nullptr), - total_bytes(0), - num_input_records(0), - num_output_records(0), - earliest_snapshot(earliest), - visible_at_tip(visible), - latest_snapshot(latest) { + SubCompactionState(Compaction* c, Slice* _start, Slice* _end) + : compaction(c), + start(_start), + end(_end), + outfile(nullptr), + builder(nullptr), + total_bytes(0), + num_input_records(0), + num_output_records(0) { assert(compaction != nullptr); level_ptrs = std::vector(compaction->number_levels(), 0); } @@ -146,9 +138,6 @@ struct CompactionJob::SubCompactionState { total_bytes = std::move(o.total_bytes); num_input_records = std::move(o.num_input_records); num_output_records = std::move(o.num_output_records); - earliest_snapshot = std::move(o.earliest_snapshot); - visible_at_tip = std::move(o.visible_at_tip); - latest_snapshot = std::move(o.latest_snapshot); level_ptrs = std::move(o.level_ptrs); return *this; } @@ -321,32 +310,29 @@ void CompactionJob::Prepare() { bottommost_level_ = c->bottommost_level(); // Initialize subcompaction states - SequenceNumber earliest_snapshot; - SequenceNumber latest_snapshot = 0; - SequenceNumber visible_at_tip = 0; + latest_snapshot_ = 0; + visible_at_tip_ = 0; if (existing_snapshots_.size() == 0) { // optimize for fast path if there are no snapshots - visible_at_tip = versions_->LastSequence(); - earliest_snapshot = visible_at_tip; + visible_at_tip_ = versions_->LastSequence(); + earliest_snapshot_ = visible_at_tip_; } else { - latest_snapshot = existing_snapshots_.back(); + latest_snapshot_ = existing_snapshots_.back(); // Add the current seqno as the 'latest' virtual // snapshot to the end of this list. existing_snapshots_.push_back(versions_->LastSequence()); - earliest_snapshot = existing_snapshots_[0]; + earliest_snapshot_ = existing_snapshots_[0]; } - InitializeSubCompactions(earliest_snapshot, visible_at_tip, latest_snapshot); + InitializeSubCompactions(); } // For L0-L1 compaction, iterators work in parallel by processing // different subsets of the full key range. This function sets up // the local states used by each of these subcompactions during // their execution -void CompactionJob::InitializeSubCompactions(const SequenceNumber& earliest, - const SequenceNumber& visible, - const SequenceNumber& latest) { +void CompactionJob::InitializeSubCompactions() { Compaction* c = compact_->compaction; auto& bounds = sub_compaction_boundaries_; if (c->IsSubCompaction()) { @@ -410,8 +396,7 @@ void CompactionJob::InitializeSubCompactions(const SequenceNumber& earliest, for (size_t i = 0; i <= bounds.size(); i++) { Slice *start = i == 0 ? nullptr : &bounds[i - 1]; Slice *end = i == bounds.size() ? nullptr : &bounds[i]; - compact_->sub_compact_states.emplace_back(compact_->compaction, start, - end, earliest, visible, latest); + compact_->sub_compact_states.emplace_back(compact_->compaction, start, end); } } @@ -658,8 +643,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { visible_in_snapshot = kMaxSequenceNumber; // apply the compaction filter to the first occurrence of the user key if (compaction_filter && ikey.type == kTypeValue && - (sub_compact->visible_at_tip || - ikey.sequence > sub_compact->latest_snapshot)) { + (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { // If the user has specified a compaction filter and the sequence // number is greater than any external snapshot, then invoke the // filter. If the return value of the compaction filter is true, @@ -695,9 +679,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { // the earlist snapshot that is affected by this kv. SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot SequenceNumber visible = - sub_compact->visible_at_tip - ? sub_compact->visible_at_tip - : findEarliestVisibleSnapshot(ikey.sequence, &prev_snapshot); + visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot( + ikey.sequence, &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in @@ -709,7 +692,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubCompactionState* sub_compact) { ++key_drop_newer_entry; input->Next(); // (A) } else if (ikey.type == kTypeDeletion && - ikey.sequence <= sub_compact->earliest_snapshot && + ikey.sequence <= earliest_snapshot_ && sub_compact->compaction->KeyNotExistsBeyondOutputLevel( ikey.user_key, &sub_compact->level_ptrs)) { // For this user key: @@ -815,7 +798,7 @@ Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value, // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level_ && ikey.sequence < sub_compact->earliest_snapshot && + if (bottommost_level_ && ikey.sequence < earliest_snapshot_ && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems diff --git a/db/compaction_job.h b/db/compaction_job.h index eadd89feaf272604f01853518ade9ac792309b90..375c6e6cc0c5e883a8e49eff612f0aeecf26d385 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -83,9 +83,7 @@ class CompactionJob { void AggregateStatistics(); // Set up the individual states used by each subcompaction - void InitializeSubCompactions(const SequenceNumber& earliest, - const SequenceNumber& visible, - const SequenceNumber& latest); + void InitializeSubCompactions(); // update the thread status for starting a compaction. void ReportStartedCompaction(Compaction* compaction); @@ -132,6 +130,10 @@ class CompactionJob { InternalStats::CompactionStats compaction_stats_; + SequenceNumber earliest_snapshot_; + SequenceNumber latest_snapshot_; + SequenceNumber visible_at_tip_; + // DBImpl state const std::string& dbname_; const DBOptions& db_options_;