From 34455e35e22dbe33b89dbf7ae5560dd151f20824 Mon Sep 17 00:00:00 2001 From: Smilencer <527646889@qq.com> Date: Mon, 19 Oct 2020 16:51:45 +0800 Subject: [PATCH] feat(hotkey): implement the RPC handler in hotkey_collector (#621) --- src/server/hotkey_collector.cpp | 70 ++++++++++++++++++- src/server/hotkey_collector.h | 12 ++-- src/server/pegasus_server_impl_init.cpp | 6 +- .../test/capacity_unit_calculator_test.cpp | 4 +- 4 files changed, 82 insertions(+), 10 deletions(-) diff --git a/src/server/hotkey_collector.cpp b/src/server/hotkey_collector.cpp index cb3d1c9..dfde0c5 100644 --- a/src/server/hotkey_collector.cpp +++ b/src/server/hotkey_collector.cpp @@ -17,21 +17,39 @@ #include "hotkey_collector.h" +#include #include #include "base/pegasus_key_schema.h" +#include namespace pegasus { namespace server { -hotkey_collector::hotkey_collector() - : _internal_collector(dsn::make_unique()) +hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type, + dsn::replication::replica_base *r_base) + : replica_base(r_base), + _state(hotkey_collector_state::STOPPED), + _hotkey_type(hotkey_type), + _internal_collector(std::make_shared()) { } -// TODO: (Tangyanzhao) implement these functions void hotkey_collector::handle_rpc(const dsn::replication::detect_hotkey_request &req, dsn::replication::detect_hotkey_response &resp) { + switch (req.action) { + case dsn::replication::detect_action::START: + on_start_detect(resp); + return; + case dsn::replication::detect_action::STOP: + on_stop_detect(resp); + return; + default: + std::string hint = fmt::format("{}: can't find this detect action", req.action); + resp.err = dsn::ERR_INVALID_STATE; + resp.__set_err_hint(hint); + derror_replica(hint); + } } void hotkey_collector::capture_raw_key(const dsn::blob &raw_key, int64_t weight) @@ -49,5 +67,51 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); } +void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp) +{ + auto now_state = _state.load(); + std::string hint; + switch (now_state) { + case hotkey_collector_state::COARSE_DETECTING: + case hotkey_collector_state::FINE_DETECTING: + resp.err = dsn::ERR_INVALID_STATE; + hint = fmt::format("still detecting {} hotkey, state is {}", + dsn::enum_to_string(_hotkey_type), + enum_to_string(now_state)); + dwarn_replica(hint); + return; + case hotkey_collector_state::FINISHED: + resp.err = dsn::ERR_INVALID_STATE; + hint = fmt::format( + "{} hotkey result has been found, you can send a stop rpc to restart hotkey detection", + dsn::enum_to_string(_hotkey_type)); + dwarn_replica(hint); + return; + case hotkey_collector_state::STOPPED: + // TODO: (Tangyanzhao) start coarse detecting + _state.store(hotkey_collector_state::COARSE_DETECTING); + resp.err = dsn::ERR_OK; + hint = fmt::format("starting to detect {} hotkey", dsn::enum_to_string(_hotkey_type)); + ddebug_replica(hint); + return; + default: + hint = "invalid collector state"; + resp.err = dsn::ERR_INVALID_STATE; + resp.__set_err_hint(hint); + derror_replica(hint); + dassert(false, "invalid collector state"); + } +} + +void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp) +{ + _state.store(hotkey_collector_state::STOPPED); + _internal_collector.reset(); + resp.err = dsn::ERR_OK; + std::string hint = + fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type)); + ddebug_replica(hint); +} + } // namespace server } // namespace pegasus diff --git a/src/server/hotkey_collector.h b/src/server/hotkey_collector.h index 5c092dc..c649fac 100644 --- a/src/server/hotkey_collector.h +++ b/src/server/hotkey_collector.h @@ -17,9 +17,9 @@ #pragma once -#include #include #include "hotkey_collector_state.h" +#include namespace pegasus { namespace server { @@ -63,10 +63,11 @@ class internal_collector_base; // | | | Hotkey | // +--------------------+ +----------------------------------------------------+ -class hotkey_collector +class hotkey_collector : public dsn::replication::replica_base { public: - hotkey_collector(); + hotkey_collector(dsn::replication::hotkey_type::type hotkey_type, + dsn::replication::replica_base *r_base); // TODO: (Tangyanzhao) capture_*_key should be consistent with hotspot detection // weight: calculate the weight according to the specific situation void capture_raw_key(const dsn::blob &raw_key, int64_t weight); @@ -76,8 +77,11 @@ public: /*out*/ dsn::replication::detect_hotkey_response &resp); private: - std::unique_ptr _internal_collector; + void on_start_detect(dsn::replication::detect_hotkey_response &resp); + void on_stop_detect(dsn::replication::detect_hotkey_response &resp); std::atomic _state; + const dsn::replication::hotkey_type::type _hotkey_type; + std::shared_ptr _internal_collector; }; class internal_collector_base diff --git a/src/server/pegasus_server_impl_init.cpp b/src/server/pegasus_server_impl_init.cpp index 71c6979..75f0359 100644 --- a/src/server/pegasus_server_impl_init.cpp +++ b/src/server/pegasus_server_impl_init.cpp @@ -43,8 +43,10 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r) _primary_address = dsn::rpc_address(dsn_primary_address()).to_string(); _gpid = get_gpid(); - _read_hotkey_collector = std::make_shared(); - _write_hotkey_collector = std::make_shared(); + _read_hotkey_collector = + std::make_shared(dsn::replication::hotkey_type::READ, this); + _write_hotkey_collector = + std::make_shared(dsn::replication::hotkey_type::WRITE, this); _verbose_log = dsn_config_get_value_bool("pegasus.server", "rocksdb_verbose_log", diff --git a/src/server/test/capacity_unit_calculator_test.cpp b/src/server/test/capacity_unit_calculator_test.cpp index 27b8c91..8964af5 100644 --- a/src/server/test/capacity_unit_calculator_test.cpp +++ b/src/server/test/capacity_unit_calculator_test.cpp @@ -29,7 +29,9 @@ public: explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r) : capacity_unit_calculator( - r, std::make_shared(), std::make_shared()) + r, + std::make_shared(dsn::replication::hotkey_type::READ, this), + std::make_shared(dsn::replication::hotkey_type::WRITE, this)) { } -- GitLab