From 5234fc1b703fa9c6810a1908b172f279b38e8d19 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Tue, 2 Apr 2019 15:14:41 -0700 Subject: [PATCH] Mark logs with prepare in PreReleaseCallback (#5121) Summary: In prepare phase of 2PC, the db promises to remember the prepared data, for possible future commits. To fulfill the promise the prepared data must be persisted in the WAL so that they could be recovered after a crash. The log that contains a prepare batch that is not committed yet, is marked so that it is not garbage collected before the transaction commits/rollbacks. The bug was that the write to the log file and the mark of the file was not atomic, and WAL gc could have happened before the WAL log is actually marked. This patch moves the marking logic to PreReleaseCallback so that the WAL gc logic that joins both write threads would see the WAL write and WAL mark atomically. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5121 Differential Revision: D14665210 Pulled By: maysamyabandeh fbshipit-source-id: 1d66aeb1c66a296cb4899a5a20c4d40c59e4b534 --- HISTORY.md | 3 ++ db/db_impl_write.cc | 11 +++-- db/pre_release_callback.h | 4 +- db/write_callback_test.cc | 4 +- .../transactions/pessimistic_transaction.cc | 48 +++++++++++++------ utilities/transactions/write_prepared_txn.cc | 14 ++++-- .../transactions/write_prepared_txn_db.cc | 12 +++-- .../transactions/write_prepared_txn_db.h | 34 ++++++++----- .../transactions/write_unprepared_txn.cc | 20 ++------ .../transactions/write_unprepared_txn_db.cc | 7 ++- .../transactions/write_unprepared_txn_db.h | 10 ++-- 11 files changed, 99 insertions(+), 68 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 593a74324..6c5464d50 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,9 @@ ### Unreleased ### New Features * When reading from option file/string/map, customized comparators and/or merge operators can be filled according to object registry. +### Public API Change +### Bug Fixes +* Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction. ## 6.1.0 (3/27/2019) ### New Features diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 26f4073bb..8d318f621 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -311,8 +311,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } writer->sequence = next_sequence; if (writer->pre_release_callback) { - Status ws = writer->pre_release_callback->Callback(writer->sequence, - disable_memtable); + Status ws = writer->pre_release_callback->Callback( + writer->sequence, disable_memtable, writer->log_used); if (!ws.ok()) { status = ws; break; @@ -649,8 +649,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); const bool DISABLE_MEMTABLE = true; - Status ws = writer->pre_release_callback->Callback(writer->sequence, - DISABLE_MEMTABLE); + Status ws = writer->pre_release_callback->Callback( + writer->sequence, DISABLE_MEMTABLE, writer->log_used); if (!ws.ok()) { status = ws; break; @@ -993,8 +993,9 @@ Status DBImpl::WriteRecoverableState() { const bool DISABLE_MEMTABLE = true; for (uint64_t sub_batch_seq = seq + 1; sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { + uint64_t const no_log_num = 0; status = recoverable_state_pre_release_callback_->Callback( - sub_batch_seq, !DISABLE_MEMTABLE); + sub_batch_seq, !DISABLE_MEMTABLE, no_log_num); } } if (status.ok()) { diff --git a/db/pre_release_callback.h b/db/pre_release_callback.h index 09564ba33..f91ef1b27 100644 --- a/db/pre_release_callback.h +++ b/db/pre_release_callback.h @@ -26,7 +26,9 @@ class PreReleaseCallback { // released. // is_mem_disabled is currently used for debugging purposes to assert that // the callback is done from the right write queue. - virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) = 0; + // If non-zero, log_number indicates the WAL log to which we wrote. + virtual Status Callback(SequenceNumber seq, bool is_mem_disabled, + uint64_t log_number) = 0; }; } // namespace rocksdb diff --git a/db/write_callback_test.cc b/db/write_callback_test.cc index fdc70198f..cb880560e 100644 --- a/db/write_callback_test.cc +++ b/db/write_callback_test.cc @@ -295,8 +295,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) { public: PublishSeqCallback(DBImpl* db_impl_in) : db_impl_(db_impl_in) {} - Status Callback(SequenceNumber last_seq, - bool /*not used*/) override { + Status Callback(SequenceNumber last_seq, bool /*not used*/, + uint64_t) override { db_impl_->SetLastPublishedSequence(last_seq); return Status::OK(); } diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index b78fb9d9f..042ae95ab 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -195,22 +195,14 @@ Status PessimisticTransaction::Prepare() { } if (can_prepare) { - bool wal_already_marked = false; txn_state_.store(AWAITING_PREPARE); // transaction can't expire after preparation expiration_time_ = 0; - if (log_number_ > 0) { - assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); - wal_already_marked = true; - } + assert(log_number_ == 0 || + txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED); s = PrepareInternal(); if (s.ok()) { - assert(log_number_ != 0); - if (!wal_already_marked) { - dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( - log_number_); - } txn_state_.store(PREPARED); } } else if (txn_state_ == LOCKS_STOLEN) { @@ -232,10 +224,38 @@ Status WriteCommittedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_); - Status s = - db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), - /*callback*/ nullptr, &log_number_, /*log_ref*/ 0, - /* disable_memtable*/ true); + class MarkLogCallback : public PreReleaseCallback { + public: + MarkLogCallback(DBImpl* db, bool two_write_queues) + : db_(db), two_write_queues_(two_write_queues) { + (void)two_write_queues_; // to silence unused private field warning + } + virtual Status Callback(SequenceNumber, bool is_mem_disabled, + uint64_t log_number) override { +#ifdef NDEBUG + (void)is_mem_disabled; +#endif + assert(log_number != 0); + assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue + db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number); + return Status::OK(); + } + + private: + DBImpl* db_; + bool two_write_queues_; + } mark_log_callback(db_impl_, + db_impl_->immutable_db_options().two_write_queues); + + WriteCallback* const kNoWriteCallback = nullptr; + const uint64_t kRefNoLog = 0; + const bool kDisableMemtable = true; + SequenceNumber* const KIgnoreSeqUsed = nullptr; + const size_t kNoBatchCount = 0; + Status s = db_impl_->WriteImpl( + write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback, + &log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, + &mark_log_callback); return s; } diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 98eee11f7..4100925c5 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -84,6 +84,7 @@ Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; const bool WRITE_AFTER_COMMIT = true; + const bool kFirstPrepareBatch = true; WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, !WRITE_AFTER_COMMIT); // For each duplicate key we account for a new sub-batch @@ -92,8 +93,8 @@ Status WritePreparedTxn::PrepareInternal() { // prepared entries to PreparedHeap and hence enables an optimization. Refer to // SmallestUnCommittedSeq for more details. AddPreparedCallback add_prepared_callback( - wpt_db_, prepare_batch_cnt_, - db_impl_->immutable_db_options().two_write_queues); + wpt_db_, db_impl_, prepare_batch_cnt_, + db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch); const bool DISABLE_MEMTABLE = true; uint64_t seq_used = kMaxSequenceNumber; Status s = db_impl_->WriteImpl( @@ -152,9 +153,10 @@ Status WritePreparedTxn::CommitInternal() { WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt); // This is to call AddPrepared on CommitTimeWriteBatch + const bool kFirstPrepareBatch = true; AddPreparedCallback add_prepared_callback( - wpt_db_, commit_batch_cnt, - db_impl_->immutable_db_options().two_write_queues); + wpt_db_, db_impl_, commit_batch_cnt, + db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); PreReleaseCallback* pre_release_callback; if (do_one_write) { pre_release_callback = &update_commit_map; @@ -321,6 +323,7 @@ Status WritePreparedTxn::RollbackInternal() { const uint64_t NO_REF_LOG = 0; uint64_t seq_used = kMaxSequenceNumber; const size_t ONE_BATCH = 1; + const bool kFirstPrepareBatch = true; // We commit the rolled back prepared batches. Although this is // counter-intuitive, i) it is safe to do so, since the prepared batches are // already canceled out by the rollback batch, ii) adding the commit entry to @@ -329,7 +332,8 @@ Status WritePreparedTxn::RollbackInternal() { // with a live snapshot around so that the live snapshot properly skips the // entry even if its prepare seq is lower than max_evicted_seq_. AddPreparedCallback add_prepared_callback( - wpt_db_, ONE_BATCH, db_impl_->immutable_db_options().two_write_queues); + wpt_db_, db_impl_, ONE_BATCH, + db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch); WritePreparedCommitEntryPreReleaseCallback update_commit_map( wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH); PreReleaseCallback* pre_release_callback; diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 2430615c0..5364a9e05 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -64,10 +64,9 @@ Status WritePreparedTxnDB::Initialize( public: explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) : db_(db) {} - Status Callback(SequenceNumber commit_seq, bool is_mem_disabled) override { -#ifdef NDEBUG - (void)is_mem_disabled; -#endif + Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { assert(!is_mem_disabled); db_->AddCommitted(commit_seq, commit_seq); return Status::OK(); @@ -163,11 +162,14 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig, const uint64_t no_log_ref = 0; uint64_t seq_used = kMaxSequenceNumber; const size_t ZERO_PREPARES = 0; + const bool kSeperatePrepareCommitBatches = true; // Since this is not 2pc, there is no need for AddPrepared but having it in // the PreReleaseCallback enables an optimization. Refer to // SmallestUnCommittedSeq for more details. AddPreparedCallback add_prepared_callback( - this, batch_cnt, db_impl_->immutable_db_options().two_write_queues); + this, db_impl_, batch_cnt, + db_impl_->immutable_db_options().two_write_queues, + !kSeperatePrepareCommitBatches); WritePreparedCommitEntryPreReleaseCallback update_commit_map( this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt); PreReleaseCallback* pre_release_callback; diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index 52d07a791..10d1dbf60 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -751,30 +751,40 @@ class WritePreparedTxnReadCallback : public ReadCallback { class AddPreparedCallback : public PreReleaseCallback { public: - AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt, - bool two_write_queues) + AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl, + size_t sub_batch_cnt, bool two_write_queues, + bool first_prepare_batch) : db_(db), + db_impl_(db_impl), sub_batch_cnt_(sub_batch_cnt), - two_write_queues_(two_write_queues) { + two_write_queues_(two_write_queues), + first_prepare_batch_(first_prepare_batch) { (void)two_write_queues_; // to silence unused private field warning } virtual Status Callback(SequenceNumber prepare_seq, - bool is_mem_disabled) override { -#ifdef NDEBUG - (void)is_mem_disabled; -#endif + bool is_mem_disabled __attribute__((__unused__)), + uint64_t log_number) override { // Always Prepare from the main queue assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue for (size_t i = 0; i < sub_batch_cnt_; i++) { db_->AddPrepared(prepare_seq + i); } + if (first_prepare_batch_) { + assert(log_number != 0); + db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( + log_number); + } return Status::OK(); } private: WritePreparedTxnDB* db_; + DBImpl* db_impl_; size_t sub_batch_cnt_; bool two_write_queues_; + // It is 2PC and this is the first prepare batch. Always the case in 2PC + // unless it is WriteUnPrepared. + bool first_prepare_batch_; }; class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { @@ -800,10 +810,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { } virtual Status Callback(SequenceNumber commit_seq, - bool is_mem_disabled) override { -#ifdef NDEBUG - (void)is_mem_disabled; -#endif + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { // Always commit from the 2nd queue assert(!db_impl_->immutable_db_options().two_write_queues || is_mem_disabled); @@ -884,8 +892,8 @@ class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback { assert(prep_batch_cnt_ > 0); } - virtual Status Callback(SequenceNumber commit_seq, - bool is_mem_disabled) override { + Status Callback(SequenceNumber commit_seq, bool is_mem_disabled, + uint64_t) override { // Always commit from the 2nd queue assert(is_mem_disabled); // implies the 2nd queue assert(db_impl_->immutable_db_options().two_write_queues); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index 027fe7368..731460eda 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -155,22 +155,10 @@ Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family, Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() { const bool kPrepared = true; Status s; - - bool needs_mark = (log_number_ == 0); - if (max_write_batch_size_ != 0 && write_batch_.GetDataSize() > max_write_batch_size_) { assert(GetState() != PREPARED); s = FlushWriteBatchToDB(!kPrepared); - if (s.ok()) { - assert(log_number_ > 0); - // This is done to prevent WAL files after log_number_ from being - // deleted, because they could potentially contain unprepared batches. - if (needs_mark) { - dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection( - log_number_); - } - } } return s; } @@ -198,6 +186,7 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { WriteOptions write_options = write_options_; write_options.disableWAL = false; const bool WRITE_AFTER_COMMIT = true; + const bool first_prepare_batch = log_number_ == 0; // MarkEndPrepare will change Noop marker to the appropriate marker. WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_, !WRITE_AFTER_COMMIT, !prepared); @@ -210,8 +199,8 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) { // prepared entries to PreparedHeap and hence enables an optimization. Refer // to SmallestUnCommittedSeq for more details. AddPreparedCallback add_prepared_callback( - wpt_db_, prepare_batch_cnt_, - db_impl_->immutable_db_options().two_write_queues); + wpt_db_, db_impl_, prepare_batch_cnt_, + db_impl_->immutable_db_options().two_write_queues, first_prepare_batch); const bool DISABLE_MEMTABLE = true; uint64_t seq_used = kMaxSequenceNumber; // log_number_ should refer to the oldest log containing uncommitted data @@ -334,7 +323,8 @@ Status WriteUnpreparedTxn::CommitInternal() { explicit PublishSeqPreReleaseCallback(DBImpl* db_impl) : db_impl_(db_impl) {} Status Callback(SequenceNumber seq, - bool is_mem_disabled __attribute__((__unused__))) override { + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { assert(is_mem_disabled); assert(db_impl_->immutable_db_options().two_write_queues); db_impl_->SetLastPublishedSequence(seq); diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 8c6709321..55ca2b3ea 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -194,10 +194,9 @@ Status WriteUnpreparedTxnDB::Initialize( public: explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) : db_(db) {} - Status Callback(SequenceNumber commit_seq, bool is_mem_disabled) override { -#ifdef NDEBUG - (void)is_mem_disabled; -#endif + Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { assert(!is_mem_disabled); db_->AddCommitted(commit_seq, commit_seq); return Status::OK(); diff --git a/utilities/transactions/write_unprepared_txn_db.h b/utilities/transactions/write_unprepared_txn_db.h index dfba4c136..4b4e31e1b 100644 --- a/utilities/transactions/write_unprepared_txn_db.h +++ b/utilities/transactions/write_unprepared_txn_db.h @@ -59,8 +59,9 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback { assert(unprep_seqs.size() > 0); } - virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled - __attribute__((__unused__))) override { + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1) ? commit_seq : commit_seq + data_batch_cnt_ - 1; @@ -122,8 +123,9 @@ class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback { assert(db_impl_->immutable_db_options().two_write_queues); } - virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled - __attribute__((__unused__))) override { + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled __attribute__((__unused__)), + uint64_t) override { assert(is_mem_disabled); // implies the 2nd queue const uint64_t last_commit_seq = commit_seq; db_->AddCommitted(rollback_seq_, last_commit_seq); -- GitLab