提交 3e417a66 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

WritePrepared Txn: AddPrepared for all sub-batches

Summary:
Currently AddPrepared is performed only on the first sub-batch if there are duplicate keys in the write batch. This could cause a problem if the transaction takes too long to commit and the seq number of the first sub-patch moved to old_prepared_ but not the seq of the later ones. The patch fixes this by calling AddPrepared for all sub-patches.
Closes https://github.com/facebook/rocksdb/pull/3651

Differential Revision: D7388635

Pulled By: maysamyabandeh

fbshipit-source-id: 0ccd80c150d9bc42fe955e49ddb9d7ca353067b4
上级 d382ae7d
...@@ -2145,10 +2145,9 @@ Status DBImpl::DeleteFile(std::string name) { ...@@ -2145,10 +2145,9 @@ Status DBImpl::DeleteFile(std::string name) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context,
cfd, &job_context.superversion_context, *cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), FlushReason::kDeleteFiles);
FlushReason::kDeleteFiles);
} }
FindObsoleteFiles(&job_context, false); FindObsoleteFiles(&job_context, false);
} // lock released here } // lock released here
...@@ -2230,10 +2229,9 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, ...@@ -2230,10 +2229,9 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context,
cfd, &job_context.superversion_context, *cfd->GetLatestMutableCFOptions(),
*cfd->GetLatestMutableCFOptions(), FlushReason::kDeleteFiles);
FlushReason::kDeleteFiles);
} }
for (auto* deleted_file : deleted_files) { for (auto* deleted_file : deleted_files) {
deleted_file->being_compacted = false; deleted_file->being_compacted = false;
...@@ -2861,8 +2859,7 @@ Status DBImpl::IngestExternalFile( ...@@ -2861,8 +2859,7 @@ Status DBImpl::IngestExternalFile(
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
} }
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork(cfd, &sv_context, InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options,
*mutable_cf_options,
FlushReason::kExternalFileIngestion); FlushReason::kExternalFileIngestion);
} }
......
...@@ -655,7 +655,7 @@ Status DBImpl::CompactFilesImpl( ...@@ -655,7 +655,7 @@ Status DBImpl::CompactFilesImpl(
if (status.ok()) { if (status.ok()) {
InstallSuperVersionAndScheduleWork( InstallSuperVersionAndScheduleWork(
c->column_family_data(), &job_context->superversion_context, c->column_family_data(), &job_context->superversion_context,
*c->mutable_cf_options(), FlushReason::kManualCompaction); *c->mutable_cf_options(), FlushReason::kManualCompaction);
} }
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
...@@ -2044,8 +2044,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { ...@@ -2044,8 +2044,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
void DBImpl::InstallSuperVersionAndScheduleWork( void DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersionContext* sv_context, ColumnFamilyData* cfd, SuperVersionContext* sv_context,
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options, FlushReason flush_reason) {
FlushReason flush_reason) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// Update max_total_in_memory_state_ // Update max_total_in_memory_state_
......
...@@ -1366,8 +1366,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, ...@@ -1366,8 +1366,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
new_mem->Ref(); new_mem->Ref();
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
mutable_cf_options, mutable_cf_options, flush_reason);
flush_reason);
if (two_write_queues_) { if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w); nonmem_write_thread_.ExitUnbatched(&nonmem_w);
} }
......
...@@ -930,6 +930,44 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { ...@@ -930,6 +930,44 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
} }
} }
// This tests that transactions with duplicate keys perform correctly after max
// is advancing their prepared sequence numbers. This will not be the case if
// for example the txn does not add the prepared seq for the second sub-batch to
// the PrepareHeap structure.
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) {
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
ASSERT_OK(txn0->Prepare());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Ensure that all the prepared sequence numbers will be removed from the
// PrepareHeap.
SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE;
wp_db->AdvanceMaxEvictedSeq(0, new_max);
ReadOptions ropt;
PinnableSlice pinnable_val;
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
delete txn0;
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->AdvanceMaxEvictedSeq(0, new_max);
s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
txn0 = db->GetTransactionByName("xid");
ASSERT_OK(txn0->Rollback());
delete txn0;
}
TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) {
// Given the sequential run of txns, with this timeout we should never see a // Given the sequential run of txns, with this timeout we should never see a
// deadlock nor a timeout unless we have a key conflict, which should be // deadlock nor a timeout unless we have a key conflict, which should be
......
...@@ -83,7 +83,9 @@ Status WritePreparedTxn::PrepareInternal() { ...@@ -83,7 +83,9 @@ Status WritePreparedTxn::PrepareInternal() {
// callback otherwise there is a non-zero chance of max dvancing prepare_seq // callback otherwise there is a non-zero chance of max dvancing prepare_seq
// and readers assume the data as committed. // and readers assume the data as committed.
if (s.ok()) { if (s.ok()) {
wpt_db_->AddPrepared(prepare_seq); for (size_t i = 0; i < prepare_batch_cnt_; i++) {
wpt_db_->AddPrepared(prepare_seq + i);
}
} }
return s; return s;
} }
...@@ -293,7 +295,9 @@ Status WritePreparedTxn::RollbackInternal() { ...@@ -293,7 +295,9 @@ Status WritePreparedTxn::RollbackInternal() {
if (do_one_write) { if (do_one_write) {
// Mark the txn as rolled back // Mark the txn as rolled back
uint64_t& rollback_seq = seq_used; uint64_t& rollback_seq = seq_used;
wpt_db_->RollbackPrepared(GetId(), rollback_seq); for (size_t i = 0; i < prepare_batch_cnt_; i++) {
wpt_db_->RollbackPrepared(GetId() + i, rollback_seq);
}
return s; return s;
} // else do the 2nd write for commit } // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used; uint64_t& prepare_seq = seq_used;
...@@ -318,7 +322,9 @@ Status WritePreparedTxn::RollbackInternal() { ...@@ -318,7 +322,9 @@ Status WritePreparedTxn::RollbackInternal() {
// Mark the txn as rolled back // Mark the txn as rolled back
uint64_t& rollback_seq = seq_used; uint64_t& rollback_seq = seq_used;
if (s.ok()) { if (s.ok()) {
wpt_db_->RollbackPrepared(GetId(), rollback_seq); for (size_t i = 0; i < prepare_batch_cnt_; i++) {
wpt_db_->RollbackPrepared(GetId() + i, rollback_seq);
}
} }
return s; return s;
......
...@@ -36,7 +36,10 @@ Status WritePreparedTxnDB::Initialize( ...@@ -36,7 +36,10 @@ Status WritePreparedTxnDB::Initialize(
assert(dbimpl != nullptr); assert(dbimpl != nullptr);
auto rtxns = dbimpl->recovered_transactions(); auto rtxns = dbimpl->recovered_transactions();
for (auto rtxn : rtxns) { for (auto rtxn : rtxns) {
AddPrepared(rtxn.second->seq_); auto cnt = rtxn.second->batch_cnt_ ? rtxn.second->batch_cnt_ : 1;
for (size_t i = 0; i < cnt; i++) {
AddPrepared(rtxn.second->seq_ + i);
}
} }
SequenceNumber prev_max = max_evicted_seq_; SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
......
...@@ -241,6 +241,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { ...@@ -241,6 +241,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class PreparedHeap_Concurrent_Test; friend class PreparedHeap_Concurrent_Test;
friend class WritePreparedTxnDBMock; friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
friend class
WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test;
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_OldCommitMapGC_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册