From 6595267980781bd872d2ecbf4f017c1d4068b381 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 23 Oct 2020 22:58:05 -0700 Subject: [PATCH] Allow compaction iterator to perform garbage collection (#7556) Summary: Add a threshold timestamp, full_history_ts_low_ of type `std::string*` to `CompactionIterator`, so that RocksDB can also perform garbage collection during compaction. * If full_history_ts_low_ is nullptr, then compaction iterator does not perform GC, preserving all timestamp history for all keys. Compaction iterator will treat user key with different timestamps as different user keys. * If full_history_ts_low_ is not nullptr, then compaction iterator performs GC. GC will look at keys older than `*full_history_ts_low_` and determine their eligibility based on factors including snapshots. Current rules of GC: * If an internal key is in the same snapshot as a previous counterpart with the same user key, and this key is eligible for GC, and the key is not single-delete or merge operand, then this key can be dropped. Note that the previous internal key cannot be a merge operand either. * If a tombstone is the most recent one in the earliest snapshot and it is eligible for GC, and keyNotExistsBeyondLevel() is true, then this tombstone can be dropped. * If a tombstone is the most recent one in a snapshot and it is eligible for GC, and the compaction is at bottommost level, then all other older internal keys of the same user key must also be eligible for GC, thus can be dropped * Single-delete, delete-range and merge are not currently supported. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7556 Test Plan: make check Reviewed By: ltamasi Differential Revision: D24507728 Pulled By: riversand963 fbshipit-source-id: 3c09c7301f41eed76dfcf4d1527e68cf6e0a8bb3 --- db/compaction/compaction.cc | 8 +- db/compaction/compaction_iterator.cc | 90 +++++++-- db/compaction/compaction_iterator.h | 39 +++- db/compaction/compaction_iterator_test.cc | 211 +++++++++++++++++++++- db/db_iter.cc | 4 +- db/dbformat.h | 13 +- test_util/testutil.cc | 10 + test_util/testutil.h | 4 + 8 files changed, 348 insertions(+), 31 deletions(-) diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 60e2681fa..efe27870d 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -383,7 +383,13 @@ bool Compaction::KeyNotExistsBeyondOutputLevel( auto* f = files[level_ptrs->at(lvl)]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { // We've advanced far enough - if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) { + // In the presence of user-defined timestamp, we may need to handle + // the case in which f->smallest.user_key() (including ts) has the + // same user key, but the ts part is smaller. If so, + // Compare(user_key, f->smallest.user_key()) returns -1. + // That's why we need CompareWithoutTimestamp(). + if (user_cmp->CompareWithoutTimestamp(user_key, + f->smallest.user_key()) >= 0) { // Key falls in this file's range, so it may // exist beyond output level return false; diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 4555ec568..1e2aff520 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -44,7 +44,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, - const std::shared_ptr info_log) + const std::shared_ptr info_log, + const std::string* full_history_ts_low) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, @@ -53,7 +54,7 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new CompactionProxy(compaction) : nullptr), compaction_filter, shutting_down, preserve_deletes_seqnum, - manual_compaction_paused, info_log) {} + manual_compaction_paused, info_log, full_history_ts_low) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -68,7 +69,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, const std::atomic* manual_compaction_paused, - const std::shared_ptr info_log) + const std::shared_ptr info_log, + const std::string* full_history_ts_low) : input_(input), cmp_(cmp), merge_helper_(merge_helper), @@ -90,7 +92,10 @@ CompactionIterator::CompactionIterator( merge_out_iter_(merge_helper_), current_key_committed_(false), info_log_(info_log), - allow_data_in_errors_(allow_data_in_errors) { + allow_data_in_errors_(allow_data_in_errors), + timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0), + full_history_ts_low_(full_history_ts_low), + cmp_with_history_ts_low_(0) { assert(compaction_filter_ == nullptr || compaction_ != nullptr); assert(snapshots_ != nullptr); bottommost_level_ = compaction_ == nullptr @@ -117,6 +122,8 @@ CompactionIterator::CompactionIterator( for (size_t i = 1; i < snapshots_->size(); ++i) { assert(snapshots_->at(i - 1) < snapshots_->at(i)); } + assert(timestamp_size_ == 0 || !full_history_ts_low_ || + timestamp_size_ == full_history_ts_low_->size()); #endif input_->SetPinnedItersMgr(&pinned_iters_mgr_); TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); @@ -298,7 +305,8 @@ void CompactionIterator::NextFromInput() { TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); // Update input statistics - if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { + if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || + ikey_.type == kTypeDeletionWithTimestamp) { iter_stats_.num_input_deletion_records++; } iter_stats_.total_input_raw_key_bytes += key_.size(); @@ -319,11 +327,33 @@ void CompactionIterator::NextFromInput() { // First occurrence of this user key // Copy key for output key_ = current_key_.SetInternalKey(key_, &ikey_); + + UpdateTimestampAndCompareWithFullHistoryLow(); + + // If + // (1) !has_current_user_key_, OR + // (2) timestamp is disabled, OR + // (3) all history will be preserved, OR + // (4) user key (excluding timestamp) is different from previous key, OR + // (5) timestamp is NO older than *full_history_ts_low_ + // then current_user_key_ must be treated as a different user key. + // This means, if a user key (excluding ts) is the same as the previous + // user key, and its ts is older than *full_history_ts_low_, then we + // consider this key for GC, e.g. it may be dropped if certain conditions + // match. + if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ || + 0 != cmp_->CompareWithoutTimestamp(ikey_.user_key, + current_user_key_) || + cmp_with_history_ts_low_ >= 0) { + // Initialize for future comparison for rule (A) and etc. + current_user_key_sequence_ = kMaxSequenceNumber; + current_user_key_snapshot_ = 0; + has_current_user_key_ = true; + } current_user_key_ = ikey_.user_key; - has_current_user_key_ = true; + has_outputted_key_ = false; - current_user_key_sequence_ = kMaxSequenceNumber; - current_user_key_snapshot_ = 0; + current_key_committed_ = KeyCommitted(ikey_.sequence); // Apply the compaction filter to the first committed version of the user @@ -543,9 +573,12 @@ void CompactionIterator::NextFromInput() { last_sequence, current_user_key_sequence_); } - ++iter_stats_.num_record_drop_hidden; // (A) + ++iter_stats_.num_record_drop_hidden; // rule (A) input_->Next(); - } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && + } else if (compaction_ != nullptr && + (ikey_.type == kTypeDeletion || + (ikey_.type == kTypeDeletionWithTimestamp && + cmp_with_history_ts_low_ < 0)) && IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikeyNotNeededForIncrementalSnapshot() && compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, @@ -569,13 +602,19 @@ void CompactionIterator::NextFromInput() { // given that: // (1) The deletion is earlier than earliest_write_conflict_snapshot, and // (2) No value exist earlier than the deletion. + // + // Note also that a deletion marker of type kTypeDeletionWithTimestamp + // will be treated as a different user key unless the timestamp is older + // than *full_history_ts_low_. ++iter_stats_.num_record_drop_obsolete; if (!bottommost_level_) { ++iter_stats_.num_optimized_del_drop_obsolete; } input_->Next(); - } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ && - ikeyNotNeededForIncrementalSnapshot()) { + } else if ((ikey_.type == kTypeDeletion || + (ikey_.type == kTypeDeletionWithTimestamp && + cmp_with_history_ts_low_ < 0)) && + bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) { // Handle the case where we have a delete key at the bottom most level // We can skip outputting the key iff there are no subsequent puts for this // key @@ -583,12 +622,17 @@ void CompactionIterator::NextFromInput() { ikey_.user_key, &level_ptrs_)); ParsedInternalKey next_ikey; input_->Next(); - // Skip over all versions of this key that happen to occur in the same snapshot - // range as the delete + // Skip over all versions of this key that happen to occur in the same + // snapshot range as the delete. + // + // Note that a deletion marker of type kTypeDeletionWithTimestamp will be + // considered to have a different user key unless the timestamp is older + // than *full_history_ts_low_. while (!IsPausingManualCompaction() && !IsShuttingDown() && input_->Valid() && (ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) && - cmp_->Equal(ikey_.user_key, next_ikey.user_key) && + 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key, + next_ikey.user_key) && (prev_snapshot == 0 || DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { input_->Next(); @@ -597,7 +641,8 @@ void CompactionIterator::NextFromInput() { // delete too if (input_->Valid() && (ParseInternalKey(input_->key(), &next_ikey) == Status::OK()) && - cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { + 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key, + next_ikey.user_key)) { valid_ = true; at_next_ = true; } @@ -735,7 +780,18 @@ void CompactionIterator::PrepareOutput() { ikey_.type); } ikey_.sequence = 0; - current_key_.UpdateInternalKey(0, ikey_.type); + if (!timestamp_size_) { + current_key_.UpdateInternalKey(0, ikey_.type); + } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) { + // We can also zero out timestamp for better compression. + // For the same user key (excluding timestamp), the timestamp-based + // history can be collapsed to save some space if the timestamp is + // older than *full_history_ts_low_. + const std::string kTsMin(timestamp_size_, static_cast(0)); + const Slice ts_slice = kTsMin; + ikey_.SetTimestamp(ts_slice); + current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice); + } } } } diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 29dedd3c7..b8454593c 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -75,7 +75,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr); + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); // Constructor with custom CompactionProxy, used for tests. CompactionIterator(InternalIterator* input, const Comparator* cmp, @@ -92,7 +93,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const SequenceNumber preserve_deletes_seqnum = 0, const std::atomic* manual_compaction_paused = nullptr, - const std::shared_ptr info_log = nullptr); + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr); ~CompactionIterator(); @@ -152,6 +154,18 @@ class CompactionIterator { bool IsInEarliestSnapshot(SequenceNumber sequence); + // Extract user-defined timestamp from user key if possible and compare it + // with *full_history_ts_low_ if applicable. + inline void UpdateTimestampAndCompareWithFullHistoryLow() { + if (full_history_ts_low_) { + assert(timestamp_size_ > 0); + current_ts_ = + ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); + cmp_with_history_ts_low_ = + cmp_->CompareTimestamp(current_ts_, *full_history_ts_low_); + } + } + InternalIterator* input_; const Comparator* cmp_; MergeHelper* merge_helper_; @@ -199,11 +213,13 @@ class CompactionIterator { // Stores whether ikey_.user_key is valid. If set to false, the user key is // not compared against the current key in the underlying iterator. bool has_current_user_key_ = false; - bool at_next_ = false; // If false, the iterator - // Holds a copy of the current compaction iterator output (or current key in - // the underlying iterator during NextFromInput()). + // If false, the iterator holds a copy of the current compaction iterator + // output (or current key in the underlying iterator during NextFromInput()). + bool at_next_ = false; + IterKey current_key_; Slice current_user_key_; + Slice current_ts_; SequenceNumber current_user_key_sequence_; SequenceNumber current_user_key_snapshot_; @@ -237,6 +253,19 @@ class CompactionIterator { bool allow_data_in_errors_; + // Comes from comparator. + const size_t timestamp_size_; + + // Lower bound timestamp to retain full history in terms of user-defined + // timestamp. If a key's timestamp is older than full_history_ts_low_, then + // the key *may* be eligible for garbage collection (GC). The skipping logic + // is in `NextFromInput()` and `PrepareOutput()`. + // If nullptr, NO GC will be performed and all history will be preserved. + const std::string* const full_history_ts_low_; + + // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_) + int cmp_with_history_ts_low_; + bool IsShuttingDown() { // This is a best-effort facility, so memory_order_relaxed is sufficient. return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index 57db42489..a88809eab 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -216,6 +216,9 @@ class CompactionIteratorTest : public testing::TestWithParam { CompactionIteratorTest() : cmp_(BytewiseComparator()), icmp_(cmp_), snapshots_({}) {} + explicit CompactionIteratorTest(const Comparator* ucmp) + : cmp_(ucmp), icmp_(cmp_), snapshots_({}) {} + void InitIterators( const std::vector& ks, const std::vector& vs, const std::vector& range_del_ks, @@ -224,7 +227,9 @@ class CompactionIteratorTest : public testing::TestWithParam { SequenceNumber last_committed_sequence = kMaxSequenceNumber, MergeOperator* merge_op = nullptr, CompactionFilter* filter = nullptr, bool bottommost_level = false, - SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { + SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, + bool key_not_exists_beyond_output_level = false, + const std::string* full_history_ts_low = nullptr) { std::unique_ptr unfragmented_range_del_iter( new test::VectorIterator(range_del_ks, range_del_vs)); auto tombstone_list = std::make_shared( @@ -236,10 +241,12 @@ class CompactionIteratorTest : public testing::TestWithParam { range_del_agg_->AddTombstones(std::move(range_del_iter)); std::unique_ptr compaction; - if (filter || bottommost_level) { + if (filter || bottommost_level || key_not_exists_beyond_output_level) { compaction_proxy_ = new FakeCompaction(); compaction_proxy_->is_bottommost_level = bottommost_level; compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind(); + compaction_proxy_->key_not_exists_beyond_output_level = + key_not_exists_beyond_output_level; compaction.reset(compaction_proxy_); } bool use_snapshot_checker = UseSnapshotChecker() || GetParam(); @@ -252,6 +259,11 @@ class CompactionIteratorTest : public testing::TestWithParam { 0 /*latest_snapshot*/, snapshot_checker_.get(), 0 /*level*/, nullptr /*statistics*/, &shutting_down_)); + if (c_iter_) { + // Since iter_ is still used in ~CompactionIterator(), we call + // ~CompactionIterator() first. + c_iter_.reset(); + } iter_.reset(new LoggingForwardVectorIterator(ks, vs)); iter_->SeekToFirst(); c_iter_.reset(new CompactionIterator( @@ -260,7 +272,9 @@ class CompactionIteratorTest : public testing::TestWithParam { Env::Default(), false /* report_detailed_time */, false, range_del_agg_.get(), nullptr /* blob_file_builder */, false /*allow_data_in_errors*/, std::move(compaction), filter, - &shutting_down_)); + &shutting_down_, /*preserve_deletes_seqnum=*/0, + /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr, + full_history_ts_low)); } void AddSnapshot(SequenceNumber snapshot, @@ -282,10 +296,13 @@ class CompactionIteratorTest : public testing::TestWithParam { MergeOperator* merge_operator = nullptr, CompactionFilter* compaction_filter = nullptr, bool bottommost_level = false, - SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber) { + SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber, + bool key_not_exists_beyond_output_level = false, + const std::string* full_history_ts_low = nullptr) { InitIterators(input_keys, input_values, {}, {}, kMaxSequenceNumber, last_committed_seq, merge_operator, compaction_filter, - bottommost_level, earliest_write_conflict_snapshot); + bottommost_level, earliest_write_conflict_snapshot, + key_not_exists_beyond_output_level, full_history_ts_low); c_iter_->SeekToFirst(); for (size_t i = 0; i < expected_keys.size(); i++) { std::string info = "i = " + ToString(i); @@ -299,6 +316,11 @@ class CompactionIteratorTest : public testing::TestWithParam { ASSERT_FALSE(c_iter_->Valid()); } + void ClearSnapshots() { + snapshots_.clear(); + snapshot_map_.clear(); + } + const Comparator* cmp_; const InternalKeyComparator icmp_; std::vector snapshots_; @@ -1033,6 +1055,185 @@ INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance, CompactionIteratorWithAllowIngestBehindTest, testing::Values(true, false)); +class CompactionIteratorTsGcTest : public CompactionIteratorTest { + public: + CompactionIteratorTsGcTest() + : CompactionIteratorTest(test::ComparatorWithU64Ts()) {} +}; + +TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeValue), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, + kTypeDeletionWithTimestamp)}; + const std::vector input_values = {"a3", ""}; + std::string full_history_ts_low; + // All keys' timestamps are newer than or equal to 102, thus none of them + // will be eligible for GC. + PutFixed64(&full_history_ts_low, 102); + const std::vector& expected_keys = input_keys; + const std::vector& expected_values = input_values; + const std::vector> params = { + {false, false}, {false, true}, {true, true}}; + for (const std::pair& param : params) { + const bool bottommost_level = param.first; + const bool key_not_exists_beyond_output_level = param.second; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + bottommost_level, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + key_not_exists_beyond_output_level, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue)}; + const std::vector input_values = {"", "a2", "a1"}; + std::string full_history_ts_low; + // All keys' timestamps are older than 104. + PutFixed64(&full_history_ts_low, 104); + { + // With a snapshot at seq 3, both the deletion marker and the key at 3 must + // be preserved. + AddSnapshot(3); + const std::vector expected_keys = {input_keys[0], + input_keys[1]}; + const std::vector expected_values = {"", "a2"}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + ClearSnapshots(); + } + { + // No snapshot, the deletion marker should be preserved because the user + // key may appear beyond output level. + const std::vector expected_keys = {input_keys[0]}; + const std::vector expected_values = {""}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + } + { + // No snapshot, the deletion marker can be dropped because the user key + // does not appear in higher levels. + const std::vector expected_keys = {}; + const std::vector expected_values = {}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue), + test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)}; + const std::vector input_values = {"", "a2", "a1", "a0"}; + { + std::string full_history_ts_low; + // Keys whose timestamps larger than or equal to 102 will be preserved. + PutFixed64(&full_history_ts_low, 102); + const std::vector expected_keys = {input_keys[0], + input_keys[1]}; + const std::vector expected_values = {"", "a2"}; + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, DropTombstones) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)}; + const std::vector input_values = {"", "a2", "", "a0"}; + const std::vector expected_keys = {input_keys[0], input_keys[1]}; + const std::vector expected_values = {"", "a2"}; + + // Take a snapshot at seq 2. + AddSnapshot(2); + + { + // Non-bottommost level, but key does not exist beyond output level. + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 102); + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_sequence=*/kMaxSequenceNumber, + /*merge_op=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/false, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low); + } + { + // Bottommost level + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 102); + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/true, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low); + } +} + +TEST_P(CompactionIteratorTsGcTest, RewriteTs) { + constexpr char user_key[] = "a"; + const std::vector input_keys = { + test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue), + test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeDeletionWithTimestamp), + test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)}; + const std::vector input_values = {"", "a2", "", "a0"}; + const std::vector expected_keys = { + input_keys[0], input_keys[1], input_keys[2], + test::KeyStr(/*ts=*/0, user_key, /*seq=*/0, kTypeValue)}; + const std::vector expected_values = {"", "a2", "", "a0"}; + + AddSnapshot(1); + AddSnapshot(2); + + { + // Bottommost level and need to rewrite both ts and seq. + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 102); + RunTest(input_keys, input_values, expected_keys, expected_values, + /*last_committed_seq=*/kMaxSequenceNumber, + /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr, + /*bottommost_level=*/true, + /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber, + /*key_not_exists_beyond_output_level=*/true, &full_history_ts_low); + } +} + +INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance, + CompactionIteratorTsGcTest, + testing::Values(true, false)); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_iter.cc b/db/db_iter.cc index a9eee88dd..c99383e2a 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -436,11 +436,11 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, &last_key, ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion)); } else { - std::string min_ts(timestamp_size_, static_cast(0)); + const std::string kTsMin(timestamp_size_, static_cast(0)); AppendInternalKeyWithDifferentTimestamp( &last_key, ParsedInternalKey(saved_key_.GetUserKey(), 0, kTypeDeletion), - min_ts); + kTsMin); } // Don't set skipping_saved_key = false because we may still see more // user-keys equal to saved_key_. diff --git a/db/dbformat.h b/db/dbformat.h index 81c852ac4..d55bbae44 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -119,6 +119,12 @@ struct ParsedInternalKey { sequence = 0; type = kTypeDeletion; } + + void SetTimestamp(const Slice& ts) { + assert(ts.size() <= user_key.size()); + const char* addr = user_key.data() - ts.size(); + memcpy(const_cast(addr), ts.data(), ts.size()); + } }; // Return the length of the encoding of "key". @@ -475,9 +481,14 @@ class IterKey { // Update the sequence number in the internal key. Guarantees not to // invalidate slices to the key (and the user key). - void UpdateInternalKey(uint64_t seq, ValueType t) { + void UpdateInternalKey(uint64_t seq, ValueType t, const Slice* ts = nullptr) { assert(!IsKeyPinned()); assert(key_size_ >= kNumInternalBytes); + if (ts) { + assert(key_size_ >= kNumInternalBytes + ts->size()); + memcpy(&buf_[key_size_ - kNumInternalBytes - ts->size()], ts->data(), + ts->size()); + } uint64_t newval = (seq << 8) | t; EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval); } diff --git a/test_util/testutil.cc b/test_util/testutil.cc index 24005e782..18349cf4c 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -205,6 +205,16 @@ std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, return k.Encode().ToString(); } +std::string KeyStr(uint64_t ts, const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt) { + std::string user_key_with_ts(user_key); + std::string ts_str; + PutFixed64(&ts_str, ts); + user_key_with_ts.append(ts_str); + return KeyStr(user_key_with_ts, seq, t, corrupt); +} + std::string RandomName(Random* rnd, const size_t len) { std::stringstream ss; for (size_t i = 0; i < len; ++i) { diff --git a/test_util/testutil.h b/test_util/testutil.h index 4255a48f2..320cccc29 100644 --- a/test_util/testutil.h +++ b/test_util/testutil.h @@ -394,6 +394,10 @@ extern std::string KeyStr(const std::string& user_key, const SequenceNumber& seq, const ValueType& t, bool corrupt = false); +extern std::string KeyStr(uint64_t ts, const std::string& user_key, + const SequenceNumber& seq, const ValueType& t, + bool corrupt = false); + class SleepingBackgroundTask { public: SleepingBackgroundTask() -- GitLab