diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 93681c70b3fd5cba1fd7ef26915269fafd0a74ad..8bda80bf4b7c70091610f359d43f020c2212d116 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1693,7 +1693,8 @@ Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family, } InstrumentedMutexLock l(&mutex_); *ts_low = cfd->GetFullHistoryTsLow(); - assert(cfd->user_comparator()->timestamp_size() == ts_low->size()); + assert(ts_low->empty() || + cfd->user_comparator()->timestamp_size() == ts_low->size()); return Status::OK(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index e6d97b125700878fc033e8f16697eac1a059181b..8db53dac93f32f8f9e84f7913c50964db62b8d09 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1722,6 +1722,22 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, for (const auto& blob : blob_file_additions) { edit->AddBlobFile(blob); } + + // For UDT in memtable only feature, move up the cutoff timestamp whenever + // a flush happens. + const Comparator* ucmp = cfd->user_comparator(); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps) { + Slice mem_newest_udt = mem->GetNewestUDT(); + std::string full_history_ts_low = cfd->GetFullHistoryTsLow(); + if (full_history_ts_low.empty() || + ucmp->CompareTimestamp(mem_newest_udt, full_history_ts_low) >= 0) { + std::string new_full_history_ts_low; + GetFullHistoryTsLowFromU64CutoffTs(&mem_newest_udt, + &new_full_history_ts_low); + edit->SetFullHistoryTsLow(new_full_history_ts_low); + } + } } InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 72b6f7c7b8248414f459db9e60938a98a40612bd..01dc84a0fba3db19491dd3e93b2923da13a9bef2 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -14,6 +14,7 @@ #include "port/stack_trace.h" #include "rocksdb/file_system.h" #include "test_util/sync_point.h" +#include "util/udt_util.h" #include "utilities/fault_injection_env.h" #include "utilities/fault_injection_fs.h" @@ -384,6 +385,7 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { ts_options.persist_user_defined_timestamps = persist_udt; bool avoid_flush_during_recovery = true; + std::string full_history_ts_low; ReadOptions read_opts; do { Slice ts_slice = ts1; @@ -439,6 +441,8 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { CheckGet(read_opts, 1, "foo", "v4", ts3); CheckGet(read_opts, 1, "bar", "v2", ts2); CheckGet(read_opts, 1, "baz", "v5", ts1); + ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low)); + ASSERT_TRUE(full_history_ts_low.empty()); } while (ChangeWalOptions()); } @@ -470,6 +474,8 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { std::vector> level_to_files; dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files); + std::string full_history_ts_low; + ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low)); ASSERT_GT(level_to_files.size(), 1); // L0 only has one SST file. ASSERT_EQ(level_to_files[0].size(), 1); @@ -477,9 +483,14 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { if (persist_udt) { ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key()); + ASSERT_TRUE(full_history_ts_low.empty()); } else { ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key()); + std::string effective_cutoff; + Slice write_ts_slice = write_ts; + GetFullHistoryTsLowFromU64CutoffTs(&write_ts_slice, &effective_cutoff); + ASSERT_EQ(effective_cutoff, full_history_ts_low); } } diff --git a/db/flush_job.cc b/db/flush_job.cc index 451e61937b8a29d5e764e01fea879bb3f42b5286..0e6c66cacb3a63bbf8028265e6805e23f9ae3817 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -1147,16 +1147,13 @@ Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() { ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) { return Status::OK(); } - Slice cutoff_udt_slice = cutoff_udt_; - uint64_t cutoff_udt_ts = 0; - bool format_res = GetFixed64(&cutoff_udt_slice, &cutoff_udt_ts); - assert(format_res); - (void)format_res; std::string new_full_history_ts_low; + Slice cutoff_udt_slice = cutoff_udt_; // TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an // operation to get the next immediately larger user-defined timestamp to // expand this feature to other user-defined timestamp formats. - PutFixed64(&new_full_history_ts_low, cutoff_udt_ts + 1); + GetFullHistoryTsLowFromU64CutoffTs(&cutoff_udt_slice, + &new_full_history_ts_low); VersionEdit edit; edit.SetColumnFamily(cfd_->GetID()); edit.SetFullHistoryTsLow(new_full_history_ts_low); diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 18eb020813fdd8611cb40e88d8a12841d19eba10..08e4e08b7b884d154244ddc465dbc678d16a6323 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -1190,7 +1190,9 @@ struct AdvancedColumnFamilyOptions { // persisted to WAL even if this flag is set to `false`. The benefit of this // is that user-defined timestamps can be recovered with the caveat that users // should flush all memtables so there is no active WAL files before doing a - // downgrade. + // downgrade. In order to use WAL to recover user-defined timestamps, users of + // this feature would want to set both `avoid_flush_during_shutdown` and + // `avoid_flush_during_recovery` to be true. // // Note that setting this flag to false is not supported in combination with // atomic flush, or concurrent memtable write enabled by diff --git a/util/udt_util.cc b/util/udt_util.cc index 9380f4560422bf885efb714256a8da66f52142f6..7d549acabcb62488a8c616c54983b8afd8ba7b62 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -8,6 +8,7 @@ #include "db/dbformat.h" #include "rocksdb/types.h" +#include "util/coding.h" #include "util/write_batch_util.h" namespace ROCKSDB_NAMESPACE { @@ -340,4 +341,11 @@ Status ValidateUserDefinedTimestampsOptions( return Status::InvalidArgument( "Unsupported user defined timestamps settings change."); } + +void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts, + std::string* full_history_ts_low) { + uint64_t cutoff_udt_ts = 0; + [[maybe_unused]] bool format_res = GetFixed64(cutoff_ts, &cutoff_udt_ts); + PutFixed64(full_history_ts_low, cutoff_udt_ts + 1); +} } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util.h b/util/udt_util.h index 4bc83773983d48978463e72e1482deefc5689baa..706b02e90ce1c4be89e7ce115e2618717020d449 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -246,4 +246,12 @@ Status ValidateUserDefinedTimestampsOptions( const Comparator* new_comparator, const std::string& old_comparator_name, bool new_persist_udt, bool old_persist_udt, bool* mark_sst_files_has_no_udt); + +// Given a cutoff user-defined timestamp formatted as uint64_t, get the +// effective `full_history_ts_low` timestamp, which is the next immediately +// bigger timestamp. Used by the UDT in memtable only feature when flushing +// memtables and remove timestamps. This process collapses history and increase +// the effective `full_history_ts_low`. +void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts, + std::string* full_history_ts_low); } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index 47e1edf34ff53ad3ccb9b61a644792c16cbe54e9..44ee567f744545312092eacc37459770960d334f 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -438,6 +438,20 @@ TEST(ValidateUserDefinedTimestampsOptionsTest, InvalidUserComparatorChange) { &mark_sst_files) .IsInvalidArgument()); } + +TEST(GetFullHistoryTsLowFromU64CutoffTsTest, Success) { + std::string cutoff_ts; + uint64_t cutoff_ts_int = 3; + PutFixed64(&cutoff_ts, 3); + Slice cutoff_ts_slice = cutoff_ts; + std::string actual_full_history_ts_low; + GetFullHistoryTsLowFromU64CutoffTs(&cutoff_ts_slice, + &actual_full_history_ts_low); + + std::string expected_ts_low; + PutFixed64(&expected_ts_low, cutoff_ts_int + 1); + ASSERT_EQ(expected_ts_low, actual_full_history_ts_low); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {