提交 cf9d8e45 编写于 作者: Y Yanqin Jin 提交者: Facebook GitHub Bot

Add full_history_ts_low_ to CompactionJob (#7657)

Summary:
https://github.com/facebook/rocksdb/issues/7556 enables `CompactionIterator` to perform garbage collection during compaction according
to a lower bound (user-defined) timestamp `full_history_ts_low_`.

This PR adds a data member `full_history_ts_low_` of type `std::string` to `CompactionJob`, and
`full_history_ts_low_` does not change during compaction. `CompactionJob` will pass a pointer to this
data member to the `CompactionIterator` used during compaction.

Also refactored compaction_job_test.cc to re-use some existing code, which is actually the majority of this PR.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7657

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D24913803

Pulled By: riversand963

fbshipit-source-id: 11ad5329ddac365667152e7b3b02f84182c0ca8e
上级 0dc437d6
...@@ -309,7 +309,7 @@ CompactionJob::CompactionJob( ...@@ -309,7 +309,7 @@ CompactionJob::CompactionJob(
const std::string& dbname, CompactionJobStats* compaction_job_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused, const std::string& db_id, const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id) const std::string& db_session_id, std::string full_history_ts_low)
: job_id_(job_id), : job_id_(job_id),
compact_(new CompactionState(compaction)), compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats), compaction_job_stats_(compaction_job_stats),
...@@ -344,7 +344,8 @@ CompactionJob::CompactionJob( ...@@ -344,7 +344,8 @@ CompactionJob::CompactionJob(
paranoid_file_checks_(paranoid_file_checks), paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats), measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET), write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri) { thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)) {
assert(compaction_job_stats_ != nullptr); assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr); assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data(); const auto* cfd = compact_->compaction->column_family_data();
...@@ -995,6 +996,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ...@@ -995,6 +996,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
} }
Status status; Status status;
const std::string* const full_history_ts_low =
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
sub_compact->c_iter.reset(new CompactionIterator( sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, &existing_snapshots_, earliest_write_conflict_snapshot_,
...@@ -1002,8 +1005,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ...@@ -1002,8 +1005,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
/*expect_valid_internal_key=*/true, &range_del_agg, /*expect_valid_internal_key=*/true, &range_del_agg,
blob_file_builder.get(), db_options_.allow_data_in_errors, blob_file_builder.get(), db_options_.allow_data_in_errors,
sub_compact->compaction, compaction_filter, shutting_down_, sub_compact->compaction, compaction_filter, shutting_down_,
preserve_deletes_seqnum_, manual_compaction_paused_, preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
db_options_.info_log)); full_history_ts_low));
auto c_iter = sub_compact->c_iter.get(); auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst(); c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
......
...@@ -78,7 +78,8 @@ class CompactionJob { ...@@ -78,7 +78,8 @@ class CompactionJob {
const std::string& dbname, CompactionJobStats* compaction_job_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused = nullptr, const std::atomic<int>* manual_compaction_paused = nullptr,
const std::string& db_id = "", const std::string& db_session_id = ""); const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "");
~CompactionJob(); ~CompactionJob();
...@@ -201,6 +202,7 @@ class CompactionJob { ...@@ -201,6 +202,7 @@ class CompactionJob {
Env::WriteLifeTimeHint write_hint_; Env::WriteLifeTimeHint write_hint_;
Env::Priority thread_pri_; Env::Priority thread_pri_;
IOStatus io_status_; IOStatus io_status_;
std::string full_history_ts_low_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
...@@ -67,13 +67,14 @@ void VerifyInitializationOfCompactionJobStats( ...@@ -67,13 +67,14 @@ void VerifyInitializationOfCompactionJobStats(
} // namespace } // namespace
// TODO(icanadi) Make it simpler once we mock out VersionSet class CompactionJobTestBase : public testing::Test {
class CompactionJobTest : public testing::Test { protected:
public: CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
CompactionJobTest() std::function<std::string(uint64_t)> encode_u64_ts)
: env_(Env::Default()), : env_(Env::Default()),
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)), fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
dbname_(test::PerThreadDBPath("compaction_job_test")), dbname_(std::move(dbname)),
ucmp_(ucmp),
db_options_(), db_options_(),
mutable_cf_options_(cf_options_), mutable_cf_options_(cf_options_),
mutable_db_options_(), mutable_db_options_(),
...@@ -86,12 +87,17 @@ class CompactionJobTest : public testing::Test { ...@@ -86,12 +87,17 @@ class CompactionJobTest : public testing::Test {
shutting_down_(false), shutting_down_(false),
preserve_deletes_seqnum_(0), preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()), mock_table_factory_(new mock::MockTableFactory()),
error_handler_(nullptr, db_options_, &mutex_) { error_handler_(nullptr, db_options_, &mutex_),
encode_u64_ts_(std::move(encode_u64_ts)) {}
void SetUp() override {
EXPECT_OK(env_->CreateDirIfMissing(dbname_)); EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.env = env_; db_options_.env = env_;
db_options_.fs = fs_; db_options_.fs = fs_;
db_options_.db_paths.emplace_back(dbname_, db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max()); std::numeric_limits<uint64_t>::max());
cf_options_.comparator = ucmp_;
cf_options_.table_factory = mock_table_factory_;
} }
std::string GenerateFileName(uint64_t file_number) { std::string GenerateFileName(uint64_t file_number) {
...@@ -102,9 +108,10 @@ class CompactionJobTest : public testing::Test { ...@@ -102,9 +108,10 @@ class CompactionJobTest : public testing::Test {
return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
} }
static std::string KeyStr(const std::string& user_key, std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num,
const SequenceNumber seq_num, const ValueType t) { const ValueType t, uint64_t ts = 0) {
return InternalKey(user_key, seq_num, t).Encode().ToString(); std::string user_key_with_ts = user_key + encode_u64_ts_(ts);
return InternalKey(user_key_with_ts, seq_num, t).Encode().ToString();
} }
static std::string BlobStr(uint64_t blob_file_number, uint64_t offset, static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
...@@ -208,9 +215,9 @@ class CompactionJobTest : public testing::Test { ...@@ -208,9 +215,9 @@ class CompactionJobTest : public testing::Test {
// returns expected result after compaction // returns expected result after compaction
mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) { mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
stl_wrappers::KVMap expected_results; stl_wrappers::KVMap expected_results;
const int kKeysPerFile = 10000; constexpr int kKeysPerFile = 10000;
const int kCorruptKeysPerFile = 200; constexpr int kCorruptKeysPerFile = 200;
const int kMatchingKeys = kKeysPerFile / 2; constexpr int kMatchingKeys = kKeysPerFile / 2;
SequenceNumber sequence_number = 0; SequenceNumber sequence_number = 0;
auto corrupt_id = [&](int id) { auto corrupt_id = [&](int id) {
...@@ -239,7 +246,7 @@ class CompactionJobTest : public testing::Test { ...@@ -239,7 +246,7 @@ class CompactionJobTest : public testing::Test {
{bottommost_internal_key.Encode().ToString(), value}); {bottommost_internal_key.Encode().ToString(), value});
} }
} }
mock::SortKVVector(&contents); mock::SortKVVector(&contents, ucmp_);
AddMockFile(contents); AddMockFile(contents);
} }
...@@ -255,7 +262,7 @@ class CompactionJobTest : public testing::Test { ...@@ -255,7 +262,7 @@ class CompactionJobTest : public testing::Test {
} }
void NewDB() { void NewDB() {
DestroyDB(dbname_, Options()); EXPECT_OK(DestroyDB(dbname_, Options()));
EXPECT_OK(env_->CreateDirIfMissing(dbname_)); EXPECT_OK(env_->CreateDirIfMissing(dbname_));
versions_.reset( versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
...@@ -265,12 +272,6 @@ class CompactionJobTest : public testing::Test { ...@@ -265,12 +272,6 @@ class CompactionJobTest : public testing::Test {
SetIdentityFile(env_, dbname_); SetIdentityFile(env_, dbname_);
VersionEdit new_db; VersionEdit new_db;
if (db_options_.write_dbid_to_manifest) {
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
std::string db_id;
impl->GetDbIdentityFromIdentityFile(&db_id);
new_db.SetDBId(db_id);
}
new_db.SetLogNumber(0); new_db.SetLogNumber(0);
new_db.SetNextFile(2); new_db.SetNextFile(2);
new_db.SetLastSequence(0); new_db.SetLastSequence(0);
...@@ -294,13 +295,12 @@ class CompactionJobTest : public testing::Test { ...@@ -294,13 +295,12 @@ class CompactionJobTest : public testing::Test {
ASSERT_OK(s); ASSERT_OK(s);
std::vector<ColumnFamilyDescriptor> column_families;
cf_options_.table_factory = mock_table_factory_;
cf_options_.merge_operator = merge_op_; cf_options_.merge_operator = merge_op_;
cf_options_.compaction_filter = compaction_filter_.get(); cf_options_.compaction_filter = compaction_filter_.get();
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options_); column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
EXPECT_OK(versions_->Recover(column_families, false)); ASSERT_OK(versions_->Recover(column_families, false));
cfd_ = versions_->GetColumnFamilySet()->GetDefault(); cfd_ = versions_->GetColumnFamilySet()->GetDefault();
} }
...@@ -338,19 +338,22 @@ class CompactionJobTest : public testing::Test { ...@@ -338,19 +338,22 @@ class CompactionJobTest : public testing::Test {
EventLogger event_logger(db_options_.info_log.get()); EventLogger event_logger(db_options_.info_log.get());
// TODO(yiwu) add a mock snapshot checker and add test for it. // TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr; SnapshotChecker* snapshot_checker = nullptr;
ASSERT_TRUE(full_history_ts_low_.empty() ||
ucmp_->timestamp_size() == full_history_ts_low_.size());
CompactionJob compaction_job( CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(), 0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr,
nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots, nullptr, nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_, earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_, &event_logger, false, false, dbname_, &compaction_job_stats_,
Env::Priority::USER, nullptr /* IOTracer */); Env::Priority::USER, nullptr /* IOTracer */,
/*manual_compaction_paused=*/nullptr, /*db_id=*/"",
/*db_session_id=*/"", full_history_ts_low_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare(); compaction_job.Prepare();
mutex_.Unlock(); mutex_.Unlock();
Status s; Status s = compaction_job.Run();
s = compaction_job.Run();
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_OK(compaction_job.io_status()); ASSERT_OK(compaction_job.io_status());
mutex_.Lock(); mutex_.Lock();
...@@ -380,6 +383,7 @@ class CompactionJobTest : public testing::Test { ...@@ -380,6 +383,7 @@ class CompactionJobTest : public testing::Test {
Env* env_; Env* env_;
std::shared_ptr<FileSystem> fs_; std::shared_ptr<FileSystem> fs_;
std::string dbname_; std::string dbname_;
const Comparator* const ucmp_;
EnvOptions env_options_; EnvOptions env_options_;
ImmutableDBOptions db_options_; ImmutableDBOptions db_options_;
ColumnFamilyOptions cf_options_; ColumnFamilyOptions cf_options_;
...@@ -398,6 +402,17 @@ class CompactionJobTest : public testing::Test { ...@@ -398,6 +402,17 @@ class CompactionJobTest : public testing::Test {
std::unique_ptr<CompactionFilter> compaction_filter_; std::unique_ptr<CompactionFilter> compaction_filter_;
std::shared_ptr<MergeOperator> merge_op_; std::shared_ptr<MergeOperator> merge_op_;
ErrorHandler error_handler_; ErrorHandler error_handler_;
std::string full_history_ts_low_;
const std::function<std::string(uint64_t)> encode_u64_ts_;
};
// TODO(icanadi) Make it simpler once we mock out VersionSet
class CompactionJobTest : public CompactionJobTestBase {
public:
CompactionJobTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_test"),
BytewiseComparator(),
[](uint64_t /*ts*/) { return ""; }) {}
}; };
TEST_F(CompactionJobTest, Simple) { TEST_F(CompactionJobTest, Simple) {
...@@ -1078,6 +1093,118 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) { ...@@ -1078,6 +1093,118 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
/* expected_oldest_blob_file_number */ 19); /* expected_oldest_blob_file_number */ 19);
} }
class CompactionJobTimestampTest : public CompactionJobTestBase {
public:
CompactionJobTimestampTest()
: CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
test::ComparatorWithU64Ts(), test::EncodeInt) {}
};
TEST_F(CompactionJobTimestampTest, GCDisabled) {
NewDB();
auto file1 =
mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
{KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
{KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"}});
AddMockFile(file1);
auto file2 = mock::MakeMockFile(
{{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
{KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
{KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
AddMockFile(file2);
SetLastSequence(10);
auto expected_results = mock::MakeMockFile(
{{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
{KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
{KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
{KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
{KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}
TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
NewDB();
auto file1 =
mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
{KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
{KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}});
AddMockFile(file1);
auto file2 =
mock::MakeMockFile({{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
{KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
AddMockFile(file2);
SetLastSequence(101);
auto expected_results =
mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
{KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
{KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
{KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
full_history_ts_low_ = encode_u64_ts_(0);
RunCompaction({files}, expected_results);
}
TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
NewDB();
auto file1 = mock::MakeMockFile(
{{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
{KeyStr("b", 6, ValueType::kTypeValue, 99), "b6"}});
AddMockFile(file1);
auto file2 = mock::MakeMockFile(
{{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
{KeyStr("b", 3, ValueType::kTypeDeletionWithTimestamp, 97), ""},
{KeyStr("b", 2, ValueType::kTypeValue, 96), "b2"}});
AddMockFile(file2);
SetLastSequence(6);
auto expected_results =
mock::MakeMockFile({{KeyStr("b", 0, ValueType::kTypeValue, 0), "b6"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
RunCompaction({files}, expected_results);
}
TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
NewDB();
auto file1 =
mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
{KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
AddMockFile(file1);
auto file2 = mock::MakeMockFile(
{{KeyStr("a", 3, ValueType::kTypeValue, 48), "a3"},
{KeyStr("a", 2, ValueType::kTypeValue, 46), "a2"},
{KeyStr("b", 4, ValueType::kTypeDeletionWithTimestamp, 47), ""}});
AddMockFile(file2);
SetLastSequence(6);
auto expected_results =
mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
{KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
full_history_ts_low_ = encode_u64_ts_(49);
RunCompaction({files}, expected_results);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -18,8 +18,8 @@ namespace mock { ...@@ -18,8 +18,8 @@ namespace mock {
KVVector MakeMockFile(std::initializer_list<KVPair> l) { return KVVector(l); } KVVector MakeMockFile(std::initializer_list<KVPair> l) { return KVVector(l); }
void SortKVVector(KVVector* kv_vector) { void SortKVVector(KVVector* kv_vector, const Comparator* ucmp) {
InternalKeyComparator icmp(BytewiseComparator()); InternalKeyComparator icmp(ucmp);
std::sort(kv_vector->begin(), kv_vector->end(), std::sort(kv_vector->begin(), kv_vector->end(),
[icmp](KVPair a, KVPair b) -> bool { [icmp](KVPair a, KVPair b) -> bool {
return icmp.Compare(a.first, b.first) < 0; return icmp.Compare(a.first, b.first) < 0;
......
...@@ -31,7 +31,8 @@ using KVPair = std::pair<std::string, std::string>; ...@@ -31,7 +31,8 @@ using KVPair = std::pair<std::string, std::string>;
using KVVector = std::vector<KVPair>; using KVVector = std::vector<KVPair>;
KVVector MakeMockFile(std::initializer_list<KVPair> l = {}); KVVector MakeMockFile(std::initializer_list<KVPair> l = {});
void SortKVVector(KVVector* kv_vector); void SortKVVector(KVVector* kv_vector,
const Comparator* ucmp = BytewiseComparator());
struct MockTableFileSystem { struct MockTableFileSystem {
port::Mutex mutex; port::Mutex mutex;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册