未验证 提交 c5dbab27 编写于 作者: Q QinZuoyan 提交者: GitHub

server: improve capacity unit calculation (#339)

上级 3c59ffdb
Subproject commit 7e6af96f463f9b1100a9f3765858191d931228aa
Subproject commit b8bfc1ce5c6066e337048554daea24f085f16322
......@@ -274,16 +274,19 @@ falcon_path = /v1/push
[pegasus.collector]
cluster = onebox
available_detect_app = @APP_NAME@
available_detect_alert_script_dir = ./package/bin
available_detect_alert_email_address =
available_detect_interval_seconds = 3
available_detect_alert_fail_count = 30
available_detect_timeout = 5000
app_stat_interval_seconds = 10
cu_stat_app = stat
cu_fetch_interval_seconds = 8
usage_stat_app = @APP_NAME@
capacity_unit_fetch_interval_seconds = 8
storage_size_fetch_interval_seconds = 3600
[pegasus.clusters]
onebox = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
......
......@@ -285,16 +285,19 @@
[pegasus.collector]
cluster = %{cluster.name}
available_detect_app = temp
available_detect_alert_script_dir = ./package/bin
available_detect_alert_email_address =
available_detect_interval_seconds = 3
available_detect_alert_fail_count = 30
available_detect_timeout = 5000
app_stat_interval_seconds = 10
cu_stat_app = stat
cu_fetch_interval_seconds = 8
usage_stat_app = stat
capacity_unit_fetch_interval_seconds = 8
storage_size_fetch_interval_seconds = 3600
[pegasus.clusters]
%{cluster.name} = %{meta.server.list}
......
......@@ -25,7 +25,12 @@ namespace pegasus {
namespace server {
DEFINE_TASK_CODE(LPC_PEGASUS_APP_STAT_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_PEGASUS_CU_STAT_TIMER, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE(LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
info_collector::info_collector()
{
......@@ -49,22 +54,40 @@ info_collector::info_collector()
10, // default value 10s
"app stat interval seconds");
_cu_stat_app = dsn_config_get_value_string(
"pegasus.collector", "cu_stat_app", "", "app for recording capacity unit info");
dassert(!_cu_stat_app.empty(), "");
_usage_stat_app = dsn_config_get_value_string(
"pegasus.collector", "usage_stat_app", "", "app for recording usage statistics");
dassert(!_usage_stat_app.empty(), "");
// initialize the _client.
if (!pegasus_client_factory::initialize(nullptr)) {
dassert(false, "Initialize the pegasus client failed");
}
_client = pegasus_client_factory::get_client(_cluster_name.c_str(), _cu_stat_app.c_str());
_client = pegasus_client_factory::get_client(_cluster_name.c_str(), _usage_stat_app.c_str());
dassert(_client != nullptr, "Initialize the client failed");
_result_writer = dsn::make_unique<result_writer>(_client);
_cu_fetch_interval_seconds =
_capacity_unit_fetch_interval_seconds =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"cu_fetch_interval_seconds",
"capacity_unit_fetch_interval_seconds",
8, // default value 8s
"capacity unit fetch interval seconds");
// _capacity_unit_retry_wait_seconds is in range of [1, 10]
_capacity_unit_retry_wait_seconds =
std::min(10u, std::max(1u, _capacity_unit_fetch_interval_seconds / 10));
// _capacity_unit_retry_max_count is in range of [0, 3]
_capacity_unit_retry_max_count =
std::min(3u, _capacity_unit_fetch_interval_seconds / _capacity_unit_retry_wait_seconds);
_storage_size_fetch_interval_seconds =
(uint32_t)dsn_config_get_value_uint64("pegasus.collector",
"storage_size_fetch_interval_seconds",
3600, // default value 1h
"storage size fetch interval seconds");
// _storage_size_retry_wait_seconds is in range of [1, 60]
_storage_size_retry_wait_seconds =
std::min(60u, std::max(1u, _storage_size_fetch_interval_seconds / 10));
// _storage_size_retry_max_count is in range of [0, 3]
_storage_size_retry_max_count =
std::min(3u, _storage_size_fetch_interval_seconds / _storage_size_retry_wait_seconds);
}
info_collector::~info_collector()
......@@ -85,13 +108,21 @@ void info_collector::start()
0,
std::chrono::minutes(1));
_cu_stat_timer_task =
::dsn::tasking::enqueue_timer(LPC_PEGASUS_CU_STAT_TIMER,
&_tracker,
[this] { on_capacity_unit_stat(); },
std::chrono::seconds(_cu_fetch_interval_seconds),
0,
std::chrono::minutes(1));
_capacity_unit_stat_timer_task = ::dsn::tasking::enqueue_timer(
LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
&_tracker,
[this] { on_capacity_unit_stat(_capacity_unit_retry_max_count); },
std::chrono::seconds(_capacity_unit_fetch_interval_seconds),
0,
std::chrono::minutes(1));
_storage_size_stat_timer_task = ::dsn::tasking::enqueue_timer(
LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
&_tracker,
[this] { on_storage_size_stat(_storage_size_retry_max_count); },
std::chrono::seconds(_storage_size_fetch_interval_seconds),
0,
std::chrono::minutes(1));
}
void info_collector::stop() { _tracker.cancel_outstanding_tasks(); }
......@@ -230,38 +261,75 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str
return counters;
}
void info_collector::on_capacity_unit_stat()
void info_collector::on_capacity_unit_stat(int remaining_retry_count)
{
ddebug("start to stat capacity unit");
ddebug("start to stat capacity unit, remaining_retry_count = %d", remaining_retry_count);
std::vector<node_capacity_unit_stat> nodes_stat;
if (!get_capacity_unit_stat(&_shell_context, nodes_stat)) {
derror("get capacity unit stat failed");
if (remaining_retry_count > 0) {
dwarn("get capacity unit stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
remaining_retry_count,
_capacity_unit_retry_wait_seconds);
::dsn::tasking::enqueue(LPC_PEGASUS_CAPACITY_UNIT_STAT_TIMER,
&_tracker,
[=] { on_capacity_unit_stat(remaining_retry_count - 1); },
0,
std::chrono::seconds(_capacity_unit_retry_wait_seconds));
} else {
derror("get capacity unit stat failed, remaining_retry_count = 0, no retry anymore");
}
return;
}
for (auto elem : nodes_stat) {
if (!has_capacity_unit_updated(elem.node_address, elem.timestamp)) {
for (node_capacity_unit_stat &elem : nodes_stat) {
if (elem.node_address.empty() || elem.timestamp.empty() ||
!has_capacity_unit_updated(elem.node_address, elem.timestamp)) {
dinfo("recent read/write capacity unit value of node %s has not updated",
elem.node_address.c_str());
continue;
}
_result_writer->set_result(elem.timestamp, elem.node_address, elem.dump_to_json());
_result_writer->set_result(elem.timestamp, "cu@" + elem.node_address, elem.dump_to_json());
}
}
bool info_collector::has_capacity_unit_updated(const std::string &node_address,
const std::string &timestamp)
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_cu_update_info_lock);
auto find = _cu_update_info.find(node_address);
if (find == _cu_update_info.end()) {
_cu_update_info[node_address] = timestamp;
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_capacity_unit_update_info_lock);
auto find = _capacity_unit_update_info.find(node_address);
if (find == _capacity_unit_update_info.end()) {
_capacity_unit_update_info[node_address] = timestamp;
return true;
}
if (timestamp > find->second) {
_cu_update_info[node_address] = timestamp;
find->second = timestamp;
return true;
}
return false;
}
void info_collector::on_storage_size_stat(int remaining_retry_count)
{
ddebug("start to stat storage size, remaining_retry_count = %d", remaining_retry_count);
app_storage_size_stat st_stat;
if (!get_storage_size_stat(&_shell_context, st_stat)) {
if (remaining_retry_count > 0) {
dwarn("get storage size stat failed, remaining_retry_count = %d, "
"wait %u seconds to retry",
remaining_retry_count,
_storage_size_retry_wait_seconds);
::dsn::tasking::enqueue(LPC_PEGASUS_STORAGE_SIZE_STAT_TIMER,
&_tracker,
[=] { on_storage_size_stat(remaining_retry_count - 1); },
0,
std::chrono::seconds(_storage_size_retry_wait_seconds));
} else {
derror("get storage size stat failed, remaining_retry_count = 0, no retry anymore");
}
return;
}
_result_writer->set_result(st_stat.timestamp, "ss", st_stat.dump_to_json());
}
} // namespace server
} // namespace pegasus
......@@ -66,9 +66,11 @@ public:
void on_app_stat();
AppStatCounters *get_app_counters(const std::string &app_name);
void on_capacity_unit_stat();
void on_capacity_unit_stat(int remaining_retry_count);
bool has_capacity_unit_updated(const std::string &node_address, const std::string &timestamp);
void on_storage_size_stat(int remaining_retry_count);
private:
dsn::task_tracker _tracker;
::dsn::rpc_address _meta_servers;
......@@ -79,17 +81,23 @@ private:
::dsn::utils::ex_lock_nr _app_stat_counter_lock;
std::map<std::string, AppStatCounters *> _app_stat_counters;
// app for recording read/write cu.
std::string _cu_stat_app;
// app for recording usage statistics, including read/write capacity unit and storage size.
std::string _usage_stat_app;
// client to access server.
pegasus_client *_client;
// for writing cu stat result
std::unique_ptr<result_writer> _result_writer;
uint32_t _cu_fetch_interval_seconds;
::dsn::task_ptr _cu_stat_timer_task;
::dsn::utils::ex_lock_nr _cu_update_info_lock;
uint32_t _capacity_unit_fetch_interval_seconds;
uint32_t _capacity_unit_retry_wait_seconds;
uint32_t _capacity_unit_retry_max_count;
::dsn::task_ptr _capacity_unit_stat_timer_task;
uint32_t _storage_size_fetch_interval_seconds;
uint32_t _storage_size_retry_wait_seconds;
uint32_t _storage_size_retry_max_count;
::dsn::task_ptr _storage_size_stat_timer_task;
::dsn::utils::ex_lock_nr _capacity_unit_update_info_lock;
// mapping 'node address' --> 'last updated timestamp'
std::map<std::string, string> _cu_update_info;
std::map<std::string, string> _capacity_unit_update_info;
};
} // namespace server
} // namespace pegasus
......@@ -20,13 +20,13 @@ void result_writer::set_result(const std::string &hash_key,
if (err != PERR_OK) {
int new_try_count = try_count - 1;
if (new_try_count > 0) {
derror("set_result fail, hash_key = %s, sort_key = %s, value = %s, "
"error = %s, left_try_count = %d, try again after 1 minute",
hash_key.c_str(),
sort_key.c_str(),
value.c_str(),
_client->get_error_string(err),
new_try_count);
dwarn("set_result fail, hash_key = %s, sort_key = %s, value = %s, "
"error = %s, left_try_count = %d, try again after 1 minute",
hash_key.c_str(),
sort_key.c_str(),
value.c_str(),
_client->get_error_string(err),
new_try_count);
::dsn::tasking::enqueue(
LPC_WRITE_RESULT,
&_tracker,
......
......@@ -667,13 +667,12 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
}
::dsn::command command;
command.cmd = "perf-counters";
command.cmd = "perf-counters-by-substr";
char tmp[256];
if (app_name.empty()) {
sprintf(tmp, ".*@.*");
} else {
sprintf(tmp, ".*@%d\\..*", app_info->app_id);
}
if (app_name.empty())
sprintf(tmp, "@");
else
sprintf(tmp, "@%d.", app_info->app_id);
command.arguments.emplace_back(tmp);
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
......@@ -760,25 +759,21 @@ struct node_capacity_unit_stat
// timestamp when node perf_counter_info has updated.
std::string timestamp;
std::string node_address;
// mapping app_name --> (read_cu, write_cu)
std::map<std::string, std::pair<int64_t, int64_t>> cu_value_by_app;
// mapping: app_id --> (read_cu, write_cu)
std::map<int32_t, std::pair<int64_t, int64_t>> cu_value_by_app;
std::string dump_to_json() const
{
std::map<int32_t, std::vector<int64_t>> values;
for (auto &kv : cu_value_by_app) {
auto &pair = kv.second;
if (pair.first != 0 || pair.second != 0)
values.emplace(kv.first, std::vector<int64_t>{pair.first, pair.second});
}
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
writer.StartObject();
for (const auto &elem : cu_value_by_app) {
auto cu_tuple = elem.second;
if (cu_tuple.first == 0 && cu_tuple.second == 0)
continue;
char tuple_str[50];
sprintf(tuple_str, "[%ld,%ld]", cu_tuple.first, cu_tuple.second);
dsn::json::json_encode(writer, elem.first);
dsn::json::json_encode(writer, tuple_str);
}
writer.EndObject();
dsn::json::json_encode(writer, values);
return out.str();
}
};
......@@ -786,19 +781,15 @@ struct node_capacity_unit_stat
inline bool get_capacity_unit_stat(shell_context *sc,
std::vector<node_capacity_unit_stat> &nodes_stat)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes))
if (!fill_nodes(sc, "replica-server", nodes)) {
derror("get replica server node list failed");
return false;
std::map<int32_t, std::string> app_name_map;
for (auto elem : apps)
app_name_map.emplace(elem.app_id, elem.app_name);
}
::dsn::command command;
command.cmd = "perf-counters";
char tmp[256];
sprintf(tmp, ".*\\*recent\\..*\\.cu@.*");
command.arguments.emplace_back(tmp);
command.cmd = "perf-counters-by-substr";
command.arguments.emplace_back(".cu@");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
......@@ -806,34 +797,110 @@ inline bool get_capacity_unit_stat(shell_context *sc,
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(node_addr, results[i], info))
return false;
if (!decode_node_perf_counter_info(node_addr, results[i], info)) {
dwarn("decode perf counter from node(%s) failed, just ignore it",
node_addr.to_string());
continue;
}
nodes_stat[i].timestamp = info.timestamp_str;
nodes_stat[i].node_address = node_addr.to_string();
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id, partition_index;
int32_t app_id, pidx;
std::string counter_name;
bool parse_ret =
parse_app_pegasus_perf_counter_name(m.name, app_id, partition_index, counter_name);
dassert(parse_ret, "name = %s", m.name.c_str());
if (app_name_map.find(app_id) == app_name_map.end())
continue;
std::string app_name = app_name_map[app_id];
bool r = parse_app_pegasus_perf_counter_name(m.name, app_id, pidx, counter_name);
dassert(r, "name = %s", m.name.c_str());
if (counter_name == "recent.read.cu") {
if (nodes_stat[i].cu_value_by_app.find(app_name) ==
nodes_stat[i].cu_value_by_app.end()) {
nodes_stat[i].cu_value_by_app.emplace(app_name, std::make_pair(0, 0));
}
nodes_stat[i].cu_value_by_app[app_name].first += (int64_t)m.value;
}
if (counter_name == "recent.write.cu") {
if (nodes_stat[i].cu_value_by_app.find(app_name) ==
nodes_stat[i].cu_value_by_app.end()) {
nodes_stat[i].cu_value_by_app.emplace(app_name, std::make_pair(0, 0));
}
nodes_stat[i].cu_value_by_app[app_name].second += (int64_t)m.value;
nodes_stat[i].cu_value_by_app[app_id].first += (int64_t)m.value;
} else if (counter_name == "recent.write.cu") {
nodes_stat[i].cu_value_by_app[app_id].second += (int64_t)m.value;
}
}
}
return true;
}
struct app_storage_size_stat
{
// timestamp when this stat is generated.
std::string timestamp;
// mapping: app_id --> [app_partition_count, stat_partition_count, storage_size_in_mb]
std::map<int32_t, std::vector<int64_t>> st_value_by_app;
std::string dump_to_json() const
{
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, st_value_by_app);
return out.str();
}
};
inline bool get_storage_size_stat(shell_context *sc, app_storage_size_stat &st_stat)
{
std::vector<::dsn::app_info> apps;
std::vector<node_desc> nodes;
if (!get_apps_and_nodes(sc, apps, nodes)) {
derror("get apps and nodes failed");
return false;
}
std::map<int32_t, std::vector<dsn::partition_configuration>> app_partitions;
if (!get_app_partitions(sc, apps, app_partitions)) {
derror("get app partitions failed");
return false;
}
for (auto &kv : app_partitions) {
auto &v = kv.second;
for (auto &c : v) {
// use partition_flags to record if this partition's storage size is calculated,
// because `app_partitions' is a temporary variable, so we can re-use partition_flags.
c.partition_flags = 0;
}
}
::dsn::command command;
command.cmd = "perf-counters-by-prefix";
command.arguments.emplace_back("replica*app.pegasus*disk.storage.sst(MB)");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
for (int i = 0; i < nodes.size(); ++i) {
dsn::rpc_address node_addr = nodes[i].address;
dsn::perf_counter_info info;
if (!decode_node_perf_counter_info(node_addr, results[i], info)) {
dwarn("decode perf counter from node(%s) failed, just ignore it",
node_addr.to_string());
continue;
}
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name);
dassert(parse_ret, "name = %s", m.name.c_str());
if (counter_name != "disk.storage.sst(MB)")
continue;
auto find = app_partitions.find(app_id_x);
if (find == app_partitions.end()) // app id not found
continue;
dsn::partition_configuration &pc = find->second[partition_index_x];
if (pc.primary != node_addr) // not primary replica
continue;
if (pc.partition_flags != 0) // already calculated
continue;
pc.partition_flags = 1;
int64_t app_partition_count = find->second.size();
auto st_it = st_stat.st_value_by_app
.emplace(app_id_x, std::vector<int64_t>{app_partition_count, 0, 0})
.first;
st_it->second[1]++; // stat_partition_count
st_it->second[2] += m.value; // storage_size_in_mb
}
}
char buf[20];
dsn::utils::time_ms_to_date_time(dsn_now_ms(), buf, sizeof(buf));
st_stat.timestamp = buf;
return true;
}
......@@ -176,12 +176,13 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
}
::dsn::command command;
command.cmd = "perf-counters";
command.arguments.push_back(".*memused.res(MB)");
command.arguments.push_back(".*rdb.block_cache.memory_usage");
command.arguments.push_back(".*disk.available.total.ratio");
command.arguments.push_back(".*disk.available.min.ratio");
command.arguments.push_back(".*@.*");
command.cmd = "perf-counters-by-prefix";
command.arguments.push_back("replica*server*memused.res(MB)");
command.arguments.push_back("replica*app.pegasus*rdb.block_cache.memory_usage");
command.arguments.push_back("replica*eon.replica_stub*disk.available.total.ratio");
command.arguments.push_back("replica*eon.replica_stub*disk.available.min.ratio");
command.arguments.push_back("replica*app.pegasus*rdb.memtable.memory_usage");
command.arguments.push_back("replica*app.pegasus*rdb.index_and_filter_blocks.memory_usage");
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
......@@ -210,25 +211,19 @@ bool ls_nodes(command_executor *e, shell_context *sc, arguments args)
}
list_nodes_helper &h = tmp_it->second;
for (dsn::perf_counter_metric &m : info.counters) {
if (m.name == "replica*server*memused.res(MB)")
h.memused_res_mb = m.value;
else if (m.name == "replica*app.pegasus*rdb.block_cache.memory_usage")
h.block_cache_bytes = m.value;
else if (m.name == "replica*eon.replica_stub*disk.available.total.ratio")
h.disk_available_total_ratio = m.value;
else if (m.name == "replica*eon.replica_stub*disk.available.min.ratio")
h.disk_available_min_ratio = m.value;
else {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
m.name, app_id_x, partition_index_x, counter_name);
dassert(parse_ret, "name = %s", m.name.c_str());
if (counter_name == "rdb.memtable.memory_usage")
h.mem_tbl_bytes += m.value;
else if (counter_name == "rdb.index_and_filter_blocks.memory_usage")
h.mem_idx_bytes += m.value;
}
if (m.name.find("memused.res(MB)") != std::string::npos)
h.memused_res_mb += m.value;
else if (m.name.find("rdb.block_cache.memory_usage") != std::string::npos)
h.block_cache_bytes += m.value;
else if (m.name.find("disk.available.total.ratio") != std::string::npos)
h.disk_available_total_ratio += m.value;
else if (m.name.find("disk.available.min.ratio") != std::string::npos)
h.disk_available_min_ratio += m.value;
else if (m.name.find("rdb.memtable.memory_usage") != std::string::npos)
h.mem_tbl_bytes += m.value;
else if (m.name.find("rdb.index_and_filter_blocks.memory_usage") !=
std::string::npos)
h.mem_idx_bytes += m.value;
}
}
}
......
......@@ -201,9 +201,11 @@ bool app_disk(command_executor *e, shell_context *sc, arguments args)
}
::dsn::command command;
command.cmd = "perf-counters";
command.cmd = "perf-counters-by-prefix";
char tmp[256];
sprintf(tmp, ".*\\*app\\.pegasus\\*disk\\.storage\\.sst.*@%d\\..*", app_id);
sprintf(tmp, "replica*app.pegasus*disk.storage.sst(MB)@%d.", app_id);
command.arguments.push_back(tmp);
sprintf(tmp, "replica*app.pegasus*disk.storage.sst.count@%d.", app_id);
command.arguments.push_back(tmp);
std::vector<std::pair<bool, std::string>> results;
call_remote_command(sc, nodes, command, results);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册