提交 0f4244fe 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

WritePrepared: Improve stress tests with slow threads (#4974)

Summary:
The transaction stress tests, stress a high concurrency scenario. In WritePrepared/WriteUnPrepared we need to also stress the scenarios where an inserting/reading transaction is very slow. This would stress the corner cases that the caching is not sufficient and other slower data structures are engaged. To emulate such cases we make use of slow inserter/verifier threads and also reduce the size of cache data structures.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4974

Differential Revision: D14143070

Pulled By: maysamyabandeh

fbshipit-source-id: 81eb674678faf9fae0f654cd60ebcc74e26aeee7
上级 bcdc8c8b
......@@ -102,6 +102,7 @@ struct TransactionDBOptions {
friend class WritePreparedTxnDB;
friend class WritePreparedTransactionTestBase;
friend class MySQLStyleTransactionTest;
};
struct TransactionOptions {
......
......@@ -55,3 +55,7 @@ inline const char* RocksLogShorterFileName(const char* file)
#define ROCKS_LOG_BUFFER_MAX_SZ(LOG_BUF, MAX_LOG_SIZE, FMT, ...) \
rocksdb::LogToBuffer(LOG_BUF, MAX_LOG_SIZE, ROCKS_LOG_PREPEND_FILE_LINE(FMT), \
RocksLogShorterFileName(__FILE__), ##__VA_ARGS__)
#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \
; // due to overhead by default skip such lines
// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
......@@ -655,6 +655,9 @@ WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
if (snapshot != nullptr) {
ROCKS_LOG_DETAILS(dbimpl_->immutable_db_options().info_log,
"ReleaseSnapshot %" PRIu64 " Set",
snapshot->GetSequenceNumber());
db->ReleaseSnapshot(snapshot);
}
}
......
......@@ -66,12 +66,16 @@ INSTANTIATE_TEST_CASE_P(
#ifndef ROCKSDB_VALGRIND_RUN
INSTANTIATE_TEST_CASE_P(
MySQLStyleTransactionTest, MySQLStyleTransactionTest,
::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED),
std::make_tuple(false, true, WRITE_COMMITTED),
std::make_tuple(false, false, WRITE_PREPARED),
std::make_tuple(false, true, WRITE_PREPARED),
std::make_tuple(false, false, WRITE_UNPREPARED),
std::make_tuple(false, true, WRITE_UNPREPARED)));
::testing::Values(std::make_tuple(false, false, WRITE_COMMITTED, false),
std::make_tuple(false, true, WRITE_COMMITTED, false),
std::make_tuple(false, false, WRITE_PREPARED, false),
std::make_tuple(false, false, WRITE_PREPARED, true),
std::make_tuple(false, true, WRITE_PREPARED, false),
std::make_tuple(false, true, WRITE_PREPARED, true),
std::make_tuple(false, false, WRITE_UNPREPARED, false),
std::make_tuple(false, false, WRITE_UNPREPARED, true),
std::make_tuple(false, true, WRITE_UNPREPARED, false),
std::make_tuple(false, true, WRITE_UNPREPARED, true)));
#endif // ROCKSDB_VALGRIND_RUN
TEST_P(TransactionTest, DoubleEmptyWrite) {
......@@ -4990,10 +4994,14 @@ TEST_P(TransactionStressTest, ExpiredTransactionDataRace1) {
#ifndef ROCKSDB_VALGRIND_RUN
namespace {
// cmt_delay_ms is the delay between prepare and commit
// first_id is the id of the first transaction
Status TransactionStressTestInserter(TransactionDB* db,
const size_t num_transactions,
const size_t num_sets,
const size_t num_keys_per_set) {
const size_t num_keys_per_set,
const uint64_t cmt_delay_ms = 0,
const uint64_t first_id = 0) {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
Random64 _rand(seed);
WriteOptions write_options;
......@@ -5003,9 +5011,9 @@ Status TransactionStressTestInserter(TransactionDB* db,
// separte functions are engaged for each.
txn_options.set_snapshot = _rand.OneIn(2);
RandomTransactionInserter inserter(&_rand, write_options, read_options,
num_keys_per_set,
static_cast<uint16_t>(num_sets));
RandomTransactionInserter inserter(
&_rand, write_options, read_options, num_keys_per_set,
static_cast<uint16_t>(num_sets), cmt_delay_ms, first_id);
for (size_t t = 0; t < num_transactions; t++) {
bool success = inserter.TransactionDBInsert(db, txn_options);
......@@ -5017,7 +5025,8 @@ Status TransactionStressTestInserter(TransactionDB* db,
// Make sure at least some of the transactions succeeded. It's ok if
// some failed due to write-conflicts.
if (inserter.GetFailureCount() > num_transactions / 2) {
if (num_transactions != 1 &&
inserter.GetFailureCount() > num_transactions / 2) {
return Status::TryAgain("Too many transactions failed! " +
std::to_string(inserter.GetFailureCount()) + " / " +
std::to_string(num_transactions));
......@@ -5035,6 +5044,8 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
ReOpenNoDelete();
const size_t num_workers = 4; // worker threads count
const size_t num_checkers = 2; // checker threads count
const size_t num_slow_checkers = 2; // checker threads emulating backups
const size_t num_slow_workers = 1; // slow worker threads count
const size_t num_transactions_per_thread = 10000;
const uint16_t num_sets = 3;
const size_t num_keys_per_set = 100;
......@@ -5060,6 +5071,28 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
ASSERT_OK(s);
}
};
std::function<void()> call_slow_checker = [&] {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
Random64 rand(seed);
// Verify that data is consistent
while (finished < num_workers) {
uint64_t delay_ms = rand.Uniform(100) + 1;
Status s = RandomTransactionInserter::Verify(
db, num_sets, num_keys_per_set, TAKE_SNAPSHOT, &rand, delay_ms);
ASSERT_OK(s);
}
};
std::function<void()> call_slow_inserter = [&] {
size_t seed = std::hash<std::thread::id>()(std::this_thread::get_id());
Random64 rand(seed);
uint64_t id = 0;
// Verify that data is consistent
while (finished < num_workers) {
uint64_t delay_ms = rand.Uniform(500) + 1;
ASSERT_OK(TransactionStressTestInserter(db, 1, num_sets, num_keys_per_set,
delay_ms, id++));
}
};
for (uint32_t i = 0; i < num_workers; i++) {
threads.emplace_back(call_inserter);
......@@ -5067,6 +5100,14 @@ TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
for (uint32_t i = 0; i < num_checkers; i++) {
threads.emplace_back(call_checker);
}
if (with_slow_threads_) {
for (uint32_t i = 0; i < num_slow_checkers; i++) {
threads.emplace_back(call_slow_checker);
}
for (uint32_t i = 0; i < num_slow_workers; i++) {
threads.emplace_back(call_slow_inserter);
}
}
// Wait for all threads to finish
for (auto& t : threads) {
......
......@@ -448,6 +448,31 @@ class TransactionTest : public TransactionTestBase,
class TransactionStressTest : public TransactionTest {};
class MySQLStyleTransactionTest : public TransactionTest {};
class MySQLStyleTransactionTest
: public TransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy, bool>> {
public:
MySQLStyleTransactionTest()
: TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam())),
with_slow_threads_(std::get<3>(GetParam())) {
if (with_slow_threads_ &&
(txn_db_options.write_policy == WRITE_PREPARED ||
txn_db_options.write_policy == WRITE_UNPREPARED)) {
// The corner case with slow threads involves the caches filling
// over which would not happen even with artifial delays. To help
// such cases to show up we lower the size of the cache-related data
// structures.
txn_db_options.wp_snapshot_cache_bits = 1;
txn_db_options.wp_commit_cache_bits = 10;
EXPECT_OK(ReOpen());
}
};
protected:
// Also emulate slow threads by addin artiftial delays
const bool with_slow_threads_;
};
} // namespace rocksdb
......@@ -34,10 +34,6 @@
namespace rocksdb {
#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \
; // due to overhead by default skip such lines
// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__)
// A PessimisticTransactionDB that writes data to DB after prepare phase of 2PC.
// In this way some data in the DB might not be committed. The DB provides
// mechanisms to tell such data apart from committed data.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册