diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 2655d30a9e9d8c36395699f5a1557399537a2b15..c934b50b1f31a5abefb96653d72f9e10b1972c21 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -220,7 +220,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_sync = write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; - if (!two_write_queues_ || !disable_memtable) { + assert(!two_write_queues_ || !disable_memtable); + { // With concurrent writes we do preprocess only in the write thread that // also does write to memtable to avoid sync issue on shared data structure // with the other thread diff --git a/db/write_batch.cc b/db/write_batch.cc index 1d9423e0d875f55574ceb814c7ffe03b4ba357a5..07068555b2ff7b4ff2211e761fc65beb51963d51 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -776,7 +776,7 @@ bool WriteBatchInternal::IsLatestPersistentState(const WriteBatch* b) { return b->is_latest_persistent_state_; } -void WriteBatchInternal::SetAsLastestPersistentState(WriteBatch* b) { +void WriteBatchInternal::SetAsLatestPersistentState(WriteBatch* b) { b->is_latest_persistent_state_ = true; } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index fa863a1d62c118b82570aa4f2f01d52eab45e4e7..abb8825859067ab9891cf9be2e515fa0db124118 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -214,7 +214,7 @@ class WriteBatchInternal { // This write batch includes the latest state that should be persisted. Such // state meant to be used only during recovery. - static void SetAsLastestPersistentState(WriteBatch* b); + static void SetAsLatestPersistentState(WriteBatch* b); static bool IsLatestPersistentState(const WriteBatch* b); }; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 5f666b280137e496b134c461ffc0e8150c2a3709..4c6b826161d5dd5b1a59d106e2c43fcd51a46d2c 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -158,7 +158,7 @@ Status WritePreparedTxn::CommitInternal() { // When not writing to memtable, we can still cache the latest write batch. // The cached batch will be written to memtable in WriteRecoverableState // during FlushMemTable - WriteBatchInternal::SetAsLastestPersistentState(working_batch); + WriteBatchInternal::SetAsLatestPersistentState(working_batch); } auto prepare_seq = GetId(); @@ -236,14 +236,6 @@ Status WritePreparedTxn::CommitInternal() { NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, &update_commit_map_with_aux_batch); assert(!s.ok() || seq_used != kMaxSequenceNumber); - if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues)) { - if (s.ok()) { - // Note: RemovePrepared should be called after WriteImpl that publishsed - // the seq. Otherwise SmallestUnCommittedSeq optimization breaks. - wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_); - } - wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt); - } // else RemovePrepared is called from within PreReleaseCallback return s; } diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 167d2e80cfb93ed8dc4dbc086b7d87f9fde11979..b853e95462420cd85c6d2b5f0c88e2ff00b7a551 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -508,24 +508,22 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq, prev_max, max_evicted_seq); AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); } - // After each eviction from commit cache, check if the commit entry should - // be kept around because it overlaps with a live snapshot. - CheckAgainstSnapshots(evicted); if (UNLIKELY(!delayed_prepared_empty_.load(std::memory_order_acquire))) { WriteLock wl(&prepared_mutex_); - for (auto dp : delayed_prepared_) { - if (dp == evicted.prep_seq) { - // This is a rare case that txn is committed but prepared_txns_ is not - // cleaned up yet. Refer to delayed_prepared_commits_ definition for - // why it should be kept updated. - delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq; - ROCKS_LOG_DEBUG(info_log_, - "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64, - evicted.prep_seq, evicted.commit_seq); - break; - } + auto dp_iter = delayed_prepared_.find(evicted.prep_seq); + if (dp_iter != delayed_prepared_.end()) { + // This is a rare case that txn is committed but prepared_txns_ is not + // cleaned up yet. Refer to delayed_prepared_commits_ definition for + // why it should be kept updated. + delayed_prepared_commits_[evicted.prep_seq] = evicted.commit_seq; + ROCKS_LOG_DEBUG(info_log_, + "delayed_prepared_commits_[%" PRIu64 "]=%" PRIu64, + evicted.prep_seq, evicted.commit_seq); } } + // After each eviction from commit cache, check if the commit entry should + // be kept around because it overlaps with a live snapshot. + CheckAgainstSnapshots(evicted); } bool succ = ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 375ae76d5a5e0e80fc19bd2544584437c90e8384..bd704232376ffd47459c733619d81440f0156f6a 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -1080,6 +1080,8 @@ SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot, *min = static_cast_with_check(snapshot)->min_uncommitted_; *max = static_cast_with_check(snapshot)->number_; + // A duplicate of the check in EnhanceSnapshot(). + assert(*min <= *max + 1); return kBackedByDBSnapshot; } else { *min = SmallestUnCommittedSeq(); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index c6740374ffafb13436feba37d77399c558e60e7b..302d4aeb00a83b633ca3d6df1e9f8f3ecbb22654 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -553,7 +553,7 @@ Status WriteUnpreparedTxn::CommitInternal() { // When not writing to memtable, we can still cache the latest write batch. // The cached batch will be written to memtable in WriteRecoverableState // during FlushMemTable - WriteBatchInternal::SetAsLastestPersistentState(working_batch); + WriteBatchInternal::SetAsLatestPersistentState(working_batch); } const bool includes_data = !empty && !for_recovery;