diff --git a/conf/pika.conf b/conf/pika.conf index a1209b7c5855dffc3bd83830112d0c3494eecdfe..0e84f48617d6c0a97e9761d324e7585e5e1048f4 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -410,3 +410,7 @@ default-slot-num : 1024 # blob-num-shard-bits default -1, the number of bits from cache keys to be use as shard id. # The cache will be sharded into 2^blob-num-shard-bits shards. # blob-num-shard-bits : -1 + +# Rsync Rate limiting configuration +throttle-bytes-per-second : 307200000 +max-rsync-parallel-num : 4 \ No newline at end of file diff --git a/include/pika_admin.h b/include/pika_admin.h index 24d55a475f0e6521220230037c27aded24af5fd0..839ce4abe2fab06507f89d46b710939a9d3c5f13 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -450,16 +450,16 @@ class HelloCmd : public Cmd { }; class DiskRecoveryCmd : public Cmd { -public: - DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} - void Do(std::shared_ptr slot = nullptr) override; - void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; - void Merge() override{}; - Cmd* Clone() override { return new DiskRecoveryCmd(*this); } - -private: - void DoInitial() override; - std::map background_errors_; + public: + DiskRecoveryCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {} + void Do(std::shared_ptr slot = nullptr) override; + void Split(std::shared_ptr slot, const HintKeys& hint_keys) override{}; + void Merge() override{}; + Cmd* Clone() override { return new DiskRecoveryCmd(*this); } + + private: + void DoInitial() override; + std::map background_errors_; }; #ifdef WITH_COMMAND_DOCS diff --git a/include/pika_conf.h b/include/pika_conf.h index 94361f878ff1a88a88919654c44cb8765153f7a0..0ff6001ce48111e0d5c085a551697d4ea8c3abd0 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -328,6 +328,15 @@ class PikaConf : public pstd::BaseConf { int64_t blob_cache() { return blob_cache_; } int64_t blob_num_shard_bits() { return blob_num_shard_bits_; } + // Rsync Rate limiting configuration + int throttle_bytes_per_second() { + std::shared_lock l(rwlock_); + return throttle_bytes_per_second_; + } + int max_rsync_parallel_num() { + std::shared_lock l(rwlock_); + return max_rsync_parallel_num_; + } // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } std::string pidfile() { return pidfile_; } @@ -544,6 +553,19 @@ class PikaConf : public pstd::BaseConf { log_level_ = value; } + // Rsync Rate limiting configuration + void SetThrottleBytesPerSecond(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("throttle-bytes-per-second", std::to_string(value)); + throttle_bytes_per_second_ = value; + } + + void SetMaxRsyncParallelNum(const int value) { + std::lock_guard l(rwlock_); + TryPushDiffCommands("max-rsync-parallel-num", std::to_string(value)); + max_rsync_parallel_num_ = value; + } + pstd::Status DBSlotsSanityCheck(const std::string& db_name, const std::set& slot_ids, bool is_add); pstd::Status AddDBSlots(const std::string& db_name, const std::set& slot_ids); @@ -666,6 +688,10 @@ class PikaConf : public pstd::BaseConf { std::unique_ptr local_meta_; std::shared_mutex rwlock_; + + // Rsync Rate limiting configuration + int throttle_bytes_per_second_ = 307200000; + int max_rsync_parallel_num_ = 4; }; #endif diff --git a/include/rsync_client.h b/include/rsync_client.h index 94eb35d862c1030700d18948686d3d693af156ad..3b2aa94d8b17d3aac813b2f1fd1a4dfca5bdaea8 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -29,6 +29,8 @@ #include "include/throttle.h" #include "rsync_service.pb.h" +extern std::unique_ptr g_pika_conf; + const std::string kDumpMetaFileName = "DUMP_META_DATA"; const std::string kUuidPrefix = "snapshot-uuid:"; @@ -50,6 +52,7 @@ public: void* ThreadMain() override; void Copy(const std::set& file_set, int index); bool Init(); + int GetParallelNum(); Status Start(); Status Stop(); bool IsRunning() { @@ -93,9 +96,9 @@ private: std::condition_variable cond_; std::mutex mu_; - std::unique_ptr throttle_; std::string master_ip_; int master_port_; + int parallel_num_; }; class RsyncWriter { diff --git a/include/throttle.h b/include/throttle.h index d80f43aaca79b6f905653d0775ca0cd1ce4c4117..cb0dc7d63888d088300248a82d4f0cbbab9a4339 100644 --- a/include/throttle.h +++ b/include/throttle.h @@ -8,6 +8,9 @@ #include #include "pstd/include/pstd_mutex.h" +#include "pika_conf.h" + +extern std::unique_ptr g_pika_conf; namespace rsync { class Throttle { @@ -17,6 +20,10 @@ class Throttle { ~Throttle(); size_t ThrottledByThroughput(size_t bytes); void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us); + static Throttle& GetInstance() { + static Throttle instance(g_pika_conf->throttle_bytes_per_second(), 10); + return instance; + } private: std::atomic throttle_throughput_bytes_ = 100 * 1024 * 1024; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 53cdd64741b026ce5d3be7777ad7a06a0bf9b90d..c876592295e2687a8ac1c0b559fd82a5343fbd90 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1836,6 +1836,17 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "slave-read-only"); EncodeString(&config_body, g_pika_conf->slave_read_only() ? "yes" : "no"); } + if (pstd::stringmatch(pattern.data(), "throttle-bytes-per-second", 1) != 0) { + elements += 2; + EncodeString(&config_body, "throttle-bytes-per-second"); + EncodeNumber(&config_body, g_pika_conf->throttle_bytes_per_second()); + } + + if (pstd::stringmatch(pattern.data(), "max-rsync-parallel-num", 1) != 0) { + elements += 2; + EncodeString(&config_body, "max-rsync-parallel-num"); + EncodeNumber(&config_body, g_pika_conf->max_rsync_parallel_num()); + } std::stringstream resp; resp << "*" << std::to_string(elements) << "\r\n" << config_body; @@ -1879,6 +1890,8 @@ void ConfigCmd::ConfigSet(std::string& ret) { EncodeString(&ret, "write-buffer-size"); EncodeString(&ret, "max-write-buffer-num"); EncodeString(&ret, "arena-block-size"); + EncodeString(&ret, "throttle-bytes-per-second"); + EncodeString(&ret, "max-rsync-parallel-num"); return; } long int ival; @@ -2161,6 +2174,20 @@ void ConfigCmd::ConfigSet(std::string& ret) { } g_pika_conf->SetArenaBlockSize(static_cast(ival)); ret = "+OK\r\n"; + } else if (set_item == "throttle-bytes-per-second") { + if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n"; + return; + } + g_pika_conf->SetThrottleBytesPerSecond(static_cast(ival)); + ret = "+OK\r\n"; + } else if (set_item == "max-rsync-parallel-num") { + if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival > kMaxRsyncParallelNum) { + ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-rsync-parallel-num'\r\n"; + return; + } + g_pika_conf->SetMaxRsyncParallelNum(static_cast(ival)); + ret = "+OK\r\n"; } else { ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n"; } diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 7d5fcfde9299452b03d56703609f856290327386..92a679e78241eec8231daa239f4499046d1ae220 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -601,6 +601,17 @@ int PikaConf::Load() { GetConfInt64("blob-num-shard-bits", &blob_num_shard_bits_); return ret; + + // throttle-bytes-per-second + GetConfInt("throttle-bytes-per-second", &throttle_bytes_per_second_); + if (throttle_bytes_per_second_ <= 0) { + throttle_bytes_per_second_ = 307200000; + } + + GetConfInt("max-rsync-parallel-num", &max_rsync_parallel_num_); + if (max_rsync_parallel_num_ <= 0) { + max_rsync_parallel_num_ = 4; + } } void PikaConf::TryPushDiffCommands(const std::string& command, const std::string& value) { @@ -641,6 +652,8 @@ int PikaConf::ConfigRewrite() { SetConfInt64("manually-resume-interval", resume_check_interval_); SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_); SetConfInt("slave-priority", slave_priority_); + SetConfInt("throttle-bytes-per-second", throttle_bytes_per_second_); + SetConfInt("max-rsync-parallel-num", max_rsync_parallel_num_); SetConfInt("sync-window-size", sync_window_size_.load()); SetConfInt("consensus-level", consensus_level_.load()); SetConfInt("replication-num", replication_num_.load()); diff --git a/src/rsync_client.cc b/src/rsync_client.cc index 46363e9384ba0a1a6ce7513bb8eebd49fd49980f..ac9fd9cefab1e4afcedfe55d2fa70420ba4b4c82 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -18,18 +18,17 @@ using namespace RsyncService; extern PikaServer* g_pika_server; const int kFlushIntervalUs = 10 * 1000 * 1000; -const int kThrottleBytesPerSecond = 300 << 20; const int kBytesPerRequest = 4 << 20; const int kThrottleCheckCycle = 10; namespace rsync { RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id) : snapshot_uuid_(""), dir_(dir), db_name_(db_name), slot_id_(slot_id), - state_(IDLE), max_retries_(10), master_ip_(""), master_port_(0) { + state_(IDLE), max_retries_(10), master_ip_(""), master_port_(0), + parallel_num_(g_pika_conf->max_rsync_parallel_num()) { wo_mgr_.reset(new WaitObjectManager()); client_thread_ = std::make_unique(3000, 60, wo_mgr_.get()); - work_threads_.resize(kMaxRsyncParallelNum); - throttle_.reset(new Throttle(kThrottleBytesPerSecond, kThrottleCheckCycle)); + work_threads_.resize(GetParallelNum()); finished_work_cnt_.store(0); } @@ -83,13 +82,13 @@ void* RsyncClient::ThreadMain() { Status s = Status::OK(); LOG(INFO) << "RsyncClient begin to copy remote files"; - std::vector > file_vec(kMaxRsyncParallelNum); + std::vector > file_vec(GetParallelNum()); int index = 0; for (const auto& file : file_set_) { - file_vec[index++ % kMaxRsyncParallelNum].insert(file); + file_vec[index++ % GetParallelNum()].insert(file); } - for (int i = 0; i < kMaxRsyncParallelNum; i++) { + for (int i = 0; i < GetParallelNum(); i++) { work_threads_[i] = std::move(std::thread(&RsyncClient::Copy, this, file_vec[i], i)); } @@ -126,12 +125,12 @@ void* RsyncClient::ThreadMain() { outfile.flush(); meta_rep.clear(); - if (finished_work_cnt_.load() == kMaxRsyncParallelNum) { + if (finished_work_cnt_.load() == GetParallelNum()) { break; } } - for (int i = 0; i < kMaxRsyncParallelNum; i++) { + for (int i = 0; i < GetParallelNum(); i++) { work_threads_[i].join(); } finished_work_cnt_.store(0); @@ -161,7 +160,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { break; } size_t copy_file_begin_time = pstd::NowMicros(); - size_t count = throttle_->ThrottledByThroughput(kBytesPerRequest); + size_t count = Throttle::GetInstance().ThrottledByThroughput(kBytesPerRequest); if (count == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(1000 / kThrottleCheckCycle)); continue; @@ -200,7 +199,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { size_t ret_count = resp->file_resp().count(); size_t elaspe_time_us = pstd::NowMicros() - copy_file_begin_time; - throttle_->ReturnUnusedThroughput(count, ret_count, elaspe_time_us); + Throttle::GetInstance().ReturnUnusedThroughput(count, ret_count, elaspe_time_us); if (resp->code() != RsyncService::kOk) { //TODO: handle different error @@ -492,5 +491,9 @@ std::string RsyncClient::GetLocalMetaFilePath() { return db_path + kDumpMetaFileName; } +int RsyncClient::GetParallelNum() { + return parallel_num_; +} + } // end namespace rsync