diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index e1bdddcb750c49662f29ef8c346ef4bd2bc80b2f..00191978f4521d230e62d9ec914ce752f23ad9cf 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -221,39 +221,43 @@ void CompactionIterator::Next() { bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until) { + if (!compaction_filter_) { + return true; + } + // TODO: support compaction filter for wide-column entities - if (!compaction_filter_ || - (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) { + if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) { return true; } - bool error = false; - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. If the return value of the compaction filter is true, - // replace the entry with a deletion marker. - CompactionFilter::Decision filter = CompactionFilter::Decision::kUndetermined; - compaction_filter_value_.clear(); - compaction_filter_skip_until_.Clear(); + + CompactionFilter::Decision decision = + CompactionFilter::Decision::kUndetermined; CompactionFilter::ValueType value_type = ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue : CompactionFilter::ValueType::kBlobIndex; + // Hack: pass internal key to BlobIndexCompactionFilter since it needs // to get sequence number. assert(compaction_filter_); - Slice& filter_key = - (ikey_.type == kTypeValue || + const Slice& filter_key = + (ikey_.type != kTypeBlobIndex || !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) ? ikey_.user_key : key_; + + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + { StopWatchNano timer(clock_, report_detailed_time_); - if (kTypeBlobIndex == ikey_.type) { - filter = compaction_filter_->FilterBlobByKey( + + if (ikey_.type == kTypeBlobIndex) { + decision = compaction_filter_->FilterBlobByKey( level_, filter_key, &compaction_filter_value_, compaction_filter_skip_until_.rep()); - if (CompactionFilter::Decision::kUndetermined == filter && + if (decision == CompactionFilter::Decision::kUndetermined && !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { - if (compaction_ == nullptr) { + if (!compaction_) { status_ = Status::Corruption("Unexpected blob index outside of compaction"); validity_info_.Invalidate(); @@ -299,17 +303,18 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, value_type = CompactionFilter::ValueType::kValue; } } - if (CompactionFilter::Decision::kUndetermined == filter) { - filter = compaction_filter_->FilterV2( + if (decision == CompactionFilter::Decision::kUndetermined) { + decision = compaction_filter_->FilterV2( level_, filter_key, value_type, blob_value_.empty() ? value_ : blob_value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); } + iter_stats_.total_filter_time += env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; } - if (CompactionFilter::Decision::kUndetermined == filter) { + if (decision == CompactionFilter::Decision::kUndetermined) { // Should not reach here, since FilterV2 should never return kUndetermined. status_ = Status::NotSupported("FilterV2() should never return kUndetermined"); @@ -317,15 +322,15 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, return false; } - if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && + if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil && cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= 0) { // Can't skip to a key smaller than the current one. // Keep the key as per FilterV2 documentation. - filter = CompactionFilter::Decision::kKeep; + decision = CompactionFilter::Decision::kKeep; } - if (filter == CompactionFilter::Decision::kRemove) { + if (decision == CompactionFilter::Decision::kRemove) { // convert the current key to a delete; key_ is pointing into // current_key_ at this point, so updating current_key_ updates key() ikey_.type = kTypeDeletion; @@ -333,7 +338,7 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // no value associated with delete value_.clear(); iter_stats_.num_record_drop_user++; - } else if (filter == CompactionFilter::Decision::kPurge) { + } else if (decision == CompactionFilter::Decision::kPurge) { // convert the current key to a single delete; key_ is pointing into // current_key_ at this point, so updating current_key_ updates key() ikey_.type = kTypeSingleDeletion; @@ -341,19 +346,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, // no value associated with single delete value_.clear(); iter_stats_.num_record_drop_user++; - } else if (filter == CompactionFilter::Decision::kChangeValue) { - if (ikey_.type == kTypeBlobIndex) { - // value transfer from blob file to inlined data + } else if (decision == CompactionFilter::Decision::kChangeValue) { + if (ikey_.type != kTypeValue) { ikey_.type = kTypeValue; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue); } + value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + } else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) { *need_skip = true; compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, kValueTypeForSeek); *skip_until = compaction_filter_skip_until_.Encode(); - } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) { + } else if (decision == CompactionFilter::Decision::kChangeBlobIndex) { // Only the StackableDB-based BlobDB impl's compaction filter should return // kChangeBlobIndex. Decision about rewriting blob and changing blob index // in the integrated BlobDB impl is made in subsequent call to @@ -365,23 +370,27 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, validity_info_.Invalidate(); return false; } - if (ikey_.type == kTypeValue) { - // value transfer from inlined data to blob file + + if (ikey_.type != kTypeBlobIndex) { ikey_.type = kTypeBlobIndex; - current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); + current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex); } + value_ = compaction_filter_value_; - } else if (filter == CompactionFilter::Decision::kIOError) { + } else if (decision == CompactionFilter::Decision::kIOError) { if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { status_ = Status::NotSupported( "CompactionFilter for integrated BlobDB should not return kIOError"); validity_info_.Invalidate(); return false; } + status_ = Status::IOError("Failed to access blob during compaction filter"); - error = true; + validity_info_.Invalidate(); + return false; } - return !error; + + return true; } void CompactionIterator::NextFromInput() {