diff --git a/db/db_impl.cc b/db/db_impl.cc index 2628278563be750c796aeec90351b84d76d9547c..bab63ac90028f6d294fe41c76fa3b62b8133f442 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2145,10 +2145,9 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWork( - cfd, &job_context.superversion_context, - *cfd->GetLatestMutableCFOptions(), - FlushReason::kDeleteFiles); + InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context, + *cfd->GetLatestMutableCFOptions(), + FlushReason::kDeleteFiles); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -2230,10 +2229,9 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWork( - cfd, &job_context.superversion_context, - *cfd->GetLatestMutableCFOptions(), - FlushReason::kDeleteFiles); + InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context, + *cfd->GetLatestMutableCFOptions(), + FlushReason::kDeleteFiles); } for (auto* deleted_file : deleted_files) { deleted_file->being_compacted = false; @@ -2861,8 +2859,7 @@ Status DBImpl::IngestExternalFile( &mutex_, directories_.GetDbDir()); } if (status.ok()) { - InstallSuperVersionAndScheduleWork(cfd, &sv_context, - *mutable_cf_options, + InstallSuperVersionAndScheduleWork(cfd, &sv_context, *mutable_cf_options, FlushReason::kExternalFileIngestion); } diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 3e2e21d160d3220b3c6dcaca09671fbec95b8100..ca14b84432da4b0e9c143609a91327fcdc1a8d02 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -655,7 +655,7 @@ Status DBImpl::CompactFilesImpl( if (status.ok()) { InstallSuperVersionAndScheduleWork( c->column_family_data(), &job_context->superversion_context, - *c->mutable_cf_options(), FlushReason::kManualCompaction); + *c->mutable_cf_options(), FlushReason::kManualCompaction); } c->ReleaseCompactionFiles(s); @@ -2044,8 +2044,7 @@ bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) { void DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options, - FlushReason flush_reason) { + const MutableCFOptions& mutable_cf_options, FlushReason flush_reason) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 28aaf0812c818a28565e03a01165b4b6d1a7043d..0c3085b5e31c7d0b6fda40fc1ec5e0cfcaf4e46d 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -1366,8 +1366,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, - mutable_cf_options, - flush_reason); + mutable_cf_options, flush_reason); if (two_write_queues_) { nonmem_write_thread_.ExitUnbatched(&nonmem_w); } diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 60c5fcf4c4a876ff1b17e54be36d670a516dd9a1..cfba5a7a00d55dd03aea5804bf0f5a60757081e0 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -930,6 +930,44 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { } } +// This tests that transactions with duplicate keys perform correctly after max +// is advancing their prepared sequence numbers. This will not be the case if +// for example the txn does not add the prepared seq for the second sub-batch to +// the PrepareHeap structure. +TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicatesTest) { + WriteOptions write_options; + TransactionOptions txn_options; + Transaction* txn0 = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn0->SetName("xid")); + ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); + ASSERT_OK(txn0->Put(Slice("key"), Slice("value2"))); + ASSERT_OK(txn0->Prepare()); + + WritePreparedTxnDB* wp_db = dynamic_cast(db); + // Ensure that all the prepared sequence numbers will be removed from the + // PrepareHeap. + SequenceNumber new_max = wp_db->COMMIT_CACHE_SIZE; + wp_db->AdvanceMaxEvictedSeq(0, new_max); + + ReadOptions ropt; + PinnableSlice pinnable_val; + auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + delete txn0; + + wp_db->db_impl_->FlushWAL(true); + wp_db->TEST_Crash(); + ReOpenNoDelete(); + wp_db = dynamic_cast(db); + wp_db->AdvanceMaxEvictedSeq(0, new_max); + s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val); + ASSERT_TRUE(s.IsNotFound()); + + txn0 = db->GetTransactionByName("xid"); + ASSERT_OK(txn0->Rollback()); + delete txn0; +} + TEST_P(WritePreparedTransactionTest, SeqAdvanceConcurrentTest) { // Given the sequential run of txns, with this timeout we should never see a // deadlock nor a timeout unless we have a key conflict, which should be diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index f6aef7fe2dbbf7c984057a5d9226737c09e06f51..9a8022ae15aa55e8036100d0c37a0a48be789ca5 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -83,7 +83,9 @@ Status WritePreparedTxn::PrepareInternal() { // callback otherwise there is a non-zero chance of max dvancing prepare_seq // and readers assume the data as committed. if (s.ok()) { - wpt_db_->AddPrepared(prepare_seq); + for (size_t i = 0; i < prepare_batch_cnt_; i++) { + wpt_db_->AddPrepared(prepare_seq + i); + } } return s; } @@ -293,7 +295,9 @@ Status WritePreparedTxn::RollbackInternal() { if (do_one_write) { // Mark the txn as rolled back uint64_t& rollback_seq = seq_used; - wpt_db_->RollbackPrepared(GetId(), rollback_seq); + for (size_t i = 0; i < prepare_batch_cnt_; i++) { + wpt_db_->RollbackPrepared(GetId() + i, rollback_seq); + } return s; } // else do the 2nd write for commit uint64_t& prepare_seq = seq_used; @@ -318,7 +322,9 @@ Status WritePreparedTxn::RollbackInternal() { // Mark the txn as rolled back uint64_t& rollback_seq = seq_used; if (s.ok()) { - wpt_db_->RollbackPrepared(GetId(), rollback_seq); + for (size_t i = 0; i < prepare_batch_cnt_; i++) { + wpt_db_->RollbackPrepared(GetId() + i, rollback_seq); + } } return s; diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 52c05ea71d54fd37b7b3d14a64538202dea3deac..48f185682ceb2f82750d3c5b22ca68bcf930cf6c 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -36,7 +36,10 @@ Status WritePreparedTxnDB::Initialize( assert(dbimpl != nullptr); auto rtxns = dbimpl->recovered_transactions(); for (auto rtxn : rtxns) { - AddPrepared(rtxn.second->seq_); + auto cnt = rtxn.second->batch_cnt_ ? rtxn.second->batch_cnt_ : 1; + for (size_t i = 0; i < cnt; i++) { + AddPrepared(rtxn.second->seq_ + i); + } } SequenceNumber prev_max = max_evicted_seq_; SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index a3d2ab8fc20264dad7a3128ccfc639aadc3275c1..cb1eac1e997c302a51120c588bb14b6c7e07549d 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -241,6 +241,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class PreparedHeap_Concurrent_Test; friend class WritePreparedTxnDBMock; friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; + friend class + WritePreparedTransactionTest_AdvanceMaxEvictedSeqWithDuplicatesTest_Test; friend class WritePreparedTransactionTest_BasicRecoveryTest_Test; friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test; friend class WritePreparedTransactionTest_OldCommitMapGC_Test;