range_del_aggregator.cc 7.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
//  Copyright (c) 2016-present, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.

#include "db/range_del_aggregator.h"

#include <algorithm>

namespace rocksdb {

RangeDelAggregator::RangeDelAggregator(
    const InternalKeyComparator& icmp,
    const std::vector<SequenceNumber>& snapshots)
    : icmp_(icmp) {
  pinned_iters_mgr_.StartPinning();
  for (auto snapshot : snapshots) {
18 19
    stripe_map_.emplace(snapshot,
                        TombstoneMap(stl_wrappers::LessOfComparator(&icmp_)));
20 21 22 23 24
  }
  // Data newer than any snapshot falls in this catch-all stripe
  stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap());
}

A
Andrew Kryczka 已提交
25
bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) {
26 27 28 29
  ParsedInternalKey parsed;
  if (!ParseInternalKey(internal_key, &parsed)) {
    assert(false);
  }
A
Andrew Kryczka 已提交
30
  return ShouldDelete(parsed);
A
Andrew Kryczka 已提交
31 32
}

A
Andrew Kryczka 已提交
33
bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) {
34 35
  assert(IsValueType(parsed.type));

A
Andrew Kryczka 已提交
36 37 38 39 40
  const auto& tombstone_map = GetTombstoneMap(parsed.sequence);
  for (const auto& start_key_and_tombstone : tombstone_map) {
    const auto& tombstone = start_key_and_tombstone.second;
    if (icmp_.user_comparator()->Compare(parsed.user_key,
                                         tombstone.start_key_) < 0) {
41 42
      break;
    }
A
Andrew Kryczka 已提交
43
    if (parsed.sequence < tombstone.seq_ &&
44
        icmp_.user_comparator()->Compare(parsed.user_key, tombstone.end_key_) <
A
Andrew Kryczka 已提交
45 46 47
            0) {
      return true;
    }
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
  }
  return false;
}

bool RangeDelAggregator::ShouldAddTombstones(
    bool bottommost_level /* = false */) {
  auto stripe_map_iter = stripe_map_.begin();
  assert(stripe_map_iter != stripe_map_.end());
  if (bottommost_level) {
    // For the bottommost level, keys covered by tombstones in the first
    // (oldest) stripe have been compacted away, so the tombstones are obsolete.
    ++stripe_map_iter;
  }
  while (stripe_map_iter != stripe_map_.end()) {
    if (!stripe_map_iter->second.empty()) {
      return true;
    }
    ++stripe_map_iter;
  }
  return false;
}

70 71
Status RangeDelAggregator::AddTombstones(ScopedArenaIterator input) {
  return AddTombstones(input.release(), true /* arena */);
72 73
}

74
Status RangeDelAggregator::AddTombstones(
75
    std::unique_ptr<InternalIterator> input) {
76
  return AddTombstones(input.release(), false /* arena */);
77 78
}

79
Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
80 81 82
  pinned_iters_mgr_.PinIterator(input, arena);
  input->SeekToFirst();
  while (input->Valid()) {
83 84 85 86 87
    ParsedInternalKey parsed_key;
    if (!ParseInternalKey(input->key(), &parsed_key)) {
      return Status::Corruption("Unable to parse range tombstone InternalKey");
    }
    RangeTombstone tombstone(parsed_key, input->value());
A
Andrew Kryczka 已提交
88
    auto& tombstone_map = GetTombstoneMap(tombstone.seq_);
89
    tombstone_map.emplace(input->key(), std::move(tombstone));
90 91
    input->Next();
  }
92
  return Status::OK();
93 94
}

A
Andrew Kryczka 已提交
95
RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap(
96 97 98 99 100 101 102 103 104 105 106 107
    SequenceNumber seq) {
  // The stripe includes seqnum for the snapshot above and excludes seqnum for
  // the snapshot below.
  StripeMap::iterator iter;
  if (seq > 0) {
    // upper_bound() checks strict inequality so need to subtract one
    iter = stripe_map_.upper_bound(seq - 1);
  } else {
    iter = stripe_map_.begin();
  }
  // catch-all stripe justifies this assertion in either of above cases
  assert(iter != stripe_map_.end());
A
Andrew Kryczka 已提交
108
  return iter->second;
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
}

