提交 53b88784 编写于 作者: I Igor Canadi

Add throttling to multi-threaded backups

Summary: See internal task t8056182

Test Plan: Added multi-threading in RateLimiter test

Reviewers: benj, AaronFeldman

Reviewed By: AaronFeldman

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D45459
上级 09d982f9
......@@ -28,6 +28,7 @@
#include <algorithm>
#include <vector>
#include <map>
#include <mutex>
#include <sstream>
#include <string>
#include <limits>
......@@ -51,7 +52,10 @@ class BackupRateLimiter {
micros_start_time_(env->NowMicros()),
bytes_since_start_(0) {}
// thread safe
void ReportAndWait(uint64_t bytes_since_last_call) {
std::unique_lock<std::mutex> lk(lock_);
bytes_since_start_ += bytes_since_last_call;
if (bytes_since_start_ < bytes_per_check_) {
// not enough bytes to be rate-limited
......@@ -75,6 +79,7 @@ class BackupRateLimiter {
private:
Env* env_;
std::mutex lock_;
uint64_t max_bytes_per_second_;
uint64_t bytes_per_check_;
uint64_t micros_start_time_;
......@@ -338,9 +343,9 @@ class BackupEngineImpl : public BackupEngine {
CopyWorkItem(const CopyWorkItem&) = delete;
CopyWorkItem& operator=(const CopyWorkItem&) = delete;
CopyWorkItem(CopyWorkItem&& o) { *this = std::move(o); }
CopyWorkItem(CopyWorkItem&& o) noexcept { *this = std::move(o); }
CopyWorkItem& operator=(CopyWorkItem&& o) {
CopyWorkItem& operator=(CopyWorkItem&& o) noexcept {
src_path = std::move(o.src_path);
dst_path = std::move(o.dst_path);
src_env = o.src_env;
......@@ -378,11 +383,11 @@ class BackupEngineImpl : public BackupEngine {
std::string dst_relative;
BackupAfterCopyWorkItem() {}
BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) {
BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) noexcept {
*this = std::move(o);
}
BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) {
BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) noexcept {
result = std::move(o.result);
shared = o.shared;
needed_to_copy = o.needed_to_copy;
......@@ -413,11 +418,11 @@ class BackupEngineImpl : public BackupEngine {
RestoreAfterCopyWorkItem(std::future<CopyResult>&& _result,
uint32_t _checksum_value)
: result(std::move(_result)), checksum_value(_checksum_value) {}
RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) {
RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) noexcept {
*this = std::move(o);
}
RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) {
RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) noexcept {
result = std::move(o.result);
checksum_value = o.checksum_value;
return *this;
......@@ -672,11 +677,6 @@ Status BackupEngineImpl::Initialize() {
Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
assert(initialized_);
if (options_.max_background_operations > 1 &&
options_.backup_rate_limit != 0) {
return Status::InvalidArgument(
"Multi-threaded backups cannot use a backup_rate_limit");
}
assert(!read_only_);
Status s;
std::vector<std::string> live_files;
......@@ -968,11 +968,6 @@ Status BackupEngineImpl::RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options) {
assert(initialized_);
if (options_.max_background_operations > 1 &&
options_.restore_rate_limit != 0) {
return Status::InvalidArgument(
"Multi-threaded restores cannot use a restore_rate_limit");
}
auto corrupt_itr = corrupt_backups_.find(backup_id);
if (corrupt_itr != corrupt_backups_.end()) {
return corrupt_itr->second.first;
......@@ -1146,7 +1141,8 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
char file_contents[10];
int len = sprintf(file_contents, "%u\n", latest_backup);
int len =
snprintf(file_contents, sizeof(file_contents), "%u\n", latest_backup);
s = file_writer->Append(Slice(file_contents, len));
if (s.ok() && options_.sync) {
file_writer->Sync(false);
......
......@@ -1037,43 +1037,46 @@ TEST_F(BackupableDBTest, KeepLogFiles) {
}
TEST_F(BackupableDBTest, RateLimiting) {
uint64_t const KB = 1024 * 1024;
size_t const kMicrosPerSec = 1000 * 1000LL;
// iter 0 -- single threaded
// iter 1 -- multi threaded
for (int iter = 0; iter < 2; ++iter) {
uint64_t const KB = 1024 * 1024;
size_t const kMicrosPerSec = 1000 * 1000LL;
std::vector<std::pair<uint64_t, uint64_t>> limits(
{{KB, 5 * KB}, {2 * KB, 3 * KB}});
std::vector<std::pair<uint64_t, uint64_t>> limits(
{{KB, 5 * KB}, {2 * KB, 3 * KB}});
for (const auto& limit : limits) {
// destroy old data
DestroyDB(dbname_, Options());
for (const auto& limit : limits) {
// destroy old data
DestroyDB(dbname_, Options());
backupable_options_->backup_rate_limit = limit.first;
backupable_options_->restore_rate_limit = limit.second;
// rate-limiting backups must be single-threaded
backupable_options_->max_background_operations = 1;
options_.compression = kNoCompression;
OpenDBAndBackupEngine(true);
size_t bytes_written = FillDB(db_.get(), 0, 100000);
auto start_backup = env_->NowMicros();
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
auto backup_time = env_->NowMicros() - start_backup;
auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) /
backupable_options_->backup_rate_limit;
ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time);
backupable_options_->backup_rate_limit = limit.first;
backupable_options_->restore_rate_limit = limit.second;
backupable_options_->max_background_operations = (iter == 0) ? 1 : 10;
options_.compression = kNoCompression;
OpenDBAndBackupEngine(true);
size_t bytes_written = FillDB(db_.get(), 0, 100000);
CloseDBAndBackupEngine();
auto start_backup = env_->NowMicros();
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
auto backup_time = env_->NowMicros() - start_backup;
auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) /
backupable_options_->backup_rate_limit;
ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time);
OpenBackupEngine();
auto start_restore = env_->NowMicros();
ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
auto restore_time = env_->NowMicros() - start_restore;
CloseBackupEngine();
auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) /
backupable_options_->restore_rate_limit;
ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time);
CloseDBAndBackupEngine();
AssertBackupConsistency(0, 0, 100000, 100010);
OpenBackupEngine();
auto start_restore = env_->NowMicros();
ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
auto restore_time = env_->NowMicros() - start_restore;
CloseBackupEngine();
auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) /
backupable_options_->restore_rate_limit;
ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time);
AssertBackupConsistency(0, 0, 100000, 100010);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册