提交 2a6d0cde 编写于 作者: S sdong

Ignore stale logs while restarting DBs

Summary:
Stale log files can be deleted out of order. This can happen for various reasons. One of the reason is that no data is ever inserted to a column family and we have an optimization to update its log number, but not all the old log files are cleaned up (the case shown in the unit tests added). It can also happen when we simply delete multiple log files out of order.

This causes data corruption because we simply increase seqID after processing the next row and we may end up with writing data with smaller seqID than what is already flushed to memtables.

In DB recovery, for the oldest files we are replaying, if there it contains no data for any column family, we ignore the sequence IDs in the file.

Test Plan: Add two unit tests that fail without the fix.

Reviewers: IslamAbdelRahman, igor, yiwu

Reviewed By: yiwu

Subscribers: hermanlee4, yoshinorim, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60891
上级 ee8bf2e4
......@@ -501,6 +501,135 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) {
}
}
class FlushEmptyCFTestWithParam : public ColumnFamilyTest,
public testing::WithParamInterface<bool> {
public:
FlushEmptyCFTestWithParam() { allow_2pc_ = GetParam(); }
// Required if inheriting from testing::WithParamInterface<>
static void SetUpTestCase() {}
static void TearDownTestCase() {}
bool allow_2pc_;
};
TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
db_options_.allow_2pc = allow_2pc_;
Open();
CreateColumnFamilies({"one", "two"});
// Generate log file A.
ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
Reopen();
// Log file A is not dropped after reopening because default column family's
// min log number is 0.
// It flushes to SST file X
ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
// Current log file is file B now. While flushing, a new log file C is created
// and is set to current. Boths' min log number is set to file C in memory, so
// after flushing file B is deleted. At the same time, the min log number of
// default CF is not written to manifest. Log file A still remains.
// Flushed to SST file Y.
Flush(1);
Flush(0);
ASSERT_OK(Put(1, "bar", "v3")); // seqID 4
ASSERT_OK(Put(1, "foo", "v4")); // seqID 5
// Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->ResetState();
// Before opening, there are four files:
// Log file A contains seqID 1
// Log file C contains seqID 4, 5
// SST file X contains seqID 1
// SST file Y contains seqID 2, 3
// Min log number:
// default CF: 0
// CF one, two: C
// When opening the DB, all the seqID should be preserved.
Open(names, {});
ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v3", Get(1, "bar"));
Close();
db_options_.env = env_;
}
TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
db_options_.env = fault_env.get();
db_options_.allow_2pc = allow_2pc_;
Open();
CreateColumnFamilies({"one", "two"});
// Generate log file A.
ASSERT_OK(Put(1, "foo", "v1")); // seqID 1
Reopen();
// Log file A is not dropped after reopening because default column family's
// min log number is 0.
// It flushes to SST file X
ASSERT_OK(Put(1, "foo", "v1")); // seqID 2
ASSERT_OK(Put(1, "bar", "v2")); // seqID 3
// Current log file is file B now. While flushing, a new log file C is created
// and is set to current. Both CFs' min log number is set to file C so after
// flushing file B is deleted. Log file A still remains.
// Flushed to SST file Y.
Flush(1);
ASSERT_OK(Put(0, "bar", "v2")); // seqID 4
ASSERT_OK(Put(2, "bar", "v2")); // seqID 5
ASSERT_OK(Put(1, "bar", "v3")); // seqID 6
// Flushing all column families. This forces all CFs' min log to current. This
// is written to the manifest file. Log file C is cleared.
Flush(0);
Flush(1);
Flush(2);
// Write to log file D
ASSERT_OK(Put(1, "bar", "v4")); // seqID 7
ASSERT_OK(Put(1, "bar", "v5")); // seqID 8
// Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false);
std::vector<std::string> names;
for (auto name : names_) {
if (name != "") {
names.push_back(name);
}
}
Close();
fault_env->ResetState();
// Before opening, there are two logfiles:
// Log file A contains seqID 1
// Log file D contains seqID 7, 8
// Min log number:
// default CF: D
// CF one, two: D
// When opening the DB, log file D should be replayed using the seqID
// specified in the file.
Open(names, {});
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("v5", Get(1, "bar"));
Close();
db_options_.env = env_;
}
INSTANTIATE_TEST_CASE_P(FlushEmptyCFTestWithParam, FlushEmptyCFTestWithParam,
::testing::Bool());
TEST_F(ColumnFamilyTest, AddDrop) {
Open();
CreateColumnFamilies({"one", "two", "three"});
......
......@@ -1478,9 +1478,11 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
}
recovered_sequence = sequence;
bool no_prev_seq = true;
if (*next_sequence == kMaxSequenceNumber) {
*next_sequence = sequence;
} else {
no_prev_seq = false;
WriteBatchInternal::SetSequence(&batch, *next_sequence);
}
......@@ -1563,10 +1565,18 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// insert. We don't want to fail the whole write batch in that case --
// we just ignore the update.
// That's why we set ignore missing column families to true
bool has_valid_writes = false;
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
log_number, this, false /* concurrent_memtable_writes */,
next_sequence);
next_sequence, &has_valid_writes);
// If it is the first log file and there is no column family updated
// after replaying the file, this file may be a stale file. We ignore
// sequence IDs from the file. Otherwise, if a newer stale log file that
// has been deleted, the sequenceID may be wrong.
if (no_prev_seq && !has_valid_writes) {
*next_sequence = kMaxSequenceNumber;
}
MaybeIgnoreError(&status);
if (!status.ok()) {
// We are treating this as a failure while reading since we read valid
......@@ -1575,7 +1585,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
continue;
}
if (!read_only) {
if (has_valid_writes && !read_only) {
// we can do this because this is called before client has access to the
// DB and there is only a single thread operating on DB
ColumnFamilyData* cfd;
......
......@@ -694,6 +694,7 @@ class MemTableInserter : public WriteBatch::Handler {
uint64_t log_number_ref_;
DBImpl* db_;
const bool concurrent_memtable_writes_;
bool* has_valid_writes_;
typedef std::map<MemTable*, MemTablePostProcessInfo> MemPostInfoMap;
MemPostInfoMap mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery)
......@@ -704,7 +705,8 @@ class MemTableInserter : public WriteBatch::Handler {
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes)
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr)
: sequence_(sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
......@@ -713,6 +715,7 @@ class MemTableInserter : public WriteBatch::Handler {
log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr) {
assert(cf_mems_);
}
......@@ -756,6 +759,10 @@ class MemTableInserter : public WriteBatch::Handler {
return false;
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
if (log_number_ref_ > 0) {
cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
}
......@@ -976,6 +983,9 @@ class MemTableInserter : public WriteBatch::Handler {
// we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch();
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
} else {
// in non-recovery we ignore prepare markers
// and insert the values directly. making sure we have a
......@@ -1029,6 +1039,9 @@ class MemTableInserter : public WriteBatch::Handler {
if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString());
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
}
} else {
// in non recovery we simply ignore this tag
......@@ -1112,16 +1125,15 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
return s;
}
Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
bool concurrent_memtable_writes,
SequenceNumber* last_seq_used) {
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* last_seq_used, bool* has_valid_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
flush_scheduler, ignore_missing_column_families,
log_number, db, concurrent_memtable_writes);
log_number, db, concurrent_memtable_writes,
has_valid_writes);
Status s = batch->Iterate(&inserter);
if (last_seq_used != nullptr) {
*last_seq_used = inserter.get_final_sequence();
......
......@@ -160,7 +160,8 @@ class WriteBatchInternal {
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
bool concurrent_memtable_writes = false,
SequenceNumber* last_seq_used = nullptr);
SequenceNumber* last_seq_used = nullptr,
bool* has_valid_writes = nullptr);
static Status InsertInto(WriteThread::Writer* writer,
ColumnFamilyMemTables* memtables,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册