提交 15053f3a 编写于 作者: Y Yu Zhang 提交者: Facebook GitHub Bot

Logically strip timestamp during flush (#11557)

Summary:
Logically strip the user-defined timestamp when L0 files are created during flush when `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` is false. Logically stripping timestamp here means replacing the original user-defined timestamp with a mininum timestamp, which for now is hard coded to be all zeros bytes.

While working on this, I caught a missing piece on the `BlockBuilder` level for this feature. The current quick path `std::min(buffer_size, last_key_size)` needs a bit tweaking to work for this feature. When user-defined timestamp is stripped during block building, on writing first entry or right after resetting, `buffer` is empty and `buffer_size` is zero as usual. However, in follow-up writes, depending on the size of the stripped user-defined timestamp, and the size of the value, what's in `buffer` can sometimes be smaller than `last_key_size`, leading `std::min(buffer_size, last_key_size)` to truncate the `last_key`. Previous test doesn't caught the bug because in those tests, the size of the stripped user-defined timestamps bytes is smaller than the length of the value. In order to avoid the conditional operation, this PR changed the original trivial `std::min` operation into an arithmetic operation. Since this is a change in a hot and performance critical path, I did the following benchmark to check no observable regression is introduced.
```TEST_TMPDIR=/dev/shm/rocksdb1 ./db_bench -benchmarks=fillseq -memtablerep=vector -allow_concurrent_memtable_write=false -num=50000000```
Compiled with DEBUG_LEVEL=0
Test vs. control runs simulaneous for better accuracy, units = ops/sec
                       PR  vs base:
Round 1: 350652 vs 349055
Round 2: 365733 vs 364308
Round 3: 355681 vs 354475

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11557

Test Plan:
New timestamp specific test added or existing tests augmented, both are parameterized with `UserDefinedTimestampTestMode`:
`UserDefinedTimestampTestMode::kNormal` -> UDT feature enabled, write / read with min timestamp
`UserDefinedTimestampTestMode::kStripUserDefinedTimestamps` -> UDT feature enabled, write / read with min timestamp, set Options.persist_user_defined_timestamps to false.

```
make all check
./db_wal_test --gtest_filter="*WithTimestamp*"
./flush_job_test --gtest_filter="*WithTimestamp*"
./repair_test --gtest_filter="*WithTimestamp*"
./block_based_table_reader_test
```

Reviewed By: pdillinger

Differential Revision: D47027664

Pulled By: jowlyzhang

fbshipit-source-id: e729193b6334dfc63aaa736d684d907a022571f5
上级 bfdc9101
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "db/blob/blob_file_builder.h" #include "db/blob/blob_file_builder.h"
#include "db/compaction/compaction_iterator.h" #include "db/compaction/compaction_iterator.h"
#include "db/dbformat.h"
#include "db/event_helpers.h" #include "db/event_helpers.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/merge_helper.h" #include "db/merge_helper.h"
...@@ -205,21 +206,38 @@ Status BuildTable( ...@@ -205,21 +206,38 @@ Status BuildTable(
/*compaction=*/nullptr, compaction_filter.get(), /*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low); /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
const size_t ts_sz = ucmp->timestamp_size();
const bool strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;
std::string key_after_flush_buf;
c_iter.SeekToFirst(); c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) { for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key(); const Slice& key = c_iter.key();
const Slice& value = c_iter.value(); const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey(); const ParsedInternalKey& ikey = c_iter.ikey();
// Generate a rolling 64-bit hash of the key and values Slice key_after_flush = key;
// Note : // If user defined timestamps will be stripped from user key after flush,
// Here "key" integrates 'sequence_number'+'kType'+'user key'. // the in memory version of the key act logically the same as one with a
s = output_validator.Add(key, value); // minimum timestamp. We update the timestamp here so file boundary and
// output validator, block builder all see the effect of the stripping.
if (strip_timestamp) {
key_after_flush_buf.clear();
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
key_after_flush = key_after_flush_buf;
}
// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key_after_flush, value);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
builder->Add(key, value); builder->Add(key_after_flush, value);
s = meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type); s = meta->UpdateBoundaries(key_after_flush, value, ikey.sequence,
ikey.type);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
...@@ -244,6 +262,7 @@ Status BuildTable( ...@@ -244,6 +262,7 @@ Status BuildTable(
range_del_it->Next()) { range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone(); auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize(); auto kv = tombstone.Serialize();
// TODO(yuzhangyu): handle range deletion for UDT in memtables only.
builder->Add(kv.first.Encode(), kv.second); builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey(); InternalKey tombstone_end = tombstone.SerializeEndKey();
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_, meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
...@@ -354,6 +373,8 @@ Status BuildTable( ...@@ -354,6 +373,8 @@ Status BuildTable(
s = *io_status; s = *io_status;
} }
// TODO(yuzhangyu): handle the key copy in the blob when ts should be
// stripped.
if (blob_file_builder) { if (blob_file_builder) {
if (s.ok()) { if (s.ok()) {
s = blob_file_builder->Finish(); s = blob_file_builder->Finish();
......
...@@ -311,7 +311,9 @@ TEST_F(DBWALTest, Recover) { ...@@ -311,7 +311,9 @@ TEST_F(DBWALTest, Recover) {
} while (ChangeWalOptions()); } while (ChangeWalOptions());
} }
class DBWALTestWithTimestamp : public DBBasicTestWithTimestampBase { class DBWALTestWithTimestamp
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<test::UserDefinedTimestampTestMode> {
public: public:
DBWALTestWithTimestamp() DBWALTestWithTimestamp()
: DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {} : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {}
...@@ -401,6 +403,54 @@ TEST_F(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) { ...@@ -401,6 +403,54 @@ TEST_F(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument()); ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument());
} }
TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// Set up the option that enables user defined timestmp size.
std::string min_ts = Timestamp(0, 0);
std::string write_ts = Timestamp(1, 0);
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
bool persist_udt = test::ShouldPersistUDT(GetParam());
ts_options.persist_user_defined_timestamps = persist_udt;
std::string smallest_ukey_without_ts = "baz";
std::string largest_ukey_without_ts = "foo";
ASSERT_OK(CreateAndReopenWithCFWithTs({"pikachu"}, ts_options));
ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1"));
ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5"));
// Very small write buffer size to force flush memtables recovered from WAL.
ts_options.write_buffer_size = 16;
ts_options.arena_block_size = 16;
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files);
ASSERT_GT(level_to_files.size(), 1);
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto meta = level_to_files[0][0];
if (persist_udt) {
ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key());
} else {
ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key());
ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key());
}
}
// Param 0: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
RecoverAndFlush, DBWALTestWithTimestamp,
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal));
TEST_F(DBWALTest, RecoverWithTableHandle) { TEST_F(DBWALTest, RecoverWithTableHandle) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
......
...@@ -120,6 +120,16 @@ void StripTimestampFromInternalKey(std::string* result, const Slice& key, ...@@ -120,6 +120,16 @@ void StripTimestampFromInternalKey(std::string* result, const Slice& key,
kNumInternalBytes); kNumInternalBytes);
} }
void ReplaceInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz) {
const size_t key_sz = key.size();
assert(key_sz >= ts_sz + kNumInternalBytes);
result->reserve(key_sz);
result->append(key.data(), key_sz - kNumInternalBytes - ts_sz);
result->append(ts_sz, static_cast<unsigned char>(0));
result->append(key.data() + key_sz - kNumInternalBytes, kNumInternalBytes);
}
std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const { std::string ParsedInternalKey::DebugString(bool log_err_key, bool hex) const {
std::string result = "'"; std::string result = "'";
if (log_err_key) { if (log_err_key) {
......
...@@ -168,51 +168,57 @@ inline void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ...@@ -168,51 +168,57 @@ inline void UnPackSequenceAndType(uint64_t packed, uint64_t* seq,
EntryType GetEntryType(ValueType value_type); EntryType GetEntryType(ValueType value_type);
// Append the serialization of "key" to *result. // Append the serialization of "key" to *result.
extern void AppendInternalKey(std::string* result, void AppendInternalKey(std::string* result, const ParsedInternalKey& key);
const ParsedInternalKey& key);
// Append the serialization of "key" to *result, replacing the original // Append the serialization of "key" to *result, replacing the original
// timestamp with argument ts. // timestamp with argument ts.
extern void AppendInternalKeyWithDifferentTimestamp( void AppendInternalKeyWithDifferentTimestamp(std::string* result,
std::string* result, const ParsedInternalKey& key, const Slice& ts); const ParsedInternalKey& key,
const Slice& ts);
// Serialized internal key consists of user key followed by footer. // Serialized internal key consists of user key followed by footer.
// This function appends the footer to *result, assuming that *result already // This function appends the footer to *result, assuming that *result already
// contains the user key at the end. // contains the user key at the end.
extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s, void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
ValueType t); ValueType t);
// Append the key and a minimal timestamp to *result // Append the key and a minimal timestamp to *result
extern void AppendKeyWithMinTimestamp(std::string* result, const Slice& key, void AppendKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz); size_t ts_sz);
// Append the key and a maximal timestamp to *result // Append the key and a maximal timestamp to *result
extern void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key, void AppendKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz); size_t ts_sz);
// `key` is a user key with timestamp. Append the user key without timestamp // `key` is a user key with timestamp. Append the user key without timestamp
// and the maximal timestamp to *result. // and the maximal timestamp to *result.
extern void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key, void AppendUserKeyWithMaxTimestamp(std::string* result, const Slice& key,
size_t ts_sz); size_t ts_sz);
// `key` is an internal key containing a user key without timestamp. Create a // `key` is an internal key containing a user key without timestamp. Create a
// new key in *result by padding a min timestamp of size `ts_sz` to the user key // new key in *result by padding a min timestamp of size `ts_sz` to the user key
// and copying the remaining internal key bytes. // and copying the remaining internal key bytes.
extern void PadInternalKeyWithMinTimestamp(std::string* result, void PadInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
const Slice& key, size_t ts_sz); size_t ts_sz);
// `key` is an internal key containing a user key with timestamp of size // `key` is an internal key containing a user key with timestamp of size
// `ts_sz`. Create a new internal key in *result by stripping the timestamp from // `ts_sz`. Create a new internal key in *result by stripping the timestamp from
// the user key and copying the remaining internal key bytes. // the user key and copying the remaining internal key bytes.
extern void StripTimestampFromInternalKey(std::string* result, const Slice& key, void StripTimestampFromInternalKey(std::string* result, const Slice& key,
size_t ts_sz); size_t ts_sz);
// `key` is an internal key containing a user key with timestamp of size
// `ts_sz`. Create a new internal key in *result while replace the original
// timestamp with min timestamp.
void ReplaceInternalKeyWithMinTimestamp(std::string* result, const Slice& key,
size_t ts_sz);
// Attempt to parse an internal key from "internal_key". On success, // Attempt to parse an internal key from "internal_key". On success,
// stores the parsed data in "*result", and returns true. // stores the parsed data in "*result", and returns true.
// //
// On error, returns false, leaves "*result" in an undefined state. // On error, returns false, leaves "*result" in an undefined state.
extern Status ParseInternalKey(const Slice& internal_key, Status ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result,
ParsedInternalKey* result, bool log_err_key); bool log_err_key);
// Returns the user key portion of an internal key. // Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) { inline Slice ExtractUserKey(const Slice& internal_key) {
...@@ -783,8 +789,7 @@ class InternalKeySliceTransform : public SliceTransform { ...@@ -783,8 +789,7 @@ class InternalKeySliceTransform : public SliceTransform {
// Read the key of a record from a write batch. // Read the key of a record from a write batch.
// if this record represent the default column family then cf_record // if this record represent the default column family then cf_record
// must be passed as false, otherwise it must be passed as true. // must be passed as false, otherwise it must be passed as true.
extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record);
bool cf_record);
// Read record from a write batch piece from input. // Read record from a write batch piece from input.
// tag, column_family, key, value and blob are return values. Callers own the // tag, column_family, key, value and blob are return values. Callers own the
...@@ -793,9 +798,9 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, ...@@ -793,9 +798,9 @@ extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key,
// input will be advanced to after the record. // input will be advanced to after the record.
// If user-defined timestamp is enabled for a column family, then the `key` // If user-defined timestamp is enabled for a column family, then the `key`
// resulting from this call will include timestamp. // resulting from this call will include timestamp.
extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key, uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid); Slice* value, Slice* blob, Slice* xid);
// When user call DeleteRange() to delete a range of keys, // When user call DeleteRange() to delete a range of keys,
// we will store a serialized RangeTombstone in MemTable and SST. // we will store a serialized RangeTombstone in MemTable and SST.
......
...@@ -312,6 +312,27 @@ TEST_F(FormatTest, StripTimestampFromInternalKey) { ...@@ -312,6 +312,27 @@ TEST_F(FormatTest, StripTimestampFromInternalKey) {
ASSERT_EQ(kTypeValue, key_without_timestamp.type); ASSERT_EQ(kTypeValue, key_without_timestamp.type);
} }
TEST_F(FormatTest, ReplaceInternalKeyWithMinTimestamp) {
std::string orig_user_key = "foo";
size_t ts_sz = 8;
orig_user_key.append(ts_sz, static_cast<unsigned char>(1));
std::string orig_internal_key = IKey(orig_user_key, 100, kTypeValue);
std::string key_buf;
ReplaceInternalKeyWithMinTimestamp(&key_buf, orig_internal_key, ts_sz);
ParsedInternalKey new_key;
Slice in(key_buf);
ASSERT_OK(ParseInternalKey(in, &new_key, true /*log_err_key*/));
std::string min_timestamp(ts_sz, static_cast<unsigned char>(0));
size_t ukey_diff_offset = new_key.user_key.difference_offset(orig_user_key);
ASSERT_EQ(min_timestamp,
Slice(new_key.user_key.data() + ukey_diff_offset, ts_sz));
ASSERT_EQ(orig_user_key.size(), new_key.user_key.size());
ASSERT_EQ(100, new_key.sequence);
ASSERT_EQ(kTypeValue, new_key.type);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -117,6 +117,8 @@ class FlushJobTestBase : public testing::Test { ...@@ -117,6 +117,8 @@ class FlushJobTestBase : public testing::Test {
db_options_.statistics = CreateDBStatistics(); db_options_.statistics = CreateDBStatistics();
cf_options_.comparator = ucmp_; cf_options_.comparator = ucmp_;
cf_options_.persist_user_defined_timestamps = persist_udt_;
cf_options_.paranoid_file_checks = paranoid_file_checks_;
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
cf_options_.table_factory = mock_table_factory_; cf_options_.table_factory = mock_table_factory_;
...@@ -149,6 +151,9 @@ class FlushJobTestBase : public testing::Test { ...@@ -149,6 +151,9 @@ class FlushJobTestBase : public testing::Test {
std::atomic<bool> shutting_down_; std::atomic<bool> shutting_down_;
std::shared_ptr<mock::MockTableFactory> mock_table_factory_; std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
bool persist_udt_ = true;
bool paranoid_file_checks_ = false;
SeqnoToTimeMapping empty_seqno_to_time_mapping_; SeqnoToTimeMapping empty_seqno_to_time_mapping_;
}; };
...@@ -600,7 +605,13 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { ...@@ -600,7 +605,13 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) {
} }
} }
class FlushJobTimestampTest : public FlushJobTestBase { // Test parameters:
// param 0): paranoid file check
// param 1): user-defined timestamp test mode
class FlushJobTimestampTest
: public FlushJobTestBase,
public testing::WithParamInterface<
std::tuple<bool, test::UserDefinedTimestampTestMode>> {
public: public:
FlushJobTimestampTest() FlushJobTimestampTest()
: FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"), : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"),
...@@ -616,13 +627,36 @@ class FlushJobTimestampTest : public FlushJobTestBase { ...@@ -616,13 +627,36 @@ class FlushJobTimestampTest : public FlushJobTestBase {
} }
protected: protected:
void SetUp() override {
paranoid_file_checks_ = std::get<0>(GetParam());
auto udt_test_mode = std::get<1>(GetParam());
persist_udt_ = test::ShouldPersistUDT(udt_test_mode);
FlushJobTestBase::SetUp();
}
static constexpr uint64_t kStartTs = 10; static constexpr uint64_t kStartTs = 10;
static constexpr SequenceNumber kStartSeq = 0; static constexpr SequenceNumber kStartSeq = 0;
SequenceNumber curr_seq_{kStartSeq}; SequenceNumber curr_seq_{kStartSeq};
std::atomic<uint64_t> curr_ts_{kStartTs}; std::atomic<uint64_t> curr_ts_{kStartTs};
void CheckFileMetaData(ColumnFamilyData* cfd,
const InternalKey& expected_smallest,
const InternalKey& expected_largest,
const FileMetaData* meta_from_flush) const {
ASSERT_EQ(expected_smallest.Encode(), meta_from_flush->smallest.Encode());
ASSERT_EQ(expected_largest.Encode(), meta_from_flush->largest.Encode());
const VersionStorageInfo* storage_info = cfd->current()->storage_info();
const std::vector<FileMetaData*>& l0_files = storage_info->LevelFiles(0);
ASSERT_EQ(l0_files.size(), 1);
auto installed_file_meta = l0_files[0];
ASSERT_EQ(expected_smallest.Encode(),
installed_file_meta->smallest.Encode());
ASSERT_EQ(expected_largest.Encode(), installed_file_meta->largest.Encode());
}
}; };
TEST_F(FlushJobTimestampTest, AllKeysExpired) { TEST_P(FlushJobTimestampTest, AllKeysExpired) {
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
autovector<MemTable*> to_delete; autovector<MemTable*> to_delete;
...@@ -669,17 +703,24 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) { ...@@ -669,17 +703,24 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) {
{ {
std::string key = test::EncodeInt(0); std::string key = test::EncodeInt(0);
key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1)); if (!persist_udt_) {
// When `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` flag
// is set to false. The user-defined timestamp is stripped from user key
// during flush, making the user key logically containing the minimum
// timestamp.
key.append(test::EncodeInt(0));
} else {
key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1));
}
InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp); InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp);
ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode()); CheckFileMetaData(cfd, ikey, ikey, &fmeta);
ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode());
} }
job_context.Clean(); job_context.Clean();
ASSERT_TRUE(to_delete.empty()); ASSERT_TRUE(to_delete.empty());
} }
TEST_F(FlushJobTimestampTest, NoKeyExpired) { TEST_P(FlushJobTimestampTest, NoKeyExpired) {
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
autovector<MemTable*> to_delete; autovector<MemTable*> to_delete;
...@@ -722,18 +763,38 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { ...@@ -722,18 +763,38 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) {
{ {
std::string ukey = test::EncodeInt(0); std::string ukey = test::EncodeInt(0);
std::string smallest_key = std::string smallest_key;
ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1); std::string largest_key;
std::string largest_key = ukey + test::EncodeInt(kStartTs); if (!persist_udt_) {
// When `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` flag
// is set to false. The user-defined timestamp is stripped from user key
// during flush, making the user key logically containing the minimum
// timestamp, which is hardcoded to be all zeros for now.
smallest_key = ukey + test::EncodeInt(0);
largest_key = ukey + test::EncodeInt(0);
} else {
smallest_key =
ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1);
largest_key = ukey + test::EncodeInt(kStartTs);
}
InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue); InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue); InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode()); CheckFileMetaData(cfd, smallest, largest, &fmeta);
ASSERT_EQ(largest.Encode(), fmeta.largest.Encode());
} }
job_context.Clean(); job_context.Clean();
ASSERT_TRUE(to_delete.empty()); ASSERT_TRUE(to_delete.empty());
} }
// Param 0: paranoid file check
// Param 1: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
FlushJobTimestampTest, FlushJobTimestampTest,
::testing::Combine(
::testing::Bool(),
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal)));
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -315,7 +315,13 @@ TEST_F(RepairTest, UnflushedSst) { ...@@ -315,7 +315,13 @@ TEST_F(RepairTest, UnflushedSst) {
ASSERT_EQ(Get("key"), "val"); ASSERT_EQ(Get("key"), "val");
} }
class RepairTestWithTimestamp : public DBBasicTestWithTimestampBase { // Test parameters:
// param 0): paranoid file check
// param 1): user-defined timestamp test mode
class RepairTestWithTimestamp
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<
std::tuple<bool, test::UserDefinedTimestampTestMode>> {
public: public:
RepairTestWithTimestamp() RepairTestWithTimestamp()
: DBBasicTestWithTimestampBase("repair_test_with_timestamp") {} : DBBasicTestWithTimestampBase("repair_test_with_timestamp") {}
...@@ -326,23 +332,46 @@ class RepairTestWithTimestamp : public DBBasicTestWithTimestampBase { ...@@ -326,23 +332,46 @@ class RepairTestWithTimestamp : public DBBasicTestWithTimestampBase {
} }
void CheckGet(const ReadOptions& read_opts, const Slice& key, void CheckGet(const ReadOptions& read_opts, const Slice& key,
const std::string& expected_value) { const std::string& expected_value,
const std::string& expected_ts) {
std::string actual_value; std::string actual_value;
ASSERT_OK(db_->Get(read_opts, handles_[0], key, &actual_value)); std::string actual_ts;
ASSERT_OK(db_->Get(read_opts, handles_[0], key, &actual_value, &actual_ts));
ASSERT_EQ(expected_value, actual_value); ASSERT_EQ(expected_value, actual_value);
ASSERT_EQ(expected_ts, actual_ts);
}
void CheckFileBoundaries(const Slice& smallest_user_key,
const Slice& largest_user_key) {
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
&level_to_files);
ASSERT_GT(level_to_files.size(), 1);
// L0 only has one SST file.
ASSERT_EQ(level_to_files[0].size(), 1);
auto file_meta = level_to_files[0][0];
ASSERT_EQ(smallest_user_key, file_meta.smallest.user_key());
ASSERT_EQ(largest_user_key, file_meta.largest.user_key());
} }
}; };
TEST_F(RepairTestWithTimestamp, UnflushedSst) { TEST_P(RepairTestWithTimestamp, UnflushedSst) {
Destroy(last_options_); Destroy(last_options_);
bool paranoid_file_checks = std::get<0>(GetParam());
bool persist_udt = test::ShouldPersistUDT(std::get<1>(GetParam()));
std::string smallest_ukey_without_ts = "bar";
std::string largest_ukey_without_ts = "foo";
Options options = CurrentOptions(); Options options = CurrentOptions();
options.env = env_; options.env = env_;
options.create_if_missing = true; options.create_if_missing = true;
std::string ts = Timestamp(0, 0); std::string min_ts = Timestamp(0, 0);
const size_t kTimestampSize = ts.size(); std::string write_ts = Timestamp(1, 0);
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize); TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp; options.comparator = &test_cmp;
options.persist_user_defined_timestamps = persist_udt;
options.paranoid_file_checks = paranoid_file_checks;
ColumnFamilyOptions cf_options(options); ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families; std::vector<ColumnFamilyDescriptor> column_families;
...@@ -351,7 +380,10 @@ TEST_F(RepairTestWithTimestamp, UnflushedSst) { ...@@ -351,7 +380,10 @@ TEST_F(RepairTestWithTimestamp, UnflushedSst) {
ASSERT_OK(DB::Open(options, dbname_, column_families, &handles_, &db_)); ASSERT_OK(DB::Open(options, dbname_, column_families, &handles_, &db_));
ASSERT_OK(Put("key", ts, "val")); ASSERT_OK(Put(smallest_ukey_without_ts, write_ts,
smallest_ukey_without_ts + ":val"));
ASSERT_OK(
Put(largest_ukey_without_ts, write_ts, largest_ukey_without_ts + ":val"));
VectorLogPtr wal_files; VectorLogPtr wal_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 1); ASSERT_EQ(wal_files.size(), 1);
...@@ -381,12 +413,46 @@ TEST_F(RepairTestWithTimestamp, UnflushedSst) { ...@@ -381,12 +413,46 @@ TEST_F(RepairTestWithTimestamp, UnflushedSst) {
ASSERT_GT(total_ssts_size, 0); ASSERT_GT(total_ssts_size, 0);
} }
// Check file boundaries are correct for different
// `persist_user_defined_timestamps` option values.
if (persist_udt) {
CheckFileBoundaries(smallest_ukey_without_ts + write_ts,
largest_ukey_without_ts + write_ts);
} else {
CheckFileBoundaries(smallest_ukey_without_ts + min_ts,
largest_ukey_without_ts + min_ts);
}
ReadOptions read_opts; ReadOptions read_opts;
Slice read_ts_slice = ts; Slice read_ts_slice = write_ts;
read_opts.timestamp = &read_ts_slice; read_opts.timestamp = &read_ts_slice;
CheckGet(read_opts, "key", "val"); if (persist_udt) {
CheckGet(read_opts, smallest_ukey_without_ts,
smallest_ukey_without_ts + ":val", write_ts);
CheckGet(read_opts, largest_ukey_without_ts,
largest_ukey_without_ts + ":val", write_ts);
} else {
// TODO(yuzhangyu): currently when `persist_user_defined_timestamps` is
// false, ts is unconditionally stripped during flush.
// When `full_history_ts_low` is set and respected during flush.
// We should prohibit reading below `full_history_ts_low` all together.
CheckGet(read_opts, smallest_ukey_without_ts,
smallest_ukey_without_ts + ":val", min_ts);
CheckGet(read_opts, largest_ukey_without_ts,
largest_ukey_without_ts + ":val", min_ts);
}
} }
// Param 0: paranoid file check
// Param 1: test mode for the user-defined timestamp feature
INSTANTIATE_TEST_CASE_P(
UnflushedSst, RepairTestWithTimestamp,
::testing::Combine(
::testing::Bool(),
::testing::Values(
test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp,
test::UserDefinedTimestampTestMode::kNormal)));
TEST_F(RepairTest, SeparateWalDir) { TEST_F(RepairTest, SeparateWalDir) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
......
...@@ -1120,11 +1120,30 @@ struct AdvancedColumnFamilyOptions { ...@@ -1120,11 +1120,30 @@ struct AdvancedColumnFamilyOptions {
// //
// When it's false, the user-defined timestamps will be removed from the user // When it's false, the user-defined timestamps will be removed from the user
// keys when data is flushed from memtables to SST files. Other places that // keys when data is flushed from memtables to SST files. Other places that
// user keys can be persisted like WAL and blob files go through a similar // user keys can be persisted like file boundaries in file metadata and blob
// process. Users should call `DB::IncreaseFullHistoryTsLow` to set a cutoff // files go through a similar process. There are two major motivations
// timestamp. RocksDB refrains from flushing a memtable with data still above // for this flag:
// the cutoff timestamp with best effort. When users try to read below the // 1) backward compatibility: if the user later decides to
// cutoff timestamp, an error will be returned. // disable the user-defined timestamp feature for the column family, these SST
// files can be handled by a user comparator that is not aware of user-defined
// timestamps.
// 2) enable user-defined timestamp feature for an existing column family
// while set this flag to be `false`: user keys in the newly generated SST
// files are of the same format as the existing SST files.
//
// When setting this flag to `false`, users should also call
// `DB::IncreaseFullHistoryTsLow` to set a cutoff timestamp for flush. RocksDB
// refrains from flushing a memtable with data still above
// the cutoff timestamp with best effort. Users can do user-defined
// multi-versioned read above the cutoff timestamp. When users try to read
// below the cutoff timestamp, an error will be returned.
//
// Note that if WAL is enabled, unlike SST files, user-defined timestamps are
// persisted to WAL even if this flag is set to `false`. The benefit of this
// is that user-defined timestamps can be recovered with the caveat that users
// should flush all memtables so there is no active WAL files before doing a
// downgrade or toggling on / off the user-defined timestamp feature on a
// column family.
// //
// Default: true (user-defined timestamps are persisted) // Default: true (user-defined timestamps are persisted)
// Not dynamically changeable, change it requires db restart and // Not dynamically changeable, change it requires db restart and
......
...@@ -53,8 +53,7 @@ BlockBuilder::BlockBuilder( ...@@ -53,8 +53,7 @@ BlockBuilder::BlockBuilder(
: block_restart_interval_(block_restart_interval), : block_restart_interval_(block_restart_interval),
use_delta_encoding_(use_delta_encoding), use_delta_encoding_(use_delta_encoding),
use_value_delta_encoding_(use_value_delta_encoding), use_value_delta_encoding_(use_value_delta_encoding),
ts_sz_(ts_sz), strip_ts_sz_(persist_user_defined_timestamps ? 0 : ts_sz),
persist_user_defined_timestamps_(persist_user_defined_timestamps),
is_user_key_(is_user_key), is_user_key_(is_user_key),
restarts_(1, 0), // First restart point is at offset 0 restarts_(1, 0), // First restart point is at offset 0
counter_(0), counter_(0),
...@@ -100,8 +99,8 @@ size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, ...@@ -100,8 +99,8 @@ size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key,
// Note: this is an imprecise estimate as it accounts for the whole key size // Note: this is an imprecise estimate as it accounts for the whole key size
// instead of non-shared key size. // instead of non-shared key size.
estimate += key.size(); estimate += key.size();
if (ts_sz_ > 0 && !persist_user_defined_timestamps_) { if (strip_ts_sz_ > 0) {
estimate -= ts_sz_; estimate -= strip_ts_sz_;
} }
// In value delta encoding we estimate the value delta size as half the full // In value delta encoding we estimate the value delta size as half the full
// value size since only the size field of block handle is encoded. // value size since only the size field of block handle is encoded.
...@@ -175,13 +174,13 @@ void BlockBuilder::AddWithLastKey(const Slice& key, const Slice& value, ...@@ -175,13 +174,13 @@ void BlockBuilder::AddWithLastKey(const Slice& key, const Slice& value,
// or Reset. This is more convenient for the caller and we can be more // or Reset. This is more convenient for the caller and we can be more
// clever inside BlockBuilder. On this hot code path, we want to avoid // clever inside BlockBuilder. On this hot code path, we want to avoid
// conditional jumps like `buffer_.empty() ? ... : ...` so we can use a // conditional jumps like `buffer_.empty() ? ... : ...` so we can use a
// fast min operation instead, with an assertion to be sure our logic is // fast arithmetic operation instead, with an assertion to be sure our logic
// sound. // is sound.
size_t buffer_size = buffer_.size(); size_t buffer_size = buffer_.size();
size_t last_key_size = last_key_param.size(); size_t last_key_size = last_key_param.size();
assert(buffer_size == 0 || buffer_size >= last_key_size); assert(buffer_size == 0 || buffer_size >= last_key_size - strip_ts_sz_);
Slice last_key(last_key_param.data(), std::min(buffer_size, last_key_size)); Slice last_key(last_key_param.data(), last_key_size * (buffer_size > 0));
AddWithLastKeyImpl(key, value, last_key, delta_value, buffer_size); AddWithLastKeyImpl(key, value, last_key, delta_value, buffer_size);
} }
...@@ -255,11 +254,11 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key, ...@@ -255,11 +254,11 @@ inline void BlockBuilder::AddWithLastKeyImpl(const Slice& key,
const Slice BlockBuilder::MaybeStripTimestampFromKey(std::string* key_buf, const Slice BlockBuilder::MaybeStripTimestampFromKey(std::string* key_buf,
const Slice& key) { const Slice& key) {
Slice stripped_key = key; Slice stripped_key = key;
if (ts_sz_ > 0 && !persist_user_defined_timestamps_) { if (strip_ts_sz_ > 0) {
if (is_user_key_) { if (is_user_key_) {
stripped_key.remove_suffix(ts_sz_); stripped_key.remove_suffix(strip_ts_sz_);
} else { } else {
StripTimestampFromInternalKey(key_buf, key, ts_sz_); StripTimestampFromInternalKey(key_buf, key, strip_ts_sz_);
stripped_key = *key_buf; stripped_key = *key_buf;
} }
} }
......
...@@ -94,11 +94,10 @@ class BlockBuilder { ...@@ -94,11 +94,10 @@ class BlockBuilder {
const bool use_delta_encoding_; const bool use_delta_encoding_;
// Refer to BlockIter::DecodeCurrentValue for format of delta encoded values // Refer to BlockIter::DecodeCurrentValue for format of delta encoded values
const bool use_value_delta_encoding_; const bool use_value_delta_encoding_;
// Size in bytes for the user-defined timestamp in a user key. // Size in bytes for the user-defined timestamp to strip in a user key.
const size_t ts_sz_; // This is non-zero if there is user-defined timestamp in the user key and it
// Whether the user-defined timestamp part in user keys should be persisted. // should not be persisted.
// If false, it will be stripped from the key before it's encoded. const size_t strip_ts_sz_;
const bool persist_user_defined_timestamps_;
// Whether the keys provided to build this block are user keys. If not, // Whether the keys provided to build this block are user keys. If not,
// the keys are internal keys. This will affect how timestamp stripping is // the keys are internal keys. This will affect how timestamp stripping is
// done for the key if `persisted_user_defined_timestamps_` is false and // done for the key if `persisted_user_defined_timestamps_` is false and
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册