diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index 95a2bc42eece0cb70befab3453b7de3cce6a1418..31aa629fa091ca4da2973df3db07b5aef26551ca 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -589,6 +589,11 @@ bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq, // TODO(myabandeh): read your own writes // TODO(myabandeh): optimize this. This sequence of checks must be correct but // not necessary efficient + if (prep_seq == 0) { + // Compaction will output keys to bottom-level with sequence number 0 if + // it is visible to the earliest snapshot. + return true; + } if (snapshot_seq < prep_seq) { // snapshot_seq < prep_seq <= commit_seq => snapshot_seq < commit_seq return false; diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index bbea84666f4435f67179c8724d4dadc4768e464c..9ea46edb4f325e687ddecf3b19948346d64ae9f1 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -20,6 +20,7 @@ #include "db/db_impl.h" #include "rocksdb/db.h" #include "rocksdb/options.h" +#include "rocksdb/utilities/debug.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "table/mock_table.h" @@ -283,6 +284,45 @@ class WritePreparedTransactionTest : public TransactionTest { } rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } + + // Verify value of keys. + void VerifyKeys(const std::unordered_map& data, + const Snapshot* snapshot = nullptr) { + std::string value; + ReadOptions read_options; + read_options.snapshot = snapshot; + for (auto& kv : data) { + auto s = db->Get(read_options, kv.first, &value); + ASSERT_TRUE(s.ok() || s.IsNotFound()); + if (s.ok()) { + if (kv.second != value) { + printf("key = %s\n", kv.first.c_str()); + } + ASSERT_EQ(kv.second, value); + } else { + ASSERT_EQ(kv.second, "NOT_FOUND"); + } + } + } + + // Verify all versions of keys. + void VerifyInternalKeys(const std::vector& expected_versions) { + std::vector versions; + ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key, + expected_versions.back().user_key, &versions)); + ASSERT_EQ(expected_versions.size(), versions.size()); + for (size_t i = 0; i < versions.size(); i++) { + ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key); + ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence); + ASSERT_EQ(expected_versions[i].type, versions[i].type); + if (versions[i].type != kTypeDeletion && + versions[i].type != kTypeSingleDeletion) { + ASSERT_EQ(expected_versions[i].value, versions[i].value); + } + // Range delete not supported. + assert(expected_versions[i].type != kTypeRangeDeletion); + } + } }; // TODO(myabandeh): enable it for concurrent_prepare @@ -1304,6 +1344,25 @@ TEST_P(WritePreparedTransactionTest, DuplicateKeyTest) { } } +TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) { + ASSERT_OK(db->Put(WriteOptions(), "foo", "bar")); + VerifyKeys({{"foo", "bar"}}); + const Snapshot* snapshot = db->GetSnapshot(); + ASSERT_OK(db->Flush(FlushOptions())); + // Dummy keys to avoid compaction trivially move files and get around actual + // compaction logic. + ASSERT_OK(db->Put(WriteOptions(), "a", "dummy")); + ASSERT_OK(db->Put(WriteOptions(), "z", "dummy")); + ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // Compaction will output keys with sequence number 0, if it is visible to + // earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is + // visible to any snapshot. + VerifyKeys({{"foo", "bar"}}); + VerifyKeys({{"foo", "bar"}}, snapshot); + VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}}); + db->ReleaseSnapshot(snapshot); +} + } // namespace rocksdb int main(int argc, char** argv) {