diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index ec27d3e45b98e1699efd1b508d08d809dcebcafe..792f34730037cb5e3c8da9940685bb8db0c6001b 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -197,37 +197,53 @@ TEST_P(TransactionTest, AssumeExclusiveTracked) { // This test clarifies the contract of ValidateSnapshot TEST_P(TransactionTest, ValidateSnapshotTest) { - for (bool with_2pc : {true, false}) { - ASSERT_OK(ReOpen()); - WriteOptions write_options; - ReadOptions read_options; - std::string value; + for (bool with_flush : {true}) { + for (bool with_2pc : {true}) { + ASSERT_OK(ReOpen()); + WriteOptions write_options; + ReadOptions read_options; + std::string value; - assert(db != nullptr); - Transaction* txn1 = - db->BeginTransaction(write_options, TransactionOptions()); - ASSERT_TRUE(txn1); - ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1"))); - if (with_2pc) { - ASSERT_OK(txn1->SetName("xid1")); - ASSERT_OK(txn1->Prepare()); - } + assert(db != nullptr); + Transaction* txn1 = + db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn1); + ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1"))); + if (with_2pc) { + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); + } - Transaction* txn2 = - db->BeginTransaction(write_options, TransactionOptions()); - ASSERT_TRUE(txn2); - txn2->SetSnapshot(); + if (with_flush) { + auto db_impl = reinterpret_cast(db->GetRootDB()); + db_impl->TEST_FlushMemTable(true); + // Make sure the flushed memtable is not kept in memory + int max_memtable_in_history = + std::max(options.max_write_buffer_number, + options.max_write_buffer_number_to_maintain) + + 1; + for (int i = 0; i < max_memtable_in_history; i++) { + db->Put(write_options, Slice("key"), Slice("value")); + db_impl->TEST_FlushMemTable(true); + } + } - ASSERT_OK(txn1->Commit()); - delete txn1; + Transaction* txn2 = + db->BeginTransaction(write_options, TransactionOptions()); + ASSERT_TRUE(txn2); + txn2->SetSnapshot(); - auto pes_txn2 = dynamic_cast(txn2); - // Test the simple case where the key is not tracked yet - auto trakced_seq = kMaxSequenceNumber; - auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", - &trakced_seq); - ASSERT_TRUE(s.IsBusy()); - delete txn2; + ASSERT_OK(txn1->Commit()); + delete txn1; + + auto pes_txn2 = dynamic_cast(txn2); + // Test the simple case where the key is not tracked yet + auto trakced_seq = kMaxSequenceNumber; + auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo", + &trakced_seq); + ASSERT_TRUE(s.IsBusy()); + delete txn2; + } } } diff --git a/utilities/transactions/transaction_util.cc b/utilities/transactions/transaction_util.cc index 1d511880bcb36c161e9a9491199322fc9e8735c5..ec6f7e60ae2f35559dd7b79ca9c165ebbb78ceb7 100644 --- a/utilities/transactions/transaction_util.cc +++ b/utilities/transactions/transaction_util.cc @@ -24,7 +24,8 @@ namespace rocksdb { Status TransactionUtil::CheckKeyForConflicts( DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key, - SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker) { + SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker, + SequenceNumber min_uncommitted) { Status result; auto cfh = reinterpret_cast(column_family); @@ -41,7 +42,7 @@ Status TransactionUtil::CheckKeyForConflicts( db_impl->GetEarliestMemTableSequenceNumber(sv, true); result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only, - snap_checker); + snap_checker, min_uncommitted); db_impl->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -53,7 +54,8 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber snap_seq, const std::string& key, bool cache_only, - ReadCallback* snap_checker) { + ReadCallback* snap_checker, + SequenceNumber min_uncommitted) { Status result; bool need_to_read_sst = false; @@ -75,7 +77,9 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv, "countain a long enough history to check write at SequenceNumber: ", ToString(snap_seq)); } - } else if (snap_seq < earliest_seq) { + } else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) { + // Use <= for min_uncommitted since earliest_seq is actually the largest sec + // before this memtable was created need_to_read_sst = true; if (cache_only) { diff --git a/utilities/transactions/transaction_util.h b/utilities/transactions/transaction_util.h index 7377874e6d6a44a8ec4e93570d3355cd9ca18281..0fe0e87d862b0e848e3bdb13479ecb81869d202d 100644 --- a/utilities/transactions/transaction_util.h +++ b/utilities/transactions/transaction_util.h @@ -10,6 +10,7 @@ #include #include +#include "db/dbformat.h" #include "db/read_callback.h" #include "rocksdb/db.h" @@ -51,11 +52,11 @@ class TransactionUtil { // // Returns OK on success, BUSY if there is a conflicting write, or other error // status for any unexpected errors. - static Status CheckKeyForConflicts(DBImpl* db_impl, - ColumnFamilyHandle* column_family, - const std::string& key, - SequenceNumber snap_seq, bool cache_only, - ReadCallback* snap_checker = nullptr); + static Status CheckKeyForConflicts( + DBImpl* db_impl, ColumnFamilyHandle* column_family, + const std::string& key, SequenceNumber snap_seq, bool cache_only, + ReadCallback* snap_checker = nullptr, + SequenceNumber min_uncommitted = kMaxSequenceNumber); // For each key,SequenceNumber pair in the TransactionKeyMap, this function // will verify there have been no writes to the key in the db since that @@ -74,7 +75,8 @@ class TransactionUtil { static Status CheckKey(DBImpl* db_impl, SuperVersion* sv, SequenceNumber earliest_seq, SequenceNumber snap_seq, const std::string& key, bool cache_only, - ReadCallback* snap_checker = nullptr); + ReadCallback* snap_checker = nullptr, + SequenceNumber min_uncommitted = kMaxSequenceNumber); }; } // namespace rocksdb diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 8e05f7aa579a6721689c2556fa0fbd56f9ce305a..a13086a4b1ae4cb4a925fe58b05c6db1ae888883 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2772,7 +2772,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) { // When an old prepared entry gets committed, there is a gap between the time // that it is published and when it is cleaned up from old_prepared_. This test -// stresses such cacese. +// stresses such cases. TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) { const size_t snapshot_cache_bits = 7; // same as default for (const size_t commit_cache_bits : {0, 2, 3}) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index a212d137081cc3e94cde5f3c74178497b42224a4..b2ba7be7ee6e0b649c1cc9d7532f286a55fba0a4 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -405,7 +405,7 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family, WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted); return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(), snap_seq, false /* cache_only */, - &snap_checker); + &snap_checker, min_uncommitted); } void WritePreparedTxn::SetSnapshot() {