diff --git a/include/rocksdb/utilities/optimistic_transaction_db.h b/include/rocksdb/utilities/optimistic_transaction_db.h index 518bc610c6da2968febade0da6da0eb330b4b7ed..28b24083e297db1806e39dead84bc80a1fc7f2da 100644 --- a/include/rocksdb/utilities/optimistic_transaction_db.h +++ b/include/rocksdb/utilities/optimistic_transaction_db.h @@ -11,6 +11,7 @@ #include "rocksdb/comparator.h" #include "rocksdb/db.h" +#include "rocksdb/utilities/stackable_db.h" namespace rocksdb { @@ -30,7 +31,7 @@ struct OptimisticTransactionOptions { const Comparator* cmp = BytewiseComparator(); }; -class OptimisticTransactionDB { +class OptimisticTransactionDB : public StackableDB { public: // Open an OptimisticTransactionDB similar to DB::Open(). static Status Open(const Options& options, const std::string& dbname, @@ -57,18 +58,12 @@ class OptimisticTransactionDB { OptimisticTransactionOptions(), Transaction* old_txn = nullptr) = 0; - // Return the underlying Database that was opened - virtual DB* GetBaseDB() = 0; + OptimisticTransactionDB(const OptimisticTransactionDB&) = delete; + void operator=(const OptimisticTransactionDB&) = delete; protected: // To Create an OptimisticTransactionDB, call Open() - explicit OptimisticTransactionDB(DB* /*db*/) {} - OptimisticTransactionDB() {} - - private: - // No copying allowed - OptimisticTransactionDB(const OptimisticTransactionDB&); - void operator=(const OptimisticTransactionDB&); + explicit OptimisticTransactionDB(DB* db) : StackableDB(db) {} }; } // namespace rocksdb diff --git a/utilities/transactions/optimistic_transaction_db_impl.h b/utilities/transactions/optimistic_transaction_db_impl.h index 48f838057719f9287836f4fb65217a68dc3ca90d..49240a98c2d9cb0ea82b5346f5beab7b00f4b842 100644 --- a/utilities/transactions/optimistic_transaction_db_impl.h +++ b/utilities/transactions/optimistic_transaction_db_impl.h @@ -15,11 +15,13 @@ namespace rocksdb { class OptimisticTransactionDBImpl : public OptimisticTransactionDB { public: explicit OptimisticTransactionDBImpl(DB* db, bool take_ownership = true) - : OptimisticTransactionDB(db), db_(db), db_owner_(take_ownership) {} + : OptimisticTransactionDB(db), db_owner_(take_ownership) {} ~OptimisticTransactionDBImpl() { + // Prevent this stackable from destroying + // base db if (!db_owner_) { - db_.release(); + db_ = nullptr; } } @@ -27,11 +29,9 @@ class OptimisticTransactionDBImpl : public OptimisticTransactionDB { const OptimisticTransactionOptions& txn_options, Transaction* old_txn) override; - DB* GetBaseDB() override { return db_.get(); } - private: - std::unique_ptr db_; - bool db_owner_; + + bool db_owner_; void ReinitializeTransaction(Transaction* txn, const WriteOptions& write_options, diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index f627f0e0955f4a71e65b5cdeeff88138d10bdb11..e9fdc60194080d9b05286f70dbd400c08f5723aa 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -26,7 +26,6 @@ namespace rocksdb { class OptimisticTransactionTest : public testing::Test { public: OptimisticTransactionDB* txn_db; - DB* db; string dbname; Options options; @@ -54,7 +53,6 @@ private: Status s = OptimisticTransactionDB::Open(options, dbname, &txn_db); assert(s.ok()); assert(txn_db != nullptr); - db = txn_db->GetBaseDB(); } }; @@ -64,8 +62,8 @@ TEST_F(OptimisticTransactionTest, SuccessTest) { string value; Status s; - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + txn_db->Put(write_options, Slice("foo"), Slice("bar")); + txn_db->Put(write_options, Slice("foo2"), Slice("bar")); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -81,7 +79,7 @@ TEST_F(OptimisticTransactionTest, SuccessTest) { s = txn->Commit(); ASSERT_OK(s); - db->Get(read_options, "foo", &value); + txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar2"); delete txn; @@ -93,8 +91,8 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) { string value; Status s; - db->Put(write_options, "foo", "bar"); - db->Put(write_options, "foo2", "bar"); + txn_db->Put(write_options, "foo", "bar"); + txn_db->Put(write_options, "foo2", "bar"); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -102,10 +100,10 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) { txn->Put("foo", "bar2"); // This Put outside of a transaction will conflict with the previous write - s = db->Put(write_options, "foo", "barz"); + s = txn_db->Put(write_options, "foo", "barz"); ASSERT_OK(s); - s = db->Get(read_options, "foo", &value); + s = txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); ASSERT_EQ(1, txn->GetNumKeys()); @@ -113,9 +111,9 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest) { ASSERT_TRUE(s.IsBusy()); // Txn should not commit // Verify that transaction did not write anything - db->Get(read_options, "foo", &value); + txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); - db->Get(read_options, "foo2", &value); + txn_db->Get(read_options, "foo2", &value); ASSERT_EQ(value, "bar"); delete txn; @@ -128,29 +126,29 @@ TEST_F(OptimisticTransactionTest, WriteConflictTest2) { string value; Status s; - db->Put(write_options, "foo", "bar"); - db->Put(write_options, "foo2", "bar"); + txn_db->Put(write_options, "foo", "bar"); + txn_db->Put(write_options, "foo2", "bar"); txn_options.set_snapshot = true; Transaction* txn = txn_db->BeginTransaction(write_options, txn_options); ASSERT_TRUE(txn); // This Put outside of a transaction will conflict with a later write - s = db->Put(write_options, "foo", "barz"); + s = txn_db->Put(write_options, "foo", "barz"); ASSERT_OK(s); txn->Put("foo", "bar2"); // Conflicts with write done after snapshot taken - s = db->Get(read_options, "foo", &value); + s = txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); s = txn->Commit(); ASSERT_TRUE(s.IsBusy()); // Txn should not commit // Verify that transaction did not write anything - db->Get(read_options, "foo", &value); + txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); - db->Get(read_options, "foo2", &value); + txn_db->Get(read_options, "foo2", &value); ASSERT_EQ(value, "bar"); delete txn; @@ -163,8 +161,8 @@ TEST_F(OptimisticTransactionTest, ReadConflictTest) { string value; Status s; - db->Put(write_options, "foo", "bar"); - db->Put(write_options, "foo2", "bar"); + txn_db->Put(write_options, "foo", "bar"); + txn_db->Put(write_options, "foo2", "bar"); txn_options.set_snapshot = true; Transaction* txn = txn_db->BeginTransaction(write_options, txn_options); @@ -177,10 +175,10 @@ TEST_F(OptimisticTransactionTest, ReadConflictTest) { ASSERT_EQ(value, "bar"); // This Put outside of a transaction will conflict with the previous read - s = db->Put(write_options, "foo", "barz"); + s = txn_db->Put(write_options, "foo", "barz"); ASSERT_OK(s); - s = db->Get(read_options, "foo", &value); + s = txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "barz"); s = txn->Commit(); @@ -221,8 +219,8 @@ TEST_F(OptimisticTransactionTest, FlushTest) { string value; Status s; - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + txn_db->Put(write_options, Slice("foo"), Slice("bar")); + txn_db->Put(write_options, Slice("foo2"), Slice("bar")); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -238,18 +236,18 @@ TEST_F(OptimisticTransactionTest, FlushTest) { ASSERT_EQ(value, "bar2"); // Put a random key so we have a memtable to flush - s = db->Put(write_options, "dummy", "dummy"); + s = txn_db->Put(write_options, "dummy", "dummy"); ASSERT_OK(s); // force a memtable flush FlushOptions flush_ops; - db->Flush(flush_ops); + txn_db->Flush(flush_ops); s = txn->Commit(); // txn should commit since the flushed table is still in MemtableList History ASSERT_OK(s); - db->Get(read_options, "foo", &value); + txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar2"); delete txn; @@ -261,8 +259,8 @@ TEST_F(OptimisticTransactionTest, FlushTest2) { string value; Status s; - db->Put(write_options, Slice("foo"), Slice("bar")); - db->Put(write_options, Slice("foo2"), Slice("bar")); + txn_db->Put(write_options, Slice("foo"), Slice("bar")); + txn_db->Put(write_options, Slice("foo2"), Slice("bar")); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -278,33 +276,33 @@ TEST_F(OptimisticTransactionTest, FlushTest2) { ASSERT_EQ(value, "bar2"); // Put a random key so we have a MemTable to flush - s = db->Put(write_options, "dummy", "dummy"); + s = txn_db->Put(write_options, "dummy", "dummy"); ASSERT_OK(s); // force a memtable flush FlushOptions flush_ops; - db->Flush(flush_ops); + txn_db->Flush(flush_ops); // Put a random key so we have a MemTable to flush - s = db->Put(write_options, "dummy", "dummy2"); + s = txn_db->Put(write_options, "dummy", "dummy2"); ASSERT_OK(s); // force a memtable flush - db->Flush(flush_ops); + txn_db->Flush(flush_ops); - s = db->Put(write_options, "dummy", "dummy3"); + s = txn_db->Put(write_options, "dummy", "dummy3"); ASSERT_OK(s); // force a memtable flush // Since our test db has max_write_buffer_number=2, this flush will cause // the first memtable to get purged from the MemtableList history. - db->Flush(flush_ops); + txn_db->Flush(flush_ops); s = txn->Commit(); // txn should not commit since MemTableList History is not large enough ASSERT_TRUE(s.IsTryAgain()); - db->Get(read_options, "foo", &value); + txn_db->Get(read_options, "foo", &value); ASSERT_EQ(value, "bar"); delete txn; @@ -316,13 +314,13 @@ TEST_F(OptimisticTransactionTest, NoSnapshotTest) { string value; Status s; - db->Put(write_options, "AAA", "bar"); + txn_db->Put(write_options, "AAA", "bar"); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); // Modify key after transaction start - db->Put(write_options, "AAA", "bar1"); + txn_db->Put(write_options, "AAA", "bar1"); // Read and write without a snapshot txn->GetForUpdate(read_options, "AAA", &value); @@ -345,14 +343,14 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { string value; Status s; - db->Put(write_options, "AAA", "bar"); - db->Put(write_options, "BBB", "bar"); - db->Put(write_options, "CCC", "bar"); + txn_db->Put(write_options, "AAA", "bar"); + txn_db->Put(write_options, "BBB", "bar"); + txn_db->Put(write_options, "CCC", "bar"); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); - db->Put(write_options, "AAA", "bar1"); + txn_db->Put(write_options, "AAA", "bar1"); // Read and write without a snapshot txn->GetForUpdate(read_options, "AAA", &value); @@ -360,7 +358,7 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { txn->Put("AAA", "bar2"); // Modify BBB before snapshot is taken - db->Put(write_options, "BBB", "bar1"); + txn_db->Put(write_options, "BBB", "bar1"); txn->SetSnapshot(); snapshot_read_options.snapshot = txn->GetSnapshot(); @@ -370,7 +368,7 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { ASSERT_EQ(value, "bar1"); txn->Put("BBB", "bar2"); - db->Put(write_options, "CCC", "bar1"); + txn_db->Put(write_options, "CCC", "bar1"); // Set a new snapshot txn->SetSnapshot(); @@ -391,26 +389,26 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { ASSERT_OK(s); ASSERT_EQ(value, "bar2"); - s = db->Get(read_options, "AAA", &value); + s = txn_db->Get(read_options, "AAA", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar1"); - s = db->Get(read_options, "BBB", &value); + s = txn_db->Get(read_options, "BBB", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar1"); - s = db->Get(read_options, "CCC", &value); + s = txn_db->Get(read_options, "CCC", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar1"); s = txn->Commit(); ASSERT_OK(s); - s = db->Get(read_options, "AAA", &value); + s = txn_db->Get(read_options, "AAA", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); - s = db->Get(read_options, "BBB", &value); + s = txn_db->Get(read_options, "BBB", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); - s = db->Get(read_options, "CCC", &value); + s = txn_db->Get(read_options, "CCC", &value); ASSERT_OK(s); ASSERT_EQ(value, "bar2"); @@ -419,8 +417,8 @@ TEST_F(OptimisticTransactionTest, MultipleSnapshotTest) { txn = txn_db->BeginTransaction(write_options); // Potentially conflicting writes - db->Put(write_options, "ZZZ", "zzz"); - db->Put(write_options, "XXX", "xxx"); + txn_db->Put(write_options, "ZZZ", "zzz"); + txn_db->Put(write_options, "XXX", "xxx"); txn->SetSnapshot(); @@ -457,9 +455,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { ColumnFamilyOptions cf_options; // Create 2 new column families - s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + s = txn_db->CreateColumnFamily(cf_options, "CFA", &cfa); ASSERT_OK(s); - s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + s = txn_db->CreateColumnFamily(cf_options, "CFB", &cfb); ASSERT_OK(s); delete cfa; @@ -482,7 +480,6 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { &txn_db); ASSERT_OK(s); assert(txn_db != nullptr); - db = txn_db->GetBaseDB(); Transaction* txn = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn); @@ -499,9 +496,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { batch.Put("foo", "foo"); batch.Put(handles[1], "AAA", "bar"); batch.Put(handles[1], "AAAZZZ", "bar"); - s = db->Write(write_options, &batch); + s = txn_db->Write(write_options, &batch); ASSERT_OK(s); - db->Delete(write_options, handles[1], "AAAZZZ"); + txn_db->Delete(write_options, handles[1], "AAAZZZ"); // These keys do no conflict with existing writes since they're in // different column families @@ -516,9 +513,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { // Txn should commit s = txn->Commit(); ASSERT_OK(s); - s = db->Get(read_options, "AAA", &value); + s = txn_db->Get(read_options, "AAA", &value); ASSERT_TRUE(s.IsNotFound()); - s = db->Get(read_options, handles[2], "AAAZZZ", &value); + s = txn_db->Get(read_options, handles[2], "AAAZZZ", &value); ASSERT_EQ(value, "barbar"); Slice key_slices[3] = {Slice("AAA"), Slice("ZZ"), Slice("Z")}; @@ -534,7 +531,7 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { // Verify txn did not commit s = txn2->Commit(); ASSERT_TRUE(s.IsBusy()); - s = db->Get(read_options, handles[1], "AAAZZZ", &value); + s = txn_db->Get(read_options, handles[1], "AAAZZZ", &value); ASSERT_EQ(value, "barbar"); delete txn; @@ -572,11 +569,11 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { // Txn should commit s = txn->Commit(); ASSERT_OK(s); - s = db->Get(read_options, handles[2], "ZZZ", &value); + s = txn_db->Get(read_options, handles[2], "ZZZ", &value); ASSERT_TRUE(s.IsNotFound()); // Put a key which will conflict with the next txn using the previous snapshot - db->Put(write_options, handles[2], "foo", "000"); + txn_db->Put(write_options, handles[2], "foo", "000"); results = txn2->MultiGetForUpdate(snapshot_read_options, multiget_cfh, multiget_keys, &values); @@ -592,9 +589,9 @@ TEST_F(OptimisticTransactionTest, ColumnFamiliesTest) { s = txn2->Commit(); ASSERT_TRUE(s.IsBusy()); - s = db->DropColumnFamily(handles[1]); + s = txn_db->DropColumnFamily(handles[1]); ASSERT_OK(s); - s = db->DropColumnFamily(handles[2]); + s = txn_db->DropColumnFamily(handles[2]); ASSERT_OK(s); delete txn; @@ -611,7 +608,7 @@ TEST_F(OptimisticTransactionTest, EmptyTest) { string value; Status s; - s = db->Put(write_options, "aaa", "aaa"); + s = txn_db->Put(write_options, "aaa", "aaa"); ASSERT_OK(s); Transaction* txn = txn_db->BeginTransaction(write_options); @@ -636,7 +633,7 @@ TEST_F(OptimisticTransactionTest, EmptyTest) { s = txn->GetForUpdate(read_options, "aaa", &value); ASSERT_EQ(value, "aaa"); - s = db->Put(write_options, "aaa", "xxx"); + s = txn_db->Put(write_options, "aaa", "xxx"); s = txn->Commit(); ASSERT_TRUE(s.IsBusy()); delete txn; @@ -799,7 +796,7 @@ TEST_F(OptimisticTransactionTest, LostUpdate) { delete txn1; delete txn2; - s = db->Get(read_options, "1", &value); + s = txn_db->Get(read_options, "1", &value); ASSERT_OK(s); ASSERT_EQ(value, "8"); } @@ -814,7 +811,7 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) { Transaction* txn = txn_db->BeginTransaction(write_options); txn->PutUntracked("untracked", "0"); txn->Rollback(); - s = db->Get(read_options, "untracked", &value); + s = txn_db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; @@ -827,13 +824,13 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) { // Write to the untracked key outside of the transaction and verify // it doesn't prevent the transaction from committing. - s = db->Put(write_options, "untracked", "x"); + s = txn_db->Put(write_options, "untracked", "x"); ASSERT_OK(s); s = txn->Commit(); ASSERT_OK(s); - s = db->Get(read_options, "untracked", &value); + s = txn_db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; @@ -844,12 +841,12 @@ TEST_F(OptimisticTransactionTest, UntrackedWrites) { // Write to tracked key outside of the transaction and verify that the // untracked keys are not written when the commit fails. - s = db->Delete(write_options, "tracked"); + s = txn_db->Delete(write_options, "tracked"); s = txn->Commit(); ASSERT_TRUE(s.IsBusy()); - s = db->Get(read_options, "untracked", &value); + s = txn_db->Get(read_options, "untracked", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; @@ -863,19 +860,19 @@ TEST_F(OptimisticTransactionTest, IteratorTest) { Status s; // Write some keys to the db - s = db->Put(write_options, "A", "a"); + s = txn_db->Put(write_options, "A", "a"); ASSERT_OK(s); - s = db->Put(write_options, "G", "g"); + s = txn_db->Put(write_options, "G", "g"); ASSERT_OK(s); - s = db->Put(write_options, "F", "f"); + s = txn_db->Put(write_options, "F", "f"); ASSERT_OK(s); - s = db->Put(write_options, "C", "c"); + s = txn_db->Put(write_options, "C", "c"); ASSERT_OK(s); - s = db->Put(write_options, "D", "d"); + s = txn_db->Put(write_options, "D", "d"); ASSERT_OK(s); Transaction* txn = txn_db->BeginTransaction(write_options); @@ -898,10 +895,10 @@ TEST_F(OptimisticTransactionTest, IteratorTest) { const Snapshot* snapshot = txn->GetSnapshot(); // Write some keys to the db after the snapshot - s = db->Put(write_options, "BB", "xx"); + s = txn_db->Put(write_options, "BB", "xx"); ASSERT_OK(s); - s = db->Put(write_options, "C", "xx"); + s = txn_db->Put(write_options, "C", "xx"); ASSERT_OK(s); read_options.snapshot = snapshot; @@ -995,7 +992,7 @@ TEST_F(OptimisticTransactionTest, SavepointTest) { s = txn->Commit(); ASSERT_OK(s); - s = db->Get(read_options, "B", &value); + s = txn_db->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); @@ -1107,28 +1104,28 @@ TEST_F(OptimisticTransactionTest, SavepointTest) { s = txn->Commit(); ASSERT_OK(s); - s = db->Get(read_options, "F", &value); + s = txn_db->Get(read_options, "F", &value); ASSERT_OK(s); ASSERT_EQ("f", value); - s = db->Get(read_options, "G", &value); + s = txn_db->Get(read_options, "G", &value); ASSERT_TRUE(s.IsNotFound()); - s = db->Get(read_options, "A", &value); + s = txn_db->Get(read_options, "A", &value); ASSERT_OK(s); ASSERT_EQ("aa", value); - s = db->Get(read_options, "B", &value); + s = txn_db->Get(read_options, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); - s = db->Get(read_options, "C", &value); + s = txn_db->Get(read_options, "C", &value); ASSERT_TRUE(s.IsNotFound()); - s = db->Get(read_options, "D", &value); + s = txn_db->Get(read_options, "D", &value); ASSERT_TRUE(s.IsNotFound()); - s = db->Get(read_options, "E", &value); + s = txn_db->Get(read_options, "E", &value); ASSERT_TRUE(s.IsNotFound()); delete txn; @@ -1141,7 +1138,7 @@ TEST_F(OptimisticTransactionTest, UndoGetForUpdateTest) { string value; Status s; - db->Put(write_options, "A", ""); + txn_db->Put(write_options, "A", ""); Transaction* txn1 = txn_db->BeginTransaction(write_options); ASSERT_TRUE(txn1); @@ -1350,7 +1347,7 @@ TEST_F(OptimisticTransactionTest, OptimisticTransactionStressTest) { } // Verify that data is consistent - Status s = RandomTransactionInserter::Verify(db, num_sets); + Status s = RandomTransactionInserter::Verify(txn_db, num_sets); ASSERT_OK(s); }