提交 04a2631d 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

WritePrepared: handle adding prepare before max_evicted_seq_ (#5025)

Summary:
The patch fixes an improbable race condition between AddPrepared from one write queue and AdvanceMaxEvictedSeq from another queue. In this scenario AddPrepared finds prepare_seq lower than max and adding to PrepareHeap as usual while AdvanceMaxEvictedSeq has finished checking PrepareHeap against the future max. Thus when AdvanceMaxEvictedSeq finishes off by updating the max_evicted_seq_, PrepareHeap ends up with a prepared_seq lower than it which breaks the PrepareHeap contract. The fix is that in AddPrepared we check against the future_max_evicted_seq_ instead, which is update before AdvanceMaxEvictedSeq acquire prepare_mutex_ and looks into PrepareHeap.
A unit test added to test for the failure scenario. The code is also refactored a bit to remove the duplicate code between AdvanceMaxEvictedSeq and AddPrepared.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5025

Differential Revision: D14249028

Pulled By: maysamyabandeh

fbshipit-source-id: 072ea56663f40359662c05fafa6ac524417b0622
上级 703f1375
......@@ -2929,6 +2929,81 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
// Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
// that. The test focuses on a race condition between AddPrepared and
// AdvanceMaxEvictedSeq functions.
TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
if (!options.two_write_queues) {
// This test is only for two write queues
return;
}
const size_t snapshot_cache_bits = 7; // same as default
// 1 entry to advance max after the 2nd commit
const size_t commit_cache_bits = 0;
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
std::string some_value("value_some");
std::string uncommitted_value("value_uncommitted");
// Prepare two uncommitted transactions
Transaction* txn1 =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Put(Slice("key1"), some_value));
ASSERT_OK(txn1->Prepare());
Transaction* txn2 =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn2->SetName("xid2"));
ASSERT_OK(txn2->Put(Slice("key2"), some_value));
ASSERT_OK(txn2->Prepare());
// Start the txn here so the other thread could get its id
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
// t1) Insert prepared entry, t2) commit other entires to advance max
// evicted sec and finish checking the existing prepared entires, t1)
// AddPrepared, t2) update max_evicted_seq_
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"AddPrepared::begin:pause", "AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause", "AddPrepared::begin:resume"},
{"AddPrepared::end", "AdvanceMaxEvictedSeq::update_max:resume"},
});
SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread write_thread([&]() { ASSERT_OK(txn->Prepare()); });
rocksdb::port::Thread read_thread([&]() {
TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
// Publish seq number with a commit
ASSERT_OK(txn1->Commit());
// Since the commit cache size is one the 2nd commit evict the 1st one and
// invokes AdcanceMaxEvictedSeq
ASSERT_OK(txn2->Commit());
ReadOptions roptions;
PinnableSlice value;
// The snapshot should not see the uncommitted value from write_thread
auto snap = db->GetSnapshot();
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
// This is the scenario that we test for
ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
roptions.snapshot = snap;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
ASSERT_TRUE(s.IsNotFound());
db->ReleaseSnapshot(snap);
});
read_thread.join();
write_thread.join();
delete txn1;
delete txn2;
ASSERT_OK(txn->Commit());
delete txn;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
// When an old prepared entry gets committed, there is a gap between the time
// that it is published and when it is cleaned up from old_prepared_. This test
// stresses such cases.
......
......@@ -392,22 +392,49 @@ void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
void WritePreparedTxnDB::CheckPreparedAgainstMax(SequenceNumber new_max) {
prepared_mutex_.AssertHeld();
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ and save an expensive, synchronized
// lookup from a shared set. delayed_prepared_ is expected to be empty in
// normal cases.
ROCKS_LOG_DETAILS(
info_log_,
"CheckPreparedAgainstMax prepared_txns_.empty() %d top: %" PRIu64,
prepared_txns_.empty(),
prepared_txns_.empty() ? 0 : prepared_txns_.top());
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
}
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing with max %" PRIu64,
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Preparing with max %" PRIu64,
seq, max_evicted_seq_.load());
TEST_SYNC_POINT("AddPrepared::begin:pause");
TEST_SYNC_POINT("AddPrepared::begin:resume");
WriteLock wl(&prepared_mutex_);
if (UNLIKELY(seq <= max_evicted_seq_)) {
prepared_txns_.push(seq);
auto new_max = future_max_evicted_seq_.load();
if (UNLIKELY(seq <= new_max)) {
// This should not happen in normal case
ROCKS_LOG_ERROR(
info_log_,
"Added prepare_seq is not larger than max_evicted_seq_: %" PRIu64
" <= %" PRIu64,
seq, max_evicted_seq_.load());
delayed_prepared_.insert(seq);
delayed_prepared_empty_.store(false, std::memory_order_release);
} else {
prepared_txns_.push(seq);
seq, new_max);
CheckPreparedAgainstMax(new_max);
}
TEST_SYNC_POINT("AddPrepared::end");
}
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
......@@ -557,29 +584,10 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
updated_future_max, new_max, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
};
// When max_evicted_seq_ advances, move older entries from prepared_txns_
// to delayed_prepared_. This guarantees that if a seq is lower than max,
// then it is not in prepared_txns_ ans save an expensive, synchronized
// lookup from a shared set. delayed_prepared_ is expected to be empty in
// normal cases.
{
WriteLock wl(&prepared_mutex_);
ROCKS_LOG_DETAILS(
info_log_,
"AdvanceMaxEvictedSeq prepared_txns_.empty() %d top: %" PRIu64,
prepared_txns_.empty(),
prepared_txns_.empty() ? 0 : prepared_txns_.top());
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
auto to_be_popped = prepared_txns_.top();
delayed_prepared_.insert(to_be_popped);
ROCKS_LOG_WARN(info_log_,
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
" new_max=%" PRIu64 " oldmax=%" PRIu64,
static_cast<uint64_t>(delayed_prepared_.size()),
to_be_popped, new_max, prev_max);
prepared_txns_.pop();
delayed_prepared_empty_.store(false, std::memory_order_release);
}
CheckPreparedAgainstMax(new_max);
}
// With each change to max_evicted_seq_ fetch the live snapshots behind it.
......@@ -609,6 +617,8 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
}
}
auto updated_prev_max = prev_max;
TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:pause");
TEST_SYNC_POINT("AdvanceMaxEvictedSeq::update_max:resume");
while (updated_prev_max < new_max &&
!max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
std::memory_order_acq_rel,
......
......@@ -324,6 +324,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Add the transaction with prepare sequence seq to the prepared list.
// Note: must be called serially with increasing seq on each call.
void AddPrepared(uint64_t seq);
// Check if any of the prepared txns are less than new max_evicted_seq_. Must
// be called with prepared_mutex_ write locked.
void CheckPreparedAgainstMax(SequenceNumber new_max);
// Remove the transaction with prepare sequence seq from the prepared list
void RemovePrepared(const uint64_t seq, const size_t batch_cnt = 1);
// Add the transaction with prepare sequence prepare_seq and commit sequence
......@@ -443,28 +446,29 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
const ColumnFamilyOptions& cf_options) override;
private:
friend class WritePreparedCommitEntryPreReleaseCallback;
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test;
friend class
WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedTransactionTestBase;
friend class PreparedHeap_BasicsTest_Test;
friend class PreparedHeap_EmptyAtTheEnd_Test;
friend class PreparedHeap_Concurrent_Test;
friend class PreparedHeap_EmptyAtTheEnd_Test;
friend class SnapshotConcurrentAccessTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedCommitEntryPreReleaseCallback;
friend class WritePreparedTransactionTestBase;
friend class WritePreparedTxn;
friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AddPreparedBeforeMax_Test;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class
WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
friend class WritePreparedTransactionTest_AdvanceSeqByOne_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test;
friend class WritePreparedTransactionTest_CleanupSnapshotEqualToMax_Test;
friend class
WritePreparedTransactionTest_ConflictDetectionAfterRecoveryTest_Test;
friend class WritePreparedTransactionTest_CommitMapTest_Test;
friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
friend class WritePreparedTransactionTest_IsInSnapshotTest_Test;
friend class WritePreparedTransactionTest_NewSnapshotLargerThanMax_Test;
friend class WritePreparedTransactionTest_MaxCatchupWithNewSnapshot_Test;
friend class
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册