From cf9d8e45c08dee5b6d607c71078a225d4af26a21 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 12 Nov 2020 11:40:52 -0800 Subject: [PATCH] Add full_history_ts_low_ to CompactionJob (#7657) Summary: https://github.com/facebook/rocksdb/issues/7556 enables `CompactionIterator` to perform garbage collection during compaction according to a lower bound (user-defined) timestamp `full_history_ts_low_`. This PR adds a data member `full_history_ts_low_` of type `std::string` to `CompactionJob`, and `full_history_ts_low_` does not change during compaction. `CompactionJob` will pass a pointer to this data member to the `CompactionIterator` used during compaction. Also refactored compaction_job_test.cc to re-use some existing code, which is actually the majority of this PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7657 Test Plan: make check Reviewed By: ltamasi Differential Revision: D24913803 Pulled By: riversand963 fbshipit-source-id: 11ad5329ddac365667152e7b3b02f84182c0ca8e --- db/compaction/compaction_job.cc | 11 +- db/compaction/compaction_job.h | 4 +- db/compaction/compaction_job_test.cc | 179 +++++++++++++++++++++++---- table/mock_table.cc | 4 +- table/mock_table.h | 3 +- 5 files changed, 167 insertions(+), 34 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index edf0cbdc6..a517a2015 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -309,7 +309,7 @@ CompactionJob::CompactionJob( const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused, const std::string& db_id, - const std::string& db_session_id) + const std::string& db_session_id, std::string full_history_ts_low) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -344,7 +344,8 @@ CompactionJob::CompactionJob( paranoid_file_checks_(paranoid_file_checks), measure_io_stats_(measure_io_stats), write_hint_(Env::WLTH_NOT_SET), - thread_pri_(thread_pri) { + thread_pri_(thread_pri), + full_history_ts_low_(std::move(full_history_ts_low)) { assert(compaction_job_stats_ != nullptr); assert(log_buffer_ != nullptr); const auto* cfd = compact_->compaction->column_family_data(); @@ -995,6 +996,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } Status status; + const std::string* const full_history_ts_low = + full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, @@ -1002,8 +1005,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { /*expect_valid_internal_key=*/true, &range_del_agg, blob_file_builder.get(), db_options_.allow_data_in_errors, sub_compact->compaction, compaction_filter, shutting_down_, - preserve_deletes_seqnum_, manual_compaction_paused_, - db_options_.info_log)); + preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log, + full_history_ts_low)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 2c36b408d..bbd6547da 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -78,7 +78,8 @@ class CompactionJob { const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused = nullptr, - const std::string& db_id = "", const std::string& db_session_id = ""); + const std::string& db_id = "", const std::string& db_session_id = "", + std::string full_history_ts_low = ""); ~CompactionJob(); @@ -201,6 +202,7 @@ class CompactionJob { Env::WriteLifeTimeHint write_hint_; Env::Priority thread_pri_; IOStatus io_status_; + std::string full_history_ts_low_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index a9f2948ed..fe53f0b00 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -67,13 +67,14 @@ void VerifyInitializationOfCompactionJobStats( } // namespace -// TODO(icanadi) Make it simpler once we mock out VersionSet -class CompactionJobTest : public testing::Test { - public: - CompactionJobTest() +class CompactionJobTestBase : public testing::Test { + protected: + CompactionJobTestBase(std::string dbname, const Comparator* ucmp, + std::function encode_u64_ts) : env_(Env::Default()), fs_(std::make_shared(env_)), - dbname_(test::PerThreadDBPath("compaction_job_test")), + dbname_(std::move(dbname)), + ucmp_(ucmp), db_options_(), mutable_cf_options_(cf_options_), mutable_db_options_(), @@ -86,12 +87,17 @@ class CompactionJobTest : public testing::Test { shutting_down_(false), preserve_deletes_seqnum_(0), mock_table_factory_(new mock::MockTableFactory()), - error_handler_(nullptr, db_options_, &mutex_) { + error_handler_(nullptr, db_options_, &mutex_), + encode_u64_ts_(std::move(encode_u64_ts)) {} + + void SetUp() override { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.env = env_; db_options_.fs = fs_; db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); + cf_options_.comparator = ucmp_; + cf_options_.table_factory = mock_table_factory_; } std::string GenerateFileName(uint64_t file_number) { @@ -102,9 +108,10 @@ class CompactionJobTest : public testing::Test { return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); } - static std::string KeyStr(const std::string& user_key, - const SequenceNumber seq_num, const ValueType t) { - return InternalKey(user_key, seq_num, t).Encode().ToString(); + std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num, + const ValueType t, uint64_t ts = 0) { + std::string user_key_with_ts = user_key + encode_u64_ts_(ts); + return InternalKey(user_key_with_ts, seq_num, t).Encode().ToString(); } static std::string BlobStr(uint64_t blob_file_number, uint64_t offset, @@ -208,9 +215,9 @@ class CompactionJobTest : public testing::Test { // returns expected result after compaction mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) { stl_wrappers::KVMap expected_results; - const int kKeysPerFile = 10000; - const int kCorruptKeysPerFile = 200; - const int kMatchingKeys = kKeysPerFile / 2; + constexpr int kKeysPerFile = 10000; + constexpr int kCorruptKeysPerFile = 200; + constexpr int kMatchingKeys = kKeysPerFile / 2; SequenceNumber sequence_number = 0; auto corrupt_id = [&](int id) { @@ -239,7 +246,7 @@ class CompactionJobTest : public testing::Test { {bottommost_internal_key.Encode().ToString(), value}); } } - mock::SortKVVector(&contents); + mock::SortKVVector(&contents, ucmp_); AddMockFile(contents); } @@ -255,7 +262,7 @@ class CompactionJobTest : public testing::Test { } void NewDB() { - DestroyDB(dbname_, Options()); + EXPECT_OK(DestroyDB(dbname_, Options())); EXPECT_OK(env_->CreateDirIfMissing(dbname_)); versions_.reset( new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), @@ -265,12 +272,6 @@ class CompactionJobTest : public testing::Test { SetIdentityFile(env_, dbname_); VersionEdit new_db; - if (db_options_.write_dbid_to_manifest) { - DBImpl* impl = new DBImpl(DBOptions(), dbname_); - std::string db_id; - impl->GetDbIdentityFromIdentityFile(&db_id); - new_db.SetDBId(db_id); - } new_db.SetLogNumber(0); new_db.SetNextFile(2); new_db.SetLastSequence(0); @@ -294,13 +295,12 @@ class CompactionJobTest : public testing::Test { ASSERT_OK(s); - std::vector column_families; - cf_options_.table_factory = mock_table_factory_; cf_options_.merge_operator = merge_op_; cf_options_.compaction_filter = compaction_filter_.get(); + std::vector column_families; column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); - EXPECT_OK(versions_->Recover(column_families, false)); + ASSERT_OK(versions_->Recover(column_families, false)); cfd_ = versions_->GetColumnFamilySet()->GetDefault(); } @@ -338,19 +338,22 @@ class CompactionJobTest : public testing::Test { EventLogger event_logger(db_options_.info_log.get()); // TODO(yiwu) add a mock snapshot checker and add test for it. SnapshotChecker* snapshot_checker = nullptr; + ASSERT_TRUE(full_history_ts_low_.empty() || + ucmp_->timestamp_size() == full_history_ts_low_.size()); CompactionJob compaction_job( 0, &compaction, db_options_, env_options_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger, false, false, dbname_, &compaction_job_stats_, - Env::Priority::USER, nullptr /* IOTracer */); + Env::Priority::USER, nullptr /* IOTracer */, + /*manual_compaction_paused=*/nullptr, /*db_id=*/"", + /*db_session_id=*/"", full_history_ts_low_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); mutex_.Unlock(); - Status s; - s = compaction_job.Run(); + Status s = compaction_job.Run(); ASSERT_OK(s); ASSERT_OK(compaction_job.io_status()); mutex_.Lock(); @@ -380,6 +383,7 @@ class CompactionJobTest : public testing::Test { Env* env_; std::shared_ptr fs_; std::string dbname_; + const Comparator* const ucmp_; EnvOptions env_options_; ImmutableDBOptions db_options_; ColumnFamilyOptions cf_options_; @@ -398,6 +402,17 @@ class CompactionJobTest : public testing::Test { std::unique_ptr compaction_filter_; std::shared_ptr merge_op_; ErrorHandler error_handler_; + std::string full_history_ts_low_; + const std::function encode_u64_ts_; +}; + +// TODO(icanadi) Make it simpler once we mock out VersionSet +class CompactionJobTest : public CompactionJobTestBase { + public: + CompactionJobTest() + : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"), + BytewiseComparator(), + [](uint64_t /*ts*/) { return ""; }) {} }; TEST_F(CompactionJobTest, Simple) { @@ -1078,6 +1093,118 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) { /* expected_oldest_blob_file_number */ 19); } +class CompactionJobTimestampTest : public CompactionJobTestBase { + public: + CompactionJobTimestampTest() + : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"), + test::ComparatorWithU64Ts(), test::EncodeInt) {} +}; + +TEST_F(CompactionJobTimestampTest, GCDisabled) { + NewDB(); + + auto file1 = + mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"}, + {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"}, + {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"}}); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile( + {{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""}, + {KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""}, + {KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}}); + AddMockFile(file2); + + SetLastSequence(10); + + auto expected_results = mock::MakeMockFile( + {{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"}, + {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"}, + {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"}, + {KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""}, + {KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""}, + {KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}}); + const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + RunCompaction({files}, expected_results); +} + +TEST_F(CompactionJobTimestampTest, NoKeyExpired) { + NewDB(); + + auto file1 = + mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"}, + {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"}, + {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}}); + AddMockFile(file1); + + auto file2 = + mock::MakeMockFile({{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"}, + {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}}); + AddMockFile(file2); + + SetLastSequence(101); + + auto expected_results = + mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"}, + {KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"}, + {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"}, + {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}, + {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}}); + const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + + full_history_ts_low_ = encode_u64_ts_(0); + RunCompaction({files}, expected_results); +} + +TEST_F(CompactionJobTimestampTest, AllKeysExpired) { + NewDB(); + + auto file1 = mock::MakeMockFile( + {{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""}, + {KeyStr("b", 6, ValueType::kTypeValue, 99), "b6"}}); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile( + {{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"}, + {KeyStr("b", 3, ValueType::kTypeDeletionWithTimestamp, 97), ""}, + {KeyStr("b", 2, ValueType::kTypeValue, 96), "b2"}}); + AddMockFile(file2); + + SetLastSequence(6); + + auto expected_results = + mock::MakeMockFile({{KeyStr("b", 0, ValueType::kTypeValue, 0), "b6"}}); + const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + + full_history_ts_low_ = encode_u64_ts_(std::numeric_limits::max()); + RunCompaction({files}, expected_results); +} + +TEST_F(CompactionJobTimestampTest, SomeKeysExpired) { + NewDB(); + + auto file1 = + mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"}, + {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}}); + AddMockFile(file1); + + auto file2 = mock::MakeMockFile( + {{KeyStr("a", 3, ValueType::kTypeValue, 48), "a3"}, + {KeyStr("a", 2, ValueType::kTypeValue, 46), "a2"}, + {KeyStr("b", 4, ValueType::kTypeDeletionWithTimestamp, 47), ""}}); + AddMockFile(file2); + + SetLastSequence(6); + + auto expected_results = + mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"}, + {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}}); + const auto& files = cfd_->current()->storage_info()->LevelFiles(0); + + full_history_ts_low_ = encode_u64_ts_(49); + RunCompaction({files}, expected_results); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/table/mock_table.cc b/table/mock_table.cc index c8a19a076..117639df1 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -18,8 +18,8 @@ namespace mock { KVVector MakeMockFile(std::initializer_list l) { return KVVector(l); } -void SortKVVector(KVVector* kv_vector) { - InternalKeyComparator icmp(BytewiseComparator()); +void SortKVVector(KVVector* kv_vector, const Comparator* ucmp) { + InternalKeyComparator icmp(ucmp); std::sort(kv_vector->begin(), kv_vector->end(), [icmp](KVPair a, KVPair b) -> bool { return icmp.Compare(a.first, b.first) < 0; diff --git a/table/mock_table.h b/table/mock_table.h index 0ab9674d6..4c57bee82 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -31,7 +31,8 @@ using KVPair = std::pair; using KVVector = std::vector; KVVector MakeMockFile(std::initializer_list l = {}); -void SortKVVector(KVVector* kv_vector); +void SortKVVector(KVVector* kv_vector, + const Comparator* ucmp = BytewiseComparator()); struct MockTableFileSystem { port::Mutex mutex; -- GitLab