diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 6af9ac08588e66bb9940ba95fe3dced4c7fddb93..8e05f7aa579a6721689c2556fa0fbd56f9ce305a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -2696,6 +2696,80 @@ TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) { delete iter; } +// Committing an delayed prepared has two non-atomic steps: update commit cache, +// remove seq from delayed_prepared_. The read in IsInSnapshot also involves two +// non-atomic steps of checking these two data structures. This test breaks each +// in the middle to ensure correctness in spite of non-atomic execution. +// Note: This test is limitted to the case where snapshot is larger than the +// max_evicted_seq_. +TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) { + const size_t snapshot_cache_bits = 7; // same as default + const size_t commit_cache_bits = 3; // 8 entries + for (auto split_read : {true, false}) { + DestroyAndReopenWithExtraOptions(snapshot_cache_bits, commit_cache_bits); + // Fill up the commit cache + std::string init_value("value1"); + for (int i = 0; i < 10; i++) { + db->Put(WriteOptions(), Slice("key1"), Slice(init_value)); + } + // Prepare a transaction but do not commit it + Transaction* txn = + db->BeginTransaction(WriteOptions(), TransactionOptions()); + ASSERT_OK(txn->SetName("xid")); + ASSERT_OK(txn->Put(Slice("key1"), Slice("value2"))); + ASSERT_OK(txn->Prepare()); + // Commit a bunch of entires to advance max evicted seq and make the + // prepared a delayed prepared + for (int i = 0; i < 10; i++) { + db->Put(WriteOptions(), Slice("key3"), Slice("value3")); + } + // The snapshot should not see the delayed prepared entry + auto snap = db->GetSnapshot(); + + if (split_read) { + // split right after reading from the commit cache + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause", + "AtomicCommitOfOldPrepared:Commit:before"}, + {"AtomicCommitOfOldPrepared:Commit:after", + "WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}}); + } else { // split commit + // split right before removing from delayed_preparped_ + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"WritePreparedTxnDB::RemovePrepared:pause", + "AtomicCommitOfOldPrepared:Read:before"}, + {"AtomicCommitOfOldPrepared:Read:after", + "WritePreparedTxnDB::RemovePrepared:resume"}}); + } + SyncPoint::GetInstance()->EnableProcessing(); + + rocksdb::port::Thread commit_thread([&]() { + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:before"); + ASSERT_OK(txn->Commit()); + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Commit:after"); + delete txn; + }); + + rocksdb::port::Thread read_thread([&]() { + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:before"); + ReadOptions roptions; + roptions.snapshot = snap; + PinnableSlice value; + auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value); + ASSERT_OK(s); + // It should not see the commit of delayed prpared + ASSERT_TRUE(value == init_value); + TEST_SYNC_POINT("AtomicCommitOfOldPrepared:Read:after"); + db->ReleaseSnapshot(snap); + }); + + read_thread.join(); + commit_thread.join(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + } +} + // When an old prepared entry gets committed, there is a gap between the time // that it is published and when it is cleaned up from old_prepared_. This test // stresses such cacese. diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index a86e3b00421a99b591573283b67615b6697c33cc..ab1c3490e52d372e21a6f769f432f15720edb42d 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -489,6 +489,8 @@ void WritePreparedTxnDB::RemovePrepared(const uint64_t prepare_seq, TEST_SYNC_POINT_CALLBACK( "RemovePrepared:Start", const_cast(reinterpret_cast(&prepare_seq))); + TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:pause"); + TEST_SYNC_POINT("WritePreparedTxnDB::RemovePrepared:resume"); ROCKS_LOG_DETAILS(info_log_, "RemovePrepared %" PRIu64 " cnt: %" ROCKSDB_PRIszt, prepare_seq, batch_cnt); @@ -544,9 +546,8 @@ bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max, const SequenceNumber& new_max) { ROCKS_LOG_DETAILS(info_log_, - "AdvanceMaxEvictedSeq overhead %" PRIu64 - " => %" PRIu64 prev_max, - new_max); + "AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64, + prev_max, new_max); // Declare the intention before getting snapshot from the DB. This helps a // concurrent GetSnapshot to wait to catch up with future_max_evicted_seq_ if // it has not already. Otherwise the new snapshot is when we ask DB for diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 60fa28f627f0dbec85e99efd74d04ec49750ef73..b2ca89ae8247ed47e28eb1abb37893ac79bc3e99 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -153,10 +153,21 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 1, min_uncommitted); return true; } + // Commit of delayed prepared has two non-atomic steps: add to commit cache, + // remove from delayed prepared. Our reads from these two is also + // non-atomic. By looking into commit cache first thus we might not find the + // prep_seq neither in commit cache not in delayed_prepared_. To fix that i) + // we check if there was any delayed prepared BEFORE looking into commit + // cache, ii) if there was, we complete the search steps to be these: i) + // commit cache, ii) delayed prepared, commit cache again. In this way if + // the first query to commit cache missed the commit, the 2nd will catch it. + bool was_empty = delayed_prepared_empty_.load(std::memory_order_acquire); auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; CommitEntry64b dont_care; CommitEntry cached; bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause"); + TEST_SYNC_POINT("WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"); if (exist && prep_seq == cached.prep_seq) { // It is committed and also not evicted from commit cache ROCKS_LOG_DETAILS( @@ -178,12 +189,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { prep_seq, snapshot_seq, 0); return false; } - if (!delayed_prepared_empty_.load(std::memory_order_acquire)) { + if (!was_empty) { // We should not normally reach here WPRecordTick(TXN_PREPARE_MUTEX_OVERHEAD); ReadLock rl(&prepared_mutex_); - ROCKS_LOG_WARN(info_log_, "prepared_mutex_ overhead %" PRIu64, - static_cast(delayed_prepared_.size())); + ROCKS_LOG_WARN(info_log_, + "prepared_mutex_ overhead %" PRIu64 " for %" PRIu64, + static_cast(delayed_prepared_.size()), prep_seq); if (delayed_prepared_.find(prep_seq) != delayed_prepared_.end()) { // This is the order: 1) delayed_prepared_commits_ update, 2) publish 3) // delayed_prepared_ clean up. So check if it is the case of a late @@ -204,6 +216,16 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { snapshot_seq <= it->second); return it->second <= snapshot_seq; } + } else { + // 2nd query to commit cache. Refer to was_empty comment above. + exist = GetCommitEntry(indexed_seq, &dont_care, &cached); + if (exist && prep_seq == cached.prep_seq) { + ROCKS_LOG_DETAILS( + info_log_, + "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32, + prep_seq, snapshot_seq, cached.commit_seq <= snapshot_seq); + return cached.commit_seq <= snapshot_seq; + } } } // When advancing max_evicted_seq_, we move older entires from prepared to