From 4234a6a301f5a5e25e8bf20629375ca3d4996309 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Wed, 30 Aug 2023 09:34:31 -0700 Subject: [PATCH] Increase full_history_ts_low when flush happens during recovery (#11774) Summary: This PR adds a missing piece for the UDT in memtable only feature, which is to automatically increase `full_history_ts_low` when flush happens during recovery. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11774 Test Plan: Added unit test make all check Reviewed By: ltamasi Differential Revision: D48799109 Pulled By: jowlyzhang fbshipit-source-id: fd681ed66d9d40904ca2c919b2618eb692686035 --- db/db_impl/db_impl.cc | 3 ++- db/db_impl/db_impl_open.cc | 16 ++++++++++++++++ db/db_wal_test.cc | 11 +++++++++++ db/flush_job.cc | 9 +++------ include/rocksdb/advanced_options.h | 4 +++- util/udt_util.cc | 8 ++++++++ util/udt_util.h | 8 ++++++++ util/udt_util_test.cc | 14 ++++++++++++++ 8 files changed, 65 insertions(+), 8 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 93681c70b..8bda80bf4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1693,7 +1693,8 @@ Status DBImpl::GetFullHistoryTsLow(ColumnFamilyHandle* column_family, } InstrumentedMutexLock l(&mutex_); *ts_low = cfd->GetFullHistoryTsLow(); - assert(cfd->user_comparator()->timestamp_size() == ts_low->size()); + assert(ts_low->empty() || + cfd->user_comparator()->timestamp_size() == ts_low->size()); return Status::OK(); } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index e6d97b125..8db53dac9 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1722,6 +1722,22 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, for (const auto& blob : blob_file_additions) { edit->AddBlobFile(blob); } + + // For UDT in memtable only feature, move up the cutoff timestamp whenever + // a flush happens. + const Comparator* ucmp = cfd->user_comparator(); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps) { + Slice mem_newest_udt = mem->GetNewestUDT(); + std::string full_history_ts_low = cfd->GetFullHistoryTsLow(); + if (full_history_ts_low.empty() || + ucmp->CompareTimestamp(mem_newest_udt, full_history_ts_low) >= 0) { + std::string new_full_history_ts_low; + GetFullHistoryTsLowFromU64CutoffTs(&mem_newest_udt, + &new_full_history_ts_low); + edit->SetFullHistoryTsLow(new_full_history_ts_low); + } + } } InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 72b6f7c7b..01dc84a0f 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -14,6 +14,7 @@ #include "port/stack_trace.h" #include "rocksdb/file_system.h" #include "test_util/sync_point.h" +#include "util/udt_util.h" #include "utilities/fault_injection_env.h" #include "utilities/fault_injection_fs.h" @@ -384,6 +385,7 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { ts_options.persist_user_defined_timestamps = persist_udt; bool avoid_flush_during_recovery = true; + std::string full_history_ts_low; ReadOptions read_opts; do { Slice ts_slice = ts1; @@ -439,6 +441,8 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { CheckGet(read_opts, 1, "foo", "v4", ts3); CheckGet(read_opts, 1, "bar", "v2", ts2); CheckGet(read_opts, 1, "baz", "v5", ts1); + ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low)); + ASSERT_TRUE(full_history_ts_low.empty()); } while (ChangeWalOptions()); } @@ -470,6 +474,8 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { std::vector> level_to_files; dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files); + std::string full_history_ts_low; + ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low)); ASSERT_GT(level_to_files.size(), 1); // L0 only has one SST file. ASSERT_EQ(level_to_files[0].size(), 1); @@ -477,9 +483,14 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { if (persist_udt) { ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key()); + ASSERT_TRUE(full_history_ts_low.empty()); } else { ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key()); + std::string effective_cutoff; + Slice write_ts_slice = write_ts; + GetFullHistoryTsLowFromU64CutoffTs(&write_ts_slice, &effective_cutoff); + ASSERT_EQ(effective_cutoff, full_history_ts_low); } } diff --git a/db/flush_job.cc b/db/flush_job.cc index 451e61937..0e6c66cac 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -1147,16 +1147,13 @@ Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() { ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) { return Status::OK(); } - Slice cutoff_udt_slice = cutoff_udt_; - uint64_t cutoff_udt_ts = 0; - bool format_res = GetFixed64(&cutoff_udt_slice, &cutoff_udt_ts); - assert(format_res); - (void)format_res; std::string new_full_history_ts_low; + Slice cutoff_udt_slice = cutoff_udt_; // TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an // operation to get the next immediately larger user-defined timestamp to // expand this feature to other user-defined timestamp formats. - PutFixed64(&new_full_history_ts_low, cutoff_udt_ts + 1); + GetFullHistoryTsLowFromU64CutoffTs(&cutoff_udt_slice, + &new_full_history_ts_low); VersionEdit edit; edit.SetColumnFamily(cfd_->GetID()); edit.SetFullHistoryTsLow(new_full_history_ts_low); diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 18eb02081..08e4e08b7 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -1190,7 +1190,9 @@ struct AdvancedColumnFamilyOptions { // persisted to WAL even if this flag is set to `false`. The benefit of this // is that user-defined timestamps can be recovered with the caveat that users // should flush all memtables so there is no active WAL files before doing a - // downgrade. + // downgrade. In order to use WAL to recover user-defined timestamps, users of + // this feature would want to set both `avoid_flush_during_shutdown` and + // `avoid_flush_during_recovery` to be true. // // Note that setting this flag to false is not supported in combination with // atomic flush, or concurrent memtable write enabled by diff --git a/util/udt_util.cc b/util/udt_util.cc index 9380f4560..7d549acab 100644 --- a/util/udt_util.cc +++ b/util/udt_util.cc @@ -8,6 +8,7 @@ #include "db/dbformat.h" #include "rocksdb/types.h" +#include "util/coding.h" #include "util/write_batch_util.h" namespace ROCKSDB_NAMESPACE { @@ -340,4 +341,11 @@ Status ValidateUserDefinedTimestampsOptions( return Status::InvalidArgument( "Unsupported user defined timestamps settings change."); } + +void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts, + std::string* full_history_ts_low) { + uint64_t cutoff_udt_ts = 0; + [[maybe_unused]] bool format_res = GetFixed64(cutoff_ts, &cutoff_udt_ts); + PutFixed64(full_history_ts_low, cutoff_udt_ts + 1); +} } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util.h b/util/udt_util.h index 4bc837739..706b02e90 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -246,4 +246,12 @@ Status ValidateUserDefinedTimestampsOptions( const Comparator* new_comparator, const std::string& old_comparator_name, bool new_persist_udt, bool old_persist_udt, bool* mark_sst_files_has_no_udt); + +// Given a cutoff user-defined timestamp formatted as uint64_t, get the +// effective `full_history_ts_low` timestamp, which is the next immediately +// bigger timestamp. Used by the UDT in memtable only feature when flushing +// memtables and remove timestamps. This process collapses history and increase +// the effective `full_history_ts_low`. +void GetFullHistoryTsLowFromU64CutoffTs(Slice* cutoff_ts, + std::string* full_history_ts_low); } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc index 47e1edf34..44ee567f7 100644 --- a/util/udt_util_test.cc +++ b/util/udt_util_test.cc @@ -438,6 +438,20 @@ TEST(ValidateUserDefinedTimestampsOptionsTest, InvalidUserComparatorChange) { &mark_sst_files) .IsInvalidArgument()); } + +TEST(GetFullHistoryTsLowFromU64CutoffTsTest, Success) { + std::string cutoff_ts; + uint64_t cutoff_ts_int = 3; + PutFixed64(&cutoff_ts, 3); + Slice cutoff_ts_slice = cutoff_ts; + std::string actual_full_history_ts_low; + GetFullHistoryTsLowFromU64CutoffTs(&cutoff_ts_slice, + &actual_full_history_ts_low); + + std::string expected_ts_low; + PutFixed64(&expected_ts_low, cutoff_ts_int + 1); + ASSERT_EQ(expected_ts_low, actual_full_history_ts_low); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { -- GitLab