未验证 提交 6cb413c3 编写于 作者: M Mixficsol 提交者: GitHub

rsync rate limiting configuration (#1926)

Co-authored-by: Nwuxianrong <wuxianrong@360.cn>
上级 4a39505f
......@@ -410,3 +410,7 @@ default-slot-num : 1024
# blob-num-shard-bits default -1, the number of bits from cache keys to be use as shard id.
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1
# Rsync Rate limiting configuration
throttle-bytes-per-second : 307200000
max-rsync-parallel-num : 4
\ No newline at end of file
......@@ -450,16 +450,16 @@ class HelloCmd : public Cmd {
};
class DiskRecoveryCmd : public Cmd {
public:
DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new DiskRecoveryCmd(*this); }
private:
void DoInitial() override;
std::map<std::string, uint64_t> background_errors_;
public:
DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new DiskRecoveryCmd(*this); }
private:
void DoInitial() override;
std::map<std::string, uint64_t> background_errors_;
};
#ifdef WITH_COMMAND_DOCS
......
......@@ -328,6 +328,15 @@ class PikaConf : public pstd::BaseConf {
int64_t blob_cache() { return blob_cache_; }
int64_t blob_num_shard_bits() { return blob_num_shard_bits_; }
// Rsync Rate limiting configuration
int throttle_bytes_per_second() {
std::shared_lock l(rwlock_);
return throttle_bytes_per_second_;
}
int max_rsync_parallel_num() {
std::shared_lock l(rwlock_);
return max_rsync_parallel_num_;
}
// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
......@@ -544,6 +553,19 @@ class PikaConf : public pstd::BaseConf {
log_level_ = value;
}
// Rsync Rate limiting configuration
void SetThrottleBytesPerSecond(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("throttle-bytes-per-second", std::to_string(value));
throttle_bytes_per_second_ = value;
}
void SetMaxRsyncParallelNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("max-rsync-parallel-num", std::to_string(value));
max_rsync_parallel_num_ = value;
}
pstd::Status DBSlotsSanityCheck(const std::string& db_name, const std::set<uint32_t>& slot_ids,
bool is_add);
pstd::Status AddDBSlots(const std::string& db_name, const std::set<uint32_t>& slot_ids);
......@@ -666,6 +688,10 @@ class PikaConf : public pstd::BaseConf {
std::unique_ptr<PikaMeta> local_meta_;
std::shared_mutex rwlock_;
// Rsync Rate limiting configuration
int throttle_bytes_per_second_ = 307200000;
int max_rsync_parallel_num_ = 4;
};
#endif
......@@ -29,6 +29,8 @@
#include "include/throttle.h"
#include "rsync_service.pb.h"
extern std::unique_ptr<PikaConf> g_pika_conf;
const std::string kDumpMetaFileName = "DUMP_META_DATA";
const std::string kUuidPrefix = "snapshot-uuid:";
......@@ -50,6 +52,7 @@ public:
void* ThreadMain() override;
void Copy(const std::set<std::string>& file_set, int index);
bool Init();
int GetParallelNum();
Status Start();
Status Stop();
bool IsRunning() {
......@@ -93,9 +96,9 @@ private:
std::condition_variable cond_;
std::mutex mu_;
std::unique_ptr<Throttle> throttle_;
std::string master_ip_;
int master_port_;
int parallel_num_;
};
class RsyncWriter {
......
......@@ -8,6 +8,9 @@
#include <atomic>
#include "pstd/include/pstd_mutex.h"
#include "pika_conf.h"
extern std::unique_ptr<PikaConf> g_pika_conf;
namespace rsync {
class Throttle {
......@@ -17,6 +20,10 @@ class Throttle {
~Throttle();
size_t ThrottledByThroughput(size_t bytes);
void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us);
static Throttle& GetInstance() {
static Throttle instance(g_pika_conf->throttle_bytes_per_second(), 10);
return instance;
}
private:
std::atomic<size_t> throttle_throughput_bytes_ = 100 * 1024 * 1024;
......
......@@ -1836,6 +1836,17 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, "slave-read-only");
EncodeString(&config_body, g_pika_conf->slave_read_only() ? "yes" : "no");
}
if (pstd::stringmatch(pattern.data(), "throttle-bytes-per-second", 1) != 0) {
elements += 2;
EncodeString(&config_body, "throttle-bytes-per-second");
EncodeNumber(&config_body, g_pika_conf->throttle_bytes_per_second());
}
if (pstd::stringmatch(pattern.data(), "max-rsync-parallel-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "max-rsync-parallel-num");
EncodeNumber(&config_body, g_pika_conf->max_rsync_parallel_num());
}
std::stringstream resp;
resp << "*" << std::to_string(elements) << "\r\n" << config_body;
......@@ -1879,6 +1890,8 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "write-buffer-size");
EncodeString(&ret, "max-write-buffer-num");
EncodeString(&ret, "arena-block-size");
EncodeString(&ret, "throttle-bytes-per-second");
EncodeString(&ret, "max-rsync-parallel-num");
return;
}
long int ival;
......@@ -2161,6 +2174,20 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
g_pika_conf->SetArenaBlockSize(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "throttle-bytes-per-second") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n";
return;
}
g_pika_conf->SetThrottleBytesPerSecond(static_cast<int>(ival));
ret = "+OK\r\n";
} else if (set_item == "max-rsync-parallel-num") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival > kMaxRsyncParallelNum) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-rsync-parallel-num'\r\n";
return;
}
g_pika_conf->SetMaxRsyncParallelNum(static_cast<int>(ival));
ret = "+OK\r\n";
} else {
ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n";
}
......
......@@ -601,6 +601,17 @@ int PikaConf::Load() {
GetConfInt64("blob-num-shard-bits", &blob_num_shard_bits_);
return ret;
// throttle-bytes-per-second
GetConfInt("throttle-bytes-per-second", &throttle_bytes_per_second_);
if (throttle_bytes_per_second_ <= 0) {
throttle_bytes_per_second_ = 307200000;
}
GetConfInt("max-rsync-parallel-num", &max_rsync_parallel_num_);
if (max_rsync_parallel_num_ <= 0) {
max_rsync_parallel_num_ = 4;
}
}
void PikaConf::TryPushDiffCommands(const std::string& command, const std::string& value) {
......@@ -641,6 +652,8 @@ int PikaConf::ConfigRewrite() {
SetConfInt64("manually-resume-interval", resume_check_interval_);
SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_);
SetConfInt("slave-priority", slave_priority_);
SetConfInt("throttle-bytes-per-second", throttle_bytes_per_second_);
SetConfInt("max-rsync-parallel-num", max_rsync_parallel_num_);
SetConfInt("sync-window-size", sync_window_size_.load());
SetConfInt("consensus-level", consensus_level_.load());
SetConfInt("replication-num", replication_num_.load());
......
......@@ -18,18 +18,17 @@ using namespace RsyncService;
extern PikaServer* g_pika_server;
const int kFlushIntervalUs = 10 * 1000 * 1000;
const int kThrottleBytesPerSecond = 300 << 20;
const int kBytesPerRequest = 4 << 20;
const int kThrottleCheckCycle = 10;
namespace rsync {
RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id)
: snapshot_uuid_(""), dir_(dir), db_name_(db_name), slot_id_(slot_id),
state_(IDLE), max_retries_(10), master_ip_(""), master_port_(0) {
state_(IDLE), max_retries_(10), master_ip_(""), master_port_(0),
parallel_num_(g_pika_conf->max_rsync_parallel_num()) {
wo_mgr_.reset(new WaitObjectManager());
client_thread_ = std::make_unique<RsyncClientThread>(3000, 60, wo_mgr_.get());
work_threads_.resize(kMaxRsyncParallelNum);
throttle_.reset(new Throttle(kThrottleBytesPerSecond, kThrottleCheckCycle));
work_threads_.resize(GetParallelNum());
finished_work_cnt_.store(0);
}
......@@ -83,13 +82,13 @@ void* RsyncClient::ThreadMain() {
Status s = Status::OK();
LOG(INFO) << "RsyncClient begin to copy remote files";
std::vector<std::set<std::string> > file_vec(kMaxRsyncParallelNum);
std::vector<std::set<std::string> > file_vec(GetParallelNum());
int index = 0;
for (const auto& file : file_set_) {
file_vec[index++ % kMaxRsyncParallelNum].insert(file);
file_vec[index++ % GetParallelNum()].insert(file);
}
for (int i = 0; i < kMaxRsyncParallelNum; i++) {
for (int i = 0; i < GetParallelNum(); i++) {
work_threads_[i] = std::move(std::thread(&RsyncClient::Copy, this, file_vec[i], i));
}
......@@ -126,12 +125,12 @@ void* RsyncClient::ThreadMain() {
outfile.flush();
meta_rep.clear();
if (finished_work_cnt_.load() == kMaxRsyncParallelNum) {
if (finished_work_cnt_.load() == GetParallelNum()) {
break;
}
}
for (int i = 0; i < kMaxRsyncParallelNum; i++) {
for (int i = 0; i < GetParallelNum(); i++) {
work_threads_[i].join();
}
finished_work_cnt_.store(0);
......@@ -161,7 +160,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
break;
}
size_t copy_file_begin_time = pstd::NowMicros();
size_t count = throttle_->ThrottledByThroughput(kBytesPerRequest);
size_t count = Throttle::GetInstance().ThrottledByThroughput(kBytesPerRequest);
if (count == 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000 / kThrottleCheckCycle));
continue;
......@@ -200,7 +199,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
size_t ret_count = resp->file_resp().count();
size_t elaspe_time_us = pstd::NowMicros() - copy_file_begin_time;
throttle_->ReturnUnusedThroughput(count, ret_count, elaspe_time_us);
Throttle::GetInstance().ReturnUnusedThroughput(count, ret_count, elaspe_time_us);
if (resp->code() != RsyncService::kOk) {
//TODO: handle different error
......@@ -492,5 +491,9 @@ std::string RsyncClient::GetLocalMetaFilePath() {
return db_path + kDumpMetaFileName;
}
int RsyncClient::GetParallelNum() {
return parallel_num_;
}
} // end namespace rsync
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册