提交 bcd8b7e1 编写于 作者: W WeijieSun 提交者: QinZuoyan

pegasus_counter_updater: move some features to rDSN (#125)

上级 4c8e64f7
Subproject commit 915cba584db8c83c891ab97fe3be71a3540bfec7
Subproject commit a2549457ad26cda0649374809231d94036ed7721
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <string>
#include <dsn/c/api_utilities.h>
#include <dsn/cpp/json_helper.h>
namespace pegasus {
struct perf_counter_metric
{
std::string name;
std::string type;
double value;
perf_counter_metric() : value(0) {}
perf_counter_metric(const char *n, dsn_perf_counter_type_t t, double v) : name(n), value(v)
{
switch (t) {
case COUNTER_TYPE_NUMBER:
type = "NUMBER";
break;
case COUNTER_TYPE_VOLATILE_NUMBER:
type = "VOLATILE_NUMBER";
break;
case COUNTER_TYPE_RATE:
type = "RATE";
break;
case COUNTER_TYPE_NUMBER_PERCENTILES:
type = "PERCENTILE";
break;
default:
dassert(false, "invalid type(%d)", t);
break;
}
}
DEFINE_JSON_SERIALIZATION(name, type, value)
};
struct perf_counter_info
{
std::string result; // OK or ERROR
int64_t timestamp; // in seconds
std::string timestamp_str;
std::vector<perf_counter_metric> counters;
perf_counter_info() : timestamp(0) {}
DEFINE_JSON_SERIALIZATION(result, timestamp, timestamp_str, counters)
};
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <iomanip>
#include <dsn/c/api_utilities.h>
#include <dsn/perf_counter/perf_counters.h>
#include "brief_stat.h"
namespace pegasus {
static std::map<std::string, std::string> s_brief_stat_map = {
{"zion*profiler*RPC_RRDB_RRDB_GET.qps", "get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_GET.latency.server", "get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps", "multi_get_qps"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server", "multi_get_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.qps", "put_qps"},
{"zion*profiler*RPC_RRDB_RRDB_PUT.latency.server", "put_p99(ns)"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps", "multi_put_qps"},
{"zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server", "multi_put_p99(ns)"},
{"replica*eon.replica_stub*replica(Count)", "serving_replica_count"},
{"replica*eon.replica_stub*opening.replica(Count)", "opening_replica_count"},
{"replica*eon.replica_stub*closing.replica(Count)", "closing_replica_count"},
{"replica*eon.replica_stub*replicas.commit.qps", "commit_throughput"},
{"replica*eon.replica_stub*replicas.learning.count", "learning_count"},
{"replica*app.pegasus*manual.compact.running.count", "manual_compact_running_count"},
{"replica*app.pegasus*manual.compact.enqueue.count", "manual_compact_enqueue_count"},
{"replica*eon.replica_stub*shared.log.size(MB)", "shared_log_size(MB)"},
{"replica*server*memused.virt(MB)", "memused_virt(MB)"},
{"replica*server*memused.res(MB)", "memused_res(MB)"},
{"replica*eon.replica_stub*disk.capacity.total(MB)", "disk_capacity_total(MB)"},
{"replica*eon.replica_stub*disk.available.total.ratio", "disk_available_total_ratio"},
{"replica*eon.replica_stub*disk.available.min.ratio", "disk_available_min_ratio"},
{"replica*eon.replica_stub*disk.available.max.ratio", "disk_available_max_ratio"},
};
std::string get_brief_stat()
{
std::vector<std::string> stat_counters;
for (const auto &kv : s_brief_stat_map) {
stat_counters.push_back(kv.first);
}
std::ostringstream oss;
oss << std::fixed << std::setprecision(0);
bool first_item = true;
dsn::perf_counters::snapshot_iterator iter =
[&oss, &first_item](const dsn::perf_counter_ptr &counter, double value) mutable {
if (!first_item)
oss << ", ";
oss << s_brief_stat_map.find(counter->full_name())->second << "=" << value;
first_item = false;
};
std::vector<bool> match_result;
dsn::perf_counters::instance().query_snapshot(stat_counters, iter, &match_result);
dassert(stat_counters.size() == match_result.size(), "");
for (int i = 0; i < match_result.size(); ++i) {
if (!match_result[i]) {
if (!first_item)
oss << ", ";
oss << stat_counters[i] << "=not_found";
first_item = false;
}
}
return oss.str();
}
}
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <string>
namespace pegasus {
std::string get_brief_stat();
}
......@@ -5,21 +5,25 @@
#include "pegasus_server_impl.h"
#include "pegasus_service_app.h"
#include "info_collector_app.h"
#include "pegasus_counter_updater.h"
#include "brief_stat.h"
#include <dsn/version.h>
#include <dsn/git_commit.h>
#include <pegasus/version.h>
#include <pegasus/git_commit.h>
#include <dsn/tool_api.h>
#include <dsn/tool-api/command_manager.h>
#include <dsn/dist/replication/replication_service_app.h>
#include <dsn/dist/replication/meta_service_app.h>
#include <cstdio>
#include <cstring>
#include <chrono>
#if defined(__linux__)
#include <sys/types.h>
#include <unistd.h>
#endif
#define STR_I(var) #var
#define STR(var) STR_I(var)
......@@ -29,20 +33,32 @@
#define PEGASUS_BUILD_TYPE STR(DSN_BUILD_TYPE)
#endif
static char const rcsid[] =
"$Version: Pegasus Server " PEGASUS_VERSION " (" PEGASUS_GIT_COMMIT ")"
#if defined(DSN_BUILD_TYPE)
" " STR(DSN_BUILD_TYPE)
#endif
", built with rDSN " DSN_CORE_VERSION " (" DSN_GIT_COMMIT ")"
", built by gcc " STR(__GNUC__) "." STR(__GNUC_MINOR__) "." STR(__GNUC_PATCHLEVEL__)
#if defined(DSN_BUILD_HOSTNAME)
", built on " STR(DSN_BUILD_HOSTNAME)
#endif
", built at " __DATE__ " " __TIME__ " $";
const char *pegasus_server_rcsid() { return rcsid; }
using namespace dsn;
using namespace dsn::replication;
void dsn_app_registration_pegasus()
{
dsn::service::meta_service_app::register_components();
service_app::register_factory<::pegasus::server::pegasus_replication_service_app>("replica");
service_app::register_factory<::pegasus::server::pegasus_meta_service_app>("meta");
service_app::register_factory<::pegasus::server::info_collector_app>("collector");
service_app::register_factory<pegasus::server::pegasus_meta_service_app>("meta");
service_app::register_factory<pegasus::server::pegasus_replication_service_app>("replica");
service_app::register_factory<pegasus::server::info_collector_app>("collector");
pegasus::server::pegasus_server_impl::register_service();
::dsn::command_manager::instance().register_command(
dsn::command_manager::instance().register_command(
{"server-info"},
"server-info - query server information",
"server-info",
......@@ -54,27 +70,13 @@ void dsn_app_registration_pegasus()
<< PEGASUS_BUILD_TYPE << ", Started at " << str;
return oss.str();
});
dsn::command_manager::instance().register_command(
{"server-stat"},
"server-stat - query selected perf counters",
"server-stat",
[](const std::vector<std::string> &args) { return pegasus::get_brief_stat(); });
}
#if defined(__linux__)
#include <pegasus/version.h>
#include <pegasus/git_commit.h>
#include <dsn/version.h>
#include <dsn/git_commit.h>
static char const rcsid[] =
"$Version: Pegasus Server " PEGASUS_VERSION " (" PEGASUS_GIT_COMMIT ")"
#if defined(DSN_BUILD_TYPE)
" " STR(DSN_BUILD_TYPE)
#endif
", built with rDSN " DSN_CORE_VERSION " (" DSN_GIT_COMMIT ")"
", built by gcc " STR(__GNUC__) "." STR(__GNUC_MINOR__) "." STR(__GNUC_PATCHLEVEL__)
#if defined(DSN_BUILD_HOSTNAME)
", built on " STR(DSN_BUILD_HOSTNAME)
#endif
", built at " __DATE__ " " __TIME__ " $";
const char *pegasus_server_rcsid() { return rcsid; }
#endif
int main(int argc, char **argv)
{
for (int i = 1; i < argc; ++i) {
......@@ -90,5 +92,6 @@ int main(int argc, char **argv)
ddebug("pegasus server starting, pid(%d), version(%s)", (int)getpid(), pegasus_server_rcsid());
dsn_app_registration_pegasus();
dsn_run(argc, argv, true);
return 0;
}
......@@ -4,54 +4,26 @@
#include "pegasus_counter_updater.h"
#include <iomanip>
#include <regex>
#include <ios>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <unistd.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/tool-api/command_manager.h>
#include <dsn/cpp/service_app.h>
#include "base/pegasus_utils.h"
#include "base/counter_utils.h"
#include "pegasus_io_service.h"
#if defined(__linux__)
#include <unistd.h>
#include <ios>
#include <iostream>
#include <fstream>
#endif
using namespace ::dsn;
namespace pegasus {
namespace server {
// clang-format off
static const char *s_brief_stat_mapper[] = {
"get_qps", "zion*profiler*RPC_RRDB_RRDB_GET.qps",
"get_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_GET.latency.server",
"multi_get_qps", "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.qps",
"multi_get_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_MULTI_GET.latency.server",
"put_qps", "zion*profiler*RPC_RRDB_RRDB_PUT.qps",
"put_p99(ns)","zion*profiler*RPC_RRDB_RRDB_PUT.latency.server",
"multi_put_qps", "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.qps",
"multi_put_p99(ns)", "zion*profiler*RPC_RRDB_RRDB_MULTI_PUT.latency.server",
"serving_replica_count", "replica*eon.replica_stub*replica(Count)",
"opening_replica_count", "replica*eon.replica_stub*opening.replica(Count)",
"closing_replica_count", "replica*eon.replica_stub*closing.replica(Count)",
"commit_throughput", "replica*eon.replica_stub*replicas.commit.qps",
"learning_count", "replica*eon.replica_stub*replicas.learning.count",
"manual_compact_running_count", "replica*app.pegasus*manual.compact.running.count",
"manual_compact_enqueue_count", "replica*app.pegasus*manual.compact.enqueue.count",
"shared_log_size(MB)", "replica*eon.replica_stub*shared.log.size(MB)",
"memused_virt(MB)", "replica*server*memused.virt(MB)",
"memused_res(MB)", "replica*server*memused.res(MB)",
"disk_capacity_total(MB)", "replica*eon.replica_stub*disk.capacity.total(MB)",
"disk_available_total_ratio", "replica*eon.replica_stub*disk.available.total.ratio",
"disk_available_min_ratio", "replica*eon.replica_stub*disk.available.min.ratio",
"disk_available_max_ratio", "replica*eon.replica_stub*disk.available.max.ratio"
};
// clang-format on
static void libevent_log(int severity, const char *msg)
{
dsn_log_level_t level;
......@@ -66,88 +38,18 @@ static void libevent_log(int severity, const char *msg)
dlog(level, msg);
}
#if defined(__linux__)
//////////////////////////////////////////////////////////////////////////////
//
// process_mem_usage(double &, double &) - takes two doubles by reference,
// attempts to read the system-dependent data for a process' virtual memory
// size and resident set size, and return the results in KB.
//
// On failure, returns 0.0, 0.0
static void process_mem_usage(double &vm_usage, double &resident_set)
{
using std::ios_base;
using std::ifstream;
using std::string;
vm_usage = 0.0;
resident_set = 0.0;
// 'file' stat seems to give the most reliable results
//
ifstream stat_stream("/proc/self/stat", ios_base::in);
// dummy vars for leading entries in stat that we don't care about
//
string pid, comm, state, ppid, pgrp, session, tty_nr;
string tpgid, flags, minflt, cminflt, majflt, cmajflt;
string utime, stime, cutime, cstime, priority, nice;
string O, itrealvalue, starttime;
// the two fields we want
//
unsigned long vsize;
long rss;
stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >>
minflt >> cminflt >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >>
nice >> O >> itrealvalue >> starttime >> vsize >> rss; // don't care about the rest
stat_stream.close();
static long page_size_kb =
sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages
vm_usage = vsize / 1024.0;
resident_set = rss * page_size_kb;
}
#endif
pegasus_counter_updater::pegasus_counter_updater()
: _local_port(0),
_update_interval_seconds(0),
_last_report_time_ms(0),
_enable_stat(false),
_enable_logging(false),
_enable_falcon(false),
_falcon_port(0),
_brief_stat_count(0),
_last_timestamp(0)
_falcon_port(0)
{
::dsn::command_manager::instance().register_command(
{"server-stat"},
"server-stat - query server statistics",
"server-stat",
[](const std::vector<std::string> &args) {
return ::pegasus::server::pegasus_counter_updater::instance().get_brief_stat();
});
::dsn::command_manager::instance().register_command(
{"perf-counters"},
"perf-counters - query perf counters, supporting filter by POSIX basic regular expressions",
"perf-counters [name-filter]...",
[](const std::vector<std::string> &args) {
return ::pegasus::server::pegasus_counter_updater::instance().get_perf_counters(args);
});
}
pegasus_counter_updater::~pegasus_counter_updater() { stop(); }
void pegasus_counter_updater::stat_intialize()
{
_brief_stat_count = sizeof(s_brief_stat_mapper) / sizeof(char *) / 2;
_brief_stat_value.resize(_brief_stat_count, -1);
}
void pegasus_counter_updater::falcon_initialize()
{
_falcon_host = dsn_config_get_value_string(
......@@ -182,11 +84,6 @@ void pegasus_counter_updater::falcon_initialize()
void pegasus_counter_updater::start()
{
_pfc_memused_virt.init_app_counter(
"server", "memused.virt(MB)", COUNTER_TYPE_NUMBER, "virtual memory usage in MB");
_pfc_memused_res.init_app_counter(
"server", "memused.res(MB)", COUNTER_TYPE_NUMBER, "physical memory usage in MB");
::dsn::utils::auto_write_lock l(_lock);
if (_report_timer != nullptr)
return;
......@@ -212,17 +109,11 @@ void pegasus_counter_updater::start()
_update_interval_seconds);
_last_report_time_ms = dsn_now_ms();
_enable_stat = dsn_config_get_value_bool(
"pegasus.server", "perf_counter_enable_stat", true, "perf_counter_enable_stat");
_enable_logging = dsn_config_get_value_bool(
"pegasus.server", "perf_counter_enable_logging", true, "perf_counter_enable_logging");
_enable_falcon = dsn_config_get_value_bool(
"pegasus.server", "perf_counter_enable_falcon", false, "perf_counter_enable_falcon");
if (_enable_stat) {
stat_intialize();
}
if (_enable_falcon) {
falcon_initialize();
}
......@@ -244,241 +135,56 @@ void pegasus_counter_updater::stop()
}
}
/*static*/
const char *pegasus_counter_updater::perf_counter_type(dsn_perf_counter_type_t t,
perf_counter_target target)
{
static const char *type_string[] = {
"number",
"GAUGE", // falcon type
"volatile_number",
"GAUGE", // falcon type
"rate",
// falcon type
// NOTICE: the "rate" is "GAUGE" coz we don't need falcon to recalculate the rate,
// please ref the falcon manual
"GAUGE",
"number_percentile",
"GAUGE", // falcon type
};
static const int TYPES_COUNT = sizeof(type_string) / sizeof(const char *);
dassert(t * T_TARGET_COUNT + target < TYPES_COUNT, "type (%d)", t);
return type_string[t * T_TARGET_COUNT + target];
}
void pegasus_counter_updater::update_counters_to_falcon(
const std::vector<dsn::perf_counter_ptr> &counters,
const std::vector<double> &values,
int64_t timestamp)
void pegasus_counter_updater::update_counters_to_falcon(const std::string &result,
int64_t timestamp)
{
ddebug("update counters to falcon with timestamp = %" PRId64, timestamp);
std::stringstream falcon_data;
falcon_data << "[";
for (unsigned int i = 0; i < values.size(); ++i) {
const dsn::perf_counter_ptr &ptr = counters[i];
_falcon_metric.metric = ptr->full_name();
_falcon_metric.timestamp = timestamp;
_falcon_metric.value = values[i];
_falcon_metric.counterType = perf_counter_type(ptr->type(), T_FALCON);
_falcon_metric.encode_json_state(falcon_data);
if (i + 1 < values.size())
falcon_data << ",";
}
falcon_data << "]";
const std::string &result = falcon_data.str();
http_post_request(
_falcon_host, _falcon_port, _falcon_path, "application/x-www-form-urlencoded", result);
}
void pegasus_counter_updater::logging_counters(const std::vector<dsn::perf_counter_ptr> &counters,
const std::vector<double> &values)
{
std::stringstream logging;
logging << "logging perf counter(name, type, value):" << std::endl;
logging << std::fixed << std::setprecision(2);
for (unsigned int i = 0; i < values.size(); ++i) {
const dsn::perf_counter_ptr &ptr = counters[i];
logging << "[" << ptr->full_name() << ", " << perf_counter_type(ptr->type(), T_DEBUG)
<< ", " << values[i] << "]" << std::endl;
}
ddebug(logging.str().c_str());
}
struct cmp_str
{
bool operator()(const char *a, const char *b) { return std::strcmp(a, b) < 0; }
};
void pegasus_counter_updater::update_brief_stat(const std::vector<dsn::perf_counter_ptr> &counters,
const std::vector<double> &values)
{
std::map<const char *, int, cmp_str> pref_name_to_index;
for (int i = 0; i < counters.size(); ++i) {
pref_name_to_index[counters[i]->full_name()] = i;
}
for (int i = 0; i < _brief_stat_count; ++i) {
const char *perf_name = s_brief_stat_mapper[i * 2 + 1];
auto find = pref_name_to_index.find(perf_name);
if (find != pref_name_to_index.end()) {
_brief_stat_value[i] = values[find->second];
}
}
}
std::string pegasus_counter_updater::get_brief_stat()
{
std::ostringstream oss;
if (_enable_stat) {
oss << std::fixed << std::setprecision(0);
for (int i = 0; i < _brief_stat_count; ++i) {
if (i != 0)
oss << ", ";
const char *stat_name = s_brief_stat_mapper[i * 2];
oss << stat_name << "=" << _brief_stat_value[i];
}
} else {
oss << "stat disabled";
}
return oss.str();
}
std::string pegasus_counter_updater::get_perf_counters(const std::vector<std::string> &args)
{
std::vector<dsn::perf_counter_ptr> metrics;
std::vector<double> values;
int64_t timestamp = 0;
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_last_counter_lock);
metrics = _last_metrics;
values = _last_values;
timestamp = _last_timestamp;
}
perf_counter_info info;
if (timestamp != 0) {
dassert(metrics.size() == values.size(),
"unmatched size: %d vs %d",
(int)metrics.size(),
(int)values.size());
std::vector<std::regex> regs;
regs.reserve(args.size());
for (auto &arg : args) {
try {
regs.emplace_back(arg, std::regex_constants::basic);
} catch (...) {
info.result = "ERROR: invalid filter: " + arg;
break;
}
}
if (info.result.empty()) {
for (int i = 0; i < metrics.size(); ++i) {
bool matched = false;
if (regs.empty()) {
matched = true;
} else {
for (auto &reg : regs) {
if (std::regex_match(metrics[i]->full_name(), reg)) {
matched = true;
break;
}
}
}
if (matched) {
info.counters.emplace_back(
metrics[i]->full_name(), metrics[i]->type(), values[i]);
}
}
info.result = "OK";
info.timestamp = timestamp;
char buf[20];
::dsn::utils::time_ms_to_date_time(timestamp * 1000, buf, sizeof(buf));
info.timestamp_str = buf;
}
} else {
info.result = "ERROR: counter not generated";
}
std::stringstream ss;
info.encode_json_state(ss);
return ss.str();
}
void pegasus_counter_updater::update()
{
ddebug("start update by timer");
#if defined(__linux__)
double vm_usage;
double resident_set;
process_mem_usage(vm_usage, resident_set);
uint64_t memused_virt = (uint64_t)vm_usage / 1024;
uint64_t memused_res = (uint64_t)resident_set / 1024;
_pfc_memused_virt->set(memused_virt);
_pfc_memused_res->set(memused_res);
ddebug("memused_virt = %" PRIu64 " MB, memused_res = %" PRIu64 "MB", memused_virt, memused_res);
#endif
std::vector<dsn::perf_counter_ptr> metrics;
std::vector<double> values;
::dsn::perf_counters::instance().get_all_counters(&metrics);
if (metrics.empty()) {
ddebug("no need update, coz no counters added");
return;
}
uint64_t now = dsn_now_ms();
dinfo("update now_ms(%lld), last_report_time_ms(%lld)", now, _last_report_time_ms);
_last_report_time_ms = now;
int64_t timestamp = now / 1000;
values.reserve(metrics.size());
// in case the get_xxx functions for the perf counters had SIDE EFFECTS
// we'd better pre fetch these values, and then use them
for (auto &counter : metrics) {
switch (counter->type()) {
case COUNTER_TYPE_NUMBER:
case COUNTER_TYPE_VOLATILE_NUMBER:
values.push_back(counter->get_integer_value());
break;
case COUNTER_TYPE_RATE:
values.push_back(counter->get_value());
break;
case COUNTER_TYPE_NUMBER_PERCENTILES:
values.push_back(counter->get_percentile(COUNTER_PERCENTILE_99));
break;
default:
dassert(false, "invalid type(%d)", counter->type());
break;
}
}
if (_enable_stat) {
update_brief_stat(metrics, values);
}
perf_counters::instance().take_snapshot();
if (_enable_logging) {
logging_counters(metrics, values);
std::stringstream oss;
oss << "logging perf counter(name, type, value):" << std::endl;
oss << std::fixed << std::setprecision(2);
perf_counters::instance().iterate_snapshot(
[&oss, this](const dsn::perf_counter_ptr &ptr, double val) {
oss << "[" << ptr->full_name() << ", " << dsn_counter_type_to_string(ptr->type())
<< ", " << val << "]" << std::endl;
});
ddebug("%s", oss.str().c_str());
}
if (_enable_falcon) {
update_counters_to_falcon(metrics, values, timestamp);
std::stringstream oss;
oss << "[";
bool first_append = true;
_falcon_metric.timestamp = timestamp;
perf_counters::instance().iterate_snapshot(
[&oss, &first_append, this](const dsn::perf_counter_ptr &ptr, double val) {
_falcon_metric.metric = ptr->full_name();
_falcon_metric.value = val;
_falcon_metric.counterType = "GAUGE";
if (!first_append)
oss << ",";
_falcon_metric.encode_json_state(oss);
first_append = false;
});
oss << "]";
update_counters_to_falcon(oss.str(), timestamp);
}
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_last_counter_lock);
_last_metrics = std::move(metrics);
_last_values = std::move(values);
_last_timestamp = timestamp;
ddebug("update now_ms(%lld), last_report_time_ms(%lld)", now, _last_report_time_ms);
_last_report_time_ms = now;
}
void pegasus_counter_updater::http_post_request(const std::string &host,
......
......@@ -11,6 +11,7 @@
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/perf_counter/perf_counter.h>
#include <iomanip>
#include <boost/asio.hpp>
#include <event2/event.h>
#include <event2/buffer.h>
......@@ -21,13 +22,6 @@
namespace pegasus {
namespace server {
enum perf_counter_target
{
T_DEBUG = 0,
T_FALCON,
T_TARGET_COUNT
};
// Falcon field description:
// http://git.n.xiaomi.com/falcon/doc/wikis/instance_monitor
struct falcon_metric
......@@ -49,30 +43,22 @@ public:
virtual ~pegasus_counter_updater();
void start();
void stop();
std::string get_brief_stat();
std::string get_perf_counters(const std::vector<std::string> &args);
private:
void stat_intialize();
void falcon_initialize();
void update_brief_stat(const std::vector<dsn::perf_counter_ptr> &counters,
const std::vector<double> &values);
void logging_counters(const std::vector<dsn::perf_counter_ptr> &counters,
const std::vector<double> &values);
void update_counters_to_falcon(const std::vector<dsn::perf_counter_ptr> &counters,
const std::vector<double> &values,
int64_t timestamp);
void update_counters_to_falcon(const std::string &result, int64_t timestamp);
void update();
void http_post_request(const std::string &host,
int32_t port,
const std::string &path,
const std::string &content_type,
const std::string &data);
static void http_request_done(struct evhttp_request *req, void *arg);
void update();
void on_report_timer(std::shared_ptr<boost::asio::deadline_timer> timer,
const boost::system::error_code &ec);
static void http_request_done(struct evhttp_request *req, void *arg);
mutable ::dsn::utils::rw_lock_nr _lock;
std::string _local_host;
......@@ -85,7 +71,6 @@ private:
std::shared_ptr<boost::asio::deadline_timer> _report_timer;
// perf counter flags
bool _enable_stat;
bool _enable_logging;
bool _enable_falcon;
......@@ -94,22 +79,6 @@ private:
uint16_t _falcon_port;
std::string _falcon_path;
falcon_metric _falcon_metric;
int _brief_stat_count;
std::vector<double> _brief_stat_value;
::dsn::utils::ex_lock_nr _last_counter_lock; // protected the following fields
std::vector<dsn::perf_counter_ptr> _last_metrics;
std::vector<double> _last_values;
int64_t _last_timestamp;
// perf counters
::dsn::perf_counter_wrapper _pfc_memused_virt;
::dsn::perf_counter_wrapper _pfc_memused_res;
private:
static const char *perf_counter_type(dsn_perf_counter_type_t type,
perf_counter_target target_type);
};
}
} // namespace
......@@ -16,6 +16,7 @@
#include <dsn/tool/cli/cli.client.h>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <rrdb/rrdb.code.definition.h>
#include <pegasus/version.h>
......@@ -26,7 +27,6 @@
#include "base/pegasus_key_schema.h"
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include "base/counter_utils.h"
#include "command_executor.h"
#include "command_utils.h"
......@@ -510,9 +510,9 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
derror("query perf counter info from node %s failed", node_addr.to_string());
return true;
}
pegasus::perf_counter_info info;
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<pegasus::perf_counter_info>::decode(bb, info)) {
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
derror("decode perf counter info from node %s failed, result = %s",
node_addr.to_string(),
results[i].second.c_str());
......@@ -524,7 +524,7 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
info.result.c_str());
return true;
}
for (pegasus::perf_counter_metric &m : info.counters) {
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
......@@ -564,9 +564,9 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
derror("query perf counter info from node %s failed", node_addr.to_string());
return true;
}
pegasus::perf_counter_info info;
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<pegasus::perf_counter_info>::decode(bb, info)) {
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
derror("decode perf counter info from node %s failed, result = %s",
node_addr.to_string(),
results[i].second.c_str());
......@@ -578,7 +578,7 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vector<row_dat
info.result.c_str());
return true;
}
for (pegasus::perf_counter_metric &m : info.counters) {
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
......
......@@ -3159,9 +3159,9 @@ inline bool app_disk(command_executor *e, shell_context *sc, arguments args)
<< " failed" << std::endl;
return true;
}
pegasus::perf_counter_info info;
dsn::perf_counter_info info;
dsn::blob bb(results[i].second.data(), 0, results[i].second.size());
if (!dsn::json::json_forwarder<pegasus::perf_counter_info>::decode(bb, info)) {
if (!dsn::json::json_forwarder<dsn::perf_counter_info>::decode(bb, info)) {
std::cout << "ERROR: decode perf counter info from node "
<< nodes[i].address.to_string() << " failed, result = " << results[i].second
<< std::endl;
......@@ -3172,7 +3172,7 @@ inline bool app_disk(command_executor *e, shell_context *sc, arguments args)
<< " returns error, error = " << info.result << std::endl;
return true;
}
for (pegasus::perf_counter_metric &m : info.counters) {
for (dsn::perf_counter_metric &m : info.counters) {
int32_t app_id_x, partition_index_x;
std::string counter_name;
bool parse_ret = parse_app_pegasus_perf_counter_name(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册