提交 ec6c5383 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

WritePrepared Txn: end-to-end tests

Summary:
Enable WritePrepared policy for existing transaction tests.
Closes https://github.com/facebook/rocksdb/pull/2972

Differential Revision: D5993614

Pulled By: maysamyabandeh

fbshipit-source-id: d1eb53e2920c4e2a56434bb001231c98426f3509
上级 da29eba4
......@@ -112,7 +112,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*concurrent_memtable_writes*/);
true /*concurrent_memtable_writes*/, seq_per_batch_);
}
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
......@@ -286,7 +286,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/);
this, true /*concurrent_memtable_writes*/, seq_per_batch_);
}
}
if (seq_used != nullptr) {
......
......@@ -1421,8 +1421,8 @@ Status WriteBatchInternal::InsertInto(
nullptr /*has_valid_writes*/, seq_per_batch);
for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) {
inserter.MaybeAdvanceSeq(true);
w->sequence = inserter.sequence();
inserter.MaybeAdvanceSeq(true);
continue;
}
SetSequence(w->batch, inserter.sequence());
......@@ -1436,17 +1436,16 @@ Status WriteBatchInternal::InsertInto(
return Status::OK();
}
Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
bool concurrent_memtable_writes) {
Status WriteBatchInternal::InsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch) {
assert(writer->ShouldWriteToMemtable());
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes);
concurrent_memtable_writes,
nullptr /*has_valid_writes*/, seq_per_batch);
SetSequence(writer->batch, sequence);
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
......
......@@ -180,7 +180,8 @@ class WriteBatchInternal {
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false);
bool concurrent_memtable_writes = false,
bool seq_per_batch = false);
static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);
......
......@@ -55,6 +55,7 @@ void TransactionBaseImpl::Reinitialize(DB* db,
const WriteOptions& write_options) {
Clear();
ClearSnapshot();
id_ = 0;
db_ = db;
name_.clear();
log_number_ = 0;
......
......@@ -39,20 +39,23 @@ using std::string;
namespace rocksdb {
// TODO(myabandeh): Instantiate the tests with other write policies
INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(false, false,
WRITE_COMMITTED)));
INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(true, false,
WRITE_COMMITTED)));
// TODO(myabandeh): Instantiate the tests with concurrent_prepare
INSTANTIATE_TEST_CASE_P(
DBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED),
std::make_tuple(false, false, WRITE_PREPARED)));
INSTANTIATE_TEST_CASE_P(
StackableDBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(true, false, WRITE_COMMITTED),
std::make_tuple(true, false, WRITE_PREPARED)));
INSTANTIATE_TEST_CASE_P(
MySQLStyleTransactionTest, MySQLStyleTransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED),
std::make_tuple(false, true, WRITE_COMMITTED),
std::make_tuple(true, false, WRITE_COMMITTED),
std::make_tuple(true, true, WRITE_COMMITTED)));
std::make_tuple(true, true, WRITE_COMMITTED),
std::make_tuple(false, false, WRITE_PREPARED),
std::make_tuple(true, false, WRITE_PREPARED)));
TEST_P(TransactionTest, DoubleEmptyWrite) {
WriteOptions write_options;
......@@ -784,9 +787,20 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
// heap should not care about prepared section anymore
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
}
db_impl->TEST_FlushMemTable(true);
......@@ -1096,9 +1110,20 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
// heap should not care about prepared section anymore
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
}
db_impl->TEST_FlushMemTable(true);
......@@ -1443,9 +1468,20 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
txn2->GetLogNumber());
// we should see txn1s log refernced by the memtables
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
txn1->GetLogNumber());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// we should see txn1s log refernced by the memtables
ASSERT_EQ(txn1->GetLogNumber(),
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
}
// flush default cf to crate new log
s = db->Put(wopts, "foo", "bar2");
......@@ -1463,17 +1499,39 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest) {
// heap should not show any logs
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(), 0);
// should show the first txn log
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
txn1->GetLogNumber());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// should show the first txn log
ASSERT_EQ(txn1->GetLogNumber(),
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
}
// flush only cfa memtable
s = db_impl->TEST_FlushMemTable(true, cfa);
ASSERT_OK(s);
// should show the first txn log
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
txn2->GetLogNumber());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// should show the first txn log
ASSERT_EQ(txn2->GetLogNumber(),
db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break;
default:
assert(false);
}
// flush only cfb memtable
s = db_impl->TEST_FlushMemTable(true, cfb);
......@@ -1545,8 +1603,20 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
ASSERT_OK(s);
ASSERT_GT(db_impl->TEST_LogfileNumber(), prepare_log_no);
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// This cf is empty and should ref the latest log
ASSERT_GT(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), db_impl->TEST_LogfileNumber());
break;
case WRITE_PREPARED:
// This cf is not flushed yet and should ref the log that has its data
ASSERT_EQ(cfh_a->cfd()->GetLogNumber(), prepare_log_no);
break;
case WRITE_UNPREPARED:
default:
assert(false);
}
ASSERT_EQ(db_impl->TEST_FindMinLogContainingOutstandingPrep(),
prepare_log_no);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
......@@ -1555,7 +1625,19 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
s = txn1->Commit();
ASSERT_OK(s);
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), prepare_log_no);
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(),
prepare_log_no);
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// In these modes memtable do not ref the prep sections
ASSERT_EQ(db_impl->TEST_FindMinPrepLogReferencedByMemTable(), 0);
break;
default:
assert(false);
}
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog());
......@@ -1569,8 +1651,19 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
// assert that cfa has a flush requested
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
// cfb should not be flushed becuse it has no data from LOG A
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
switch (txn_db_options.write_policy) {
case WRITE_COMMITTED:
// cfb should not be flushed becuse it has no data from LOG A
ASSERT_TRUE(!cfh_b->cfd()->imm()->HasFlushRequested());
break;
case WRITE_PREPARED:
case WRITE_UNPREPARED:
// cfb should be flushed becuse it has prepared data from LOG A
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());
break;
default:
assert(false);
}
// cfb now has data from LOG A
s = txn2->Commit();
......@@ -2690,8 +2783,12 @@ TEST_P(TransactionTest, UntrackedWrites) {
// Untracked writes should succeed even though key was written after snapshot
s = txn->PutUntracked("untracked", "1");
ASSERT_OK(s);
s = txn->MergeUntracked("untracked", "2");
ASSERT_OK(s);
if (txn_db_options.write_policy != WRITE_PREPARED) {
// WRITE_PREPARED does not currently support dup merge keys.
// TODO(myabandeh): remove this if-then when the support is added
s = txn->MergeUntracked("untracked", "2");
ASSERT_OK(s);
}
s = txn->DeleteUntracked("untracked");
ASSERT_OK(s);
......@@ -4062,6 +4159,11 @@ TEST_P(TransactionTest, SingleDeleteTest) {
}
TEST_P(TransactionTest, MergeTest) {
if (txn_db_options.write_policy == WRITE_PREPARED) {
// WRITE_PREPARED does not currently support dup merge keys.
// TODO(myabandeh): remove this if-then when the support is added
return;
}
WriteOptions write_options;
ReadOptions read_options;
string value;
......
......@@ -113,6 +113,9 @@ class TransactionTest : public ::testing::TestWithParam<
std::vector<ColumnFamilyHandle*> handles;
DB* root_db;
Options options_copy(options);
if (txn_db_options.write_policy == WRITE_PREPARED) {
options_copy.seq_per_batch = true;
}
Status s =
DB::Open(options_copy, dbname, column_families, &handles, &root_db);
if (s.ok()) {
......
......@@ -1513,7 +1513,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,
DISABLED_CompactionShouldKeepSnapshotVisibleKeysRandomized) {
CompactionShouldKeepSnapshotVisibleKeysRandomized) {
constexpr size_t kNumTransactions = 10;
constexpr size_t kNumIterations = 1000;
......
......@@ -26,14 +26,13 @@ WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
const TransactionOptions& txn_options)
: PessimisticTransaction(txn_db, write_options, txn_options),
wpt_db_(txn_db) {
PessimisticTransaction::Initialize(txn_options);
GetWriteBatch()->DisableDuplicateMergeKeys();
}
Status WritePreparedTxn::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
auto snapshot = GetSnapshot();
auto snapshot = read_options.snapshot;
auto snap_seq =
snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber;
......@@ -95,9 +94,7 @@ Status WritePreparedTxn::CommitInternal() {
// We take the commit-time batch and append the Commit marker.
// The Memtable will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
// TODO(myabandeh): prevent the users from writing to txn after the prepare
// phase
assert(working_batch->Count() == 0);
const bool empty = working_batch->Count() == 0;
WriteBatchInternal::MarkCommit(working_batch, name_);
// any operations appended to this working_batch will be ignored from WAL
......@@ -109,14 +106,21 @@ Status WritePreparedTxn::CommitInternal() {
// a connection between the memtable and its WAL, so there is no need to
// redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used);
auto s = db_impl_->WriteImpl(
write_options_, working_batch, nullptr, nullptr, zero_log_number,
empty ? disable_memtable : !disable_memtable, &seq_used);
assert(seq_used != kMaxSequenceNumber);
uint64_t& commit_seq = seq_used;
// TODO(myabandeh): Reject a commit request if AddCommitted cannot encode
// commit_seq. This happens if prep_seq <<< commit_seq.
auto prepare_seq = GetId();
wpt_db_->AddCommitted(prepare_seq, commit_seq);
if (!empty) {
// Commit the data that is accompnaied with the commit marker
// TODO(myabandeh): skip AddPrepared
wpt_db_->AddPrepared(commit_seq);
wpt_db_->AddCommitted(commit_seq, commit_seq);
}
return s;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册