未验证 提交 e17d5c7a 编写于 作者: S Smilencer 提交者: GitHub

feat(hotkey): collector can be terminated by timeout (#625)

上级 8e8510b4
......@@ -21,16 +21,24 @@
#include <dsn/utility/smart_pointers.h>
#include "base/pegasus_key_schema.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/flags.h>
namespace pegasus {
namespace server {
DSN_DEFINE_int32(
"pegasus.server",
max_seconds_to_detect_hotkey,
150,
"the max time (in seconds) allowed to capture hotkey, will stop if hotkey's not found");
hotkey_collector::hotkey_collector(dsn::replication::hotkey_type::type hotkey_type,
dsn::replication::replica_base *r_base)
: replica_base(r_base),
_state(hotkey_collector_state::STOPPED),
_hotkey_type(hotkey_type),
_internal_collector(std::make_shared<hotkey_empty_data_collector>())
_internal_collector(std::make_shared<hotkey_empty_data_collector>()),
_collector_start_time_second(0)
{
}
......@@ -65,7 +73,18 @@ void hotkey_collector::capture_hash_key(const dsn::blob &hash_key, int64_t weigh
_internal_collector->capture_data(hash_key, weight);
}
void hotkey_collector::analyse_data() { _internal_collector->analyse_data(); }
void hotkey_collector::analyse_data()
{
switch (_state.load()) {
case hotkey_collector_state::COARSE_DETECTING:
if (!terminate_if_timeout()) {
_internal_collector->analyse_data();
}
return;
default:
return;
}
}
void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response &resp)
{
......@@ -88,6 +107,7 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
dwarn_replica(hint);
return;
case hotkey_collector_state::STOPPED:
_collector_start_time_second = dsn_now_s();
// TODO: (Tangyanzhao) start coarse detecting
_state.store(hotkey_collector_state::COARSE_DETECTING);
resp.err = dsn::ERR_OK;
......@@ -105,13 +125,29 @@ void hotkey_collector::on_start_detect(dsn::replication::detect_hotkey_response
void hotkey_collector::on_stop_detect(dsn::replication::detect_hotkey_response &resp)
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
terminate();
resp.err = dsn::ERR_OK;
std::string hint =
fmt::format("{} hotkey stopped, cache cleared", dsn::enum_to_string(_hotkey_type));
ddebug_replica(hint);
}
void hotkey_collector::terminate()
{
_state.store(hotkey_collector_state::STOPPED);
_internal_collector.reset();
_collector_start_time_second = 0;
}
bool hotkey_collector::terminate_if_timeout()
{
if (dsn_now_s() >= _collector_start_time_second + FLAGS_max_seconds_to_detect_hotkey) {
ddebug_replica("hotkey collector work time is exhausted but no hotkey has been found");
terminate();
return true;
}
return false;
}
} // namespace server
} // namespace pegasus
......@@ -79,9 +79,13 @@ public:
private:
void on_start_detect(dsn::replication::detect_hotkey_response &resp);
void on_stop_detect(dsn::replication::detect_hotkey_response &resp);
void terminate();
bool terminate_if_timeout();
std::atomic<hotkey_collector_state> _state;
const dsn::replication::hotkey_type::type _hotkey_type;
std::shared_ptr<internal_collector_base> _internal_collector;
uint64_t _collector_start_time_second;
};
class internal_collector_base
......
......@@ -30,8 +30,8 @@ public:
explicit mock_capacity_unit_calculator(dsn::replication::replica_base *r)
: capacity_unit_calculator(
r,
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, this),
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, this))
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::READ, r),
std::make_shared<hotkey_collector>(dsn::replication::hotkey_type::WRITE, r))
{
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册