提交 9f1c84ca 编写于 作者: Y Yanqin Jin 提交者: Facebook GitHub Bot

Fix a bug in compaction iterator with timestamp (#7645)

Summary:
https://github.com/facebook/rocksdb/issues/7556 introduced support for compaction iterator to perform timestamp-aware garbage collection.
However, there was a bug. The comparison between `ikey_.user_key` and `current_user_key_` should happen
before `key_ = current_key_.SetInternalKey(key_, &ikey_);` (line 336 of compaction_iterator.cc).
Otherwise, after this line, `current_key_` is always the same as `ikey_.user_key`.

This PR also re-arranged the order of some data members because some of them are state variables of `CompactionIterator` while others are inputs from callers.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7645

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D24845028

Pulled By: riversand963

fbshipit-source-id: c7e79914832701462b86867e8463cd463b6c0c25
上级 c3911f1a
......@@ -87,14 +87,14 @@ CompactionIterator::CompactionIterator(
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_),
current_key_committed_(false),
info_log_(info_log),
allow_data_in_errors_(allow_data_in_errors),
timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
full_history_ts_low_(full_history_ts_low),
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_),
current_key_committed_(false),
cmp_with_history_ts_low_(0) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
assert(snapshots_ != nullptr);
......@@ -311,15 +311,32 @@ void CompactionIterator::NextFromInput() {
// merge_helper_->compaction_filter_skip_until_.
Slice skip_until;
int cmp_user_key_without_ts = 0;
int cmp_ts = 0;
if (has_current_user_key_) {
cmp_user_key_without_ts =
timestamp_size_
? cmp_->CompareWithoutTimestamp(ikey_.user_key, current_user_key_)
: cmp_->Compare(ikey_.user_key, current_user_key_);
// if timestamp_size_ > 0, then curr_ts_ has been initialized by a
// previous key.
cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
ExtractTimestampFromUserKey(
ikey_.user_key, timestamp_size_),
curr_ts_)
: 0;
}
// Check whether the user key changed. After this if statement current_key_
// is a copy of the current input key (maybe converted to a delete by the
// compaction filter). ikey_.user_key is pointing to the copy.
if (!has_current_user_key_ ||
!cmp_->Equal(ikey_.user_key, current_user_key_)) {
if (!has_current_user_key_ || cmp_user_key_without_ts != 0 || cmp_ts != 0) {
// First occurrence of this user key
// Copy key for output
key_ = current_key_.SetInternalKey(key_, &ikey_);
// If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
// in next iteration to compare with the timestamp of next key.
UpdateTimestampAndCompareWithFullHistoryLow();
// If
......@@ -334,9 +351,7 @@ void CompactionIterator::NextFromInput() {
// consider this key for GC, e.g. it may be dropped if certain conditions
// match.
if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
0 != cmp_->CompareWithoutTimestamp(ikey_.user_key,
current_user_key_) ||
cmp_with_history_ts_low_ >= 0) {
0 != cmp_user_key_without_ts || cmp_with_history_ts_low_ >= 0) {
// Initialize for future comparison for rule (A) and etc.
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
......
......@@ -157,12 +157,14 @@ class CompactionIterator {
// Extract user-defined timestamp from user key if possible and compare it
// with *full_history_ts_low_ if applicable.
inline void UpdateTimestampAndCompareWithFullHistoryLow() {
if (!timestamp_size_) {
return;
}
Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
curr_ts_.assign(ts.data(), ts.size());
if (full_history_ts_low_) {
assert(timestamp_size_ > 0);
current_ts_ =
ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
cmp_with_history_ts_low_ =
cmp_->CompareTimestamp(current_ts_, *full_history_ts_low_);
cmp_->CompareTimestamp(ts, *full_history_ts_low_);
}
}
......@@ -195,6 +197,20 @@ class CompactionIterator {
SequenceNumber earliest_snapshot_;
SequenceNumber latest_snapshot_;
std::shared_ptr<Logger> info_log_;
bool allow_data_in_errors_;
// Comes from comparator.
const size_t timestamp_size_;
// Lower bound timestamp to retain full history in terms of user-defined
// timestamp. If a key's timestamp is older than full_history_ts_low_, then
// the key *may* be eligible for garbage collection (GC). The skipping logic
// is in `NextFromInput()` and `PrepareOutput()`.
// If nullptr, NO GC will be performed and all history will be preserved.
const std::string* const full_history_ts_low_;
// State
//
// Points to a copy of the current compaction iterator output (current_key_)
......@@ -219,7 +235,7 @@ class CompactionIterator {
IterKey current_key_;
Slice current_user_key_;
Slice current_ts_;
std::string curr_ts_;
SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_;
......@@ -249,19 +265,6 @@ class CompactionIterator {
// Used to avoid purging uncommitted values. The application can specify
// uncommitted values by providing a SnapshotChecker object.
bool current_key_committed_;
std::shared_ptr<Logger> info_log_;
bool allow_data_in_errors_;
// Comes from comparator.
const size_t timestamp_size_;
// Lower bound timestamp to retain full history in terms of user-defined
// timestamp. If a key's timestamp is older than full_history_ts_low_, then
// the key *may* be eligible for garbage collection (GC). The skipping logic
// is in `NextFromInput()` and `PrepareOutput()`.
// If nullptr, NO GC will be performed and all history will be preserved.
const std::string* const full_history_ts_low_;
// Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
int cmp_with_history_ts_low_;
......
......@@ -1062,12 +1062,13 @@ class CompactionIteratorTsGcTest : public CompactionIteratorTest {
};
TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
constexpr char user_key[] = "a";
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeValue),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3,
kTypeDeletionWithTimestamp)};
const std::vector<std::string> input_values = {"a3", ""};
test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4, kTypeValue),
test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3,
kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
const std::vector<std::string> input_values = {"a3", "", "b2"};
std::string full_history_ts_low;
// All keys' timestamps are newer than or equal to 102, thus none of them
// will be eligible for GC.
......@@ -1089,22 +1090,23 @@ TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
}
TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
constexpr char user_key[] = "a";
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "a1"};
test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4,
kTypeDeletionWithTimestamp),
test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key[0], /*seq=*/2, kTypeValue),
test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "a1", "b5"};
std::string full_history_ts_low;
// All keys' timestamps are older than 104.
PutFixed64(&full_history_ts_low, 104);
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
{
// With a snapshot at seq 3, both the deletion marker and the key at 3 must
// be preserved.
AddSnapshot(3);
const std::vector<std::string> expected_keys = {input_keys[0],
input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[3]};
const std::vector<std::string> expected_values = {"", "a2", "b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
......@@ -1116,8 +1118,9 @@ TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
{
// No snapshot, the deletion marker should be preserved because the user
// key may appear beyond output level.
const std::vector<std::string> expected_keys = {input_keys[0]};
const std::vector<std::string> expected_values = {""};
const std::vector<std::string> expected_keys = {input_keys[0],
input_keys[3]};
const std::vector<std::string> expected_values = {"", "b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
......@@ -1128,8 +1131,8 @@ TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
{
// No snapshot, the deletion marker can be dropped because the user key
// does not appear in higher levels.
const std::vector<std::string> expected_keys = {};
const std::vector<std::string> expected_values = {};
const std::vector<std::string> expected_keys = {input_keys[3]};
const std::vector<std::string> expected_values = {"b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册