diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 7257c9880cf5ae54100f3fadc492227dda232536..af7b7694ddf53a49177d0261ffde5378f7d029e1 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -36,6 +36,27 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(std::make_tuple(false, false, WRITE_UNPREPARED), std::make_tuple(false, true, WRITE_UNPREPARED))); +enum StressAction { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT }; +class WriteUnpreparedStressTest : public WriteUnpreparedTransactionTestBase, + virtual public ::testing::WithParamInterface< + std::tuple> { + public: + WriteUnpreparedStressTest() + : WriteUnpreparedTransactionTestBase(false, std::get<0>(GetParam()), + WRITE_UNPREPARED), + action_(std::get<1>(GetParam())) {} + StressAction action_; +}; + +INSTANTIATE_TEST_CASE_P( + WriteUnpreparedStressTest, WriteUnpreparedStressTest, + ::testing::Values(std::make_tuple(false, NO_SNAPSHOT), + std::make_tuple(false, RO_SNAPSHOT), + std::make_tuple(false, REFRESH_SNAPSHOT), + std::make_tuple(true, NO_SNAPSHOT), + std::make_tuple(true, RO_SNAPSHOT), + std::make_tuple(true, REFRESH_SNAPSHOT))); + TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { // The following tests checks whether reading your own write for // a transaction works for write unprepared, when there are uncommitted @@ -116,7 +137,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { } #ifndef ROCKSDB_VALGRIND_RUN -TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) { +TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) { // This is a stress test where different threads are writing random keys, and // then before committing or aborting the transaction, it validates to see // that it can read the keys it wrote, and the keys it did not write respect @@ -129,170 +150,167 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWriteStress) { std::default_random_engine rand(static_cast( std::hash()(std::this_thread::get_id()))); - enum Action { NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT }; // Test with // 1. no snapshots set // 2. snapshot set on ReadOptions // 3. snapshot set, and refreshing after every write. - for (Action a : {NO_SNAPSHOT, RO_SNAPSHOT, REFRESH_SNAPSHOT}) { - WriteOptions write_options; - txn_db_options.transaction_lock_timeout = -1; - options.disable_auto_compactions = true; - ReOpen(); + StressAction a = action_; + WriteOptions write_options; + txn_db_options.transaction_lock_timeout = -1; + options.disable_auto_compactions = true; + ReOpen(); - std::vector keys; - for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) { - keys.push_back("k" + ToString(k)); - } - std::shuffle(keys.begin(), keys.end(), rand); - - // This counter will act as a "sequence number" to help us validate - // visibility logic with snapshots. If we had direct access to the seqno of - // snapshots and key/values, then we should directly compare those instead. - std::atomic counter(0); - - std::function stress_thread = [&](int id) { - size_t tid = std::hash()(std::this_thread::get_id()); - Random64 rnd(static_cast(tid)); - - Transaction* txn; - TransactionOptions txn_options; - // batch_size of 1 causes writes to DB for every marker. - txn_options.write_batch_flush_threshold = 1; - ReadOptions read_options; - - for (uint32_t i = 0; i < kNumIter; i++) { - std::set owned_keys(&keys[id * kNumKeys], - &keys[(id + 1) * kNumKeys]); - // Add unowned keys to make the workload more interesting, but this - // increases row lock contention, so just do it sometimes. - if (rnd.OneIn(2)) { - owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]); - } + std::vector keys; + for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) { + keys.push_back("k" + ToString(k)); + } + std::shuffle(keys.begin(), keys.end(), rand); - txn = db->BeginTransaction(write_options, txn_options); - txn->SetName(ToString(id)); - txn->SetSnapshot(); - if (a >= RO_SNAPSHOT) { - read_options.snapshot = txn->GetSnapshot(); - ASSERT_TRUE(read_options.snapshot != nullptr); - } + // This counter will act as a "sequence number" to help us validate + // visibility logic with snapshots. If we had direct access to the seqno of + // snapshots and key/values, then we should directly compare those instead. + std::atomic counter(0); - uint64_t buf[2]; - buf[0] = id; + std::function stress_thread = [&](int id) { + size_t tid = std::hash()(std::this_thread::get_id()); + Random64 rnd(static_cast(tid)); - // When scanning through the database, make sure that all unprepared - // keys have value >= snapshot and all other keys have value < snapshot. - int64_t snapshot_num = counter.fetch_add(1); + Transaction* txn; + TransactionOptions txn_options; + // batch_size of 1 causes writes to DB for every marker. + txn_options.write_batch_flush_threshold = 1; + ReadOptions read_options; + + for (uint32_t i = 0; i < kNumIter; i++) { + std::set owned_keys(&keys[id * kNumKeys], + &keys[(id + 1) * kNumKeys]); + // Add unowned keys to make the workload more interesting, but this + // increases row lock contention, so just do it sometimes. + if (rnd.OneIn(2)) { + owned_keys.insert(keys[rnd.Uniform(kNumKeys * kNumThreads)]); + } - Status s; - for (const auto& key : owned_keys) { - buf[1] = counter.fetch_add(1); - s = txn->Put(key, Slice((const char*)buf, sizeof(buf))); - if (!s.ok()) { - break; - } - if (a == REFRESH_SNAPSHOT) { - txn->SetSnapshot(); - read_options.snapshot = txn->GetSnapshot(); - snapshot_num = counter.fetch_add(1); - } - } + txn = db->BeginTransaction(write_options, txn_options); + txn->SetName(ToString(id)); + txn->SetSnapshot(); + if (a >= RO_SNAPSHOT) { + read_options.snapshot = txn->GetSnapshot(); + ASSERT_TRUE(read_options.snapshot != nullptr); + } + + uint64_t buf[2]; + buf[0] = id; - // Failure is possible due to snapshot validation. In this case, - // rollback and move onto next iteration. + // When scanning through the database, make sure that all unprepared + // keys have value >= snapshot and all other keys have value < snapshot. + int64_t snapshot_num = counter.fetch_add(1); + + Status s; + for (const auto& key : owned_keys) { + buf[1] = counter.fetch_add(1); + s = txn->Put(key, Slice((const char*)buf, sizeof(buf))); if (!s.ok()) { - ASSERT_TRUE(s.IsBusy()); - ASSERT_OK(txn->Rollback()); - delete txn; - continue; + break; } + if (a == REFRESH_SNAPSHOT) { + txn->SetSnapshot(); + read_options.snapshot = txn->GetSnapshot(); + snapshot_num = counter.fetch_add(1); + } + } - auto verify_key = [&owned_keys, &a, &id, &snapshot_num]( - const std::string& key, - const std::string& value) { - if (owned_keys.count(key) > 0) { - ASSERT_EQ(value.size(), 16); - - // Since this key is part of owned_keys, then this key must be - // unprepared by this transaction identified by 'id' - ASSERT_EQ(((int64_t*)value.c_str())[0], id); - if (a == REFRESH_SNAPSHOT) { - // If refresh snapshot is true, then the snapshot is refreshed - // after every Put(), meaning that the current snapshot in - // snapshot_num must be greater than the "seqno" of any keys - // written by the current transaction. - ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); - } else { - // If refresh snapshot is not on, then the snapshot was taken at - // the beginning of the transaction, meaning all writes must come - // after snapshot_num - ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num); - } - } else if (a >= RO_SNAPSHOT) { - // If this is not an unprepared key, just assert that the key - // "seqno" is smaller than the snapshot seqno. - ASSERT_EQ(value.size(), 16); + // Failure is possible due to snapshot validation. In this case, + // rollback and move onto next iteration. + if (!s.ok()) { + ASSERT_TRUE(s.IsBusy()); + ASSERT_OK(txn->Rollback()); + delete txn; + continue; + } + + auto verify_key = [&owned_keys, &a, &id, &snapshot_num]( + const std::string& key, const std::string& value) { + if (owned_keys.count(key) > 0) { + ASSERT_EQ(value.size(), 16); + + // Since this key is part of owned_keys, then this key must be + // unprepared by this transaction identified by 'id' + ASSERT_EQ(((int64_t*)value.c_str())[0], id); + if (a == REFRESH_SNAPSHOT) { + // If refresh snapshot is true, then the snapshot is refreshed + // after every Put(), meaning that the current snapshot in + // snapshot_num must be greater than the "seqno" of any keys + // written by the current transaction. ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); + } else { + // If refresh snapshot is not on, then the snapshot was taken at + // the beginning of the transaction, meaning all writes must come + // after snapshot_num + ASSERT_GT(((int64_t*)value.c_str())[1], snapshot_num); } - }; - - // Validate Get()/Next()/Prev(). Do only one of them to save time, and - // reduce lock contention. - switch (rnd.Uniform(3)) { - case 0: // Validate Get() - { - for (const auto& key : keys) { - std::string value; - s = txn->Get(read_options, Slice(key), &value); - if (!s.ok()) { - ASSERT_TRUE(s.IsNotFound()); - ASSERT_EQ(owned_keys.count(key), 0); - } else { - verify_key(key, value); - } - } - break; - } - case 1: // Validate Next() - { - Iterator* iter = txn->GetIterator(read_options); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - verify_key(iter->key().ToString(), iter->value().ToString()); + } else if (a >= RO_SNAPSHOT) { + // If this is not an unprepared key, just assert that the key + // "seqno" is smaller than the snapshot seqno. + ASSERT_EQ(value.size(), 16); + ASSERT_LT(((int64_t*)value.c_str())[1], snapshot_num); + } + }; + + // Validate Get()/Next()/Prev(). Do only one of them to save time, and + // reduce lock contention. + switch (rnd.Uniform(3)) { + case 0: // Validate Get() + { + for (const auto& key : keys) { + std::string value; + s = txn->Get(read_options, Slice(key), &value); + if (!s.ok()) { + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ(owned_keys.count(key), 0); + } else { + verify_key(key, value); } - delete iter; - break; } - case 2: // Validate Prev() - { - Iterator* iter = txn->GetIterator(read_options); - for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { - verify_key(iter->key().ToString(), iter->value().ToString()); - } - delete iter; - break; + break; + } + case 1: // Validate Next() + { + Iterator* iter = txn->GetIterator(read_options); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + verify_key(iter->key().ToString(), iter->value().ToString()); } - default: - ASSERT_TRUE(false); + delete iter; + break; } - - if (rnd.OneIn(2)) { - ASSERT_OK(txn->Commit()); - } else { - ASSERT_OK(txn->Rollback()); + case 2: // Validate Prev() + { + Iterator* iter = txn->GetIterator(read_options); + for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { + verify_key(iter->key().ToString(), iter->value().ToString()); + } + delete iter; + break; } - delete txn; + default: + ASSERT_TRUE(false); } - }; - std::vector threads; - for (uint32_t i = 0; i < kNumThreads; i++) { - threads.emplace_back(stress_thread, i); + if (rnd.OneIn(2)) { + ASSERT_OK(txn->Commit()); + } else { + ASSERT_OK(txn->Rollback()); + } + delete txn; } + }; - for (auto& t : threads) { - t.join(); - } + std::vector threads; + for (uint32_t i = 0; i < kNumThreads; i++) { + threads.emplace_back(stress_thread, i); + } + + for (auto& t : threads) { + t.join(); } } #endif // ROCKSDB_VALGRIND_RUN