提交 cad248f5 编写于 作者: A Abhishek Madan 提交者: Facebook Github Bot

Prepare FragmentedRangeTombstoneIterator for use in compaction (#4740)

Summary:
To support the flush/compaction use cases of RangeDelAggregator
in v2, FragmentedRangeTombstoneIterator now supports dropping tombstones
that cannot be read in the compaction output file. Furthermore,
FragmentedRangeTombstoneIterator supports the "snapshot striping" use
case by allowing an iterator to be split by a list of snapshots.
RangeDelAggregatorV2 will use these changes in a follow-up change.

In the process of making these changes, other miscellaneous cleanups
were also done in these files.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4740

Differential Revision: D13287382

Pulled By: abhimadan

fbshipit-source-id: f5aeb03e1b3058049b80c02a558ee48f723fa48c
上级 d3daa0db
......@@ -428,7 +428,7 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
comparator_.comparator);
auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
fragmented_tombstone_list, read_seq, comparator_.comparator);
fragmented_tombstone_list, comparator_.comparator, read_seq);
return fragmented_iter;
}
......
......@@ -220,8 +220,8 @@ int main(int argc, char** argv) {
std::unique_ptr<rocksdb::FragmentedRangeTombstoneIterator>
fragmented_range_del_iter(
new rocksdb::FragmentedRangeTombstoneIterator(
fragmented_range_tombstone_lists.back().get(),
rocksdb::kMaxSequenceNumber, icmp));
fragmented_range_tombstone_lists.back().get(), icmp,
rocksdb::kMaxSequenceNumber));
if (FLAGS_use_v2_aggregator) {
rocksdb::StopWatchNano stop_watch_add_tombstones(
......
......@@ -173,8 +173,8 @@ TEST_F(RangeDelAggregatorV2Test, EmptyTruncatedIter) {
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
bytewise_icmp);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
kMaxSequenceNumber));
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr);
......@@ -192,8 +192,8 @@ TEST_F(RangeDelAggregatorV2Test, UntruncatedIter) {
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
bytewise_icmp);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
kMaxSequenceNumber));
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr);
......@@ -226,8 +226,8 @@ TEST_F(RangeDelAggregatorV2Test, UntruncatedIterWithSnapshot) {
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
bytewise_icmp);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, 9 /* snapshot */,
bytewise_icmp));
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
9 /* snapshot */));
TruncatedRangeDelIterator iter(std::move(input_iter), &bytewise_icmp, nullptr,
nullptr);
......@@ -259,8 +259,8 @@ TEST_F(RangeDelAggregatorV2Test, TruncatedIter) {
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
bytewise_icmp);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
kMaxSequenceNumber));
InternalKey smallest("d", 7, kTypeValue);
InternalKey largest("m", 9, kTypeValue);
......@@ -294,8 +294,8 @@ TEST_F(RangeDelAggregatorV2Test, SingleIterInAggregator) {
FragmentedRangeTombstoneList fragment_list(std::move(range_del_iter),
bytewise_icmp);
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(&fragment_list, kMaxSequenceNumber,
bytewise_icmp));
new FragmentedRangeTombstoneIterator(&fragment_list, bytewise_icmp,
kMaxSequenceNumber));
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
range_del_agg.AddTombstones(std::move(input_iter));
......@@ -321,8 +321,8 @@ TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregator) {
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, kMaxSequenceNumber);
for (const auto& fragment_list : fragment_lists) {
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(
fragment_list.get(), kMaxSequenceNumber, bytewise_icmp));
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
kMaxSequenceNumber));
range_del_agg.AddTombstones(std::move(input_iter));
}
......@@ -353,8 +353,8 @@ TEST_F(RangeDelAggregatorV2Test, MultipleItersInAggregatorWithUpperBound) {
RangeDelAggregatorV2 range_del_agg(&bytewise_icmp, 19);
for (const auto& fragment_list : fragment_lists) {
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(fragment_list.get(),
19 /* snapshot */, bytewise_icmp));
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
19 /* snapshot */));
range_del_agg.AddTombstones(std::move(input_iter));
}
......@@ -392,8 +392,8 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregator) {
const auto& fragment_list = fragment_lists[i];
const auto& bounds = iter_bounds[i];
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(fragment_list.get(),
19 /* snapshot */, bytewise_icmp));
new FragmentedRangeTombstoneIterator(fragment_list.get(), bytewise_icmp,
19 /* snapshot */));
range_del_agg.AddTombstones(std::move(input_iter), &bounds.first,
&bounds.second);
}
......@@ -432,7 +432,7 @@ TEST_F(RangeDelAggregatorV2Test, MultipleTruncatedItersInAggregatorSameLevel) {
auto add_iter_to_agg = [&](size_t i) {
std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter(
new FragmentedRangeTombstoneIterator(fragment_lists[i].get(),
19 /* snapshot */, bytewise_icmp));
bytewise_icmp, 19 /* snapshot */));
range_del_agg.AddTombstones(std::move(input_iter), &iter_bounds[i].first,
&iter_bounds[i].second);
};
......
......@@ -20,7 +20,8 @@ namespace rocksdb {
FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
std::unique_ptr<InternalIterator> unfragmented_tombstones,
const InternalKeyComparator& icmp) {
const InternalKeyComparator& icmp, bool for_compaction,
const std::vector<SequenceNumber>& snapshots) {
if (unfragmented_tombstones == nullptr) {
return;
}
......@@ -43,7 +44,8 @@ FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
}
}
if (is_sorted) {
FragmentTombstones(std::move(unfragmented_tombstones), icmp);
FragmentTombstones(std::move(unfragmented_tombstones), icmp, for_compaction,
snapshots);
return;
}
......@@ -61,12 +63,13 @@ FragmentedRangeTombstoneList::FragmentedRangeTombstoneList(
// VectorIterator implicitly sorts by key during construction.
auto iter = std::unique_ptr<VectorIterator>(
new VectorIterator(std::move(keys), std::move(values), &icmp));
FragmentTombstones(std::move(iter), icmp);
FragmentTombstones(std::move(iter), icmp, for_compaction, snapshots);
}
void FragmentedRangeTombstoneList::FragmentTombstones(
std::unique_ptr<InternalIterator> unfragmented_tombstones,
const InternalKeyComparator& icmp) {
const InternalKeyComparator& icmp, bool for_compaction,
const std::vector<SequenceNumber>& snapshots) {
Slice cur_start_key(nullptr, 0);
auto cmp = ParsedInternalKeyComparator(&icmp);
......@@ -117,10 +120,38 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
}
std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
std::greater<SequenceNumber>());
size_t start_idx = tombstone_seqs_.size();
size_t end_idx = start_idx + seqnums_to_flush.size();
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
seqnums_to_flush.end());
if (for_compaction) {
// Drop all tombstone seqnums that are not preserved by a snapshot.
SequenceNumber next_snapshot = kMaxSequenceNumber;
for (auto seq : seqnums_to_flush) {
if (seq <= next_snapshot) {
// This seqnum is visible by a lower snapshot.
tombstone_seqs_.push_back(seq);
seq_set_.insert(seq);
auto upper_bound_it =
std::lower_bound(snapshots.begin(), snapshots.end(), seq);
if (upper_bound_it == snapshots.begin()) {
// This seqnum is the topmost one visible by the earliest
// snapshot. None of the seqnums below it will be visible, so we
// can skip them.
break;
}
next_snapshot = *std::prev(upper_bound_it);
}
}
end_idx = tombstone_seqs_.size();
} else {
// The fragmentation is being done for reads, so preserve all seqnums.
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
seqnums_to_flush.end());
seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
}
assert(start_idx < end_idx);
tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
cur_start_key = cur_end_key;
......@@ -178,33 +209,41 @@ void FragmentedRangeTombstoneList::FragmentTombstones(
}
}
bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
SequenceNumber upper) const {
auto seq_it = seq_set_.lower_bound(lower);
return seq_it != seq_set_.end() && *seq_it <= upper;
}
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
const FragmentedRangeTombstoneList* tombstones, SequenceNumber snapshot,
const InternalKeyComparator& icmp)
const FragmentedRangeTombstoneList* tombstones,
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
icmp_(&icmp),
ucmp_(icmp.user_comparator()),
tombstones_(tombstones),
snapshot_(snapshot) {
upper_bound_(_upper_bound),
lower_bound_(_lower_bound) {
assert(tombstones_ != nullptr);
pos_ = tombstones_->end();
pinned_pos_ = tombstones_->end();
Invalidate();
}
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
SequenceNumber snapshot, const InternalKeyComparator& icmp)
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
icmp_(&icmp),
ucmp_(icmp.user_comparator()),
tombstones_ref_(tombstones),
tombstones_(tombstones_ref_.get()),
snapshot_(snapshot) {
upper_bound_(_upper_bound),
lower_bound_(_lower_bound) {
assert(tombstones_ != nullptr);
pos_ = tombstones_->end();
seq_pos_ = tombstones_->seq_end();
pinned_pos_ = tombstones_->end();
pinned_seq_pos_ = tombstones_->seq_end();
Invalidate();
}
void FragmentedRangeTombstoneIterator::SeekToFirst() {
......@@ -220,7 +259,7 @@ void FragmentedRangeTombstoneIterator::SeekToTopFirst() {
pos_ = tombstones_->begin();
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
ScanForwardToVisibleTombstone();
}
......@@ -237,7 +276,7 @@ void FragmentedRangeTombstoneIterator::SeekToTopLast() {
pos_ = std::prev(tombstones_->end());
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
ScanBackwardToVisibleTombstone();
}
......@@ -270,7 +309,7 @@ void FragmentedRangeTombstoneIterator::SeekToCoveringTombstone(
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
}
void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
......@@ -289,25 +328,28 @@ void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
}
void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
while (pos_ != tombstones_->end() &&
seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
(seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
*seq_pos_ < lower_bound_)) {
++pos_;
if (pos_ == tombstones_->end()) {
Invalidate();
return;
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
}
}
void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
while (pos_ != tombstones_->end() &&
seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx)) {
(seq_pos_ == tombstones_->seq_iter(pos_->seq_end_idx) ||
*seq_pos_ < lower_bound_)) {
if (pos_ == tombstones_->begin()) {
Invalidate();
return;
......@@ -315,7 +357,7 @@ void FragmentedRangeTombstoneIterator::ScanBackwardToVisibleTombstone() {
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
}
}
......@@ -333,14 +375,13 @@ void FragmentedRangeTombstoneIterator::TopNext() {
}
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
ScanForwardToVisibleTombstone();
}
void FragmentedRangeTombstoneIterator::Prev() {
if (seq_pos_ == tombstones_->seq_begin()) {
pos_ = tombstones_->end();
seq_pos_ = tombstones_->seq_end();
Invalidate();
return;
}
--seq_pos_;
......@@ -358,7 +399,7 @@ void FragmentedRangeTombstoneIterator::TopPrev() {
--pos_;
seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
tombstones_->seq_iter(pos_->seq_end_idx),
snapshot_, std::greater<SequenceNumber>());
upper_bound_, std::greater<SequenceNumber>());
ScanBackwardToVisibleTombstone();
}
......@@ -372,4 +413,27 @@ SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
return ValidPos() && ucmp_->Compare(start_key(), user_key) <= 0 ? seq() : 0;
}
std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
FragmentedRangeTombstoneIterator::SplitBySnapshot(
const std::vector<SequenceNumber>& snapshots) {
std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
splits;
SequenceNumber lower = 0;
SequenceNumber upper;
for (size_t i = 0; i <= snapshots.size(); i++) {
if (i >= snapshots.size()) {
upper = kMaxSequenceNumber;
} else {
upper = snapshots[i];
}
if (tombstones_->ContainsRange(lower, upper)) {
splits.emplace(upper, std::unique_ptr<FragmentedRangeTombstoneIterator>(
new FragmentedRangeTombstoneIterator(
tombstones_, *icmp_, upper, lower)));
}
lower = upper + 1;
}
return splits;
}
} // namespace rocksdb
......@@ -7,6 +7,7 @@
#include <list>
#include <memory>
#include <set>
#include <string>
#include <vector>
......@@ -38,7 +39,8 @@ struct FragmentedRangeTombstoneList {
};
FragmentedRangeTombstoneList(
std::unique_ptr<InternalIterator> unfragmented_tombstones,
const InternalKeyComparator& icmp);
const InternalKeyComparator& icmp, bool for_compaction = false,
const std::vector<SequenceNumber>& snapshots = {});
std::vector<RangeTombstoneStack>::const_iterator begin() const {
return tombstones_.begin();
......@@ -60,7 +62,11 @@ struct FragmentedRangeTombstoneList {
return tombstone_seqs_.end();
}
bool empty() const { return tombstones_.size() == 0; }
bool empty() const { return tombstones_.empty(); }
// Returns true if the stored tombstones contain with one with a sequence
// number in [lower, upper].
bool ContainsRange(SequenceNumber lower, SequenceNumber upper) const;
private:
// Given an ordered range tombstone iterator unfragmented_tombstones,
......@@ -68,10 +74,12 @@ struct FragmentedRangeTombstoneList {
// tombstones_ and tombstone_seqs_.
void FragmentTombstones(
std::unique_ptr<InternalIterator> unfragmented_tombstones,
const InternalKeyComparator& icmp);
const InternalKeyComparator& icmp, bool for_compaction,
const std::vector<SequenceNumber>& snapshots);
std::vector<RangeTombstoneStack> tombstones_;
std::vector<SequenceNumber> tombstone_seqs_;
std::set<SequenceNumber> seq_set_;
std::list<std::string> pinned_slices_;
PinnedIteratorsManager pinned_iters_mgr_;
};
......@@ -88,11 +96,13 @@ struct FragmentedRangeTombstoneList {
class FragmentedRangeTombstoneIterator : public InternalIterator {
public:
FragmentedRangeTombstoneIterator(
const FragmentedRangeTombstoneList* tombstones, SequenceNumber snapshot,
const InternalKeyComparator& icmp);
const FragmentedRangeTombstoneList* tombstones,
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
SequenceNumber lower_bound = 0);
FragmentedRangeTombstoneIterator(
const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
SequenceNumber snapshot, const InternalKeyComparator& icmp);
const InternalKeyComparator& icmp, SequenceNumber upper_bound,
SequenceNumber lower_bound = 0);
void SeekToFirst() override;
void SeekToLast() override;
......@@ -136,10 +146,6 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
seq_pos_ = tombstones_->seq_end();
}
// TODO: implement properly
RangeTombstone tombstone() const {
return RangeTombstone(start_key(), end_key(), seq());
}
Slice start_key() const { return pos_->start_key; }
Slice end_key() const { return pos_->end_key; }
SequenceNumber seq() const { return *seq_pos_; }
......@@ -151,12 +157,24 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
return ParsedInternalKey(pos_->end_key, kMaxSequenceNumber,
kTypeRangeDeletion);
}
ParsedInternalKey internal_key() const {
return ParsedInternalKey(pos_->start_key, *seq_pos_, kTypeRangeDeletion);
}
SequenceNumber MaxCoveringTombstoneSeqnum(const Slice& user_key);
// Splits the iterator into n+1 iterators (where n is the number of
// snapshots), each providing a view over a "stripe" of sequence numbers. The
// iterators are keyed by the upper bound of their ranges (the provided
// snapshots + kMaxSequenceNumber).
//
// NOTE: the iterators in the returned map are no longer valid if their
// parent iterator is deleted, since they do not modify the refcount of the
// underlying tombstone list. Therefore, this map should be deleted before
// the parent iterator.
std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
SplitBySnapshot(const std::vector<SequenceNumber>& snapshots);
SequenceNumber upper_bound() const { return upper_bound_; }
SequenceNumber lower_bound() const { return lower_bound_; }
private:
using RangeTombstoneStack = FragmentedRangeTombstoneList::RangeTombstoneStack;
......@@ -217,10 +235,12 @@ class FragmentedRangeTombstoneIterator : public InternalIterator {
const RangeTombstoneStackStartComparator tombstone_start_cmp_;
const RangeTombstoneStackEndComparator tombstone_end_cmp_;
const InternalKeyComparator* icmp_;
const Comparator* ucmp_;
std::shared_ptr<const FragmentedRangeTombstoneList> tombstones_ref_;
const FragmentedRangeTombstoneList* tombstones_;
SequenceNumber snapshot_;
SequenceNumber upper_bound_;
SequenceNumber lower_bound_;
std::vector<RangeTombstoneStack>::const_iterator pos_;
std::vector<SequenceNumber>::const_iterator seq_pos_;
mutable std::vector<RangeTombstoneStack>::const_iterator pinned_pos_;
......
......@@ -2412,7 +2412,7 @@ FragmentedRangeTombstoneIterator* BlockBasedTable::NewRangeTombstoneIterator(
snapshot = read_options.snapshot->GetSequenceNumber();
}
return new FragmentedRangeTombstoneIterator(
rep_->fragmented_range_dels, snapshot, rep_->internal_comparator);
rep_->fragmented_range_dels, rep_->internal_comparator, snapshot);
}
InternalIterator* BlockBasedTable::NewUnfragmentedRangeTombstoneIterator(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册