未验证 提交 412492d3 编写于 作者: S Smilencer 提交者: GitHub

feat(hotspot): add a function to start hotkey detecting in hotspot_partition_calculator (#601)

上级 e56690c2
......@@ -4608,9 +4608,9 @@ hotkey_detect_request::~hotkey_detect_request() throw() {}
void hotkey_detect_request::__set_type(const hotkey_type::type val) { this->type = val; }
void hotkey_detect_request::__set_operation(const hotkey_detect_action::type val)
void hotkey_detect_request::__set_action(const hotkey_detect_action::type val)
{
this->operation = val;
this->action = val;
}
uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *iprot)
......@@ -4646,8 +4646,8 @@ uint32_t hotkey_detect_request::read(::apache::thrift::protocol::TProtocol *ipro
if (ftype == ::apache::thrift::protocol::T_I32) {
int32_t ecast135;
xfer += iprot->readI32(ecast135);
this->operation = (hotkey_detect_action::type)ecast135;
this->__isset.operation = true;
this->action = (hotkey_detect_action::type)ecast135;
this->__isset.action = true;
} else {
xfer += iprot->skip(ftype);
}
......@@ -4674,8 +4674,8 @@ uint32_t hotkey_detect_request::write(::apache::thrift::protocol::TProtocol *opr
xfer += oprot->writeI32((int32_t)this->type);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("operation", ::apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32((int32_t)this->operation);
xfer += oprot->writeFieldBegin("action", ::apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32((int32_t)this->action);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
......@@ -4687,33 +4687,33 @@ void swap(hotkey_detect_request &a, hotkey_detect_request &b)
{
using ::std::swap;
swap(a.type, b.type);
swap(a.operation, b.operation);
swap(a.action, b.action);
swap(a.__isset, b.__isset);
}
hotkey_detect_request::hotkey_detect_request(const hotkey_detect_request &other136)
{
type = other136.type;
operation = other136.operation;
action = other136.action;
__isset = other136.__isset;
}
hotkey_detect_request::hotkey_detect_request(hotkey_detect_request &&other137)
{
type = std::move(other137.type);
operation = std::move(other137.operation);
action = std::move(other137.action);
__isset = std::move(other137.__isset);
}
hotkey_detect_request &hotkey_detect_request::operator=(const hotkey_detect_request &other138)
{
type = other138.type;
operation = other138.operation;
action = other138.action;
__isset = other138.__isset;
return *this;
}
hotkey_detect_request &hotkey_detect_request::operator=(hotkey_detect_request &&other139)
{
type = std::move(other139.type);
operation = std::move(other139.operation);
action = std::move(other139.action);
__isset = std::move(other139.__isset);
return *this;
}
......@@ -4723,7 +4723,7 @@ void hotkey_detect_request::printTo(std::ostream &out) const
out << "hotkey_detect_request(";
out << "type=" << to_string(type);
out << ", "
<< "operation=" << to_string(operation);
<< "action=" << to_string(action);
out << ")";
}
......
......@@ -293,7 +293,7 @@ struct duplicate_response
struct hotkey_detect_request {
1: hotkey_type type
2: hotkey_detect_action operation
2: hotkey_detect_action action
}
struct hotkey_detect_response {
......
......@@ -1967,9 +1967,9 @@ inline std::ostream &operator<<(std::ostream &out, const duplicate_response &obj
typedef struct _hotkey_detect_request__isset
{
_hotkey_detect_request__isset() : type(false), operation(false) {}
_hotkey_detect_request__isset() : type(false), action(false) {}
bool type : 1;
bool operation : 1;
bool action : 1;
} _hotkey_detect_request__isset;
class hotkey_detect_request
......@@ -1979,25 +1979,23 @@ public:
hotkey_detect_request(hotkey_detect_request &&);
hotkey_detect_request &operator=(const hotkey_detect_request &);
hotkey_detect_request &operator=(hotkey_detect_request &&);
hotkey_detect_request() : type((hotkey_type::type)0), operation((hotkey_detect_action::type)0)
{
}
hotkey_detect_request() : type((hotkey_type::type)0), action((hotkey_detect_action::type)0) {}
virtual ~hotkey_detect_request() throw();
hotkey_type::type type;
hotkey_detect_action::type operation;
hotkey_detect_action::type action;
_hotkey_detect_request__isset __isset;
void __set_type(const hotkey_type::type val);
void __set_operation(const hotkey_detect_action::type val);
void __set_action(const hotkey_detect_action::type val);
bool operator==(const hotkey_detect_request &rhs) const
{
if (!(type == rhs.type))
return false;
if (!(operation == rhs.operation))
if (!(action == rhs.action))
return false;
return true;
}
......
......@@ -21,6 +21,13 @@
#include <math.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/tool-api/group_address.h>
#include <dsn/utility/error_code.h>
#include <rrdb/rrdb_types.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/tool-api/task_tracker.h>
#include "pegasus_read_service.h"
namespace pegasus {
namespace server {
......@@ -98,5 +105,52 @@ void hotspot_partition_calculator::data_analyse()
}
}
// TODO:(TangYanzhao) call this function to start hotkey detection
/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action)
{
auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
request->type = hotkey_type;
request->action = action;
ddebug_f("{} {} hotkey detection in {}.{}",
(action == dsn::apps::hotkey_detect_action::STOP) ? "Stop" : "Start",
(hotkey_type == dsn::apps::hotkey_type::WRITE) ? "write" : "read",
app_name,
partition_index);
dsn::rpc_address meta_server;
meta_server.assign_group("meta-servers");
std::vector<dsn::rpc_address> meta_servers;
replica_helper::load_meta_servers(meta_servers);
for (const auto &address : meta_servers) {
meta_server.group_address()->add(address);
}
auto cluster_name = dsn::replication::get_current_cluster_name();
auto resolver = partition_resolver::get_resolver(cluster_name, meta_servers, app_name.c_str());
dsn::task_tracker tracker;
detect_hotkey_rpc rpc(
std::move(request), RPC_DETECT_HOTKEY, std::chrono::seconds(10), partition_index);
rpc.call(resolver,
&tracker,
[app_name, partition_index](dsn::error_code error) {
if (error != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
app_name,
partition_index,
error.to_string());
}
})
->wait();
if (rpc.response().err != dsn::ERR_OK) {
derror_f("Hotkey detect rpc sending failed, in {}.{}, error_hint:{} {}",
app_name,
partition_index,
rpc.response().err,
rpc.response().err_hint);
}
}
} // namespace server
} // namespace pegasus
......@@ -37,6 +37,10 @@ public:
void data_aggregate(const std::vector<row_data> &partitions);
// analyse the saved data to find hotspot partition
void data_analyse();
static void send_hotkey_detect_request(const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action);
private:
const std::string _app_name;
......
......@@ -32,6 +32,8 @@ typedef ::dsn::rpc_holder<::dsn::blob, dsn::apps::ttl_response> ttl_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::get_scanner_request, dsn::apps::scan_response>
get_scanner_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::scan_request, dsn::apps::scan_response> scan_rpc;
typedef ::dsn::rpc_holder<::dsn::apps::hotkey_detect_request, dsn::apps::hotkey_detect_response>
detect_hotkey_rpc;
class pegasus_read_service : public dsn::replication::replication_app_base,
public dsn::replication::storage_serverlet<pegasus_read_service>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册