diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index e01ea0a8c9168956f1c0b782791e7264a8ac17c9..fe5fd1b3af8fb13a3a7ca4c6e0b13269df74b4b7 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -125,7 +125,7 @@ WriteCommittedTxn::WriteCommittedTxn(TransactionDB* txn_db, const TransactionOptions& txn_options) : PessimisticTransaction(txn_db, write_options, txn_options){}; -Status WriteCommittedTxn::CommitBatch(WriteBatch* batch) { +Status PessimisticTransaction::CommitBatch(WriteBatch* batch) { TransactionKeyMap keys_to_unlock; Status s = LockBatch(batch, &keys_to_unlock); diff --git a/utilities/transactions/pessimistic_transaction.h b/utilities/transactions/pessimistic_transaction.h index 40d1414157f63d66ddbd1012b54ade76737abcbe..057eaf88aca4b762b34c8f6e5e66e42711af0d5b 100644 --- a/utilities/transactions/pessimistic_transaction.h +++ b/utilities/transactions/pessimistic_transaction.h @@ -52,7 +52,7 @@ class PessimisticTransaction : public TransactionBaseImpl { // It is basically Commit without going through Prepare phase. The write batch // is also directly provided instead of expecting txn to gradually batch the // transactions writes to an internal write batch. - virtual Status CommitBatch(WriteBatch* batch) = 0; + Status CommitBatch(WriteBatch* batch); Status Rollback() override = 0; @@ -191,8 +191,6 @@ class WriteCommittedTxn : public PessimisticTransaction { virtual ~WriteCommittedTxn() {} - Status CommitBatch(WriteBatch* batch) override; - Status Rollback() override; private: diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 8ad7f9540d06d587b0043515d5e79e9e143d1ca9..db23e0b8e99e02db7fbdec56a8140a46e004fb1a 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -541,8 +541,9 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, } } auto indexed_seq = prep_seq % COMMIT_CACHE_SIZE; + CommitEntry64b dont_care; CommitEntry cached; - bool exist = GetCommitEntry(indexed_seq, &cached); + bool exist = GetCommitEntry(indexed_seq, &dont_care, &cached); if (!exist) { // It is not committed, so it must be still prepared return false; @@ -599,8 +600,9 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, ROCKS_LOG_DEBUG(info_log_, "Txn %" PRIu64 " Committing with %" PRIu64, prepare_seq, commit_seq); auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE; + CommitEntry64b evicted_64b; CommitEntry evicted; - bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted); + bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted); if (to_be_evicted) { auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); if (prev_max < evicted.commit_seq) { @@ -613,7 +615,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, CheckAgainstSnapshots(evicted); } bool succ = - ExchangeCommitEntry(indexed_seq, evicted, {prepare_seq, commit_seq}); + ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq}); if (!succ) { // A very rare event, in which the commit entry is updated before we do. // Here we apply a very simple solution of retrying. @@ -636,34 +638,32 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, } bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq, + CommitEntry64b* entry_64b, CommitEntry* entry) { - // TODO(myabandeh): implement lock-free commit_cache_ - ReadLock rl(&commit_cache_mutex_); - *entry = commit_cache_[indexed_seq]; - return (entry->commit_seq != 0); // initialized + *entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire); + bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT); + return valid; } bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq, const CommitEntry& new_entry, CommitEntry* evicted_entry) { - // TODO(myabandeh): implement lock-free commit_cache_ - WriteLock wl(&commit_cache_mutex_); - *evicted_entry = commit_cache_[indexed_seq]; - commit_cache_[indexed_seq] = new_entry; - return (evicted_entry->commit_seq != 0); // initialized + CommitEntry64b new_entry_64b(new_entry, FORMAT); + CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange( + new_entry_64b, std::memory_order_acq_rel); + bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT); + return valid; } bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq, - const CommitEntry& expected_entry, + CommitEntry64b& expected_entry_64b, const CommitEntry& new_entry) { - // TODO(myabandeh): implement lock-free commit_cache_ - WriteLock wl(&commit_cache_mutex_); - auto& evicted_entry = commit_cache_[indexed_seq]; - if (evicted_entry.prep_seq != expected_entry.prep_seq) { - return false; - } - commit_cache_[indexed_seq] = new_entry; - return true; + auto& atomic_entry = commit_cache_[indexed_seq]; + CommitEntry64b new_entry_64b(new_entry, FORMAT); + bool succ = atomic_entry.compare_exchange_strong( + expected_entry_64b, new_entry_64b, std::memory_order_acq_rel, + std::memory_order_acquire); + return succ; } void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, @@ -700,10 +700,9 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, if (update_snapshots) { UpdateSnapshots(snapshots, new_snapshots_version); } - // TODO(myabandeh): check if it worked with relaxed ordering while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( - prev_max, new_max, std::memory_order_release, - std::memory_order_acquire)) { + prev_max, new_max, std::memory_order_acq_rel, + std::memory_order_relaxed)) { }; } diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 903b135f2296440d37798dc21d1b28f25f8e1ebf..d4bb68faab6cbb48ed56a7c511b1d1d1b2d14ba8 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -105,16 +105,6 @@ class PessimisticTransactionDB : public TransactionDB { std::vector GetDeadlockInfoBuffer() override; void SetDeadlockInfoBufferSize(uint32_t target_size) override; - struct CommitEntry { - uint64_t prep_seq; - uint64_t commit_seq; - CommitEntry() : prep_seq(0), commit_seq(0) {} - CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} - bool operator==(const CommitEntry& rhs) const { - return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; - } - }; - protected: void ReinitializeTransaction( Transaction* txn, const WriteOptions& write_options, @@ -170,21 +160,27 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { public: explicit WritePreparedTxnDB( DB* db, const TransactionDBOptions& txn_db_options, - size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, - size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) + size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, + size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_SIZE(snapshot_cache_size), - COMMIT_CACHE_SIZE(commit_cache_size) { + SNAPSHOT_CACHE_BITS(snapshot_cache_bits), + SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), + COMMIT_CACHE_BITS(commit_cache_bits), + COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), + FORMAT(COMMIT_CACHE_BITS) { init(txn_db_options); } explicit WritePreparedTxnDB( StackableDB* db, const TransactionDBOptions& txn_db_options, - size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, - size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) + size_t snapshot_cache_bits = DEF_SNAPSHOT_CACHE_BITS, + size_t commit_cache_bits = DEF_COMMIT_CACHE_BITS) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_SIZE(snapshot_cache_size), - COMMIT_CACHE_SIZE(commit_cache_size) { + SNAPSHOT_CACHE_BITS(snapshot_cache_bits), + SNAPSHOT_CACHE_SIZE(static_cast(1ull << SNAPSHOT_CACHE_BITS)), + COMMIT_CACHE_BITS(commit_cache_bits), + COMMIT_CACHE_SIZE(static_cast(1ull << COMMIT_CACHE_BITS)), + FORMAT(COMMIT_CACHE_BITS) { init(txn_db_options); } @@ -203,6 +199,87 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // commit_seq to the commit map void AddCommitted(uint64_t prepare_seq, uint64_t commit_seq); + struct CommitEntry { + uint64_t prep_seq; + uint64_t commit_seq; + CommitEntry() : prep_seq(0), commit_seq(0) {} + CommitEntry(uint64_t ps, uint64_t cs) : prep_seq(ps), commit_seq(cs) {} + bool operator==(const CommitEntry& rhs) const { + return prep_seq == rhs.prep_seq && commit_seq == rhs.commit_seq; + } + }; + + struct CommitEntry64bFormat { + explicit CommitEntry64bFormat(size_t index_bits) + : INDEX_BITS(index_bits), + PREP_BITS(static_cast(64 - PAD_BITS - INDEX_BITS)), + COMMIT_BITS(static_cast(64 - PREP_BITS)), + COMMIT_FILTER(static_cast((1ull << COMMIT_BITS) - 1)) {} + // Number of higher bits of a sequence number that is not used. They are + // used to encode the value type, ... + const size_t PAD_BITS = static_cast(8); + // Number of lower bits from prepare seq that can be skipped as they are + // implied by the index of the entry in the array + const size_t INDEX_BITS; + // Number of bits we use to encode the prepare seq + const size_t PREP_BITS; + // Number of bits we use to encode the commit seq. + const size_t COMMIT_BITS; + // Filter to encode/decode commit seq + const uint64_t COMMIT_FILTER; + }; + + // Prepare Seq (64 bits) = PAD ... PAD PREP PREP ... PREP INDEX INDEX ... + // INDEX Detal Seq (64 bits) = 0 0 0 0 0 0 0 0 0 0 0 0 DELTA DELTA ... + // DELTA DELTA Encoded Value = PREP PREP .... PREP PREP DELTA DELTA + // ... DELTA DELTA PAD: first bits of a seq that is reserved for tagging and + // hence ignored PREP/INDEX: the used bits in a prepare seq number INDEX: the + // bits that do not have to be encoded (will be provided externally) DELTA: + // prep seq - commit seq + 1 Number of DELTA bits should be equal to number of + // index bits + PADs + struct CommitEntry64b { + constexpr CommitEntry64b() noexcept : rep_(0) {} + + CommitEntry64b(const CommitEntry& entry, const CommitEntry64bFormat& format) + : CommitEntry64b(entry.prep_seq, entry.commit_seq, format) {} + + CommitEntry64b(const uint64_t ps, const uint64_t cs, + const CommitEntry64bFormat& format) { + assert(ps < static_cast( + (1ull << (format.PREP_BITS + format.INDEX_BITS)))); + assert(ps <= cs); + uint64_t delta = cs - ps + 1; // make initialized delta always >= 1 + // zero is reserved for uninitialized entries + assert(0 < delta); + assert(delta < static_cast((1ull << format.COMMIT_BITS))); + rep_ = (ps << format.PAD_BITS) & ~format.COMMIT_FILTER; + rep_ = rep_ | delta; + } + + // Return false if the entry is empty + bool Parse(const uint64_t indexed_seq, CommitEntry* entry, + const CommitEntry64bFormat& format) { + uint64_t delta = rep_ & format.COMMIT_FILTER; + // zero is reserved for uninitialized entries + assert(delta < static_cast((1ull << format.COMMIT_BITS))); + if (delta == 0) { + return false; // initialized entry would have non-zero delta + } + + assert(indexed_seq < static_cast((1ull << format.INDEX_BITS))); + uint64_t prep_up = rep_ & ~format.COMMIT_FILTER; + prep_up >>= format.PAD_BITS; + const uint64_t& prep_low = indexed_seq; + entry->prep_seq = prep_up | prep_low; + + entry->commit_seq = entry->prep_seq + delta - 1; + return true; + } + + private: + uint64_t rep_; + }; + private: friend class WritePreparedTransactionTest_IsInSnapshotTest_Test; friend class WritePreparedTransactionTest_CheckAgainstSnapshotsTest_Test; @@ -220,8 +297,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); snapshot_cache_ = unique_ptr[]>( new std::atomic[SNAPSHOT_CACHE_SIZE] {}); - commit_cache_ = - unique_ptr(new CommitEntry[COMMIT_CACHE_SIZE]{}); + commit_cache_ = unique_ptr[]>( + new std::atomic[COMMIT_CACHE_SIZE] {}); } // A heap with the amortized O(1) complexity for erase. It uses one extra heap @@ -263,7 +340,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // Get the commit entry with index indexed_seq from the commit table. It // returns true if such entry exists. - bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry* entry); + bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b, + CommitEntry* entry); // Rewrite the entry with the index indexed_seq in the commit table with the // commit entry . If the rewrite results into eviction, @@ -275,7 +353,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // commit entry new_entry only if the existing entry matches the // expected_entry. Returns false otherwise. bool ExchangeCommitEntry(const uint64_t indexed_seq, - const CommitEntry& expected_entry, + CommitEntry64b& expected_entry, const CommitEntry& new_entry); // Increase max_evicted_seq_ from the previous value prev_max to the new @@ -325,7 +403,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // each entry. In x86_64 architecture such reads are compiled to simple read // instructions. 128 entries - static const size_t DEF_SNAPSHOT_CACHE_SIZE = static_cast(1 << 7); + static const size_t DEF_SNAPSHOT_CACHE_BITS = static_cast(7); + const size_t SNAPSHOT_CACHE_BITS; const size_t SNAPSHOT_CACHE_SIZE; unique_ptr[]> snapshot_cache_; // 2nd list for storing snapshots. The list sorted in ascending order. @@ -339,11 +418,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // prepared_mutex_. PreparedHeap prepared_txns_; // 10m entry, 80MB size - static const size_t DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); + static const size_t DEF_COMMIT_CACHE_BITS = static_cast(21); + const size_t COMMIT_CACHE_BITS; const size_t COMMIT_CACHE_SIZE; + const CommitEntry64bFormat FORMAT; // commit_cache_ must be initialized to zero to tell apart an empty index from // a filled one. Thread-safety is provided with commit_cache_mutex_. - unique_ptr commit_cache_; + unique_ptr[]> commit_cache_; // The largest evicted *commit* sequence number from the commit_cache_ std::atomic max_evicted_seq_ = {}; // Advance max_evicted_seq_ by this value each time it needs an update. The diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 8abae6fa80dd9701e32ff7b34b59c772780da713..9e167722c26f330d3dd362e7fd7cfef6bc17546a 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -40,7 +40,9 @@ using std::string; namespace rocksdb { -using CommitEntry = PessimisticTransactionDB::CommitEntry; +using CommitEntry = WritePreparedTxnDB::CommitEntry; +using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b; +using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat; TEST(PreparedHeap, BasicsTest) { WritePreparedTxnDB::PreparedHeap heap; @@ -106,6 +108,49 @@ TEST(PreparedHeap, BasicsTest) { ASSERT_TRUE(heap.empty()); } +TEST(CommitEntry64b, BasicTest) { + const size_t INDEX_BITS = static_cast(21); + const size_t INDEX_SIZE = static_cast(1ull << INDEX_BITS); + const CommitEntry64bFormat FORMAT(static_cast(INDEX_BITS)); + + // zero-initialized CommitEntry64b should inidcate an empty entry + CommitEntry64b empty_entry64b; + uint64_t empty_index = 11ul; + CommitEntry empty_entry; + bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT); + ASSERT_FALSE(ok); + + // the zero entry is reserved for un-initialized entries + const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1; + // Samples over the numbers that are covered by that many index bits + std::array is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}}; + // Samples over the numbers that are covered by that many commit bits + std::array ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}}; + // Iterate over prepare numbers that have i) cover all bits of a sequence + // number, and ii) include some bits that fall into the range of index or + // commit bits + for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) { + for (uint64_t i : is) { + for (uint64_t d : ds) { + uint64_t p = base + i + d; + for (uint64_t c : {p, p + d / 2, p + d}) { + uint64_t index = p % INDEX_SIZE; + CommitEntry before(p, c), after; + CommitEntry64b entry64b(before, FORMAT); + ok = entry64b.Parse(index, &after, FORMAT); + ASSERT_TRUE(ok); + if (!(before == after)) { + printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64 + " c %" PRIu64 " index %" PRIu64 "\n", + base, i, d, p, c, index); + } + ASSERT_EQ(before, after); + } + } + } + } +} + class WritePreparedTxnDBMock : public WritePreparedTxnDB { public: WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) @@ -255,33 +300,35 @@ TEST_P(WritePreparedTransactionTest, CommitMapTest) { ASSERT_FALSE(evicted); // Should be able to read the same value - bool found = wp_db->GetCommitEntry(c.prep_seq % size, &e); + CommitEntry64b dont_care; + bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); ASSERT_TRUE(found); ASSERT_EQ(c, e); // Should be able to distinguish between overlapping entries - found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &e); + found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e); ASSERT_TRUE(found); ASSERT_NE(c.prep_seq + size, e.prep_seq); // Should be able to detect non-existent entry - found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &e); - ASSERT_EQ(e.commit_seq, 0); + found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e); ASSERT_FALSE(found); // Reject an invalid exchange - CommitEntry e2 = {c.prep_seq + size, c.commit_seq}; - bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2, e); + CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size}; + CommitEntry64b e2_64b(e2, wp_db->FORMAT); + bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e); ASSERT_FALSE(exchanged); // check whether it did actually reject that - found = wp_db->GetCommitEntry(e2.prep_seq % size, &e); + found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e); ASSERT_TRUE(found); ASSERT_EQ(c, e); // Accept a valid exchange + CommitEntry64b c_64b(c, wp_db->FORMAT); CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1}; - exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c, e3); + exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3); ASSERT_TRUE(exchanged); // check whether it did actually accepted that - found = wp_db->GetCommitEntry(c.prep_seq % size, &e); + found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e); ASSERT_TRUE(found); ASSERT_EQ(e3, e); @@ -290,7 +337,7 @@ TEST_P(WritePreparedTransactionTest, CommitMapTest) { evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e); ASSERT_TRUE(evicted); ASSERT_EQ(e3, e); - found = wp_db->GetCommitEntry(e4.prep_seq % size, &e); + found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e); ASSERT_TRUE(found); ASSERT_EQ(e4, e); } @@ -333,11 +380,15 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { } TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { - std::vector snapshots = {100l, 200l, 300l, 400l, - 500l, 600l, 700l}; + std::vector snapshots = {100l, 200l, 300l, 400l, 500l, + 600l, 700l, 800l, 900l}; + const size_t snapshot_cache_bits = 2; + // Safety check to express the intended size in the test. Can be adjusted if + // the snapshots lists changed. + assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size()); DBImpl* mock_db = new DBImpl(options, dbname); - std::unique_ptr wp_db(new WritePreparedTxnDBMock( - mock_db, txn_db_options, snapshots.size() / 2)); + std::unique_ptr wp_db( + new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); SequenceNumber version = 1000l; ASSERT_EQ(0, wp_db->snapshots_total_); wp_db->UpdateSnapshots(snapshots, version); @@ -345,9 +396,9 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { // seq numbers are chosen so that we have two of them between each two // snapshots. If the diff of two consecuitive seq is more than 5, there is a // snapshot between them. - std::vector seqs = {50l, 55l, 150l, 155l, 250l, 255l, - 350l, 355l, 450l, 455l, 550l, 555l, - 650l, 655l, 750l, 755l}; + std::vector seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l, + 355l, 450l, 455l, 550l, 555l, 650l, 655l, + 750l, 755l, 850l, 855l, 950l, 955l}; assert(seqs.size() > 1); for (size_t i = 0; i < seqs.size() - 1; i++) { wp_db->old_commit_map_empty_ = true; // reset @@ -374,12 +425,16 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { // in the methods must also be added. const std::vector snapshots = {10l, 20l, 30l, 40l, 50l, 60l, 70l, 80l, 90l, 100l}; + const size_t snapshot_cache_bits = 2; + // Safety check to express the intended size in the test. Can be adjusted if + // the snapshots lists changed. + assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size()); SequenceNumber version = 1000l; // Choose the cache size so that the new snapshot list could replace all the // existing items in the cache and also have some overflow. DBImpl* mock_db = new DBImpl(options, dbname); - std::unique_ptr wp_db(new WritePreparedTxnDBMock( - mock_db, txn_db_options, (snapshots.size() - 2) / 2)); + std::unique_ptr wp_db( + new WritePreparedTxnDBMock(mock_db, txn_db_options, snapshot_cache_bits)); // Add up to 2 items that do not fit into the cache for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + 2; old_size++) { @@ -435,6 +490,7 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { } } } + printf("\n"); } #endif @@ -502,9 +558,9 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { WriteOptions wo; // Use small commit cache to trigger lots of eviction and fast advance of // max_evicted_seq_ - const size_t commit_cache_size = 8; + const size_t commit_cache_bits = 3; // Same for snapshot cache size - const size_t snapshot_cache_size = 5; + const size_t snapshot_cache_bits = 2; // Take some preliminary snapshots first. This is to stress the data structure // that holds the old snapshots as it will be designed to be efficient when @@ -538,7 +594,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { std::set committed_before; DBImpl* mock_db = new DBImpl(options, dbname); std::unique_ptr wp_db(new WritePreparedTxnDBMock( - mock_db, txn_db_options, snapshot_cache_size, commit_cache_size)); + mock_db, txn_db_options, snapshot_cache_bits, commit_cache_bits)); // We continue until max advances a bit beyond the snapshot. while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { // do prepare for a transaction diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 243c91e2d66fcf4b633bd803a047e64b047da502..263f5a99b1f69f61cfab62e31eb3fca1d2539438 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -41,12 +41,6 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options, pinnable_val, &callback); } -Status WritePreparedTxn::CommitBatch(WriteBatch* /* unused */) { - // TODO(myabandeh) Implement this - throw std::runtime_error("CommitBatch not Implemented"); - return Status::OK(); -} - Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; @@ -97,6 +91,8 @@ Status WritePreparedTxn::CommitInternal() { auto s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, log_number_, disable_memtable, &seq_used); uint64_t& commit_seq = seq_used; + // TODO(myabandeh): Reject a commit request if AddCommitted cannot encode + // commit_seq. This happens if prep_seq <<< commit_seq. wpt_db_->AddCommitted(prepare_seq_, commit_seq); return s; } diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index a2fe2ae4b759546f132a40f603f8afd377d82d05..14d7dcc25ab788c92ffdd5355c9ffbb82ee8ee04 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -50,8 +50,6 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - Status CommitBatch(WriteBatch* batch) override; - Status Rollback() override; private: