提交 78afb4d8 编写于 作者: A Akanksha Mahajan 提交者: Facebook GitHub Bot

Support SingleDelete for user-defined timestamps (#8921)

Summary:
Added support for SingleDelete for user-defined timestamps. Users can now Get and Iterate over keys deleted with SingleDelete. It also includes changes in CompactionIterator which  preserves the same user key with different timestamps, unless the timestamp is below a certain threshold full_history_ts_low.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8921

Test Plan: Added new unit tests

Reviewed By: riversand963

Differential Revision: D31098191

Pulled By: akankshamahajan15

fbshipit-source-id: 78a59ef4b4884ae324fcd10f56e62a27d5ee2f49
上级 0774d640
......@@ -543,9 +543,21 @@ void CompactionIterator::NextFromInput() {
// we can choose how to handle such a combinations of operations. We will
// try to compact out as much as we can in these cases.
// We will report counts on these anomalous cases.
//
// Note: If timestamp is enabled, then record will be eligible for
// deletion, only if, along with above conditions (Rule 1 and Rule 2)
// full_history_ts_low_ is specified and timestamp for that key is less
// than *full_history_ts_low_. If it's not eligible for deletion, then we
// will output the SingleDelete. For Optimization 3 also, if
// full_history_ts_low_ is specified and timestamp for the key is less
// than *full_history_ts_low_ then only optimization will be applied.
// The easiest way to process a SingleDelete during iteration is to peek
// ahead at the next key.
const bool is_timestamp_eligible_for_gc =
(timestamp_size_ == 0 ||
(full_history_ts_low_ && cmp_with_history_ts_low_ < 0));
ParsedInternalKey next_ikey;
AdvanceInputIter();
......@@ -554,7 +566,7 @@ void CompactionIterator::NextFromInput() {
if (input_.Valid() &&
ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_)
.ok() &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) {
#ifndef NDEBUG
const Compaction* c =
compaction_ ? compaction_->real_compaction() : nullptr;
......@@ -562,7 +574,6 @@ void CompactionIterator::NextFromInput() {
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:SingleDelete:1",
const_cast<Compaction*>(c));
// Check whether the next key belongs to the same snapshot as the
// SingleDelete.
if (prev_snapshot == 0 ||
......@@ -570,15 +581,20 @@ void CompactionIterator::NextFromInput() {
TEST_SYNC_POINT_CALLBACK(
"CompactionIterator::NextFromInput:SingleDelete:2", nullptr);
if (next_ikey.type == kTypeSingleDeletion) {
// We encountered two SingleDeletes in a row. This could be due to
// unexpected user input.
// Skip the first SingleDelete and let the next iteration decide how
// to handle the second SingleDelete
// We encountered two SingleDeletes for same key in a row. This
// could be due to unexpected user input. Skip the first
// SingleDelete and let the next iteration decide how to handle the
// second SingleDelete
// First SingleDelete has been skipped since we already called
// input_.Next().
++iter_stats_.num_record_drop_obsolete;
++iter_stats_.num_single_del_mismatch;
} else if (!is_timestamp_eligible_for_gc) {
// We cannot drop the SingleDelete as timestamp is enabled, and
// timestamp of this key is greater than or equal to
// *full_history_ts_low_. We will output the SingleDelete.
valid_ = true;
} else if (has_outputted_key_ ||
DefinitelyInSnapshot(ikey_.sequence,
earliest_write_conflict_snapshot_)) {
......@@ -635,7 +651,8 @@ void CompactionIterator::NextFromInput() {
if (compaction_ != nullptr &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
&level_ptrs_) &&
is_timestamp_eligible_for_gc) {
// Key doesn't exist outside of this range.
// Can compact out this SingleDelete.
++iter_stats_.num_record_drop_obsolete;
......@@ -987,8 +1004,12 @@ void CompactionIterator::PrepareOutput() {
ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
assert(ikey_.type != kTypeDeletion);
assert(ikey_.type != kTypeSingleDeletion ||
(timestamp_size_ || full_history_ts_low_));
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion &&
(!timestamp_size_ || !full_history_ts_low_))) {
ROCKS_LOG_FATAL(info_log_,
"Unexpected key type %d for seq-zero optimization",
ikey_.type);
......
......@@ -1258,6 +1258,101 @@ TEST_P(CompactionIteratorTsGcTest, RewriteTs) {
}
}
TEST_P(CompactionIteratorTsGcTest, SingleDeleteNoKeyEligibleForGC) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/104, user_key[0], /*seq=*/4, kTypeSingleDeletion),
test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/102, user_key[1], /*seq=*/2, kTypeValue)};
const std::vector<std::string> input_values = {"", "a3", "b2"};
std::string full_history_ts_low;
// All keys' timestamps are newer than or equal to 102, thus none of them
// will be eligible for GC.
PutFixed64(&full_history_ts_low, 102);
const std::vector<std::string>& expected_keys = input_keys;
const std::vector<std::string>& expected_values = input_values;
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const std::pair<bool, bool>& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
bottommost_level,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
TEST_P(CompactionIteratorTsGcTest, SingleDeleteDropTombstones) {
constexpr char user_key[] = "a";
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeSingleDeletion),
test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeSingleDeletion),
test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "", "a0"};
const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
const std::vector<std::string> expected_values = {"", "a2"};
// Take a snapshot at seq 2.
AddSnapshot(2);
{
const std::vector<std::pair<bool, bool>> params = {
{false, false}, {false, true}, {true, true}};
for (const std::pair<bool, bool>& param : params) {
const bool bottommost_level = param.first;
const bool key_not_exists_beyond_output_level = param.second;
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 102);
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
bottommost_level,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
key_not_exists_beyond_output_level, &full_history_ts_low);
}
}
}
TEST_P(CompactionIteratorTsGcTest, SingleDeleteAllKeysOlderThanThreshold) {
constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
const std::vector<std::string> input_keys = {
test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4, kTypeSingleDeletion),
test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3, kTypeValue),
test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
const std::vector<std::string> input_values = {"", "a2", "b5"};
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
{
// With a snapshot at seq 3, both the deletion marker and the key at 3 must
// be preserved.
AddSnapshot(3);
const std::vector<std::string> expected_keys = {
input_keys[0], input_keys[1], input_keys[2]};
const std::vector<std::string> expected_values = {"", "a2", "b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
ClearSnapshots();
}
{
// No snapshot.
const std::vector<std::string> expected_keys = {input_keys[2]};
const std::vector<std::string> expected_values = {"b5"};
RunTest(input_keys, input_values, expected_keys, expected_values,
/*last_committed_seq=*/kMaxSequenceNumber,
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
/*bottommost_level=*/false,
/*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
/*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
}
}
INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
CompactionIteratorTsGcTest,
testing::Values(true, false));
......
......@@ -1293,13 +1293,16 @@ TEST_F(CompactionJobTimestampTest, GCDisabled) {
auto file1 =
mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
{KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
{KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"}});
{KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
{KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"}});
AddMockFile(file1);
auto file2 = mock::MakeMockFile(
{{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
{KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
{KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
{{KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
{KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
{KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
{KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
AddMockFile(file2);
SetLastSequence(10);
......@@ -1308,9 +1311,11 @@ TEST_F(CompactionJobTimestampTest, GCDisabled) {
{{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
{KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
{KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
{KeyStr("b", 7, ValueType::kTypeDeletionWithTimestamp, 97), ""},
{KeyStr("c", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
{KeyStr("c", 5, ValueType::kTypeValue, 95), "c5"}});
{KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
{KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
{KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
{KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"},
{KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}
......@@ -1348,19 +1353,21 @@ TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
auto file1 = mock::MakeMockFile(
{{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
{KeyStr("b", 6, ValueType::kTypeValue, 99), "b6"}});
{KeyStr("b", 6, ValueType::kTypeSingleDeletion, 99), ""},
{KeyStr("c", 7, ValueType::kTypeValue, 98), "c7"}});
AddMockFile(file1);
auto file2 = mock::MakeMockFile(
{{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
{KeyStr("b", 3, ValueType::kTypeDeletionWithTimestamp, 97), ""},
{KeyStr("b", 2, ValueType::kTypeValue, 96), "b2"}});
{{KeyStr("a", 4, ValueType::kTypeValue, 97), "a4"},
{KeyStr("b", 3, ValueType::kTypeValue, 96), "b3"},
{KeyStr("c", 2, ValueType::kTypeDeletionWithTimestamp, 95), ""},
{KeyStr("c", 1, ValueType::kTypeValue, 94), "c1"}});
AddMockFile(file2);
SetLastSequence(6);
SetLastSequence(7);
auto expected_results =
mock::MakeMockFile({{KeyStr("b", 0, ValueType::kTypeValue, 0), "b6"}});
mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}});
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
......
......@@ -2065,12 +2065,36 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key) {
Status s;
if (opt.timestamp == nullptr) {
WriteBatch batch;
s = batch.SingleDelete(column_family, key);
if (!s.ok()) {
return s;
}
s = Write(opt, &batch);
return s;
}
const Slice* ts = opt.timestamp;
assert(ts != nullptr);
size_t ts_sz = ts->size();
assert(column_family->GetComparator());
assert(ts_sz == column_family->GetComparator()->timestamp_size());
WriteBatch batch;
Status s = batch.SingleDelete(column_family, key);
if (key.data() + key.size() == ts->data()) {
Slice key_with_ts = Slice(key.data(), key.size() + ts_sz);
s = batch.SingleDelete(column_family, key_with_ts);
} else {
std::array<Slice, 2> key_with_ts_slices{{key, *ts}};
SliceParts key_with_ts(key_with_ts_slices.data(), 2);
s = batch.SingleDelete(column_family, key_with_ts);
}
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
s = Write(opt, &batch);
return s;
}
Status DB::DeleteRange(const WriteOptions& opt,
......
......@@ -983,13 +983,27 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) {
Close();
}
TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
class DBBasicDeletionTestWithTimestamp
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<enum ValueType> {
public:
DBBasicDeletionTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_basic_deletion_test_with_timestamp") {}
};
INSTANTIATE_TEST_CASE_P(
Timestamp, DBBasicDeletionTestWithTimestamp,
::testing::Values(ValueType::kTypeSingleDeletion,
ValueType::kTypeDeletionWithTimestamp));
TEST_P(DBBasicDeletionTestWithTimestamp, ForwardIterateStartSeqnum) {
const int kNumKeysPerFile = 128;
const uint64_t kMaxKey = 0xffffffffffffffff;
const uint64_t kMinKey = kMaxKey - 1023;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
ValueType op_type = GetParam();
// Need to disable compaction to bottommost level when sequence number will be
// zeroed out, causing the verification of sequence number to fail in this
// test.
......@@ -1016,7 +1030,11 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
if (k % 2) {
s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
} else {
s = db_->Delete(write_opts, Key1(k));
if (op_type == ValueType::kTypeDeletionWithTimestamp) {
s = db_->Delete(write_opts, Key1(k));
} else if (op_type == ValueType::kTypeSingleDeletion) {
s = db_->SingleDelete(write_opts, Key1(k));
}
}
ASSERT_OK(s);
}
......@@ -1038,8 +1056,7 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
uint64_t key = kMinKey;
for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
CheckIterEntry(
iter.get(), Key1(key), expected_seq,
(key % 2) ? kTypeValue : kTypeDeletionWithTimestamp,
iter.get(), Key1(key), expected_seq, (key % 2) ? kTypeValue : op_type,
(key % 2) ? "value" + std::to_string(i + 1) : std::string(),
write_ts_list[i + 1]);
++key;
......@@ -1062,7 +1079,7 @@ TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
SequenceNumber expected_seq = start_seqs[i] + (kMaxKey - kMinKey) + 1;
for (it->Seek(Key1(kMinKey)); it->Valid(); it->Next()) {
CheckIterEntry(it.get(), Key1(key), expected_seq,
(key % 2) ? kTypeValue : kTypeDeletionWithTimestamp,
(key % 2) ? kTypeValue : op_type,
"value" + std::to_string(i + 1), write_ts_list[i + 1]);
++key;
--expected_seq;
......@@ -2429,13 +2446,15 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
const size_t kNumL0Files =
static_cast<size_t>(Options().level0_file_num_compaction_trigger);
{
// Generate enough L0 files with ts=1 to trigger compaction to L1
// Half of the keys will go through Deletion and remaining half with
// SingleDeletion. Generate enough L0 files with ts=1 to trigger compaction
// to L1
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
WriteOptions wopts;
wopts.timestamp = &ts;
for (size_t i = 0; i != kNumL0Files; ++i) {
for (int j = 0; j != kNumKeysPerFile; ++j) {
for (size_t i = 0; i < kNumL0Files; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(db_->Put(wopts, Key1(j), "value" + std::to_string(i)));
}
ASSERT_OK(db_->Flush(FlushOptions()));
......@@ -2445,11 +2464,15 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
ts_str = Timestamp(3, 0);
ts = ts_str;
wopts.timestamp = &ts;
for (int i = 0; i != kNumKeysPerFile; ++i) {
for (int i = 0; i < kNumKeysPerFile; ++i) {
std::string key_str = Key1(i);
Slice key(key_str);
if ((i % 3) == 0) {
ASSERT_OK(db_->Delete(wopts, key));
if (i < kNumKeysPerFile / 2) {
ASSERT_OK(db_->Delete(wopts, key));
} else {
ASSERT_OK(db_->SingleDelete(wopts, key));
}
} else {
ASSERT_OK(db_->Put(wopts, key, "new_value"));
}
......@@ -2463,7 +2486,11 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
std::string key_str = Key1(i);
Slice key(key_str);
if ((i % 3) == 1) {
ASSERT_OK(db_->Delete(wopts, key));
if (i < kNumKeysPerFile / 2) {
ASSERT_OK(db_->Delete(wopts, key));
} else {
ASSERT_OK(db_->SingleDelete(wopts, key));
}
} else if ((i % 3) == 2) {
ASSERT_OK(db_->Put(wopts, key, "new_value_2"));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册