提交 d6b9b3b8 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

Enhance transaction_test_util with delays (#4970)

Summary:
Enhance ::Insert and ::Verify test functions to add artificial delay between prepare and commit, and take snapshot and reads respectively.  A future PR will make use of these to improve stress tests to test against long-running transactions as well as long-running backup jobs. Also randomly sets set_snapshot to false for inserters to skip setting the snapshot in the initialization phase and let the snapshot be taken later explicitly.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4970

Differential Revision: D14031342

Pulled By: maysamyabandeh

fbshipit-source-id: b52b453751f0b25b81b23c48892bc1d152464cab
上级 576d2d6c
......@@ -21,6 +21,10 @@
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "db/dbformat.h"
#include "db/snapshot_impl.h"
#include "util/logging.h"
#include "util/random.h"
#include "util/string_util.h"
......@@ -28,13 +32,15 @@ namespace rocksdb {
RandomTransactionInserter::RandomTransactionInserter(
Random64* rand, const WriteOptions& write_options,
const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets)
const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets,
const uint64_t cmt_delay_ms, const uint64_t first_id)
: rand_(rand),
write_options_(write_options),
read_options_(read_options),
num_keys_(num_keys),
num_sets_(num_sets),
txn_id_(0) {}
txn_id_(first_id),
cmt_delay_ms_(cmt_delay_ms) {}
RandomTransactionInserter::~RandomTransactionInserter() {
if (txn_ != nullptr) {
......@@ -51,17 +57,18 @@ bool RandomTransactionInserter::TransactionDBInsert(
std::hash<std::thread::id> hasher;
char name[64];
snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d",
snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64,
hasher(std::this_thread::get_id()), txn_id_++);
assert(strlen(name) < 64 - 1);
txn_->SetName(name);
assert(txn_->SetName(name).ok());
bool take_snapshot = rand_->OneIn(2);
// Take a snapshot if set_snapshot was not set or with 50% change otherwise
bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2);
if (take_snapshot) {
txn_->SetSnapshot();
read_options_.snapshot = txn_->GetSnapshot();
}
auto res = DoInsert(nullptr, txn_, false);
auto res = DoInsert(db, txn_, false);
if (take_snapshot) {
read_options_.snapshot = nullptr;
}
......@@ -74,7 +81,7 @@ bool RandomTransactionInserter::OptimisticTransactionDBInsert(
optimistic_txn_ =
db->BeginTransaction(write_options_, txn_options, optimistic_txn_);
return DoInsert(nullptr, optimistic_txn_, true);
return DoInsert(db, optimistic_txn_, true);
}
bool RandomTransactionInserter::DBInsert(DB* db) {
......@@ -178,20 +185,39 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
}
bytes_inserted_ += key.size() + sum.size();
}
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
"Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64
"+%" PRIu64 "=%" PRIu64,
txn->GetName().c_str(), s.ToString().c_str(),
txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(),
int_value, incr, int_value + incr);
}
if (s.ok()) {
if (txn != nullptr) {
if (!is_optimistic && !rand_->OneIn(10)) {
// also try commit without prpare
bool with_prepare = !is_optimistic && !rand_->OneIn(10);
if (with_prepare) {
// Also try commit without prepare
s = txn->Prepare();
assert(s.ok());
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
"Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
s.ToString().c_str(), txn->GetName().c_str());
db->GetDBOptions().env->SleepForMicroseconds(
static_cast<int>(cmt_delay_ms_ * 1000));
}
if (!rand_->OneIn(20)) {
s = txn->Commit();
assert(!with_prepare || s.ok());
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
"Commit of %" PRIu64 " %s (%s)", txn->GetId(),
s.ToString().c_str(), txn->GetName().c_str());
} else {
// Also try 5% rollback
s = txn->Rollback();
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
"Rollback %" PRIu64 " %s %s", txn->GetId(),
txn->GetName().c_str(), s.ToString().c_str());
assert(s.ok());
}
assert(is_optimistic || s.ok());
......@@ -226,6 +252,8 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
}
}
} else {
ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s",
s.ToString().c_str(), txn->GetName().c_str());
if (txn != nullptr) {
assert(txn->Rollback().ok());
}
......@@ -246,7 +274,11 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
// Verify that the sum of the keys in each set are equal
Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
uint64_t num_keys_per_set,
bool take_snapshot, Random64* rand) {
bool take_snapshot, Random64* rand,
uint64_t delay_ms) {
// delay_ms is the delay between taking a snapshot and doing the reads. It
// emulates reads from a long-running backup job.
assert(delay_ms == 0 || take_snapshot);
uint64_t prev_total = 0;
uint32_t prev_i = 0;
bool prev_assigned = false;
......@@ -254,6 +286,8 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
ReadOptions roptions;
if (take_snapshot) {
roptions.snapshot = db->GetSnapshot();
db->GetDBOptions().env->SleepForMicroseconds(
static_cast<int>(delay_ms * 1000));
}
std::vector<uint16_t> set_vec(num_sets);
......@@ -271,7 +305,9 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
// Use either point lookup or iterator. Point lookups are slower so we use
// it less often.
if (num_keys_per_set != 0 && rand && rand->OneIn(10)) { // use point lookup
const bool use_point_lookup =
num_keys_per_set != 0 && rand && rand->OneIn(10);
if (use_point_lookup) {
ReadOptions read_options;
for (uint64_t k = 0; k < num_keys_per_set; k++) {
std::string dont_care;
......@@ -299,17 +335,37 @@ Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
value.ToString().c_str());
return Status::Corruption();
}
ROCKS_LOG_DEBUG(
db->GetDBOptions().info_log,
"VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64,
roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul,
roptions.snapshot
? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_
: 0ul,
key.size(), key.data(), int_value);
total += int_value;
}
delete iter;
}
if (prev_assigned && total != prev_total) {
db->GetDBOptions().info_log->Flush();
fprintf(stdout,
"RandomTransactionVerify found inconsistent totals. "
"Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 " \n",
prev_i, prev_total, set_i, total);
"RandomTransactionVerify found inconsistent totals using "
"pointlookup? %d "
"Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
" at snapshot %" PRIu64 "\n",
use_point_lookup, prev_i, prev_total, set_i, total,
roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
fflush(stdout);
return Status::Corruption();
} else {
ROCKS_LOG_DEBUG(
db->GetDBOptions().info_log,
"RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
" snap: %" PRIu64,
use_point_lookup, total,
roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
}
prev_total = total;
prev_i = set_i;
......
......@@ -35,10 +35,13 @@ class RandomTransactionInserter {
public:
// num_keys is the number of keys in each set.
// num_sets is the number of sets of keys.
// cmt_delay_ms is the delay between prepare (if there is any) and commit
// first_id is the id of the first transaction
explicit RandomTransactionInserter(
Random64* rand, const WriteOptions& write_options = WriteOptions(),
const ReadOptions& read_options = ReadOptions(), uint64_t num_keys = 1000,
uint16_t num_sets = 3);
uint16_t num_sets = 3, const uint64_t cmt_delay_ms = 0,
const uint64_t first_id = 0);
~RandomTransactionInserter();
......@@ -76,7 +79,8 @@ class RandomTransactionInserter {
// Returns OK if Invariant is true.
static Status Verify(DB* db, uint16_t num_sets, uint64_t num_keys_per_set = 0,
bool take_snapshot = false, Random64* rand = nullptr);
bool take_snapshot = false, Random64* rand = nullptr,
uint64_t delay_ms = 0);
// Returns the status of the previous Insert operation
Status GetLastStatus() { return last_status_; }
......@@ -116,7 +120,9 @@ class RandomTransactionInserter {
Transaction* txn_ = nullptr;
Transaction* optimistic_txn_ = nullptr;
std::atomic<int> txn_id_;
uint64_t txn_id_;
// The delay between ::Prepare and ::Commit
const uint64_t cmt_delay_ms_;
bool DoInsert(DB* db, Transaction* txn, bool is_optimistic);
};
......
......@@ -4999,7 +4999,9 @@ Status TransactionStressTestInserter(TransactionDB* db,
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
txn_options.set_snapshot = true;
// Inside the inserter we might also retake the snapshot. We do both since two
// separte functions are engaged for each.
txn_options.set_snapshot = _rand.OneIn(2);
RandomTransactionInserter inserter(&_rand, write_options, read_options,
num_keys_per_set,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册