diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0a480a4a2ebc11971dd4598420e307be37c116b8..27d48539c35ad961e2a39f1dd08892b89c9e25d5 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -3412,7 +3412,9 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv, #ifndef ROCKSDB_LITE Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, - bool cache_only, SequenceNumber* seq, + bool cache_only, + SequenceNumber lower_bound_seq, + SequenceNumber* seq, bool* found_record_for_key, bool* is_blob_index) { Status s; @@ -3445,6 +3447,13 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return Status::OK(); } + SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber(); + if (lower_bound_in_mem != kMaxSequenceNumber && + lower_bound_in_mem < lower_bound_seq) { + *found_record_for_key = false; + return Status::OK(); + } + // Check if there is a record for this key in the immutable memtables sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq, seq, read_options, nullptr /*read_callback*/, is_blob_index); @@ -3464,6 +3473,13 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return Status::OK(); } + SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber(); + if (lower_bound_in_imm != kMaxSequenceNumber && + lower_bound_in_imm < lower_bound_seq) { + *found_record_for_key = false; + return Status::OK(); + } + // Check if there is a record for this key in the immutable memtables sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq, seq, read_options, @@ -3485,6 +3501,10 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, return Status::OK(); } + // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true) + // check here to skip the history if possible. But currently the caller + // already does that. Maybe we should move the logic here later. + // TODO(agiardullo): possible optimization: consider checking cached // SST files if cache_only=true? if (!cache_only) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 4c80b6a4d0c8fc2ca4ed91daff6e4fef2243ca50..4de15f0324d8fe5d7cf777c4923aa3342432c147 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -413,11 +413,17 @@ class DBImpl : public DB { // snapshot, we know that no key could have existing after this snapshot // (since we do not compact keys that have an earlier snapshot). // + // Only records newer than or at `lower_bound_seq` are guaranteed to be + // returned. Memtables and files may not be checked if it only contains data + // older than `lower_bound_seq`. + // // Returns OK or NotFound on success, // other status on unexpected error. // TODO(andrewkr): this API need to be aware of range deletion operations Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, - bool cache_only, SequenceNumber* seq, + bool cache_only, + SequenceNumber lower_bound_seq, + SequenceNumber* seq, bool* found_record_for_key, bool* is_blob_index = nullptr); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 25583fa981abd083483ca48c04456fc6dc65c0ea..86eb1460c15c1871c0406f628ec283108c344488 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1426,8 +1426,8 @@ class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback { bool found_record_for_key = false; bool is_blob_index = false; Status s = db_impl->GetLatestSequenceForKey( - sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key, - &is_blob_index); + sv, key_, false /*cache_only*/, 0 /*lower_bound_seq*/, &latest_seq, + &found_record_for_key, &is_blob_index); db_impl->ReturnAndCleanupSuperVersion(cfd_, sv); if (!s.ok() && !s.IsNotFound()) { // Error. diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 5e1af2fb1f5a748c3cfc1bd7e926a2d363e787cb..3aa6c207a481ce7d4b61ee4917c39f9a8f80c9c9 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -9,11 +9,15 @@ #include #include + +#include "db/db_impl/db_impl.h" #include "logging/logging.h" #include "port/port.h" #include "rocksdb/db.h" +#include "rocksdb/perf_context.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/transaction.h" +#include "test_util/sync_point.h" #include "test_util/testharness.h" #include "test_util/transaction_test_util.h" #include "util/crc32c.h" @@ -308,6 +312,120 @@ TEST_F(OptimisticTransactionTest, FlushTest2) { delete txn; } +// Trigger the condition where some old memtables are skipped when doing +// TransactionUtil::CheckKey(), and make sure the result is still correct. +TEST_F(OptimisticTransactionTest, CheckKeySkipOldMemtable) { + const int kAttemptHistoryMemtable = 0; + const int kAttemptImmMemTable = 1; + for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable; + attempt++) { + options.max_write_buffer_number_to_maintain = 3; + Reopen(); + + WriteOptions write_options; + ReadOptions read_options; + ReadOptions snapshot_read_options; + ReadOptions snapshot_read_options2; + string value; + Status s; + + ASSERT_OK(txn_db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(txn_db->Put(write_options, Slice("foo2"), Slice("bar"))); + + Transaction* txn = txn_db->BeginTransaction(write_options); + ASSERT_TRUE(txn != nullptr); + + Transaction* txn2 = txn_db->BeginTransaction(write_options); + ASSERT_TRUE(txn2 != nullptr); + + snapshot_read_options.snapshot = txn->GetSnapshot(); + ASSERT_OK(txn->GetForUpdate(snapshot_read_options, "foo", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(txn->Put(Slice("foo"), Slice("bar2"))); + + snapshot_read_options2.snapshot = txn2->GetSnapshot(); + ASSERT_OK(txn2->GetForUpdate(snapshot_read_options2, "foo2", &value)); + ASSERT_EQ(value, "bar"); + ASSERT_OK(txn2->Put(Slice("foo2"), Slice("bar2"))); + + // txn updates "foo" and txn2 updates "foo2", and now a write is + // issued for "foo", which conflicts with txn but not txn2 + ASSERT_OK(txn_db->Put(write_options, "foo", "bar")); + + if (attempt == kAttemptImmMemTable) { + // For the second attempt, hold flush from beginning. The memtable + // will be switched to immutable after calling TEST_SwitchMemtable() + // while CheckKey() is called. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"OptimisticTransactionTest.CheckKeySkipOldMemtable", + "FlushJob::Start"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + } + + // force a memtable flush. The memtable should still be kept + FlushOptions flush_ops; + if (attempt == kAttemptHistoryMemtable) { + ASSERT_OK(txn_db->Flush(flush_ops)); + } else { + assert(attempt == kAttemptImmMemTable); + DBImpl* db_impl = static_cast(txn_db->GetRootDB()); + db_impl->TEST_SwitchMemtable(); + } + uint64_t num_imm_mems; + ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable, + &num_imm_mems)); + if (attempt == kAttemptHistoryMemtable) { + ASSERT_EQ(0, num_imm_mems); + } else { + assert(attempt == kAttemptImmMemTable); + ASSERT_EQ(1, num_imm_mems); + } + + // Put something in active memtable + ASSERT_OK(txn_db->Put(write_options, Slice("foo3"), Slice("bar"))); + + // Create txn3 after flushing, when this transaction is commited, + // only need to check the active memtable + Transaction* txn3 = txn_db->BeginTransaction(write_options); + ASSERT_TRUE(txn3 != nullptr); + + // Commit both of txn and txn2. txn will conflict but txn2 will + // pass. In both ways, both memtables are queried. + SetPerfLevel(PerfLevel::kEnableCount); + + get_perf_context()->Reset(); + s = txn->Commit(); + // We should have checked two memtables + ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); + // txn should fail because of conflict, even if the memtable + // has flushed, because it is still preserved in history. + ASSERT_TRUE(s.IsBusy()); + + get_perf_context()->Reset(); + s = txn2->Commit(); + // We should have checked two memtables + ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); + ASSERT_TRUE(s.ok()); + + txn3->Put(Slice("foo2"), Slice("bar2")); + get_perf_context()->Reset(); + s = txn3->Commit(); + // txn3 is created after the active memtable is created, so that is the only + // memtable to check. + ASSERT_EQ(1, get_perf_context()->get_from_memtable_count); + ASSERT_TRUE(s.ok()); + + TEST_SYNC_POINT("OptimisticTransactionTest.CheckKeySkipOldMemtable"); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + SetPerfLevel(PerfLevel::kDisable); + + delete txn; + delete txn2; + delete txn3; + } +} + TEST_F(OptimisticTransactionTest, NoSnapshotTest) { WriteOptions write_options; ReadOptions read_options; diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 407feaaa88a754db57eda68f87bb064f4ca31089..ba3b75e15bf7d8d1beb62dce9373b42c9838edd9 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -52,6 +52,12 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, const std::string& key, bool cache_only, ReadCallback* snap_checker, SequenceNumber min_uncommitted) { + // When `min_uncommitted` is provided, keys are not always committed + // in sequence number order, and `snap_checker` is used to check whether + // specific sequence number is in the database is visible to the transaction. + // So `snap_checker` must be provided. + assert(min_uncommitted == kMaxSequenceNumber || snap_checker != nullptr); + Status result; bool need_to_read_sst = false; @@ -100,8 +106,19 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber seq = kMaxSequenceNumber; bool found_record_for_key = false; + // When min_uncommitted == kMaxSequenceNumber, writes are committed in + // sequence number order, so only keys larger than `snap_seq` can cause + // conflict. + // When min_uncommitted != kMaxSequenceNumber, keys lower than + // min_uncommitted will not triggered conflicts, while keys larger than + // min_uncommitted might create conflicts, so we need to read them out + // from the DB, and call callback to snap_checker to determine. So only + // keys lower than min_uncommitted can be skipped. + SequenceNumber lower_bound_seq = + (min_uncommitted == kMaxSequenceNumber) ? snap_seq : min_uncommitted; Status s = db_impl->GetLatestSequenceForKey(sv, key, !need_to_read_sst, - &seq, &found_record_for_key); + lower_bound_seq, &seq, + &found_record_for_key); if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) { result = s; diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index 0fe0e87d862b0e848e3bdb13479ecb81869d202d..1d910134b6694e91e3be0f4af2efc919fab29958 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -50,6 +50,9 @@ class TransactionUtil { // SST files. This will make it more likely this function will // return an error if it is unable to determine if there are any conflicts. // + // See comment of CheckKey() for explanation of `snap_seq`, `snap_checker` + // and `min_uncommitted`. + // // Returns OK on success, BUSY if there is a conflicting write, or other error // status for any unexpected errors. static Status CheckKeyForConflicts( @@ -72,6 +75,14 @@ class TransactionUtil { bool cache_only); private: + // If `snap_checker` == nullptr, writes are always commited in sequence number + // order. All sequence number <= `snap_seq` will not conflict with any + // write, and all keys > `snap_seq` of `key` will trigger conflict. + // If `snap_checker` != nullptr, writes may not commit in sequence number + // order. In this case `min_uncommitted` is a lower bound. + // seq < `min_uncommitted`: no conflict + // seq > `snap_seq`: applicable to conflict + // `min_uncommitted` <= seq <= `snap_seq`: call `snap_checker` to determine. static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber snap_seq, const std::string& key, bool cache_only, diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index e62b8344169acb9e57f370279837f605fe8424dc..88f4ea032a9a183451921fd138fca759c2211aa9 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -761,6 +761,147 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false); } +// Trigger the condition where some old memtables are skipped when doing +// TransactionUtil::CheckKey(), and make sure the result is still correct. +TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) { + const int kAttemptHistoryMemtable = 0; + const int kAttemptImmMemTable = 1; + for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable; + attempt++) { + options.max_write_buffer_number_to_maintain = 3; + ReOpen(); + + WriteOptions write_options; + ReadOptions read_options; + TransactionOptions txn_options; + txn_options.set_snapshot = true; + string value; + Status s; + + ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar"))); + ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar"))); + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn != nullptr); + ASSERT_OK(txn->SetName("txn")); + + Transaction* txn2 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn2 != nullptr); + ASSERT_OK(txn2->SetName("txn2")); + + // This transaction is created to cause potential conflict. + Transaction* txn_x = db->BeginTransaction(write_options); + ASSERT_OK(txn_x->SetName("txn_x")); + ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3"))); + ASSERT_OK(txn_x->Prepare()); + + // Create snapshots after the prepare, but there should still + // be a conflict when trying to read "foo". + + if (attempt == kAttemptImmMemTable) { + // For the second attempt, hold flush from beginning. The memtable + // will be switched to immutable after calling TEST_SwitchMemtable() + // while CheckKey() is called. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"WritePreparedTransactionTest.CheckKeySkipOldMemtable", + "FlushJob::Start"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + } + + // force a memtable flush. The memtable should still be kept + FlushOptions flush_ops; + if (attempt == kAttemptHistoryMemtable) { + ASSERT_OK(db->Flush(flush_ops)); + } else { + assert(attempt == kAttemptImmMemTable); + DBImpl* db_impl = static_cast(db->GetRootDB()); + db_impl->TEST_SwitchMemtable(); + } + uint64_t num_imm_mems; + ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable, + &num_imm_mems)); + if (attempt == kAttemptHistoryMemtable) { + ASSERT_EQ(0, num_imm_mems); + } else { + assert(attempt == kAttemptImmMemTable); + ASSERT_EQ(1, num_imm_mems); + } + + // Put something in active memtable + ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar"))); + + // Create txn3 after flushing, but this transaction also needs to + // check all memtables because of they contains uncommitted data. + Transaction* txn3 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn3 != nullptr); + ASSERT_OK(txn3->SetName("txn3")); + + // Commit the pending write + ASSERT_OK(txn_x->Commit()); + + // Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will + // pass. In all cases, both memtables are queried. + SetPerfLevel(PerfLevel::kEnableCount); + get_perf_context()->Reset(); + ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy()); + // We should have checked two memtables, active and either immutable + // or history memtable, depending on the test case. + ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); + + get_perf_context()->Reset(); + ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy()); + // We should have checked two memtables, active and either immutable + // or history memtable, depending on the test case. + ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); + + get_perf_context()->Reset(); + ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value)); + ASSERT_EQ(value, "bar"); + // We should have checked two memtables, and since there is no + // conflict, another Get() will be made and fetch the data from + // DB. If it is in immutable memtable, two extra memtable reads + // will be issued. If it is not (in history), only one will + // be made, which is to the active memtable. + if (attempt == kAttemptHistoryMemtable) { + ASSERT_EQ(3, get_perf_context()->get_from_memtable_count); + } else { + assert(attempt == kAttemptImmMemTable); + ASSERT_EQ(4, get_perf_context()->get_from_memtable_count); + } + + Transaction* txn4 = db->BeginTransaction(write_options, txn_options); + ASSERT_TRUE(txn4 != nullptr); + ASSERT_OK(txn4->SetName("txn4")); + get_perf_context()->Reset(); + ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value)); + if (attempt == kAttemptHistoryMemtable) { + // Active memtable will be checked in snapshot validation and when + // getting the value. + ASSERT_EQ(2, get_perf_context()->get_from_memtable_count); + } else { + // Only active memtable will be checked in snapshot validation but + // both of active and immutable snapshot will be queried when + // getting the value. + assert(attempt == kAttemptImmMemTable); + ASSERT_EQ(3, get_perf_context()->get_from_memtable_count); + } + + ASSERT_OK(txn2->Commit()); + ASSERT_OK(txn4->Commit()); + + TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable"); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + SetPerfLevel(PerfLevel::kDisable); + + delete txn; + delete txn2; + delete txn3; + delete txn4; + delete txn_x; + } +} + // Reproduce the bug with two snapshots with the same seuqence number and test // that the release of the first snapshot will not affect the reads by the other // snapshot