提交 5234fc1b 编写于 作者: M Maysam Yabandeh 提交者: Facebook Github Bot

Mark logs with prepare in PreReleaseCallback (#5121)

Summary:
In prepare phase of 2PC, the db promises to remember the prepared data, for possible future commits. To fulfill the promise the prepared data must be persisted in the WAL so that they could be recovered after a crash. The log that contains a prepare batch that is not committed yet, is marked so that it is not garbage collected before the transaction commits/rollbacks. The bug was that the write to the log file and the mark of the file was not atomic, and WAL gc could have happened before the WAL log is actually marked. This patch moves the marking logic to PreReleaseCallback so that the WAL gc logic that joins both write threads would see the WAL write and WAL mark atomically.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5121

Differential Revision: D14665210

Pulled By: maysamyabandeh

fbshipit-source-id: 1d66aeb1c66a296cb4899a5a20c4d40c59e4b534
上级 26015f3b
......@@ -3,6 +3,9 @@
### Unreleased
### New Features
* When reading from option file/string/map, customized comparators and/or merge operators can be filled according to object registry.
### Public API Change
### Bug Fixes
* Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction.
## 6.1.0 (3/27/2019)
### New Features
......
......@@ -311,8 +311,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
writer->sequence = next_sequence;
if (writer->pre_release_callback) {
Status ws = writer->pre_release_callback->Callback(writer->sequence,
disable_memtable);
Status ws = writer->pre_release_callback->Callback(
writer->sequence, disable_memtable, writer->log_used);
if (!ws.ok()) {
status = ws;
break;
......@@ -649,8 +649,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
if (!writer->CallbackFailed() && writer->pre_release_callback) {
assert(writer->sequence != kMaxSequenceNumber);
const bool DISABLE_MEMTABLE = true;
Status ws = writer->pre_release_callback->Callback(writer->sequence,
DISABLE_MEMTABLE);
Status ws = writer->pre_release_callback->Callback(
writer->sequence, DISABLE_MEMTABLE, writer->log_used);
if (!ws.ok()) {
status = ws;
break;
......@@ -993,8 +993,9 @@ Status DBImpl::WriteRecoverableState() {
const bool DISABLE_MEMTABLE = true;
for (uint64_t sub_batch_seq = seq + 1;
sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
uint64_t const no_log_num = 0;
status = recoverable_state_pre_release_callback_->Callback(
sub_batch_seq, !DISABLE_MEMTABLE);
sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
}
}
if (status.ok()) {
......
......@@ -26,7 +26,9 @@ class PreReleaseCallback {
// released.
// is_mem_disabled is currently used for debugging purposes to assert that
// the callback is done from the right write queue.
virtual Status Callback(SequenceNumber seq, bool is_mem_disabled) = 0;
// If non-zero, log_number indicates the WAL log to which we wrote.
virtual Status Callback(SequenceNumber seq, bool is_mem_disabled,
uint64_t log_number) = 0;
};
} // namespace rocksdb
......@@ -295,8 +295,8 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
public:
PublishSeqCallback(DBImpl* db_impl_in)
: db_impl_(db_impl_in) {}
Status Callback(SequenceNumber last_seq,
bool /*not used*/) override {
Status Callback(SequenceNumber last_seq, bool /*not used*/,
uint64_t) override {
db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK();
}
......
......@@ -195,22 +195,14 @@ Status PessimisticTransaction::Prepare() {
}
if (can_prepare) {
bool wal_already_marked = false;
txn_state_.store(AWAITING_PREPARE);
// transaction can't expire after preparation
expiration_time_ = 0;
if (log_number_ > 0) {
assert(txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
wal_already_marked = true;
}
assert(log_number_ == 0 ||
txn_db_impl_->GetTxnDBOptions().write_policy == WRITE_UNPREPARED);
s = PrepareInternal();
if (s.ok()) {
assert(log_number_ != 0);
if (!wal_already_marked) {
dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
log_number_);
}
txn_state_.store(PREPARED);
}
} else if (txn_state_ == LOCKS_STOLEN) {
......@@ -232,10 +224,38 @@ Status WriteCommittedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_);
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log_ref*/ 0,
/* disable_memtable*/ true);
class MarkLogCallback : public PreReleaseCallback {
public:
MarkLogCallback(DBImpl* db, bool two_write_queues)
: db_(db), two_write_queues_(two_write_queues) {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber, bool is_mem_disabled,
uint64_t log_number) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
assert(log_number != 0);
assert(!two_write_queues_ || is_mem_disabled); // implies the 2nd queue
db_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(log_number);
return Status::OK();
}
private:
DBImpl* db_;
bool two_write_queues_;
} mark_log_callback(db_impl_,
db_impl_->immutable_db_options().two_write_queues);
WriteCallback* const kNoWriteCallback = nullptr;
const uint64_t kRefNoLog = 0;
const bool kDisableMemtable = true;
SequenceNumber* const KIgnoreSeqUsed = nullptr;
const size_t kNoBatchCount = 0;
Status s = db_impl_->WriteImpl(
write_options, GetWriteBatch()->GetWriteBatch(), kNoWriteCallback,
&log_number_, kRefNoLog, kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount,
&mark_log_callback);
return s;
}
......
......@@ -84,6 +84,7 @@ Status WritePreparedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
const bool WRITE_AFTER_COMMIT = true;
const bool kFirstPrepareBatch = true;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT);
// For each duplicate key we account for a new sub-batch
......@@ -92,8 +93,8 @@ Status WritePreparedTxn::PrepareInternal() {
// prepared entries to PreparedHeap and hence enables an optimization. Refer to
// SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
wpt_db_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues);
wpt_db_, db_impl_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
Status s = db_impl_->WriteImpl(
......@@ -152,9 +153,10 @@ Status WritePreparedTxn::CommitInternal() {
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
// This is to call AddPrepared on CommitTimeWriteBatch
const bool kFirstPrepareBatch = true;
AddPreparedCallback add_prepared_callback(
wpt_db_, commit_batch_cnt,
db_impl_->immutable_db_options().two_write_queues);
wpt_db_, db_impl_, commit_batch_cnt,
db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
PreReleaseCallback* pre_release_callback;
if (do_one_write) {
pre_release_callback = &update_commit_map;
......@@ -321,6 +323,7 @@ Status WritePreparedTxn::RollbackInternal() {
const uint64_t NO_REF_LOG = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ONE_BATCH = 1;
const bool kFirstPrepareBatch = true;
// We commit the rolled back prepared batches. Although this is
// counter-intuitive, i) it is safe to do so, since the prepared batches are
// already canceled out by the rollback batch, ii) adding the commit entry to
......@@ -329,7 +332,8 @@ Status WritePreparedTxn::RollbackInternal() {
// with a live snapshot around so that the live snapshot properly skips the
// entry even if its prepare seq is lower than max_evicted_seq_.
AddPreparedCallback add_prepared_callback(
wpt_db_, ONE_BATCH, db_impl_->immutable_db_options().two_write_queues);
wpt_db_, db_impl_, ONE_BATCH,
db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
PreReleaseCallback* pre_release_callback;
......
......@@ -64,10 +64,9 @@ Status WritePreparedTxnDB::Initialize(
public:
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
Status Callback(SequenceNumber commit_seq, bool is_mem_disabled) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
assert(!is_mem_disabled);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
......@@ -163,11 +162,14 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
const uint64_t no_log_ref = 0;
uint64_t seq_used = kMaxSequenceNumber;
const size_t ZERO_PREPARES = 0;
const bool kSeperatePrepareCommitBatches = true;
// Since this is not 2pc, there is no need for AddPrepared but having it in
// the PreReleaseCallback enables an optimization. Refer to
// SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
this, batch_cnt, db_impl_->immutable_db_options().two_write_queues);
this, db_impl_, batch_cnt,
db_impl_->immutable_db_options().two_write_queues,
!kSeperatePrepareCommitBatches);
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
PreReleaseCallback* pre_release_callback;
......
......@@ -751,30 +751,40 @@ class WritePreparedTxnReadCallback : public ReadCallback {
class AddPreparedCallback : public PreReleaseCallback {
public:
AddPreparedCallback(WritePreparedTxnDB* db, size_t sub_batch_cnt,
bool two_write_queues)
AddPreparedCallback(WritePreparedTxnDB* db, DBImpl* db_impl,
size_t sub_batch_cnt, bool two_write_queues,
bool first_prepare_batch)
: db_(db),
db_impl_(db_impl),
sub_batch_cnt_(sub_batch_cnt),
two_write_queues_(two_write_queues) {
two_write_queues_(two_write_queues),
first_prepare_batch_(first_prepare_batch) {
(void)two_write_queues_; // to silence unused private field warning
}
virtual Status Callback(SequenceNumber prepare_seq,
bool is_mem_disabled) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
bool is_mem_disabled __attribute__((__unused__)),
uint64_t log_number) override {
// Always Prepare from the main queue
assert(!two_write_queues_ || !is_mem_disabled); // implies the 1st queue
for (size_t i = 0; i < sub_batch_cnt_; i++) {
db_->AddPrepared(prepare_seq + i);
}
if (first_prepare_batch_) {
assert(log_number != 0);
db_impl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
log_number);
}
return Status::OK();
}
private:
WritePreparedTxnDB* db_;
DBImpl* db_impl_;
size_t sub_batch_cnt_;
bool two_write_queues_;
// It is 2PC and this is the first prepare batch. Always the case in 2PC
// unless it is WriteUnPrepared.
bool first_prepare_batch_;
};
class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
......@@ -800,10 +810,8 @@ class WritePreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
}
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
// Always commit from the 2nd queue
assert(!db_impl_->immutable_db_options().two_write_queues ||
is_mem_disabled);
......@@ -884,8 +892,8 @@ class WritePreparedRollbackPreReleaseCallback : public PreReleaseCallback {
assert(prep_batch_cnt_ > 0);
}
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled) override {
Status Callback(SequenceNumber commit_seq, bool is_mem_disabled,
uint64_t) override {
// Always commit from the 2nd queue
assert(is_mem_disabled); // implies the 2nd queue
assert(db_impl_->immutable_db_options().two_write_queues);
......
......@@ -155,22 +155,10 @@ Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
const bool kPrepared = true;
Status s;
bool needs_mark = (log_number_ == 0);
if (max_write_batch_size_ != 0 &&
write_batch_.GetDataSize() > max_write_batch_size_) {
assert(GetState() != PREPARED);
s = FlushWriteBatchToDB(!kPrepared);
if (s.ok()) {
assert(log_number_ > 0);
// This is done to prevent WAL files after log_number_ from being
// deleted, because they could potentially contain unprepared batches.
if (needs_mark) {
dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
log_number_);
}
}
}
return s;
}
......@@ -198,6 +186,7 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
const bool WRITE_AFTER_COMMIT = true;
const bool first_prepare_batch = log_number_ == 0;
// MarkEndPrepare will change Noop marker to the appropriate marker.
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(), name_,
!WRITE_AFTER_COMMIT, !prepared);
......@@ -210,8 +199,8 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
// prepared entries to PreparedHeap and hence enables an optimization. Refer
// to SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
wpt_db_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues);
wpt_db_, db_impl_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
// log_number_ should refer to the oldest log containing uncommitted data
......@@ -334,7 +323,8 @@ Status WriteUnpreparedTxn::CommitInternal() {
explicit PublishSeqPreReleaseCallback(DBImpl* db_impl)
: db_impl_(db_impl) {}
Status Callback(SequenceNumber seq,
bool is_mem_disabled __attribute__((__unused__))) override {
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
assert(is_mem_disabled);
assert(db_impl_->immutable_db_options().two_write_queues);
db_impl_->SetLastPublishedSequence(seq);
......
......@@ -194,10 +194,9 @@ Status WriteUnpreparedTxnDB::Initialize(
public:
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
: db_(db) {}
Status Callback(SequenceNumber commit_seq, bool is_mem_disabled) override {
#ifdef NDEBUG
(void)is_mem_disabled;
#endif
Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
assert(!is_mem_disabled);
db_->AddCommitted(commit_seq, commit_seq);
return Status::OK();
......
......@@ -59,8 +59,9 @@ class WriteUnpreparedCommitEntryPreReleaseCallback : public PreReleaseCallback {
assert(unprep_seqs.size() > 0);
}
virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled
__attribute__((__unused__))) override {
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
const uint64_t last_commit_seq = LIKELY(data_batch_cnt_ <= 1)
? commit_seq
: commit_seq + data_batch_cnt_ - 1;
......@@ -122,8 +123,9 @@ class WriteUnpreparedRollbackPreReleaseCallback : public PreReleaseCallback {
assert(db_impl_->immutable_db_options().two_write_queues);
}
virtual Status Callback(SequenceNumber commit_seq, bool is_mem_disabled
__attribute__((__unused__))) override {
virtual Status Callback(SequenceNumber commit_seq,
bool is_mem_disabled __attribute__((__unused__)),
uint64_t) override {
assert(is_mem_disabled); // implies the 2nd queue
const uint64_t last_commit_seq = commit_seq;
db_->AddCommitted(rollback_seq_, last_commit_seq);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册