提交 9dc29414 编写于 作者: L Lei Jin

add checksum for backup files

Summary: Keep checksum of each backuped file in meta file. When it restores these files, compute their checksum on the fly and compare against what is in the meta file. Fail the restore process if checksum mismatch.

Test Plan: unit test

Reviewers: haobo, igor, sdong, kailiu

Reviewed By: igor

CC: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D15381
上级 90f29ccb
......@@ -10,6 +10,7 @@
#include "utilities/backupable_db.h"
#include "db/filename.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "rocksdb/transaction_log.h"
#define __STDC_FORMAT_MACROS
......@@ -48,12 +49,22 @@ class BackupEngine {
void DeleteBackupsNewerThan(uint64_t sequence_number);
private:
struct FileInfo {
FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
: refs(0), filename(fname), size(sz), checksum_value(checksum) {}
int refs;
const std::string filename;
const uint64_t size;
uint32_t checksum_value;
};
class BackupMeta {
public:
BackupMeta(const std::string& meta_filename,
std::unordered_map<std::string, int>* file_refs, Env* env)
std::unordered_map<std::string, FileInfo>* file_infos, Env* env)
: timestamp_(0), size_(0), meta_filename_(meta_filename),
file_refs_(file_refs), env_(env) {}
file_infos_(file_infos), env_(env) {}
~BackupMeta() {}
......@@ -73,7 +84,8 @@ class BackupEngine {
return sequence_number_;
}
void AddFile(const std::string& filename, uint64_t size);
Status AddFile(const FileInfo& file_info);
void Delete();
bool Empty() {
......@@ -96,7 +108,7 @@ class BackupEngine {
std::string const meta_filename_;
// files with relative paths (without "/" prefix!!)
std::vector<std::string> files_;
std::unordered_map<std::string, int>* file_refs_;
std::unordered_map<std::string, FileInfo>* file_infos_;
Env* env_;
static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
......@@ -141,6 +153,7 @@ class BackupEngine {
Env* dst_env,
bool sync,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0);
// if size_limit == 0, there is no size limit, copy everything
Status BackupFile(BackupID backup_id,
......@@ -149,15 +162,21 @@ class BackupEngine {
const std::string& src_dir,
const std::string& src_fname, // starts with "/"
uint64_t size_limit = 0);
Status CalculateChecksum(const std::string& src,
Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value);
// Will delete all the files we don't need anymore
// If full_scan == true, it will do the full scan of files/ directory
// and delete all the files that are not referenced from backuped_file_refs_
// and delete all the files that are not referenced from backuped_file_infos__
void GarbageCollection(bool full_scan);
// backup state data
BackupID latest_backup_id_;
std::map<BackupID, BackupMeta> backups_;
std::unordered_map<std::string, int> backuped_file_refs_;
std::unordered_map<std::string, FileInfo> backuped_file_infos_;
std::vector<BackupID> obsolete_backups_;
std::atomic<bool> stop_backup_;
......@@ -198,7 +217,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
assert(backups_.find(backup_id) == backups_.end());
backups_.insert(std::make_pair(
backup_id, BackupMeta(GetBackupMetaFile(backup_id),
&backuped_file_refs_, backup_env_)));
&backuped_file_infos_, backup_env_)));
}
if (options_.destroy_old_data) { // Destory old data
......@@ -302,7 +321,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
assert(backups_.find(new_backup_id) == backups_.end());
auto ret = backups_.insert(std::make_pair(
new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id),
&backuped_file_refs_, backup_env_)));
&backuped_file_infos_, backup_env_)));
assert(ret.second == true);
auto& new_backup = ret.first->second;
new_backup.RecordTimestamp();
......@@ -478,10 +497,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id,
"/" + dst;
Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false);
uint32_t checksum_value;
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
nullptr /* size */, &checksum_value);
if (!s.ok()) {
break;
}
const auto iter = backuped_file_infos_.find(file);
assert(iter != backuped_file_infos_.end());
if (iter->second.checksum_value != checksum_value) {
s = Status::Corruption("Checksum check failed");
break;
}
}
Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str());
......@@ -555,6 +583,7 @@ Status BackupEngine::CopyFile(const std::string& src,
Env* dst_env,
bool sync,
uint64_t* size,
uint32_t* checksum_value,
uint64_t size_limit) {
Status s;
unique_ptr<WritableFile> dst_file;
......@@ -564,6 +593,9 @@ Status BackupEngine::CopyFile(const std::string& src,
if (size != nullptr) {
*size = 0;
}
if (checksum_value != nullptr) {
*checksum_value = 0;
}
// Check if size limit is set. if not, set it to very big number
if (size_limit == 0) {
......@@ -589,12 +621,19 @@ Status BackupEngine::CopyFile(const std::string& src,
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
size_limit -= data.size();
if (!s.ok()) {
return s;
}
if (size != nullptr) {
*size += data.size();
}
if (s.ok()) {
s = dst_file->Append(data);
if (checksum_value != nullptr) {
*checksum_value = crc32c::Extend(*checksum_value, data.data(),
data.size());
}
s = dst_file->Append(data);
} while (s.ok() && data.size() > 0 && size_limit > 0);
if (s.ok() && sync) {
......@@ -629,9 +668,15 @@ Status BackupEngine::BackupFile(BackupID backup_id,
// if it's shared, we also need to check if it exists -- if it does,
// no need to copy it again
uint32_t checksum_value = 0;
if (shared && backup_env_->FileExists(dst_path)) {
backup_env_->GetFileSize(dst_path, &size); // Ignore error
Log(options_.info_log, "%s already present", src_fname.c_str());
Log(options_.info_log, "%s already present, calculate checksum",
src_fname.c_str());
s = CalculateChecksum(src_dir + src_fname,
db_env_,
size_limit,
&checksum_value);
} else {
Log(options_.info_log, "Copying %s", src_fname.c_str());
s = CopyFile(src_dir + src_fname,
......@@ -640,22 +685,63 @@ Status BackupEngine::BackupFile(BackupID backup_id,
backup_env_,
options_.sync,
&size,
&checksum_value,
size_limit);
if (s.ok() && shared) {
s = backup_env_->RenameFile(dst_path_tmp, dst_path);
}
}
if (s.ok()) {
backup->AddFile(dst_relative, size);
s = backup->AddFile(FileInfo(dst_relative, size, checksum_value));
}
return s;
}
Status BackupEngine::CalculateChecksum(const std::string& src,
Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value) {
*checksum_value = 0;
if (size_limit == 0) {
size_limit = std::numeric_limits<uint64_t>::max();
}
EnvOptions env_options;
env_options.use_mmap_writes = false;
std::unique_ptr<SequentialFile> src_file;
Status s = src_env->NewSequentialFile(src, &src_file, env_options);
if (!s.ok()) {
return s;
}
std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
}
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
if (!s.ok()) {
return s;
}
size_limit -= data.size();
*checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
} while (data.size() > 0 && size_limit > 0);
return s;
}
void BackupEngine::GarbageCollection(bool full_scan) {
Log(options_.info_log, "Starting garbage collection");
std::vector<std::string> to_delete;
for (auto& itr : backuped_file_refs_) {
if (itr.second == 0) {
for (auto& itr : backuped_file_infos_) {
if (itr.second.refs == 0) {
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
s.ToString().c_str());
......@@ -663,7 +749,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
}
}
for (auto& td : to_delete) {
backuped_file_refs_.erase(td);
backuped_file_infos_.erase(td);
}
if (!full_scan) {
// take care of private dirs -- if full_scan == true, then full_scan will
......@@ -686,7 +772,7 @@ void BackupEngine::GarbageCollection(bool full_scan) {
for (auto& child : shared_children) {
std::string rel_fname = GetSharedFileRel(child);
// if it's not refcounted, delete it
if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) {
if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) {
// this might be a directory, but DeleteFile will just fail in that
// case, so we're good
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
......@@ -731,23 +817,34 @@ void BackupEngine::GarbageCollection(bool full_scan) {
// ------- BackupMeta class --------
void BackupEngine::BackupMeta::AddFile(const std::string& filename,
uint64_t size) {
size_ += size;
files_.push_back(filename);
auto itr = file_refs_->find(filename);
if (itr == file_refs_->end()) {
file_refs_->insert(std::make_pair(filename, 1));
Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) {
size_ += file_info.size;
files_.push_back(file_info.filename);
auto itr = file_infos_->find(file_info.filename);
if (itr == file_infos_->end()) {
auto ret = file_infos_->insert({file_info.filename, file_info});
if (ret.second) {
ret.first->second.refs = 1;
} else {
// if this happens, something is seriously wrong
return Status::Corruption("In memory metadata insertion error");
}
} else {
++itr->second; // increase refcount if already present
if (itr->second.checksum_value != file_info.checksum_value) {
return Status::Corruption("Checksum mismatch for existing backup file");
}
++itr->second.refs; // increase refcount if already present
}
return Status::OK();
}
void BackupEngine::BackupMeta::Delete() {
for (auto& file : files_) {
auto itr = file_refs_->find(file);
assert(itr != file_refs_->end());
--(itr->second); // decrease refcount
for (const auto& file : files_) {
auto itr = file_infos_->find(file);
assert(itr != file_infos_->end());
--(itr->second.refs); // decrease refcount
}
files_.clear();
// delete meta file
......@@ -759,8 +856,8 @@ void BackupEngine::BackupMeta::Delete() {
// <timestamp>
// <seq number>
// <number of files>
// <file1>
// <file2>
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
// ...
// TODO: maybe add checksum?
Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
......@@ -790,18 +887,40 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) {
sscanf(data.data(), "%u%n", &num_files, &bytes_read);
data.remove_prefix(bytes_read + 1); // +1 for '\n'
std::vector<std::pair<std::string, uint64_t>> files;
std::vector<FileInfo> files;
for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
std::string filename = GetSliceUntil(&data, '\n').ToString();
auto line = GetSliceUntil(&data, '\n');
std::string filename = GetSliceUntil(&line, ' ').ToString();
uint64_t size;
s = env_->GetFileSize(backup_dir + "/" + filename, &size);
files.push_back(std::make_pair(filename, size));
if (line.empty()) {
return Status::Corruption("File checksum is missing");
}
uint32_t checksum_value = 0;
if (line.starts_with("crc32 ")) {
line.remove_prefix(6);
sscanf(line.data(), "%u", &checksum_value);
if (memcmp(line.data(), std::to_string(checksum_value).c_str(),
line.size() - 1) != 0) {
return Status::Corruption("Invalid checksum value");
}
} else {
return Status::Corruption("Unknown checksum type");
}
files.emplace_back(filename, size, checksum_value);
}
if (s.ok()) {
for (auto file : files) {
AddFile(file.first, file.second);
for (const auto& file_info : files) {
s = AddFile(file_info);
if (!s.ok()) {
break;
}
}
}
......@@ -825,8 +944,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
sequence_number_);
len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size());
for (size_t i = 0; i < files_.size(); ++i) {
len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str());
for (const auto& file : files_) {
const auto& iter = file_infos_->find(file);
assert(iter != file_infos_->end());
// use crc32 for now, switch to something else if needed
len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n",
file.c_str(), iter->second.checksum_value);
}
s = backup_meta_file->Append(Slice(buf.get(), (size_t)len));
......
......@@ -154,7 +154,6 @@ class TestEnv : public EnvWrapper {
Status NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r,
const EnvOptions& options) {
opened_files_.push_back(f);
if (dummy_sequential_file_) {
r->reset(new TestEnv::DummySequentialFile());
return Status::OK();
......@@ -165,6 +164,7 @@ class TestEnv : public EnvWrapper {
Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
const EnvOptions& options) {
written_files_.push_back(f);
if (limit_written_files_ <= 0) {
return Status::IOError("Sorry, can't do this");
}
......@@ -172,14 +172,14 @@ class TestEnv : public EnvWrapper {
return EnvWrapper::NewWritableFile(f, r, options);
}
void AssertOpenedFiles(std::vector<std::string>& should_have_opened) {
sort(should_have_opened.begin(), should_have_opened.end());
sort(opened_files_.begin(), opened_files_.end());
ASSERT_TRUE(opened_files_ == should_have_opened);
void AssertWrittenFiles(std::vector<std::string>& should_have_written) {
sort(should_have_written.begin(), should_have_written.end());
sort(written_files_.begin(), written_files_.end());
ASSERT_TRUE(written_files_ == should_have_written);
}
void ClearOpenedFiles() {
opened_files_.clear();
void ClearWrittenFiles() {
written_files_.clear();
}
void SetLimitWrittenFiles(uint64_t limit) {
......@@ -192,7 +192,7 @@ class TestEnv : public EnvWrapper {
private:
bool dummy_sequential_file_ = false;
std::vector<std::string> opened_files_;
std::vector<std::string> written_files_;
uint64_t limit_written_files_ = 1000000;
}; // TestEnv
......@@ -239,6 +239,46 @@ class FileManager : public EnvWrapper {
return s;
}
Status CorruptChecksum(const std::string& fname, bool appear_valid) {
std::string metadata;
Status s = ReadFileToString(this, fname, &metadata);
if (!s.ok()) {
return s;
}
s = DeleteFile(fname);
if (!s.ok()) {
return s;
}
std::vector<int64_t> positions;
auto pos = metadata.find(" crc32 ");
if (pos == std::string::npos) {
return Status::Corruption("checksum not found");
}
do {
positions.push_back(pos);
pos = metadata.find(" crc32 ", pos + 6);
} while (pos != std::string::npos);
pos = positions[rnd_.Next() % positions.size()];
if (metadata.size() < pos + 7) {
return Status::Corruption("bad CRC32 checksum value");
}
if (appear_valid) {
if (metadata[pos + 8] == '\n') {
// single digit value, safe to insert one more digit
metadata.insert(pos + 8, 1, '0');
} else {
metadata.erase(pos + 8, 1);
}
} else {
metadata[pos + 7] = 'a';
}
return WriteToFile(fname, metadata);
}
Status WriteToFile(const std::string& fname, const std::string& data) {
unique_ptr<WritableFile> file;
EnvOptions env_options;
......@@ -249,6 +289,7 @@ class FileManager : public EnvWrapper {
}
return file->Append(Slice(data));
}
private:
Random rnd_;
}; // FileManager
......@@ -412,30 +453,43 @@ TEST(BackupableDBTest, NoDoubleCopy) {
// should write 5 DB files + LATEST_BACKUP + one meta file
test_backup_env_->SetLimitWrittenFiles(7);
test_db_env_->ClearOpenedFiles();
test_backup_env_->ClearWrittenFiles();
test_db_env_->SetLimitWrittenFiles(0);
dummy_db_->live_files_ = { "/00010.sst", "/00011.sst",
"/CURRENT", "/MANIFEST-01" };
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
ASSERT_OK(db_->CreateNewBackup(false));
std::vector<std::string> should_have_openened = dummy_db_->live_files_;
should_have_openened.push_back("/00011.log");
AppendPath(dbname_, should_have_openened);
test_db_env_->AssertOpenedFiles(should_have_openened);
std::vector<std::string> should_have_written = {
"/shared/00010.sst.tmp",
"/shared/00011.sst.tmp",
"/private/1.tmp/CURRENT",
"/private/1.tmp/MANIFEST-01",
"/private/1.tmp/00011.log",
"/meta/1.tmp",
"/LATEST_BACKUP.tmp"
};
AppendPath(dbname_ + "_backup", should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
// should write 4 new DB files + LATEST_BACKUP + one meta file
// should not write/copy 00010.sst, since it's already there!
test_backup_env_->SetLimitWrittenFiles(6);
test_db_env_->ClearOpenedFiles();
test_backup_env_->ClearWrittenFiles();
dummy_db_->live_files_ = { "/00010.sst", "/00015.sst",
"/CURRENT", "/MANIFEST-01" };
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
ASSERT_OK(db_->CreateNewBackup(false));
// should not open 00010.sst - it's already there
should_have_openened = { "/00015.sst", "/CURRENT",
"/MANIFEST-01", "/00011.log" };
AppendPath(dbname_, should_have_openened);
test_db_env_->AssertOpenedFiles(should_have_openened);
should_have_written = {
"/shared/00015.sst.tmp",
"/private/2.tmp/CURRENT",
"/private/2.tmp/MANIFEST-01",
"/private/2.tmp/00011.log",
"/meta/2.tmp",
"/LATEST_BACKUP.tmp"
};
AppendPath(dbname_ + "_backup", should_have_written);
test_backup_env_->AssertWrittenFiles(should_have_written);
ASSERT_OK(db_->DeleteBackup(1));
ASSERT_EQ(true,
......@@ -463,6 +517,8 @@ TEST(BackupableDBTest, NoDoubleCopy) {
// 3. Corrupted backup meta file or missing backuped file - we should
// not be able to open that backup, but all other backups should be
// fine
// 4. Corrupted checksum value - if the checksum is not a valid uint32_t,
// db open should fail, otherwise, it aborts during the restore process.
TEST(BackupableDBTest, CorruptionsTest) {
const int keys_iteration = 5000;
Random rnd(6);
......@@ -519,12 +575,29 @@ TEST(BackupableDBTest, CorruptionsTest) {
CloseRestoreDB();
ASSERT_TRUE(!s.ok());
// new backup should be 4!
// --------- case 4. corrupted checksum value ----
ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false));
// checksum of backup 3 is an invalid value, this can be detected at
// db open time, and it reverts to the previous backup automatically
AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5);
// checksum of the backup 2 appears to be valid, this can cause checksum
// mismatch and abort restore process
ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true));
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2"));
OpenRestoreDB();
ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2"));
s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_);
ASSERT_TRUE(!s.ok());
ASSERT_OK(restore_db_->DeleteBackup(2));
CloseRestoreDB();
AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5);
// new backup should be 2!
OpenBackupableDB();
FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4);
FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2);
ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2)));
CloseBackupableDB();
AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5);
AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5);
}
// open DB, write, close DB, backup, restore, repeat
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册