提交 60beefd6 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

WritePrepared Txn: Advance seq one per batch

Summary:
By default the seq number in DB is increased once per written key. WritePrepared txns requires the seq to be increased once per the entire batch so that the seq would be used as the prepare timestamp by which the transaction is identified. Also we need to increase seq for the commit marker since it would give a unique id to the commit timestamp of transactions.

Two unit tests are added to verify our understanding of how the seq should be increased. The recovery path requires much more work and is left to another patch.
Closes https://github.com/facebook/rocksdb/pull/2885

Differential Revision: D5837843

Pulled By: maysamyabandeh

fbshipit-source-id: a08960b93d727e1cf438c254d0c2636fb133cc1c
上级 c57050b7
......@@ -191,7 +191,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
refitting_level_(false),
opened_successfully_(false),
concurrent_prepare_(options.concurrent_prepare),
manual_wal_flush_(options.manual_wal_flush) {
manual_wal_flush_(options.manual_wal_flush),
seq_per_batch_(options.seq_per_batch) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
......
......@@ -772,7 +772,7 @@ class DBImpl : public DB {
Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used, SequenceNumber* last_sequence,
int total_count);
size_t seq_inc);
// Used by WriteImpl to update bg_error_ if paranoid check is enabled.
void WriteCallbackStatusCheck(const Status& status);
......@@ -1267,6 +1267,7 @@ class DBImpl : public DB {
// 2PC these are the writes at Prepare phase.
const bool concurrent_prepare_;
const bool manual_wal_flush_;
const bool seq_per_batch_;
};
extern Options SanitizeOptions(const std::string& db,
......
......@@ -674,7 +674,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */,
next_sequence, &has_valid_writes);
next_sequence, &has_valid_writes, seq_per_batch_);
MaybeIgnoreError(&status);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
......
......@@ -68,6 +68,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with seq_per_batch");
}
Status status;
if (write_options.low_pri) {
......@@ -184,7 +188,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// more than once to a particular key.
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size > 1;
int total_count = 0;
size_t total_count = 0;
uint64_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
......@@ -197,6 +201,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
size_t seq_inc = seq_per_batch_ ? write_group.size : total_count;
const bool concurrent_update = concurrent_prepare_;
// Update stats while we are an exclusive group leader, so we know
......@@ -238,15 +243,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// LastToBeWrittenSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
total_count);
seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count);
last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc);
}
}
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count;
last_sequence += seq_inc;
if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time);
......@@ -255,12 +260,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*recovery_log_number*/, this);
0 /*recovery_log_number*/, this, parallel, seq_per_batch_);
} else {
SequenceNumber next_sequence = current_sequence;
for (auto* writer : write_group) {
if (writer->ShouldWriteToMemtable()) {
writer->sequence = next_sequence;
}
if (seq_per_batch_) {
next_sequence++;
} else if (writer->ShouldWriteToMemtable()) {
next_sequence += WriteBatchInternal::Count(writer->batch);
}
}
......@@ -281,9 +290,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*concurrent_memtable_writes*/);
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
if (seq_used != nullptr) {
*seq_used = w.sequence;
}
}
}
......@@ -427,7 +436,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this);
0 /*log_number*/, this, seq_per_batch_);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
......@@ -521,12 +530,16 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_wal_time);
// LastToBeWrittenSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
0 /*total_count*/);
size_t seq_inc = seq_per_batch_ ? write_group.size : 0 /*total_count*/;
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
auto curr_seq = last_sequence + 1;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
writer->sequence = curr_seq;
}
if (seq_per_batch_) {
curr_seq++;
} else if (writer->CheckCallback(this)) {
curr_seq += WriteBatchInternal::Count(writer->batch);
}
}
......@@ -778,7 +791,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used,
SequenceNumber* last_sequence,
int total_count) {
size_t seq_inc) {
Status status;
WriteBatch tmp_batch;
......@@ -796,7 +809,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
writer->log_used = logfile_number_;
}
}
*last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count);
*last_sequence = versions_->FetchAddLastToBeWrittenSequence(seq_inc);
auto sequence = *last_sequence + 1;
WriteBatchInternal::SetSequence(merged_batch, sequence);
......
......@@ -144,7 +144,7 @@ WriteBatch::WriteBatch(const WriteBatch& src)
max_bytes_(src.max_bytes_),
rep_(src.rep_) {}
WriteBatch::WriteBatch(WriteBatch&& src)
WriteBatch::WriteBatch(WriteBatch&& src) noexcept
: save_points_(std::move(src.save_points_)),
wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
......@@ -366,6 +366,7 @@ Status WriteBatch::Iterate(Handler* handler) const {
input.remove_prefix(WriteBatchInternal::kHeader);
Slice key, value, blob, xid;
bool first_tag = true;
int found = 0;
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
......@@ -438,10 +439,12 @@ Status WriteBatch::Iterate(Handler* handler) const {
handler->MarkRollback(xid);
break;
case kTypeNoop:
handler->MarkNoop(first_tag);
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
first_tag = false;
}
if (!s.ok()) {
return s;
......@@ -838,6 +841,9 @@ class MemTableInserter : public WriteBatch::Handler {
PostMapType mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery)
WriteBatch* rebuilding_trx_;
// Increase seq number once per each write batch. Otherwise increase it once
// per key.
bool seq_per_batch_;
MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
......@@ -848,26 +854,27 @@ class MemTableInserter : public WriteBatch::Handler {
return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
}
public:
public:
// cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr) {
assert(cf_mems_);
MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr, bool seq_per_batch = false)
: sequence_(_sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr),
seq_per_batch_(seq_per_batch) {
assert(cf_mems_);
}
~MemTableInserter() {
......@@ -880,6 +887,12 @@ public:
MemTableInserter(const MemTableInserter&) = delete;
MemTableInserter& operator=(const MemTableInserter&) = delete;
void MaybeAdvanceSeq(bool batch_boundry = false) {
if (batch_boundry == seq_per_batch_) {
sequence_++;
}
}
void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
SequenceNumber sequence() const { return sequence_; }
......@@ -944,7 +957,7 @@ public:
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
MaybeAdvanceSeq();
return seek_status;
}
......@@ -998,7 +1011,7 @@ public:
// Since all Puts are logged in trasaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
}
......@@ -1008,7 +1021,7 @@ public:
MemTable* mem = cf_mems_->GetMemTable();
mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
sequence_++;
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
}
......@@ -1022,7 +1035,7 @@ public:
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
MaybeAdvanceSeq();
return seek_status;
}
......@@ -1038,7 +1051,7 @@ public:
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
MaybeAdvanceSeq();
return seek_status;
}
......@@ -1056,7 +1069,7 @@ public:
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
MaybeAdvanceSeq();
return seek_status;
}
if (db_ != nullptr) {
......@@ -1086,7 +1099,7 @@ public:
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
MaybeAdvanceSeq();
return seek_status;
}
......@@ -1154,7 +1167,7 @@ public:
mem->Add(sequence_, kTypeMerge, key, value);
}
sequence_++;
MaybeAdvanceSeq();
CheckMemtableFull();
return Status::OK();
}
......@@ -1190,11 +1203,6 @@ public:
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
} else {
// in non-recovery we ignore prepare markers
// and insert the values directly. making sure we have a
// log for each insertion to reference.
assert(log_number_ref_ > 0);
}
return Status::OK();
......@@ -1211,9 +1219,23 @@ public:
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
assert(log_number_ref_ > 0);
}
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
return Status::OK();
}
Status MarkNoop(bool first_tag) override {
// A hack in pessimistic transaction could result into a noop at the start
// of the write batch, that should be ignored.
if (!first_tag) {
// In the absence of Prepare markers, a kTypeNoop tag indicates the end of
// a batch. This happens when write batch commits skipping the prepare
// phase.
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
}
return Status::OK();
}
......@@ -1238,6 +1260,8 @@ public:
// all insertes must reference this trx log number
log_number_ref_ = trx->log_number_;
s = trx->batch_->Iterate(this);
// TODO(myabandeh): In WritePrepared txn, a commit marker should
// reference the log that contains the prepare marker.
log_number_ref_ = 0;
if (s.ok()) {
......@@ -1248,8 +1272,15 @@ public:
}
}
} else {
// in non recovery we simply ignore this tag
// TODO(myabandeh): In WritePrepared txn, a commit marker should
// reference the log that contains the prepare marker. This is to be able
// to reconsutrct the prepared list after recovery.
// TODO(myabandeh): In WritePrepared txn, we do not reach here since
// disable_memtable is set for commit.
assert(log_number_ref_ > 0);
}
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
return s;
}
......@@ -1288,16 +1319,15 @@ public:
// 2) During Write(), in a single-threaded write thread
// 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes) {
Status WriteBatchInternal::InsertInto(
WriteThread::WriteGroup& write_group, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes, bool seq_per_batch) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, recovery_log_number,
db, concurrent_memtable_writes);
db, concurrent_memtable_writes,
nullptr /*has_valid_writes*/, seq_per_batch);
for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) {
continue;
......@@ -1337,13 +1367,14 @@ Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* last_seq_used, bool* has_valid_writes) {
SequenceNumber* next_seq, bool* has_valid_writes, bool seq_per_batch) {
MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes, has_valid_writes);
concurrent_memtable_writes, has_valid_writes,
seq_per_batch);
Status s = batch->Iterate(&inserter);
if (last_seq_used != nullptr) {
*last_seq_used = inserter.sequence();
if (next_seq != nullptr) {
*next_seq = inserter.sequence();
}
if (concurrent_memtable_writes) {
inserter.PostProcess();
......
......@@ -157,18 +157,20 @@ class WriteBatchInternal {
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false);
bool concurrent_memtable_writes = false,
bool seq_per_batch = false);
// Convenience form of InsertInto when you have only one batch
// last_seq_used returns the last sequnce number used in a MemTable insert
// next_seq returns the seq after last sequnce number used in MemTable insert
static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
SequenceNumber* last_seq_used = nullptr,
bool* has_valid_writes = nullptr);
SequenceNumber* next_seq = nullptr,
bool* has_valid_writes = nullptr,
bool seq_per_batch = false);
static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
......
......@@ -299,6 +299,10 @@ namespace {
seen += "MarkEndPrepare(" + xid.ToString() + ")";
return Status::OK();
}
virtual Status MarkNoop(bool first_tag) override {
seen += "MarkNoop(" + std::string(first_tag ? "true" : "false") + ")";
return Status::OK();
}
virtual Status MarkCommit(const Slice& xid) override {
seen += "MarkCommit(" + xid.ToString() + ")";
return Status::OK();
......
......@@ -899,6 +899,16 @@ struct DBOptions {
// relies on manual invocation of FlushWAL to write the WAL buffer to its
// file.
bool manual_wal_flush = false;
// Increase the sequence number after writing each batch, whether memtable is
// disabled for that or not. Otherwise the sequence number is increased after
// writing each key into memtable. This implies that when memtable_disable is
// set, the seq is not increased at all.
//
// Default: false
// Note: This option is experimental and meant to be used only for internal
// projects.
bool seq_per_batch = false;
};
// Options to control the behavior of a database (passed to DB::Open)
......
......@@ -244,6 +244,10 @@ class WriteBatch : public WriteBatchBase {
return Status::InvalidArgument("MarkEndPrepare() handler not defined.");
}
virtual Status MarkNoop(bool first_tag) {
return Status::InvalidArgument("MarkNoop() handler not defined.");
}
virtual Status MarkRollback(const Slice& xid) {
return Status::InvalidArgument(
"MarkRollbackPrepare() handler not defined.");
......@@ -303,7 +307,7 @@ class WriteBatch : public WriteBatchBase {
explicit WriteBatch(const std::string& rep);
WriteBatch(const WriteBatch& src);
WriteBatch(WriteBatch&& src);
WriteBatch(WriteBatch&& src) noexcept;
WriteBatch& operator=(const WriteBatch& src);
WriteBatch& operator=(WriteBatch&& src);
......
......@@ -88,7 +88,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
avoid_flush_during_recovery(options.avoid_flush_during_recovery),
allow_ingest_behind(options.allow_ingest_behind),
concurrent_prepare(options.concurrent_prepare),
manual_wal_flush(options.manual_wal_flush) {
manual_wal_flush(options.manual_wal_flush),
seq_per_batch(options.seq_per_batch) {
}
void ImmutableDBOptions::Dump(Logger* log) const {
......@@ -229,6 +230,7 @@ void ImmutableDBOptions::Dump(Logger* log) const {
concurrent_prepare);
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
manual_wal_flush);
ROCKS_LOG_HEADER(log, " Options.seq_per_batch: %d", seq_per_batch);
}
MutableDBOptions::MutableDBOptions()
......
......@@ -81,6 +81,7 @@ struct ImmutableDBOptions {
bool allow_ingest_behind;
bool concurrent_prepare;
bool manual_wal_flush;
bool seq_per_batch;
};
struct MutableDBOptions {
......
......@@ -354,7 +354,11 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"manual_wal_flush",
{offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, manual_wal_flush)}}};
offsetof(struct ImmutableDBOptions, manual_wal_flush)}},
{"seq_per_batch",
{offsetof(struct DBOptions, seq_per_batch), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, seq_per_batch)}}};
// offset_of is used to get the offset of a class data member
// ex: offset_of(&ColumnFamilyOptions::num_levels)
......
......@@ -283,7 +283,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"avoid_flush_during_shutdown=false;"
"allow_ingest_behind=false;"
"concurrent_prepare=false;"
"manual_wal_flush=false;",
"manual_wal_flush=false;"
"seq_per_batch=false;",
new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),
......
......@@ -76,6 +76,8 @@ PessimisticTransactionDB::PessimisticTransactionDB(
PessimisticTransactionDB::~PessimisticTransactionDB() {
while (!transactions_.empty()) {
delete transactions_.begin()->second;
// TODO(myabandeh): this seems to be an unsafe approach as it is not quite
// clear whether delete would also remove the entry from transactions_.
}
}
......@@ -196,6 +198,9 @@ Status TransactionDB::Open(
std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
std::vector<size_t> compaction_enabled_cf_indices;
DBOptions db_options_2pc = db_options;
if (txn_db_options.write_policy == WRITE_PREPARED) {
db_options_2pc.seq_per_batch = true;
}
PrepareWrap(&db_options_2pc, &column_families_copy,
&compaction_enabled_cf_indices);
s = DB::Open(db_options_2pc, dbname, column_families_copy, handles, &db);
......
......@@ -4640,6 +4640,90 @@ TEST_P(TransactionTest, MemoryLimitTest) {
delete txn;
}
// This test clarfies the existing expectation from the sequence number
// algorithm. It could detect mistakes in updating the code but it is not
// necessarily the one acceptable way. If the algorithm is legitimately changed,
// this unit test should be updated as well.
TEST_P(TransactionTest, SeqAdvanceTest) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->GetLatestSequenceNumber();
auto exp_seq = seq;
// Test DB's internal txn. It involves no prepare phase nor a commit marker.
WriteOptions wopts;
auto s = db->Put(wopts, "key", "value");
// Consume one seq per key
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Doing it twice might detect some bugs
s = db->Put(wopts, "key", "value");
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Testing directly writing a write batch. Functionality-wise it is equivalent
// to commit without prepare.
WriteBatch wb;
wb.Put("k1", "v1");
wb.Put("k2", "v2");
wb.Put("k3", "v3");
s = db->Write(wopts, &wb);
// One seq per key.
exp_seq += 3;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// A full 2pc txn that also involves a commit marker.
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
s = txn->Put(Slice("foo"), Slice("bar"));
s = txn->Put(Slice("foo2"), Slice("bar2"));
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4"));
s = txn->Put(Slice("foo5"), Slice("bar5"));
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
// Consume one seq per key
exp_seq += 5;
s = txn->Commit();
ASSERT_OK(s);
s = db->Put(wopts, "key", "value");
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Commit without prepare. It shoudl write to DB without a commit marker.
txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid2");
ASSERT_OK(s);
s = txn->Put(Slice("foo"), Slice("bar"));
s = txn->Put(Slice("foo2"), Slice("bar2"));
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4"));
s = txn->Put(Slice("foo5"), Slice("bar5"));
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
// One seq per key
exp_seq += 5;
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
pdb->UnregisterTransaction(txn);
delete txn;
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -552,6 +552,96 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
}
}
// This test clarfies the existing expectation from the sequence number
// algorithm. It could detect mistakes in updating the code but it is not
// necessarily the one acceptable way. If the algorithm is legitimately changed,
// this unit test should be updated as well.
TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) {
auto pdb = reinterpret_cast<PessimisticTransactionDB*>(db);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->GetLatestSequenceNumber();
auto exp_seq = seq;
// Test DB's internal txn. It involves no prepare phase nor a commit marker.
WriteOptions wopts;
auto s = db->Put(wopts, "key", "value");
// Consume one seq per batch
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Doing it twice might detect some bugs
s = db->Put(wopts, "key", "value");
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Testing directly writing a write batch. Functionality-wise it is equivalent
// to commit without prepare.
WriteBatch wb;
wb.Put("k1", "v1");
wb.Put("k2", "v2");
wb.Put("k3", "v3");
s = pdb->Write(wopts, &wb);
// Consume one seq per batch
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// A full 2pc txn that also involves a commit marker.
TransactionOptions txn_options;
WriteOptions write_options;
Transaction* txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
s = txn->Put(Slice("foo"), Slice("bar"));
s = txn->Put(Slice("foo2"), Slice("bar2"));
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4"));
s = txn->Put(Slice("foo5"), Slice("bar5"));
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
// Consume one seq per batch
exp_seq++;
s = txn->Commit();
ASSERT_OK(s);
// Consume one seq per commit marker
exp_seq++;
// Since commit marker does not write to memtable, the last seq number is not
// updated immedaitely. But the advance should be visible after the next
// write.
s = db->Put(wopts, "key", "value");
// Consume one seq per batch
exp_seq++;
ASSERT_OK(s);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
// Commit without prepare. It shoudl write to DB without a commit marker.
txn = db->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid2");
ASSERT_OK(s);
s = txn->Put(Slice("foo"), Slice("bar"));
s = txn->Put(Slice("foo2"), Slice("bar2"));
s = txn->Put(Slice("foo3"), Slice("bar3"));
s = txn->Put(Slice("foo4"), Slice("bar4"));
s = txn->Put(Slice("foo5"), Slice("bar5"));
ASSERT_OK(s);
s = txn->Commit();
ASSERT_OK(s);
// Consume one seq per batch
exp_seq++;
seq = db_impl->GetLatestSequenceNumber();
ASSERT_EQ(exp_seq, seq);
pdb->UnregisterTransaction(txn);
delete txn;
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
......
......@@ -46,11 +46,12 @@ Status WritePreparedTxn::PrepareInternal() {
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
const bool disable_memtable = true;
uint64_t seq_used;
uint64_t seq_used = kMaxSequenceNumber;
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
!disable_memtable, &seq_used);
assert(seq_used != kMaxSequenceNumber);
prepare_seq_ = seq_used;
wpt_db_->AddPrepared(prepare_seq_);
return s;
......@@ -61,6 +62,8 @@ Status WritePreparedTxn::CommitWithoutPrepareInternal() {
}
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch) {
// In the absenese of Prepare markers, use Noop as a batch separator
WriteBatchInternal::InsertNoop(batch);
const bool disable_memtable = true;
const uint64_t no_log_ref = 0;
uint64_t seq_used;
......
......@@ -69,6 +69,7 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
case kTypeLogData:
*type = kLogDataRecord;
break;
case kTypeNoop:
case kTypeBeginPrepareXID:
case kTypeEndPrepareXID:
case kTypeCommitXID:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册