提交 10d14693 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

WritePrepared: fix ValidateSnapshot with long-running txn (#4961)

Summary:
ValidateSnapshot checks if another txn has committed a value to about-to-be-locked key since a particular snapshot. It applies an optimization of looking into only the memtable if snapshot seq is larger than the earliest seq in the memtables. With a long-running txn in WritePrepared, the prepared value might be flushed out to the disk and yet it commits after the snapshot, which breaks this optimization. The patch fixes that by disabling this optimization when the min_uncomitted seq at the time the snapshot was taken is lower than earliest seq in the memtables.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4961

Differential Revision: D14009947

Pulled By: maysamyabandeh

fbshipit-source-id: 1d11679950326f7c4094b433e6b821b729f08850
上级 39fb88f1
......@@ -197,37 +197,53 @@ TEST_P(TransactionTest, AssumeExclusiveTracked) {
// This test clarifies the contract of ValidateSnapshot
TEST_P(TransactionTest, ValidateSnapshotTest) {
for (bool with_2pc : {true, false}) {
ASSERT_OK(ReOpen());
WriteOptions write_options;
ReadOptions read_options;
std::string value;
for (bool with_flush : {true}) {
for (bool with_2pc : {true}) {
ASSERT_OK(ReOpen());
WriteOptions write_options;
ReadOptions read_options;
std::string value;
assert(db != nullptr);
Transaction* txn1 =
db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn1);
ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
if (with_2pc) {
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Prepare());
}
assert(db != nullptr);
Transaction* txn1 =
db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn1);
ASSERT_OK(txn1->Put(Slice("foo"), Slice("bar1")));
if (with_2pc) {
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Prepare());
}
Transaction* txn2 =
db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn2);
txn2->SetSnapshot();
if (with_flush) {
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->TEST_FlushMemTable(true);
// Make sure the flushed memtable is not kept in memory
int max_memtable_in_history =
std::max(options.max_write_buffer_number,
options.max_write_buffer_number_to_maintain) +
1;
for (int i = 0; i < max_memtable_in_history; i++) {
db->Put(write_options, Slice("key"), Slice("value"));
db_impl->TEST_FlushMemTable(true);
}
}
ASSERT_OK(txn1->Commit());
delete txn1;
Transaction* txn2 =
db->BeginTransaction(write_options, TransactionOptions());
ASSERT_TRUE(txn2);
txn2->SetSnapshot();
auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
// Test the simple case where the key is not tracked yet
auto trakced_seq = kMaxSequenceNumber;
auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
&trakced_seq);
ASSERT_TRUE(s.IsBusy());
delete txn2;
ASSERT_OK(txn1->Commit());
delete txn1;
auto pes_txn2 = dynamic_cast<PessimisticTransaction*>(txn2);
// Test the simple case where the key is not tracked yet
auto trakced_seq = kMaxSequenceNumber;
auto s = pes_txn2->ValidateSnapshot(db->DefaultColumnFamily(), "foo",
&trakced_seq);
ASSERT_TRUE(s.IsBusy());
delete txn2;
}
}
}
......
......@@ -24,7 +24,8 @@ namespace rocksdb {
Status TransactionUtil::CheckKeyForConflicts(
DBImpl* db_impl, ColumnFamilyHandle* column_family, const std::string& key,
SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker) {
SequenceNumber snap_seq, bool cache_only, ReadCallback* snap_checker,
SequenceNumber min_uncommitted) {
Status result;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
......@@ -41,7 +42,7 @@ Status TransactionUtil::CheckKeyForConflicts(
db_impl->GetEarliestMemTableSequenceNumber(sv, true);
result = CheckKey(db_impl, sv, earliest_seq, snap_seq, key, cache_only,
snap_checker);
snap_checker, min_uncommitted);
db_impl->ReturnAndCleanupSuperVersion(cfd, sv);
}
......@@ -53,7 +54,8 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq,
SequenceNumber snap_seq,
const std::string& key, bool cache_only,
ReadCallback* snap_checker) {
ReadCallback* snap_checker,
SequenceNumber min_uncommitted) {
Status result;
bool need_to_read_sst = false;
......@@ -75,7 +77,9 @@ Status TransactionUtil::CheckKey(DBImpl* db_impl, SuperVersion* sv,
"countain a long enough history to check write at SequenceNumber: ",
ToString(snap_seq));
}
} else if (snap_seq < earliest_seq) {
} else if (snap_seq < earliest_seq || min_uncommitted <= earliest_seq) {
// Use <= for min_uncommitted since earliest_seq is actually the largest sec
// before this memtable was created
need_to_read_sst = true;
if (cache_only) {
......
......@@ -10,6 +10,7 @@
#include <string>
#include <unordered_map>
#include "db/dbformat.h"
#include "db/read_callback.h"
#include "rocksdb/db.h"
......@@ -51,11 +52,11 @@ class TransactionUtil {
//
// Returns OK on success, BUSY if there is a conflicting write, or other error
// status for any unexpected errors.
static Status CheckKeyForConflicts(DBImpl* db_impl,
ColumnFamilyHandle* column_family,
const std::string& key,
SequenceNumber snap_seq, bool cache_only,
ReadCallback* snap_checker = nullptr);
static Status CheckKeyForConflicts(
DBImpl* db_impl, ColumnFamilyHandle* column_family,
const std::string& key, SequenceNumber snap_seq, bool cache_only,
ReadCallback* snap_checker = nullptr,
SequenceNumber min_uncommitted = kMaxSequenceNumber);
// For each key,SequenceNumber pair in the TransactionKeyMap, this function
// will verify there have been no writes to the key in the db since that
......@@ -74,7 +75,8 @@ class TransactionUtil {
static Status CheckKey(DBImpl* db_impl, SuperVersion* sv,
SequenceNumber earliest_seq, SequenceNumber snap_seq,
const std::string& key, bool cache_only,
ReadCallback* snap_checker = nullptr);
ReadCallback* snap_checker = nullptr,
SequenceNumber min_uncommitted = kMaxSequenceNumber);
};
} // namespace rocksdb
......
......@@ -2772,7 +2772,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfOldPrepared) {
// When an old prepared entry gets committed, there is a gap between the time
// that it is published and when it is cleaned up from old_prepared_. This test
// stresses such cacese.
// stresses such cases.
TEST_P(WritePreparedTransactionTest, CommitOfOldPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
for (const size_t commit_cache_bits : {0, 2, 3}) {
......
......@@ -405,7 +405,7 @@ Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted);
return TransactionUtil::CheckKeyForConflicts(db_impl_, cfh, key.ToString(),
snap_seq, false /* cache_only */,
&snap_checker);
&snap_checker, min_uncommitted);
}
void WritePreparedTxn::SetSnapshot() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册