// TODO(andrewkr): We should implement an iterator over range tombstones in our
// map. It'd enable compaction to open tables on-demand, i.e., only once range
// tombstones are known to be available, without the code duplication we have
// in ShouldAddTombstones(). It'll also allow us to move the table-modifying
// code into more coherent places: CompactionJob and BuildTable().
void RangeDelAggregator::AddToBuilder(TableBuilder* builder,
                                      bool extend_before_min_key,
                                      const Slice* next_table_min_key,
                                      FileMetaData* meta,
                                      bool bottommost_level /* = false */) {
  auto stripe_map_iter = stripe_map_.begin();
  assert(stripe_map_iter != stripe_map_.end());
  if (bottommost_level) {
    // For the bottommost level, keys covered by tombstones in the first
    // (oldest) stripe have been compacted away, so the tombstones are obsolete.
    ++stripe_map_iter;
  }

  // Note the order in which tombstones are stored is insignificant since we
  // insert them into a std::map on the read path.
  bool first_added = false;
  while (stripe_map_iter != stripe_map_.end()) {
    for (const auto& start_key_and_tombstone : stripe_map_iter->second) {
      const auto& tombstone = start_key_and_tombstone.second;
      if (next_table_min_key != nullptr &&
          icmp_.user_comparator()->Compare(*next_table_min_key,
                                           tombstone.start_key_) < 0) {
        // Tombstones starting after next_table_min_key only need to be included
        // in the next table.
        break;
      }
      if (!extend_before_min_key && meta->smallest.size() != 0 &&
          icmp_.user_comparator()->Compare(tombstone.end_key_,
                                           meta->smallest.user_key()) < 0) {
        // Tombstones ending before this table's smallest key can conditionally
        // be excluded, e.g., when this table is a non-first compaction output,
        // we know such tombstones are included in the previous table. In that
        // case extend_before_min_key would be false.
        continue;
      }

      auto ikey_and_end_key = tombstone.Serialize();
      builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
      if (!first_added) {
        first_added = true;
        if (extend_before_min_key &&
            (meta->smallest.size() == 0 ||
             icmp_.Compare(ikey_and_end_key.first, meta->smallest) < 0)) {
          meta->smallest = ikey_and_end_key.first;
        }
      }
      auto end_ikey = tombstone.SerializeEndKey();
      if (meta->largest.size() == 0 ||
          icmp_.Compare(meta->largest, end_ikey) < 0) {
        if (next_table_min_key != nullptr &&
            icmp_.Compare(*next_table_min_key, end_ikey.Encode()) < 0) {
          // Pretend the largest key has the same user key as the min key in the
          // following table in order for files to appear key-space partitioned.
          // Choose highest seqnum so this file's largest comes before the next
          // file's smallest. The fake seqnum is OK because the read path's
          // file-picking code only considers the user key portion.
          //
          // Note Seek() also creates InternalKey with (user_key,
          // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
          // kTypeRangeDeletion (0xF), so the range tombstone comes before the
          // Seek() key in InternalKey's ordering. So Seek() will look in the
          // next file for the user key.
          ParsedInternalKey parsed;
          ParseInternalKey(*next_table_min_key, &parsed);
          meta->largest = InternalKey(parsed.user_key, kMaxSequenceNumber,
                                      kTypeRangeDeletion);
        } else {
          meta->largest = std::move(end_ikey);
        }
      }
      meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_);
      meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_);
    }
    ++stripe_map_iter;
  }
}

A
Andrew Kryczka 已提交
193 194 195 196 197 198 199 200 201 202
bool RangeDelAggregator::IsEmpty() {
  for (auto stripe_map_iter = stripe_map_.begin();
       stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) {
    if (!stripe_map_iter->second.empty()) {
      return false;
    }
  }
  return true;
}

203
}  // namespace rocksdb