提交 31a27a36 编写于 作者: D Dmytro Okhonko 提交者: Dmytro Okhonko

Callback for informing backup downloading added

Summary:
In case of huge db backup infromation about progress of downloading would help.
New callback parameter in CreateNewBackup() function will trigger whenever a some amount of data downloaded.
Task: 8057631

Test Plan:
ProgressCallbackDuringBackup test that cover new functionality added to BackupableDBTest tests.
other test succeed as well.

Reviewers: Guenena, benj, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D46575
上级 d93a9f2a
......@@ -92,17 +92,20 @@ struct BackupableDBOptions {
// Default: 1
int max_background_operations;
// During backup user can get callback every time next
// callback_trigger_interval_size bytes being copied.
// Default: 4194304
uint64_t callback_trigger_interval_size;
void Dump(Logger* logger) const;
explicit BackupableDBOptions(const std::string& _backup_dir,
Env* _backup_env = nullptr,
bool _share_table_files = true,
Logger* _info_log = nullptr, bool _sync = true,
bool _destroy_old_data = false,
bool _backup_log_files = true,
uint64_t _backup_rate_limit = 0,
uint64_t _restore_rate_limit = 0,
int _max_background_operations = 1)
explicit BackupableDBOptions(
const std::string& _backup_dir, Env* _backup_env = nullptr,
bool _share_table_files = true, Logger* _info_log = nullptr,
bool _sync = true, bool _destroy_old_data = false,
bool _backup_log_files = true, uint64_t _backup_rate_limit = 0,
uint64_t _restore_rate_limit = 0, int _max_background_operations = 1,
uint64_t _callback_trigger_interval_size = 4 * 1024 * 1024)
: backup_dir(_backup_dir),
backup_env(_backup_env),
share_table_files(_share_table_files),
......@@ -113,7 +116,8 @@ struct BackupableDBOptions {
backup_rate_limit(_backup_rate_limit),
restore_rate_limit(_restore_rate_limit),
share_files_with_checksum(false),
max_background_operations(_max_background_operations) {
max_background_operations(_max_background_operations),
callback_trigger_interval_size(_callback_trigger_interval_size) {
assert(share_table_files || !share_files_with_checksum);
}
};
......@@ -213,7 +217,9 @@ class BackupEngine {
const BackupableDBOptions& options,
BackupEngine** backup_engine_ptr);
virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0;
virtual Status CreateNewBackup(
DB* db, bool flush_before_backup = false,
std::function<void()> progress_callback = []() {}) = 0;
virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
virtual Status DeleteBackup(BackupID backup_id) = 0;
virtual void StopBackup() = 0;
......
......@@ -89,7 +89,9 @@ class BackupEngineImpl : public BackupEngine {
BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
bool read_only = false);
~BackupEngineImpl();
Status CreateNewBackup(DB* db, bool flush_before_backup = false) override;
Status CreateNewBackup(DB* db, bool flush_before_backup = false,
std::function<void()> progress_callback = []() {
}) override;
Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
Status DeleteBackup(BackupID backup_id) override;
void StopBackup() override {
......@@ -267,15 +269,11 @@ class BackupEngineImpl : public BackupEngine {
Status PutLatestBackupFileContents(uint32_t latest_backup);
// if size_limit == 0, there is no size limit, copy everything
Status CopyFile(const std::string& src,
const std::string& dst,
Env* src_env,
Env* dst_env,
bool sync,
RateLimiter* rate_limiter,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0);
Status CopyFile(const std::string& src, const std::string& dst, Env* src_env,
Env* dst_env, bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr, uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0,
std::function<void()> progress_callback = []() {});
Status CalculateChecksum(const std::string& src,
Env* src_env,
......@@ -296,6 +294,7 @@ class BackupEngineImpl : public BackupEngine {
RateLimiter* rate_limiter;
uint64_t size_limit;
std::promise<CopyResult> result;
std::function<void()> progress_callback;
CopyWorkItem() {}
CopyWorkItem(const CopyWorkItem&) = delete;
......@@ -312,23 +311,22 @@ class BackupEngineImpl : public BackupEngine {
rate_limiter = o.rate_limiter;
size_limit = o.size_limit;
result = std::move(o.result);
progress_callback = std::move(o.progress_callback);
return *this;
}
CopyWorkItem(std::string _src_path,
std::string _dst_path,
Env* _src_env,
Env* _dst_env,
bool _sync,
RateLimiter* _rate_limiter,
uint64_t _size_limit)
CopyWorkItem(std::string _src_path, std::string _dst_path, Env* _src_env,
Env* _dst_env, bool _sync, RateLimiter* _rate_limiter,
uint64_t _size_limit,
std::function<void()> _progress_callback = []() {})
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
src_env(_src_env),
dst_env(_dst_env),
sync(_sync),
rate_limiter(_rate_limiter),
size_limit(_size_limit) {}
size_limit(_size_limit),
progress_callback(_progress_callback) {}
};
struct BackupAfterCopyWorkItem {
......@@ -388,19 +386,18 @@ class BackupEngineImpl : public BackupEngine {
};
bool initialized_;
std::mutex byte_report_mutex_;
channel<CopyWorkItem> files_to_copy_;
std::vector<std::thread> threads_;
Status AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
BackupID backup_id,
bool shared,
const std::string& src_dir,
const std::string& src_fname, // starts with "/"
RateLimiter* rate_limiter,
uint64_t size_limit = 0,
bool shared_checksum = false);
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& src_fname, // starts with "/"
RateLimiter* rate_limiter, uint64_t size_limit = 0,
bool shared_checksum = false,
std::function<void()> progress_callback = []() {});
// backup state data
BackupID latest_backup_id_;
......@@ -578,15 +575,11 @@ Status BackupEngineImpl::Initialize() {
CopyWorkItem work_item;
while (files_to_copy_.read(work_item)) {
CopyResult result;
result.status = CopyFile(work_item.src_path,
work_item.dst_path,
work_item.src_env,
work_item.dst_env,
work_item.sync,
work_item.rate_limiter,
&result.size,
&result.checksum_value,
work_item.size_limit);
result.status =
CopyFile(work_item.src_path, work_item.dst_path, work_item.src_env,
work_item.dst_env, work_item.sync, work_item.rate_limiter,
&result.size, &result.checksum_value, work_item.size_limit,
work_item.progress_callback);
work_item.result.set_value(std::move(result));
}
});
......@@ -597,7 +590,8 @@ Status BackupEngineImpl::Initialize() {
return Status::OK();
}
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
Status BackupEngineImpl::CreateNewBackup(
DB* db, bool flush_before_backup, std::function<void()> progress_callback) {
assert(initialized_);
assert(!read_only_);
Status s;
......@@ -672,15 +666,12 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
// * if it's kTableFile, then it's shared
// * if it's kDescriptorFile, limit the size to manifest_file_size
s = AddBackupFileWorkItem(
live_dst_paths,
backup_items_to_finish,
new_backup_id,
options_.share_table_files && type == kTableFile,
db->GetName(),
live_files[i],
rate_limiter.get(),
(type == kDescriptorFile) ? manifest_file_size : 0,
options_.share_files_with_checksum && type == kTableFile);
live_dst_paths, backup_items_to_finish, new_backup_id,
options_.share_table_files && type == kTableFile, db->GetName(),
live_files[i], rate_limiter.get(),
(type == kDescriptorFile) ? manifest_file_size : 0,
options_.share_files_with_checksum && type == kTableFile,
progress_callback);
}
// Add a CopyWorkItem to the channel for each WAL file
for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
......@@ -1085,13 +1076,12 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
return s;
}
Status BackupEngineImpl::CopyFile(
const std::string& src,
const std::string& dst, Env* src_env,
Env* dst_env, bool sync,
RateLimiter* rate_limiter, uint64_t* size,
uint32_t* checksum_value,
uint64_t size_limit) {
Status BackupEngineImpl::CopyFile(const std::string& src,
const std::string& dst, Env* src_env,
Env* dst_env, bool sync,
RateLimiter* rate_limiter, uint64_t* size,
uint32_t* checksum_value, uint64_t size_limit,
std::function<void()> progress_callback) {
Status s;
unique_ptr<WritableFile> dst_file;
unique_ptr<SequentialFile> src_file;
......@@ -1125,6 +1115,7 @@ Status BackupEngineImpl::CopyFile(
unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
Slice data;
uint64_t processed_buffer_size = 0;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
......@@ -1149,6 +1140,12 @@ Status BackupEngineImpl::CopyFile(
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW);
}
processed_buffer_size += buffer_to_read;
if (processed_buffer_size > options_.callback_trigger_interval_size) {
processed_buffer_size -= options_.callback_trigger_interval_size;
std::lock_guard<std::mutex> lock(byte_report_mutex_);
progress_callback();
}
} while (s.ok() && data.size() > 0 && size_limit > 0);
if (s.ok() && sync) {
......@@ -1160,15 +1157,12 @@ Status BackupEngineImpl::CopyFile(
// src_fname will always start with "/"
Status BackupEngineImpl::AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
BackupID backup_id,
bool shared,
const std::string& src_dir,
const std::string& src_fname,
RateLimiter* rate_limiter,
uint64_t size_limit,
bool shared_checksum) {
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& src_fname, RateLimiter* rate_limiter,
uint64_t size_limit, bool shared_checksum,
std::function<void()> progress_callback) {
assert(src_fname.size() > 0 && src_fname[0] == '/');
std::string dst_relative = src_fname.substr(1);
std::string dst_relative_tmp;
......@@ -1252,13 +1246,9 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
if (need_to_copy) {
Log(options_.info_log, "Copying %s to %s", src_fname.c_str(),
dst_path_tmp.c_str());
CopyWorkItem copy_work_item(src_dir + src_fname,
dst_path_tmp,
db_env_,
backup_env_,
options_.sync,
rate_limiter,
size_limit);
CopyWorkItem copy_work_item(src_dir + src_fname, dst_path_tmp, db_env_,
backup_env_, options_.sync, rate_limiter,
size_limit, progress_callback);
BackupAfterCopyWorkItem after_copy_work_item(
copy_work_item.result.get_future(),
shared,
......
......@@ -1172,6 +1172,20 @@ TEST_F(BackupableDBTest, ReadOnlyBackupEngine) {
delete db;
}
TEST_F(BackupableDBTest, ProgressCallbackDuringBackup) {
DestroyDB(dbname_, Options());
OpenDBAndBackupEngine(true);
FillDB(db_.get(), 0, 100);
bool is_callback_invoked = false;
ASSERT_OK(backup_engine_->CreateNewBackup(
db_.get(), true,
[&is_callback_invoked]() { is_callback_invoked = true; }));
ASSERT_TRUE(is_callback_invoked);
CloseDBAndBackupEngine();
DestroyDB(dbname_, Options());
}
TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) {
DestroyDB(dbname_, Options());
OpenDBAndBackupEngine(true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册