diff --git a/db/db_impl.h b/db/db_impl.h index 3bf8d50f050c076fd3564a4799d3a11ef102b0dc..fc80fe39a8ab4998740820a13c85736ecec0204a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -621,6 +621,9 @@ class DBImpl : public DB { void SetSnapshotChecker(SnapshotChecker* snapshot_checker); + // Not thread-safe. + void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); + InstrumentedMutex* mutex() { return &mutex_; } Status NewDB(); @@ -1354,6 +1357,10 @@ class DBImpl : public DB { // REQUIRES: mutex held std::unique_ptr snapshot_checker_; + // Callback for when the cached_recoverable_state_ is written to memtable + // Only to be set during initialization + std::unique_ptr recoverable_state_pre_release_callback_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index b4b92567a9cae67a4e55022b103b5782b25c5967..3fb61010a72fc76f9f4b9b490bc2c3bc59555c3a 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -45,6 +45,11 @@ Status DBImpl::SingleDelete(const WriteOptions& write_options, return DB::SingleDelete(write_options, column_family, key); } +void DBImpl::SetRecoverableStatePreReleaseCallback( + PreReleaseCallback* callback) { + recoverable_state_pre_release_callback_.reset(callback); +} + Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { return WriteImpl(write_options, my_batch, nullptr, nullptr); } @@ -976,6 +981,14 @@ Status DBImpl::WriteRecoverableState() { if (two_write_queues_) { log_write_mutex_.Unlock(); } + if (status.ok() && recoverable_state_pre_release_callback_) { + const bool DISABLE_MEMTABLE = true; + for (uint64_t sub_batch_seq = seq + 1; + sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { + status = recoverable_state_pre_release_callback_->Callback( + sub_batch_seq, !DISABLE_MEMTABLE); + } + } if (status.ok()) { cached_recoverable_state_.Clear(); cached_recoverable_state_empty_ = true; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 9d9d0608258937fac0818b38a610f0dd21933a4d..8b34717ae57b329e7671cec81908f6a4b74f3e52 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -913,6 +913,16 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) { } db_impl->TEST_FlushMemTable(true); + // After flush the recoverable state must be visible + if (cwb4recovery) { + s = db->Get(read_options, "gtid", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "dogs"); + + s = db->Get(read_options, "gtid2", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "cats"); + } // after memtable flush we can now relese the log ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); @@ -1044,6 +1054,10 @@ TEST_P(TransactionTest, TwoPhaseEmptyWriteTest) { if (test_with_empty_wal) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); db_impl->TEST_FlushMemTable(true); + // After flush the state must be visible + s = db->Get(read_options, "foo", &value); + ASSERT_OK(s); + ASSERT_EQ(value, "bar"); } db->FlushWAL(true); // kill and reopen to trigger recovery diff --git a/utilities/transactions/write_prepared_txn_db.cc b/utilities/transactions/write_prepared_txn_db.cc index 48f185682ceb2f82750d3c5b22ca68bcf930cf6c..c30ab46fd19ddab4765018f44e8bdbe9efe22f3c 100644 --- a/utilities/transactions/write_prepared_txn_db.cc +++ b/utilities/transactions/write_prepared_txn_db.cc @@ -46,6 +46,24 @@ Status WritePreparedTxnDB::Initialize( AdvanceMaxEvictedSeq(prev_max, last_seq); db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); + // A callback to commit a single sub-batch + class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { + public: + explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) + : db_(db) {} + virtual Status Callback(SequenceNumber commit_seq, + bool is_mem_disabled) override { + assert(!is_mem_disabled); + const bool PREPARE_SKIPPED = true; + db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED); + return Status::OK(); + } + + private: + WritePreparedTxnDB* db_; + }; + db_impl_->SetRecoverableStatePreReleaseCallback( + new CommitSubBatchPreReleaseCallback(this)); auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices, handles);