提交 438f22bc 编写于 作者: S Siying Dong 提交者: Facebook Github Bot

Fix bug of Checkpoint loses recent transactions with 2PC

Summary:
If 2PC is enabled, checkpoint may not copy previous log files that contain uncommitted prepare records. In this diff we keep those files.
Closes https://github.com/facebook/rocksdb/pull/1724

Differential Revision: D4368319

Pulled By: siying

fbshipit-source-id: cc2c746
上级 335981d4
......@@ -740,6 +740,34 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
}
}
uint64_t DBImpl::MinLogNumberToKeep() {
uint64_t log_number = versions_->MinLogNumber();
if (allow_2pc()) {
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
//
// We must check min logs with outstanding prep before we check
// logs referneces by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO(horuff): iterating over all column families under db mutex.
// should find more optimial solution
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
log_number = min_log_in_prep_heap;
}
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
log_number = min_log_refed_by_mem;
}
}
return log_number;
}
// * Returns the list of live files in 'sst_live'
// If it's doing full scan:
// * Returns the list of all files in the filesystem in
......@@ -798,32 +826,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->manifest_file_number = versions_->manifest_file_number();
job_context->pending_manifest_file_number =
versions_->pending_manifest_file_number();
job_context->log_number = versions_->MinLogNumber();
if (allow_2pc()) {
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
//
// We must check min logs with outstanding prep before we check
// logs referneces by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO(horuff): iterating over all column families under db mutex.
// should find more optimial solution
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 &&
min_log_in_prep_heap < job_context->log_number) {
job_context->log_number = min_log_in_prep_heap;
}
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < job_context->log_number) {
job_context->log_number = min_log_refed_by_mem;
}
}
job_context->log_number = MinLogNumberToKeep();
job_context->prev_log_number = versions_->prev_log_number();
......
......@@ -380,6 +380,8 @@ class DBImpl : public DB {
// schedule a purge
void ScheduleBgLogWriterClose(JobContext* job_context);
uint64_t MinLogNumberToKeep();
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
......
......@@ -221,6 +221,7 @@ static const std::string num_live_versions = "num-live-versions";
static const std::string current_version_number =
"current-super-version-number";
static const std::string estimate_live_data_size = "estimate-live-data-size";
static const std::string min_log_number_to_keep = "min-log-number-to-keep";
static const std::string base_level = "base-level";
static const std::string total_sst_files_size = "total-sst-files-size";
static const std::string estimate_pending_comp_bytes =
......@@ -285,6 +286,8 @@ const std::string DB::Properties::kCurrentSuperVersionNumber =
rocksdb_prefix + current_version_number;
const std::string DB::Properties::kEstimateLiveDataSize =
rocksdb_prefix + estimate_live_data_size;
const std::string DB::Properties::kMinLogNumberToKeep =
rocksdb_prefix + min_log_number_to_keep;
const std::string DB::Properties::kTotalSstFilesSize =
rocksdb_prefix + total_sst_files_size;
const std::string DB::Properties::kBaseLevel = rocksdb_prefix + base_level;
......@@ -368,6 +371,8 @@ const std::unordered_map<std::string, DBPropertyInfo>
nullptr}},
{DB::Properties::kEstimateLiveDataSize,
{true, nullptr, &InternalStats::HandleEstimateLiveDataSize, nullptr}},
{DB::Properties::kMinLogNumberToKeep,
{false, nullptr, &InternalStats::HandleMinLogNumberToKeep, nullptr}},
{DB::Properties::kBaseLevel,
{false, nullptr, &InternalStats::HandleBaseLevel, nullptr}},
{DB::Properties::kTotalSstFilesSize,
......@@ -705,6 +710,12 @@ bool InternalStats::HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db,
return true;
}
bool InternalStats::HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db,
Version* version) {
*value = db->MinLogNumberToKeep();
return true;
}
void InternalStats::DumpDBStats(std::string* value) {
char buf[1000];
// DB-level stats, only available from default column family
......
......@@ -401,6 +401,7 @@ class InternalStats {
Version* version);
bool HandleEstimateLiveDataSize(uint64_t* value, DBImpl* db,
Version* version);
bool HandleMinLogNumberToKeep(uint64_t* value, DBImpl* db, Version* version);
// Total number of background errors encountered. Every time a flush task
// or compaction task fails, this counter is incremented. The failure can
......
......@@ -500,6 +500,10 @@ class DB {
// live data in bytes.
static const std::string kEstimateLiveDataSize;
// "rocksdb.min-log-number-to-keep" - return the minmum log number of the
// log files that should be kept.
static const std::string kMinLogNumberToKeep;
// "rocksdb.total-sst-files-size" - returns total size (bytes) of all SST
// files.
// WARNING: may slow down online queries if there are too many files.
......@@ -565,6 +569,7 @@ class DB {
// "rocksdb.num-live-versions"
// "rocksdb.current-super-version-number"
// "rocksdb.estimate-live-data-size"
// "rocksdb.min-log-number-to-keep"
// "rocksdb.total-sst-files-size"
// "rocksdb.base-level"
// "rocksdb.estimate-pending-compaction-bytes"
......
......@@ -62,6 +62,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
bool allow_2pc = db_->GetDBOptions().allow_2pc;
uint64_t min_log_num = port::kMaxUint64;
uint64_t sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
VectorLogPtr live_wal_files;
......@@ -78,6 +80,35 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
if (s.ok()) {
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size);
if (s.ok() && allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
// Need to refetch the live files to recapture the snapshot.
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
&min_log_num)) {
db_->EnableFileDeletions(false);
return Status::InvalidArgument(
"2PC enabled but cannot fine the min log number to keep.");
}
// We need to refetch live files with flush to handle this case:
// A previous 000001.log contains the prepare record of transaction tnx1.
// The current log file is 000002.log, and sequence_number points to this
// file.
// After calling GetLiveFiles(), 000003.log is created.
// Then tnx1 is committed. The commit record is written to 000003.log.
// Now we fetch min_log_num, which will be 3.
// Then only 000002.log and 000003.log will be copied, and 000001.log will
// be skipped. 000003.log contains commit message of tnx1, but we don't
// have respective prepare record for it.
// In order to avoid this situation, we need to force flush to make sure
// all transactions commited before getting min_log_num will be flushed
// to SST files.
// We cannot get min_log_num before calling the GetLiveFiles() for the
// first time, because if we do that, all the logs files will be included,
// far more than needed.
s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true);
}
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
}
......@@ -156,7 +187,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
// that has changes after the last flush.
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(live_wal_files[i]->StartSequence() >= sequence_number)) {
(live_wal_files[i]->StartSequence() >= sequence_number ||
live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
Log(db_->GetOptions().info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
......
......@@ -21,6 +21,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/transaction_db.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/xfunc.h"
......@@ -390,6 +391,120 @@ TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) {
snapshotDB = nullptr;
}
TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing2PC) {
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
const std::string dbname = test::TmpDir() + "/transaction_testdb";
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));
ASSERT_OK(DestroyDB(dbname, CurrentOptions()));
env_->DeleteDir(kSnapshotName);
env_->DeleteDir(dbname);
Close();
Options options = CurrentOptions();
// allow_2pc is implicitly set with tx prepare
// options.allow_2pc = true;
TransactionDBOptions txn_db_options;
TransactionDB* txdb;
Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb);
assert(s.ok());
ColumnFamilyHandle* cfa;
ColumnFamilyHandle* cfb;
ColumnFamilyOptions cf_options;
ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFA", &cfa));
WriteOptions write_options;
// Insert something into CFB so lots of log files will be kept
// before creating the checkpoint.
ASSERT_OK(txdb->CreateColumnFamily(cf_options, "CFB", &cfb));
ASSERT_OK(txdb->Put(write_options, cfb, "", ""));
ReadOptions read_options;
std::string value;
TransactionOptions txn_options;
Transaction* txn = txdb->BeginTransaction(write_options, txn_options);
s = txn->SetName("xid");
ASSERT_OK(s);
ASSERT_EQ(txdb->GetTransactionByName("xid"), txn);
s = txn->Put(Slice("foo"), Slice("bar"));
s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa"));
ASSERT_OK(s);
// Writing prepare into middle of first WAL, then flush WALs many times
for (int i = 1; i <= 100000; i++) {
Transaction* tx = txdb->BeginTransaction(write_options, txn_options);
ASSERT_OK(tx->SetName("x"));
ASSERT_OK(tx->Put(Slice(std::to_string(i)), Slice("val")));
ASSERT_OK(tx->Put(cfa, Slice("aaa"), Slice("111")));
ASSERT_OK(tx->Prepare());
ASSERT_OK(tx->Commit());
if (i % 10000 == 0) {
txdb->Flush(FlushOptions());
}
if (i == 88888) {
ASSERT_OK(txn->Prepare());
}
}
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"},
{"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t([&]() {
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(txdb, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));
delete checkpoint;
});
TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit");
ASSERT_OK(txn->Commit());
TEST_SYNC_POINT(
"DBTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit");
t.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// No more than two logs files should exist.
std::vector<std::string> files;
env_->GetChildren(kSnapshotName, &files);
int num_log_files = 0;
for (auto& file : files) {
uint64_t num;
FileType type;
WalFileType log_type;
if (ParseFileName(file, &num, &type, &log_type) && type == kLogFile) {
num_log_files++;
}
}
// One flush after preapare + one outstanding file before checkpoint + one log
// file generated after checkpoint.
ASSERT_LE(num_log_files, 3);
TransactionDB* snapshotDB;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
column_families.push_back(
ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
std::vector<rocksdb::ColumnFamilyHandle*> cf_handles;
ASSERT_OK(TransactionDB::Open(options, txn_db_options, kSnapshotName,
column_families, &cf_handles, &snapshotDB));
ASSERT_OK(snapshotDB->Get(read_options, "foo", &value));
ASSERT_EQ(value, "bar");
ASSERT_OK(snapshotDB->Get(read_options, cf_handles[1], "foocfa", &value));
ASSERT_EQ(value, "barcfa");
delete cfa;
delete cfb;
delete cf_handles[0];
delete cf_handles[1];
delete cf_handles[2];
delete snapshotDB;
snapshotDB = nullptr;
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册