提交 8a9fca26 编写于 作者: I Igor Canadi

Better error handling in BackupEngine

Summary:
Couple of changes here:
* NewBackupEngine() and NewReadOnlyBackupEngine() are now removed. They were deprecated since RocksDB 3.8. Changing these to new functions should be pretty straight-forward. As a followup, I'll fix all fbcode callsights
* Instead of initializing backup engine in the constructor, we initialize it in a separate function now. That way, we can catch all errors and return appropriate status code.
* We catch all errors during initializations and return them to the client properly.
* Added new tests to backupable_db_test, to make sure that we can't open BackupEngine when there are Env errors.
* Transitioned backupable_db_test to use BackupEngine rather than BackupableDB. From the two available APIs, judging by the current use-cases, it looks like BackupEngine API won. It's much more flexible since it doesn't require StackableDB.

Test Plan: Added a new unit test to backupable_db_test

Reviewers: yhchiang, sdong, AaronFeldman

Reviewed By: AaronFeldman

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D41925
上级 18d5e1bf
......@@ -2,9 +2,10 @@
## Unreleased
### Public API changes
### Public API Changes
* Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it.
* Deprecated purge_redundant_kvs_while_flush option.
* Removed BackupEngine::NewBackupEngine() and NewReadOnlyBackupEngine() that were deprecated in RocksDB 3.8. Please use BackupEngine::Open() instead.
## 3.12.0 (7/2/2015)
### New Features
......
......@@ -178,10 +178,6 @@ class BackupEngineReadOnly {
public:
virtual ~BackupEngineReadOnly() {}
static BackupEngineReadOnly* NewReadOnlyBackupEngine(
Env* db_env, const BackupableDBOptions& options)
__attribute__((deprecated("Please use Open() instead")));
static Status Open(Env* db_env, const BackupableDBOptions& options,
BackupEngineReadOnly** backup_engine_ptr);
......@@ -208,10 +204,6 @@ class BackupEngine {
public:
virtual ~BackupEngine() {}
static BackupEngine* NewBackupEngine(Env* db_env,
const BackupableDBOptions& options)
__attribute__((deprecated("Please use Open() instead")));
static Status Open(Env* db_env,
const BackupableDBOptions& options,
BackupEngine** backup_engine_ptr);
......@@ -272,6 +264,7 @@ class BackupableDB : public StackableDB {
private:
BackupEngine* backup_engine_;
Status status_;
};
// Use this class to access information about backups and restore from them
......@@ -317,6 +310,7 @@ class RestoreBackupableDB {
private:
BackupEngine* backup_engine_;
Status status_;
};
} // namespace rocksdb
......
......@@ -144,6 +144,8 @@ class BackupEngineImpl : public BackupEngine {
restore_options);
}
Status Initialize();
private:
void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
......@@ -192,7 +194,7 @@ class BackupEngineImpl : public BackupEngine {
Status AddFile(std::shared_ptr<FileInfo> file_info);
void Delete(bool delete_meta = true);
Status Delete(bool delete_meta = true);
bool Empty() {
return files_.empty();
......@@ -419,6 +421,7 @@ class BackupEngineImpl : public BackupEngine {
}
};
bool initialized_;
channel<CopyWorkItem> files_to_copy_;
std::vector<std::thread> threads_;
......@@ -459,58 +462,83 @@ class BackupEngineImpl : public BackupEngine {
BackupStatistics backup_statistics_;
};
BackupEngine* BackupEngine::NewBackupEngine(
Env* db_env, const BackupableDBOptions& options) {
return new BackupEngineImpl(db_env, options);
}
Status BackupEngine::Open(Env* env,
const BackupableDBOptions& options,
Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
BackupEngine** backup_engine_ptr) {
*backup_engine_ptr = new BackupEngineImpl(env, options);
std::unique_ptr<BackupEngineImpl> backup_engine(
new BackupEngineImpl(env, options));
auto s = backup_engine->Initialize();
if (!s.ok()) {
*backup_engine_ptr = nullptr;
return s;
}
*backup_engine_ptr = backup_engine.release();
return Status::OK();
}
BackupEngineImpl::BackupEngineImpl(Env* db_env,
const BackupableDBOptions& options,
bool read_only)
: stop_backup_(false),
: initialized_(false),
stop_backup_(false),
options_(options),
db_env_(db_env),
backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
copy_file_buffer_size_(kDefaultCopyFileBufferSize),
read_only_(read_only) {
read_only_(read_only) {}
BackupEngineImpl::~BackupEngineImpl() {
files_to_copy_.sendEof();
for (auto& t : threads_) {
t.join();
}
LogFlush(options_.info_log);
}
Status BackupEngineImpl::Initialize() {
assert(!initialized_);
initialized_ = true;
if (read_only_) {
Log(options_.info_log, "Starting read_only backup engine");
}
options_.Dump(options_.info_log);
if (!read_only_) {
// create all the dirs we need
backup_env_->CreateDirIfMissing(GetAbsolutePath());
backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_);
// gather the list of directories that we need to create
std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
directories;
directories.emplace_back(GetAbsolutePath(), &backup_directory_);
if (options_.share_table_files) {
if (options_.share_files_with_checksum) {
backup_env_->CreateDirIfMissing(GetAbsolutePath(
GetSharedFileWithChecksumRel()));
backup_env_->NewDirectory(GetAbsolutePath(
GetSharedFileWithChecksumRel()), &shared_directory_);
directories.emplace_back(
GetAbsolutePath(GetSharedFileWithChecksumRel()),
&shared_directory_);
} else {
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
backup_env_->NewDirectory(GetAbsolutePath(GetSharedFileRel()),
directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
&shared_directory_);
}
}
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel()));
backup_env_->NewDirectory(GetAbsolutePath(GetPrivateDirRel()),
directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
&private_directory_);
backup_env_->CreateDirIfMissing(GetBackupMetaDir());
backup_env_->NewDirectory(GetBackupMetaDir(), &meta_directory_);
directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
// create all the dirs we need
for (const auto& d : directories) {
auto s = backup_env_->CreateDirIfMissing(d.first);
if (s.ok()) {
s = backup_env_->NewDirectory(d.first, d.second);
}
if (!s.ok()) {
return s;
}
}
}
std::vector<std::string> backup_meta_files;
backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
{
auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
if (!s.ok()) {
return s;
}
}
// create backups_ structure
for (auto& file : backup_meta_files) {
if (file == "." || file == "..") {
......@@ -521,10 +549,10 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
sscanf(file.c_str(), "%u", &backup_id);
if (backup_id == 0 || file != rocksdb::ToString(backup_id)) {
if (!read_only_) {
Log(options_.info_log, "Unrecognized meta file %s, deleting",
file.c_str());
// invalid file name, delete that
backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
Log(options_.info_log, "Unrecognized meta file %s, deleting -- %s",
file.c_str(), s.ToString().c_str());
}
continue;
}
......@@ -540,8 +568,13 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
Log(options_.info_log,
"Backup Engine started with destroy_old_data == true, deleting all "
"backups");
PurgeOldBackups(0);
(void) GarbageCollect();
auto s = PurgeOldBackups(0);
if (s.ok()) {
s = GarbageCollect();
}
if (!s.ok()) {
return s;
}
// start from beginning
latest_backup_id_ = 0;
} else { // Load data from storage
......@@ -586,19 +619,27 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
later_ids.push_back(itr->first);
}
for (auto id : later_ids) {
Status s;
if (!read_only_) {
DeleteBackup(id);
s = DeleteBackup(id);
} else {
auto backup = backups_.find(id);
// We just found it couple of lines earlier!
assert(backup != backups_.end());
backup->second->Delete(false);
s = backup->second->Delete(false);
backups_.erase(backup);
}
if (!s.ok()) {
Log(options_.info_log, "Failed deleting backup %" PRIu32 " -- %s", id,
s.ToString().c_str());
}
}
if (!read_only_) {
PutLatestBackupFileContents(latest_backup_id_); // Ignore errors
auto s = PutLatestBackupFileContents(latest_backup_id_);
if (!s.ok()) {
return s;
}
}
// set up threads perform copies from files_to_copy_ in the background
......@@ -622,17 +663,12 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
}
Log(options_.info_log, "Initialized BackupEngine");
}
BackupEngineImpl::~BackupEngineImpl() {
files_to_copy_.sendEof();
for (int i = 0; i < options_.max_background_operations; i++) {
threads_[i].join();
}
LogFlush(options_.info_log);
return Status::OK();
}
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
assert(initialized_);
if (options_.max_background_operations > 1 &&
options_.backup_rate_limit != 0) {
return Status::InvalidArgument(
......@@ -838,6 +874,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
}
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
assert(initialized_);
assert(!read_only_);
Log(options_.info_log, "Purging old backups, keeping %u",
num_backups_to_keep);
......@@ -848,24 +885,34 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
itr++;
}
for (auto backup_id : to_delete) {
DeleteBackup(backup_id);
auto s = DeleteBackup(backup_id);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
assert(initialized_);
assert(!read_only_);
Log(options_.info_log, "Deleting backup %u", backup_id);
auto backup = backups_.find(backup_id);
if (backup != backups_.end()) {
backup->second->Delete();
auto s = backup->second->Delete();
if (!s.ok()) {
return s;
}
backups_.erase(backup);
} else {
auto corrupt = corrupt_backups_.find(backup_id);
if (corrupt == corrupt_backups_.end()) {
return Status::NotFound("Backup not found");
}
corrupt->second.second->Delete();
auto s = corrupt->second.second->Delete();
if (!s.ok()) {
return s;
}
corrupt_backups_.erase(corrupt);
}
......@@ -892,6 +939,7 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
}
void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
assert(initialized_);
backup_info->reserve(backups_.size());
for (auto& backup : backups_) {
if (!backup.second->Empty()) {
......@@ -906,6 +954,7 @@ void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
void
BackupEngineImpl::GetCorruptedBackups(
std::vector<BackupID>* corrupt_backup_ids) {
assert(initialized_);
corrupt_backup_ids->reserve(corrupt_backups_.size());
for (auto& backup : corrupt_backups_) {
corrupt_backup_ids->push_back(backup.first);
......@@ -915,6 +964,7 @@ BackupEngineImpl::GetCorruptedBackups(
Status BackupEngineImpl::RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options) {
assert(initialized_);
if (options_.max_background_operations > 1 &&
options_.restore_rate_limit != 0) {
return Status::InvalidArgument(
......@@ -1351,8 +1401,13 @@ Status BackupEngineImpl::GarbageCollect() {
// delete obsolete shared files
std::vector<std::string> shared_children;
backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
{
auto s = backup_env_->GetChildren(GetAbsolutePath(GetSharedFileRel()),
&shared_children);
if (!s.ok()) {
return s;
}
}
for (auto& child : shared_children) {
std::string rel_fname = GetSharedFileRel(child);
auto child_itr = backuped_file_infos_.find(rel_fname);
......@@ -1362,17 +1417,21 @@ Status BackupEngineImpl::GarbageCollect() {
// this might be a directory, but DeleteFile will just fail in that
// case, so we're good
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
if (s.ok()) {
Log(options_.info_log, "Deleted %s", rel_fname.c_str());
}
Log(options_.info_log, "Deleting %s -- %s", rel_fname.c_str(),
s.ToString().c_str());
backuped_file_infos_.erase(rel_fname);
}
}
// delete obsolete private files
std::vector<std::string> private_children;
backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
{
auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
&private_children);
if (!s.ok()) {
return s;
}
}
for (auto& child : private_children) {
BackupID backup_id = 0;
bool tmp_dir = child.find(".tmp") != std::string::npos;
......@@ -1389,14 +1448,12 @@ Status BackupEngineImpl::GarbageCollect() {
backup_env_->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
Status s = backup_env_->DeleteFile(full_private_path + subchild);
if (s.ok()) {
Log(options_.info_log, "Deleted %s",
(full_private_path + subchild).c_str());
}
Log(options_.info_log, "Deleting %s -- %s",
(full_private_path + subchild).c_str(), s.ToString().c_str());
}
// finally delete the private dir
Status s = backup_env_->DeleteDir(full_private_path);
Log(options_.info_log, "Deleted dir %s -- %s", full_private_path.c_str(),
Log(options_.info_log, "Deleting dir %s -- %s", full_private_path.c_str(),
s.ToString().c_str());
}
......@@ -1432,16 +1489,18 @@ Status BackupEngineImpl::BackupMeta::AddFile(
return Status::OK();
}
void BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
Status s;
for (const auto& file : files_) {
--file->refs; // decrease refcount
}
files_.clear();
// delete meta file
if (delete_meta) {
env_->DeleteFile(meta_filename_);
if (delete_meta && env_->FileExists(meta_filename_)) {
s = env_->DeleteFile(meta_filename_);
}
timestamp_ = 0;
return s;
}
// each backup meta file is of the format:
......@@ -1607,58 +1666,75 @@ class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
restore_options);
}
Status Initialize() { return backup_engine_->Initialize(); }
private:
std::unique_ptr<BackupEngineImpl> backup_engine_;
};
BackupEngineReadOnly* BackupEngineReadOnly::NewReadOnlyBackupEngine(
Env* db_env, const BackupableDBOptions& options) {
if (options.destroy_old_data) {
assert(false);
return nullptr;
}
return new BackupEngineReadOnlyImpl(db_env, options);
}
Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
BackupEngineReadOnly** backup_engine_ptr) {
if (options.destroy_old_data) {
assert(false);
return Status::InvalidArgument(
"Can't destroy old data with ReadOnly BackupEngine");
}
*backup_engine_ptr = new BackupEngineReadOnlyImpl(env, options);
std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
new BackupEngineReadOnlyImpl(env, options));
auto s = backup_engine->Initialize();
if (!s.ok()) {
*backup_engine_ptr = nullptr;
return s;
}
*backup_engine_ptr = backup_engine.release();
return Status::OK();
}
// --- BackupableDB methods --------
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
: StackableDB(db),
backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {}
: StackableDB(db) {
auto backup_engine_impl = new BackupEngineImpl(db->GetEnv(), options);
status_ = backup_engine_impl->Initialize();
backup_engine_ = backup_engine_impl;
}
BackupableDB::~BackupableDB() {
delete backup_engine_;
}
Status BackupableDB::CreateNewBackup(bool flush_before_backup) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->CreateNewBackup(this, flush_before_backup);
}
void BackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
if (!status_.ok()) {
return;
}
backup_engine_->GetBackupInfo(backup_info);
}
void
BackupableDB::GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) {
if (!status_.ok()) {
return;
}
backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}
Status BackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}
Status BackupableDB::DeleteBackup(BackupID backup_id) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->DeleteBackup(backup_id);
}
......@@ -1667,14 +1743,20 @@ void BackupableDB::StopBackup() {
}
Status BackupableDB::GarbageCollect() {
if (!status_.ok()) {
return status_;
}
return backup_engine_->GarbageCollect();
}
// --- RestoreBackupableDB methods ------
RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
const BackupableDBOptions& options)
: backup_engine_(new BackupEngineImpl(db_env, options)) {}
const BackupableDBOptions& options) {
auto backup_engine_impl = new BackupEngineImpl(db_env, options);
status_ = backup_engine_impl->Initialize();
backup_engine_ = backup_engine_impl;
}
RestoreBackupableDB::~RestoreBackupableDB() {
delete backup_engine_;
......@@ -1682,17 +1764,26 @@ RestoreBackupableDB::~RestoreBackupableDB() {
void
RestoreBackupableDB::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
if (!status_.ok()) {
return;
}
backup_engine_->GetBackupInfo(backup_info);
}
void RestoreBackupableDB::GetCorruptedBackups(
std::vector<BackupID>* corrupt_backup_ids) {
if (!status_.ok()) {
return;
}
backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}
Status RestoreBackupableDB::RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
restore_options);
}
......@@ -1700,19 +1791,31 @@ Status RestoreBackupableDB::RestoreDBFromBackup(
Status RestoreBackupableDB::RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
restore_options);
}
Status RestoreBackupableDB::PurgeOldBackups(uint32_t num_backups_to_keep) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->PurgeOldBackups(num_backups_to_keep);
}
Status RestoreBackupableDB::DeleteBackup(BackupID backup_id) {
if (!status_.ok()) {
return status_;
}
return backup_engine_->DeleteBackup(backup_id);
}
Status RestoreBackupableDB::GarbageCollect() {
if (!status_.ok()) {
return status_;
}
return backup_engine_->GarbageCollect();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册