提交 d0cfcf32 编写于 作者: W WeijieSun 提交者: QinZuoyan

reporter: link counter reporter as a static library (#132)

* monitor: link monitor to a static library

* monitor: remove unrelated code

* reporter: rename monitor to reporter

* report: rename library name
上级 9c31e8d4
...@@ -20,6 +20,7 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) ...@@ -20,6 +20,7 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../rocksdb/include) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../rocksdb/include)
add_subdirectory(base) add_subdirectory(base)
add_subdirectory(reporter)
add_subdirectory(base/test) add_subdirectory(base/test)
add_subdirectory(client_lib) add_subdirectory(client_lib)
add_subdirectory(server) add_subdirectory(server)
......
...@@ -14,6 +14,8 @@ set(MY_PROJ_INC_PATH "../../include" "../../base" "../proxy_lib") ...@@ -14,6 +14,8 @@ set(MY_PROJ_INC_PATH "../../include" "../../base" "../proxy_lib")
set(MY_PROJ_LIBS pegasus.rproxylib set(MY_PROJ_LIBS pegasus.rproxylib
pegasus.base pegasus.base
pegasus_geo_lib pegasus_geo_lib
pegasus_reporter
event
s2 s2
pegasus_client_static pegasus_client_static
fmt) fmt)
......
...@@ -78,18 +78,6 @@ fast_execution_in_network_thread = false ...@@ -78,18 +78,6 @@ fast_execution_in_network_thread = false
rpc_call_header_format_name = dsn rpc_call_header_format_name = dsn
rpc_timeout_milliseconds = 5000 rpc_timeout_milliseconds = 5000
disk_write_fail_ratio = 0.0
disk_read_fail_ratio = 0.0
perf_test_rounds = 1000
perf_test_payload_bytes = 1024
perf_test_timeouts_ms = 10000
; perf_test_concurrent_count is used only when perf_test_concurrent is true:
; - if perf_test_concurrent_count == 0, means concurrency grow exponentially.
; - if perf_test_concurrent_count > 0, means concurrency maintained to a fixed number.
perf_test_concurrent = true
perf_test_concurrent_count = 20
[task.LPC_AIO_IMMEDIATE_CALLBACK] [task.LPC_AIO_IMMEDIATE_CALLBACK]
is_trace = false is_trace = false
allow_inline = false allow_inline = false
...@@ -98,6 +86,49 @@ allow_inline = false ...@@ -98,6 +86,49 @@ allow_inline = false
is_trace = false is_trace = false
allow_inline = false allow_inline = false
[task.RPC_RRDB_RRDB_PUT_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_MULTI_PUT_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_REMOVE_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_MULTI_REMOVE]
is_profile = true
[task.RPC_RRDB_RRDB_GET_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_MULTI_GET_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_SORTKEY_COUNT_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_TTL_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_GET_SCANNER_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_SCAN_ACK]
is_profile = true
[task.RPC_RRDB_RRDB_CLEAR_SCANNER_ACK]
is_profile = true
[pegasus.server]
perf_counter_cluster_name = onebox
perf_counter_update_interval_seconds = 10
perf_counter_enable_logging = true
perf_counter_enable_falcon = false
falcon_host = 127.0.0.1
falcon_port = 1988
falcon_path = /v1/push
[uri-resolver.dsn://redis_cluster] [uri-resolver.dsn://redis_cluster]
factory = partition_resolver_simple factory = partition_resolver_simple
arguments = localhost:34601 arguments = localhost:34601
......
...@@ -8,6 +8,8 @@ ...@@ -8,6 +8,8 @@
#include <signal.h> #include <signal.h>
#include <pegasus/version.h> #include <pegasus/version.h>
#include "reporter/pegasus_counter_reporter.h"
#include "redis_parser.h" #include "redis_parser.h"
namespace pegasus { namespace pegasus {
...@@ -28,7 +30,10 @@ public: ...@@ -28,7 +30,10 @@ public:
return std::make_shared<redis_parser>(p, m); return std::make_shared<redis_parser>(p, m);
}; };
_proxy = dsn::make_unique<proxy_stub>( _proxy = dsn::make_unique<proxy_stub>(
f, args[1].c_str(), args[2].c_str(), args.size() > 3 ? args[3].c_str() : nullptr); f, args[1].c_str(), args[2].c_str(), args.size() > 3 ? args[3].c_str() : "");
pegasus::server::pegasus_counter_reporter::instance().start();
return ::dsn::ERR_OK; return ::dsn::ERR_OK;
} }
......
set(MY_PROJ_NAME "pegasus_reporter")
project(${MY_PROJ_NAME} C CXX)
# Source files under CURRENT project directory will be automatically included.
# You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC "")
# Search mode for source files under CURRENT project directory?
# "GLOB_RECURSE" for recursive search
# "GLOB" for non-recursive search
set(MY_SRC_SEARCH_MODE "GLOB")
set(MY_PROJ_LIBS "")
set(MY_BOOST_PACKAGES system)
dsn_add_static_library()
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// This source code is licensed under the Apache License Version 2.0, which // This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree. // can be found in the LICENSE file in the root directory of this source tree.
#include "pegasus_counter_updater.h" #include "pegasus_counter_reporter.h"
#include <regex> #include <regex>
#include <ios> #include <ios>
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
#include <unistd.h> #include <unistd.h>
#include <dsn/utility/smart_pointers.h> #include <dsn/utility/smart_pointers.h>
#include <dsn/tool-api/command_manager.h>
#include <dsn/cpp/service_app.h> #include <dsn/cpp/service_app.h>
#include "base/pegasus_utils.h" #include "base/pegasus_utils.h"
...@@ -38,7 +37,7 @@ static void libevent_log(int severity, const char *msg) ...@@ -38,7 +37,7 @@ static void libevent_log(int severity, const char *msg)
dlog(level, msg); dlog(level, msg);
} }
pegasus_counter_updater::pegasus_counter_updater() pegasus_counter_reporter::pegasus_counter_reporter()
: _local_port(0), : _local_port(0),
_update_interval_seconds(0), _update_interval_seconds(0),
_last_report_time_ms(0), _last_report_time_ms(0),
...@@ -48,9 +47,9 @@ pegasus_counter_updater::pegasus_counter_updater() ...@@ -48,9 +47,9 @@ pegasus_counter_updater::pegasus_counter_updater()
{ {
} }
pegasus_counter_updater::~pegasus_counter_updater() { stop(); } pegasus_counter_reporter::~pegasus_counter_reporter() { stop(); }
void pegasus_counter_updater::falcon_initialize() void pegasus_counter_reporter::falcon_initialize()
{ {
_falcon_host = dsn_config_get_value_string( _falcon_host = dsn_config_get_value_string(
"pegasus.server", "falcon_host", "127.0.0.1", "falcon agent host"); "pegasus.server", "falcon_host", "127.0.0.1", "falcon agent host");
...@@ -82,7 +81,7 @@ void pegasus_counter_updater::falcon_initialize() ...@@ -82,7 +81,7 @@ void pegasus_counter_updater::falcon_initialize()
_falcon_metric.tags.c_str()); _falcon_metric.tags.c_str());
} }
void pegasus_counter_updater::start() void pegasus_counter_reporter::start()
{ {
::dsn::utils::auto_write_lock l(_lock); ::dsn::utils::auto_write_lock l(_lock);
if (_report_timer != nullptr) if (_report_timer != nullptr)
...@@ -124,10 +123,10 @@ void pegasus_counter_updater::start() ...@@ -124,10 +123,10 @@ void pegasus_counter_updater::start()
_report_timer->expires_from_now( _report_timer->expires_from_now(
boost::posix_time::seconds(rand() % _update_interval_seconds + 1)); boost::posix_time::seconds(rand() % _update_interval_seconds + 1));
_report_timer->async_wait(std::bind( _report_timer->async_wait(std::bind(
&pegasus_counter_updater::on_report_timer, this, _report_timer, std::placeholders::_1)); &pegasus_counter_reporter::on_report_timer, this, _report_timer, std::placeholders::_1));
} }
void pegasus_counter_updater::stop() void pegasus_counter_reporter::stop()
{ {
::dsn::utils::auto_write_lock l(_lock); ::dsn::utils::auto_write_lock l(_lock);
if (_report_timer != nullptr) { if (_report_timer != nullptr) {
...@@ -135,7 +134,7 @@ void pegasus_counter_updater::stop() ...@@ -135,7 +134,7 @@ void pegasus_counter_updater::stop()
} }
} }
void pegasus_counter_updater::update_counters_to_falcon(const std::string &result, void pegasus_counter_reporter::update_counters_to_falcon(const std::string &result,
int64_t timestamp) int64_t timestamp)
{ {
ddebug("update counters to falcon with timestamp = %" PRId64, timestamp); ddebug("update counters to falcon with timestamp = %" PRId64, timestamp);
...@@ -143,7 +142,7 @@ void pegasus_counter_updater::update_counters_to_falcon(const std::string &resul ...@@ -143,7 +142,7 @@ void pegasus_counter_updater::update_counters_to_falcon(const std::string &resul
_falcon_host, _falcon_port, _falcon_path, "application/x-www-form-urlencoded", result); _falcon_host, _falcon_port, _falcon_path, "application/x-www-form-urlencoded", result);
} }
void pegasus_counter_updater::update() void pegasus_counter_reporter::update()
{ {
uint64_t now = dsn_now_ms(); uint64_t now = dsn_now_ms();
int64_t timestamp = now / 1000; int64_t timestamp = now / 1000;
...@@ -187,7 +186,7 @@ void pegasus_counter_updater::update() ...@@ -187,7 +186,7 @@ void pegasus_counter_updater::update()
_last_report_time_ms = now; _last_report_time_ms = now;
} }
void pegasus_counter_updater::http_post_request(const std::string &host, void pegasus_counter_reporter::http_post_request(const std::string &host,
int32_t port, int32_t port,
const std::string &path, const std::string &path,
const std::string &contentType, const std::string &contentType,
...@@ -197,7 +196,7 @@ void pegasus_counter_updater::http_post_request(const std::string &host, ...@@ -197,7 +196,7 @@ void pegasus_counter_updater::http_post_request(const std::string &host,
struct event_base *base = event_base_new(); struct event_base *base = event_base_new();
struct evhttp_connection *conn = evhttp_connection_base_new(base, nullptr, host.c_str(), port); struct evhttp_connection *conn = evhttp_connection_base_new(base, nullptr, host.c_str(), port);
struct evhttp_request *req = struct evhttp_request *req =
evhttp_request_new(pegasus_counter_updater::http_request_done, base); evhttp_request_new(pegasus_counter_reporter::http_request_done, base);
evhttp_add_header(req->output_headers, "Host", host.c_str()); evhttp_add_header(req->output_headers, "Host", host.c_str());
evhttp_add_header(req->output_headers, "Content-Type", contentType.c_str()); evhttp_add_header(req->output_headers, "Content-Type", contentType.c_str());
...@@ -212,7 +211,7 @@ void pegasus_counter_updater::http_post_request(const std::string &host, ...@@ -212,7 +211,7 @@ void pegasus_counter_updater::http_post_request(const std::string &host,
event_base_free(base); event_base_free(base);
} }
void pegasus_counter_updater::http_request_done(struct evhttp_request *req, void *arg) void pegasus_counter_reporter::http_request_done(struct evhttp_request *req, void *arg)
{ {
struct event_base *event = (struct event_base *)arg; struct event_base *event = (struct event_base *)arg;
if (req == nullptr) { if (req == nullptr) {
...@@ -242,7 +241,7 @@ void pegasus_counter_updater::http_request_done(struct evhttp_request *req, void ...@@ -242,7 +241,7 @@ void pegasus_counter_updater::http_request_done(struct evhttp_request *req, void
} }
} }
void pegasus_counter_updater::on_report_timer(std::shared_ptr<boost::asio::deadline_timer> timer, void pegasus_counter_reporter::on_report_timer(std::shared_ptr<boost::asio::deadline_timer> timer,
const boost::system::error_code &ec) const boost::system::error_code &ec)
{ {
// NOTICE: the following code is running out of rDSN's tls_context // NOTICE: the following code is running out of rDSN's tls_context
...@@ -250,7 +249,7 @@ void pegasus_counter_updater::on_report_timer(std::shared_ptr<boost::asio::deadl ...@@ -250,7 +249,7 @@ void pegasus_counter_updater::on_report_timer(std::shared_ptr<boost::asio::deadl
update(); update();
timer->expires_from_now(boost::posix_time::seconds(_update_interval_seconds)); timer->expires_from_now(boost::posix_time::seconds(_update_interval_seconds));
timer->async_wait(std::bind( timer->async_wait(std::bind(
&pegasus_counter_updater::on_report_timer, this, timer, std::placeholders::_1)); &pegasus_counter_reporter::on_report_timer, this, timer, std::placeholders::_1));
} else if (boost::system::errc::operation_canceled != ec) { } else if (boost::system::errc::operation_canceled != ec) {
dassert(false, "pegasus report timer error!!!"); dassert(false, "pegasus report timer error!!!");
} }
......
...@@ -6,12 +6,8 @@ ...@@ -6,12 +6,8 @@
#include <dsn/utility/singleton.h> #include <dsn/utility/singleton.h>
#include <dsn/utility/synchronize.h> #include <dsn/utility/synchronize.h>
#include <dsn/c/api_utilities.h>
#include <dsn/cpp/json_helper.h> #include <dsn/cpp/json_helper.h>
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <dsn/perf_counter/perf_counter.h>
#include <iomanip>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <event2/event.h> #include <event2/event.h>
#include <event2/buffer.h> #include <event2/buffer.h>
...@@ -36,11 +32,11 @@ struct falcon_metric ...@@ -36,11 +32,11 @@ struct falcon_metric
DEFINE_JSON_SERIALIZATION(endpoint, metric, timestamp, step, value, counterType, tags) DEFINE_JSON_SERIALIZATION(endpoint, metric, timestamp, step, value, counterType, tags)
}; };
class pegasus_counter_updater : public ::dsn::utils::singleton<pegasus_counter_updater> class pegasus_counter_reporter : public ::dsn::utils::singleton<pegasus_counter_reporter>
{ {
public: public:
pegasus_counter_updater(); pegasus_counter_reporter();
virtual ~pegasus_counter_updater(); virtual ~pegasus_counter_reporter();
void start(); void start();
void stop(); void stop();
......
...@@ -8,7 +8,6 @@ ...@@ -8,7 +8,6 @@
#include <thread> #include <thread>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <dsn/tool_api.h>
#include <dsn/utility/singleton.h> #include <dsn/utility/singleton.h>
namespace pegasus { namespace pegasus {
......
...@@ -22,6 +22,7 @@ set(MY_PROJ_LIBS ...@@ -22,6 +22,7 @@ set(MY_PROJ_LIBS
dsn.failure_detector dsn.failure_detector
dsn.failure_detector.multimaster dsn.failure_detector.multimaster
dsn.replication.zookeeper_provider dsn.replication.zookeeper_provider
pegasus_reporter
pegasus_client_static pegasus_client_static
zookeeper_mt zookeeper_mt
event event
......
...@@ -155,15 +155,6 @@ rpc_timeout_milliseconds = 5000 ...@@ -155,15 +155,6 @@ rpc_timeout_milliseconds = 5000
disk_write_fail_ratio = 0.0 disk_write_fail_ratio = 0.0
disk_read_fail_ratio = 0.0 disk_read_fail_ratio = 0.0
perf_test_rounds = 1000
perf_test_payload_bytes = 1024
perf_test_timeouts_ms = 10000
; perf_test_concurrent_count is used only when perf_test_concurrent is true:
; - if perf_test_concurrent_count == 0, means concurrency grow exponentially.
; - if perf_test_concurrent_count > 0, means concurrency maintained to a fixed number.
perf_test_concurrent = true
perf_test_concurrent_count = 20
[meta_server] [meta_server]
server_list = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603 server_list = @LOCAL_IP@:34601,@LOCAL_IP@:34602,@LOCAL_IP@:34603
cluster_root = /pegasus/onebox/@LOCAL_IP@ cluster_root = /pegasus/onebox/@LOCAL_IP@
...@@ -269,7 +260,6 @@ manual_compact_min_interval_seconds = 3600 ...@@ -269,7 +260,6 @@ manual_compact_min_interval_seconds = 3600
perf_counter_cluster_name = onebox perf_counter_cluster_name = onebox
perf_counter_update_interval_seconds = 10 perf_counter_update_interval_seconds = 10
perf_counter_enable_stat = true
perf_counter_enable_logging = false perf_counter_enable_logging = false
perf_counter_enable_falcon = false perf_counter_enable_falcon = false
......
...@@ -276,7 +276,6 @@ ...@@ -276,7 +276,6 @@
perf_counter_cluster_name = %{cluster.name} perf_counter_cluster_name = %{cluster.name}
perf_counter_update_interval_seconds = 10 perf_counter_update_interval_seconds = 10
perf_counter_enable_stat = true
perf_counter_enable_logging = false perf_counter_enable_logging = false
perf_counter_enable_falcon = false perf_counter_enable_falcon = false
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
// can be found in the LICENSE file in the root directory of this source tree. // can be found in the LICENSE file in the root directory of this source tree.
#include "info_collector_app.h" #include "info_collector_app.h"
#include "pegasus_counter_updater.h" #include "reporter/pegasus_counter_reporter.h"
#include <dsn/dist/replication.h> #include <dsn/dist/replication.h>
#include <dsn/dist/replication/replication_other_types.h> #include <dsn/dist/replication/replication_other_types.h>
...@@ -24,7 +24,7 @@ info_collector_app::~info_collector_app() {} ...@@ -24,7 +24,7 @@ info_collector_app::~info_collector_app() {}
::dsn::error_code info_collector_app::start(const std::vector<std::string> &args) ::dsn::error_code info_collector_app::start(const std::vector<std::string> &args)
{ {
pegasus_counter_updater::instance().start(); pegasus_counter_reporter::instance().start();
_updater_started = true; _updater_started = true;
_collector.start(); _collector.start();
...@@ -35,7 +35,7 @@ info_collector_app::~info_collector_app() {} ...@@ -35,7 +35,7 @@ info_collector_app::~info_collector_app() {}
::dsn::error_code info_collector_app::stop(bool cleanup) ::dsn::error_code info_collector_app::stop(bool cleanup)
{ {
if (_updater_started) { if (_updater_started) {
pegasus_counter_updater::instance().stop(); pegasus_counter_reporter::instance().stop();
} }
_collector.stop(); _collector.stop();
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include <dsn/dist/replication/meta_service_app.h> #include <dsn/dist/replication/meta_service_app.h>
#include <dsn/dist/replication/replication_service_app.h> #include <dsn/dist/replication/replication_service_app.h>
#include "pegasus_counter_updater.h" #include "reporter/pegasus_counter_reporter.h"
namespace pegasus { namespace pegasus {
namespace server { namespace server {
...@@ -24,7 +24,7 @@ public: ...@@ -24,7 +24,7 @@ public:
{ {
::dsn::error_code ret = ::dsn::replication::replication_service_app::start(args); ::dsn::error_code ret = ::dsn::replication::replication_service_app::start(args);
if (ret == ::dsn::ERR_OK) { if (ret == ::dsn::ERR_OK) {
pegasus_counter_updater::instance().start(); pegasus_counter_reporter::instance().start();
_updater_started = true; _updater_started = true;
} }
return ret; return ret;
...@@ -34,7 +34,7 @@ public: ...@@ -34,7 +34,7 @@ public:
{ {
::dsn::error_code ret = ::dsn::replication::replication_service_app::stop(); ::dsn::error_code ret = ::dsn::replication::replication_service_app::stop();
if (_updater_started) { if (_updater_started) {
pegasus_counter_updater::instance().stop(); pegasus_counter_reporter::instance().stop();
} }
return ret; return ret;
} }
...@@ -55,7 +55,7 @@ public: ...@@ -55,7 +55,7 @@ public:
{ {
::dsn::error_code ret = ::dsn::service::meta_service_app::start(args); ::dsn::error_code ret = ::dsn::service::meta_service_app::start(args);
if (ret == ::dsn::ERR_OK) { if (ret == ::dsn::ERR_OK) {
pegasus_counter_updater::instance().start(); pegasus_counter_reporter::instance().start();
_updater_started = true; _updater_started = true;
} }
return ret; return ret;
...@@ -65,7 +65,7 @@ public: ...@@ -65,7 +65,7 @@ public:
{ {
::dsn::error_code ret = ::dsn::service::meta_service_app::stop(); ::dsn::error_code ret = ::dsn::service::meta_service_app::stop();
if (_updater_started) { if (_updater_started) {
pegasus_counter_updater::instance().stop(); pegasus_counter_reporter::instance().stop();
} }
return ret; return ret;
} }
......
set(MY_PROJ_NAME pegasus_unit_test) set(MY_PROJ_NAME pegasus_unit_test)
set(MY_PROJ_SRC "../pegasus_server_impl.cpp" set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_counter_updater.cpp"
"../pegasus_manual_compact_service.cpp" "../pegasus_manual_compact_service.cpp"
"../pegasus_event_listener.cpp" "../pegasus_event_listener.cpp"
"../pegasus_write_service.cpp" "../pegasus_write_service.cpp"
...@@ -22,6 +21,7 @@ set(MY_PROJ_LIBS ...@@ -22,6 +21,7 @@ set(MY_PROJ_LIBS
dsn.failure_detector dsn.failure_detector
dsn.failure_detector.multimaster dsn.failure_detector.multimaster
dsn.replication.zookeeper_provider dsn.replication.zookeeper_provider
pegasus_reporter
pegasus_client_static pegasus_client_static
zookeeper_mt zookeeper_mt
event event
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册