diff --git a/src/base/rrdb_types.cpp b/src/base/rrdb_types.cpp index c940a2b754787d35741d8ed242e88b0d82b0b63f..53ac7c70333405c4ede31578057a3a688362fd60 100644 --- a/src/base/rrdb_types.cpp +++ b/src/base/rrdb_types.cpp @@ -4608,9 +4608,9 @@ hotkey_detect_request::~hotkey_detect_request() throw() {} void hotkey_detect_request::__set_type(const hotkey_type::type val) { this->type = val; } -void hotkey_detect_request::__set_operation(const hotkey_detect_action::type val) +void hotkey_detect_request::__set_action(const hotkey_detect_action::type val) { - this->operation = val; + this->action = val; } uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *iprot) @@ -4646,8 +4646,8 @@ uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *ipro if (ftype == ::apache::thrift::protocol::T_I32) { int32_t ecast135; xfer += iprot->readI32(ecast135); - this->operation = (hotkey_detect_action::type)ecast135; - this->__isset.operation = true; + this->action = (hotkey_detect_action::type)ecast135; + this->__isset.action = true; } else { xfer += iprot->skip(ftype); } @@ -4674,8 +4674,8 @@ uint32_t hotkey_detect_request::write(::apache::thrift::protocol::TProtocol *opr xfer += oprot->writeI32((int32_t)this->type); xfer += oprot->writeFieldEnd(); - xfer += oprot->writeFieldBegin("operation", ::apache::thrift::protocol::T_I32, 2); - xfer += oprot->writeI32((int32_t)this->operation); + xfer += oprot->writeFieldBegin("action", ::apache::thrift::protocol::T_I32, 2); + xfer += oprot->writeI32((int32_t)this->action); xfer += oprot->writeFieldEnd(); xfer += oprot->writeFieldStop(); @@ -4687,33 +4687,33 @@ void swap(hotkey_detect_request &a, hotkey_detect_request &b) { using ::std::swap; swap(a.type, b.type); - swap(a.operation, b.operation); + swap(a.action, b.action); swap(a.__isset, b.__isset); } hotkey_detect_request::hotkey_detect_request(const hotkey_detect_request &other136) { type = other136.type; - operation = other136.operation; + action = other136.action; __isset = other136.__isset; } hotkey_detect_request::hotkey_detect_request(hotkey_detect_request &&other137) { type = std::move(other137.type); - operation = std::move(other137.operation); + action = std::move(other137.action); __isset = std::move(other137.__isset); } hotkey_detect_request &hotkey_detect_request::operator=(const hotkey_detect_request &other138) { type = other138.type; - operation = other138.operation; + action = other138.action; __isset = other138.__isset; return *this; } hotkey_detect_request &hotkey_detect_request::operator=(hotkey_detect_request &&other139) { type = std::move(other139.type); - operation = std::move(other139.operation); + action = std::move(other139.action); __isset = std::move(other139.__isset); return *this; } @@ -4723,7 +4723,7 @@ void hotkey_detect_request::printTo(std::ostream &out) const out << "hotkey_detect_request("; out << "type=" << to_string(type); out << ", " - << "operation=" << to_string(operation); + << "action=" << to_string(action); out << ")"; } diff --git a/src/idl/rrdb.thrift b/src/idl/rrdb.thrift index 9b874df61fa754d3e17607f640ba0be011bfea03..fdf05f844a402d23b239dccd905a4b2d0369f2f2 100644 --- a/src/idl/rrdb.thrift +++ b/src/idl/rrdb.thrift @@ -293,7 +293,7 @@ struct duplicate_response struct hotkey_detect_request { 1: hotkey_type type - 2: hotkey_detect_action operation + 2: hotkey_detect_action action } struct hotkey_detect_response { diff --git a/src/include/rrdb/rrdb_types.h b/src/include/rrdb/rrdb_types.h index 2c6a210389414934144ad3b7136b0b63b78f859a..14683ac283cc739931cbf4b93f5b3bed70bafdf9 100644 --- a/src/include/rrdb/rrdb_types.h +++ b/src/include/rrdb/rrdb_types.h @@ -1967,9 +1967,9 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj typedef struct _hotkey_detect_request__isset { - _hotkey_detect_request__isset() : type(false), operation(false) {} + _hotkey_detect_request__isset() : type(false), action(false) {} bool type : 1; - bool operation : 1; + bool action : 1; } _hotkey_detect_request__isset; class hotkey_detect_request @@ -1979,25 +1979,23 @@ public: hotkey_detect_request(hotkey_detect_request &&); hotkey_detect_request &operator=(const hotkey_detect_request &); hotkey_detect_request &operator=(hotkey_detect_request &&); - hotkey_detect_request() : type((hotkey_type::type)0), operation((hotkey_detect_action::type)0) - { - } + hotkey_detect_request() : type((hotkey_type::type)0), action((hotkey_detect_action::type)0) {} virtual ~hotkey_detect_request() throw(); hotkey_type::type type; - hotkey_detect_action::type operation; + hotkey_detect_action::type action; _hotkey_detect_request__isset __isset; void __set_type(const hotkey_type::type val); - void __set_operation(const hotkey_detect_action::type val); + void __set_action(const hotkey_detect_action::type val); bool operator==(const hotkey_detect_request &rhs) const { if (!(type == rhs.type)) return false; - if (!(operation == rhs.operation)) + if (!(action == rhs.action)) return false; return true; } diff --git a/src/server/hotspot_partition_calculator.cpp b/src/server/hotspot_partition_calculator.cpp index 865814e7507d4da9ebb74a7aae1b636286e0bc45..839092d0197a990114f458d11fcdae2e58bcc409 100644 --- a/src/server/hotspot_partition_calculator.cpp +++ b/src/server/hotspot_partition_calculator.cpp @@ -21,6 +21,13 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include "pegasus_read_service.h" namespace pegasus { namespace server { @@ -98,5 +105,52 @@ void hotspot_partition_calculator::data_analyse() } } +// TODO:(TangYanzhao) call this function to start hotkey detection +/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request( + const std::string &app_name, + const uint64_t partition_index, + const dsn::apps::hotkey_type::type hotkey_type, + const dsn::apps::hotkey_detect_action::type action) +{ + auto request = std::make_unique(); + request->type = hotkey_type; + request->action = action; + ddebug_f("{} {} hotkey detection in {}.{}", + (action == dsn::apps::hotkey_detect_action::STOP) ? "Stop" : "Start", + (hotkey_type == dsn::apps::hotkey_type::WRITE) ? "write" : "read", + app_name, + partition_index); + dsn::rpc_address meta_server; + meta_server.assign_group("meta-servers"); + std::vector meta_servers; + replica_helper::load_meta_servers(meta_servers); + for (const auto &address : meta_servers) { + meta_server.group_address()->add(address); + } + auto cluster_name = dsn::replication::get_current_cluster_name(); + auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str()); + dsn::task_tracker tracker; + detect_hotkey_rpc rpc( + std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index); + rpc.call(resolver, + &tracker, + [app_name, partition_index](dsn::error_code error) { + if (error != dsn::ERR_OK) { + derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}", + app_name, + partition_index, + error.to_string()); + } + }) + ->wait(); + if (rpc.response().err != dsn::ERR_OK) { + derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}", + app_name, + partition_index, + rpc.response().err, + rpc.response().err_hint); + } +} + } // namespace server } // namespace pegasus diff --git a/src/server/hotspot_partition_calculator.h b/src/server/hotspot_partition_calculator.h index c950ebe82fbb42f33b757f71dac5045e11a1972a..8b13718b7a73782f702aca1336fc2bf8cc87bc45 100644 --- a/src/server/hotspot_partition_calculator.h +++ b/src/server/hotspot_partition_calculator.h @@ -37,6 +37,10 @@ public: void data_aggregate(const std::vector &partitions); // analyse the saved data to find hotspot partition void data_analyse(); + static void send_hotkey_detect_request(const std::string &app_name, + const uint64_t partition_index, + const dsn::apps::hotkey_type::type hotkey_type, + const dsn::apps::hotkey_detect_action::type action); private: const std::string _app_name; diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h index 67b6a8027f27654d946345e3dffb8d53e4dd3282..fe66637770e2eb3d351d958d90c509ef83f8dba5 100644 --- a/src/server/pegasus_read_service.h +++ b/src/server/pegasus_read_service.h @@ -32,6 +32,8 @@ typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::ttl_response> ttl_rpc; typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, dsn::apps::scan_response> get_scanner_rpc; typedef ::dsn::rpc_holder<::dsn::apps::scan_request, dsn::apps::scan_response> scan_rpc; +typedef ::dsn::rpc_holder<::dsn::apps::hotkey_detect_request, dsn::apps::hotkey_detect_response> + detect_hotkey_rpc; class pegasus_read_service : public dsn::replication::replication_app_base, public dsn::replication::storage_serverlet