From 568e0b83c5959fd5d96e7dc5adaec715658b6e93 Mon Sep 17 00:00:00 2001 From: QinZuoyan Date: Mon, 10 Dec 2018 21:53:42 +0800 Subject: [PATCH] server: support table level write throttling (#230) --- rdsn | 2 +- scripts/falcon_screen.json | 20 ++++++++++++++++++++ src/client_lib/pegasus_client_impl.cpp | 2 ++ src/include/pegasus/error_def.h | 2 ++ src/reporter/pegasus_counter_reporter.cpp | 5 +---- src/server/config.ini | 3 --- src/server/info_collector.cpp | 8 ++++++++ src/server/info_collector.h | 2 ++ src/shell/command_helper.h | 10 ++++++++-- src/shell/commands.h | 6 ++++++ 10 files changed, 50 insertions(+), 10 deletions(-) diff --git a/rdsn b/rdsn index 178165a..62ad5b7 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 178165a022e4a9fa11d02d8e2423def8aad52901 +Subproject commit 62ad5b7a25dee8c6015e702d75ff8d93768e0913 diff --git a/scripts/falcon_screen.json b/scripts/falcon_screen.json index 9f34836..309e85d 100644 --- a/scripts/falcon_screen.json +++ b/scripts/falcon_screen.json @@ -649,6 +649,26 @@ "method": "", "timespan": 86400 }, + { + "title": "Delay数据条数(统计各表最近10秒write throttling delay的数据条数)", + "endpoints": ["cluster=${cluster.name} job=collector service=pegasus"], + "counters": [ + "collector*app.pegasus*app.stat.recent_throttling_delay_count#${for.each.table}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus" + ], + "graph_type": "a", + "method": "", + "timespan": 86400 + }, + { + "title": "Reject数据条数(统计各表最近10秒write throttling reject的数据条数)", + "endpoints": ["cluster=${cluster.name} job=collector service=pegasus"], + "counters": [ + "collector*app.pegasus*app.stat.recent_throttling_reject_count#${for.each.table}/cluster=${cluster.name},job=collector,port=${collector.port},service=pegasus" + ], + "graph_type": "a", + "method": "", + "timespan": 86400 + }, { "title": "【${for.each.table}】单表QPS", "endpoints": ["cluster=${cluster.name} job=collector service=pegasus"], diff --git a/src/client_lib/pegasus_client_impl.cpp b/src/client_lib/pegasus_client_impl.cpp index 1ae6a92..be31f5a 100644 --- a/src/client_lib/pegasus_client_impl.cpp +++ b/src/client_lib/pegasus_client_impl.cpp @@ -1297,9 +1297,11 @@ const char *pegasus_client_impl::get_error_string(int error_code) const _server_error_to_client[::dsn::ERR_NETWORK_FAILURE] = PERR_NETWORK_FAILURE; _server_error_to_client[::dsn::ERR_HANDLER_NOT_FOUND] = PERR_HANDLER_NOT_FOUND; _server_error_to_client[::dsn::ERR_OPERATION_DISABLED] = PERR_OPERATION_DISABLED; + _server_error_to_client[::dsn::ERR_NOT_ENOUGH_MEMBER] = PERR_NOT_ENOUGH_MEMBER; _server_error_to_client[::dsn::ERR_APP_NOT_EXIST] = PERR_APP_NOT_EXIST; _server_error_to_client[::dsn::ERR_APP_EXIST] = PERR_APP_EXIST; + _server_error_to_client[::dsn::ERR_BUSY] = PERR_APP_BUSY; // rocksdb error; for (int i = 1001; i < 1013; i++) { diff --git a/src/include/pegasus/error_def.h b/src/include/pegasus/error_def.h index 405dcc7..837e2ff 100644 --- a/src/include/pegasus/error_def.h +++ b/src/include/pegasus/error_def.h @@ -9,6 +9,7 @@ PEGASUS_ERR_CODE(PERR_OBJECT_NOT_FOUND, -3, "object not found"); PEGASUS_ERR_CODE(PERR_NETWORK_FAILURE, -4, "network failure"); PEGASUS_ERR_CODE(PERR_HANDLER_NOT_FOUND, -5, "handler not found"); PEGASUS_ERR_CODE(PERR_OPERATION_DISABLED, -6, "operation disabled"); +PEGASUS_ERR_CODE(PERR_NOT_ENOUGH_MEMBER, -7, "no enough member"); PEGASUS_ERR_CODE(PERR_SCAN_COMPLETE, 1, "scan complete"); // SERVER ERROR @@ -16,6 +17,7 @@ PEGASUS_ERR_CODE(PERR_APP_NOT_EXIST, -101, "app not exist"); PEGASUS_ERR_CODE(PERR_APP_EXIST, -102, "app already exist"); PEGASUS_ERR_CODE(PERR_SERVER_INTERNAL_ERROR, -103, "server internal error"); PEGASUS_ERR_CODE(PERR_SERVER_CHANGED, -104, "server changed"); +PEGASUS_ERR_CODE(PERR_APP_BUSY, -105, "app busy"); // CLIENT ERROR PEGASUS_ERR_CODE(PERR_INVALID_APP_NAME, diff --git a/src/reporter/pegasus_counter_reporter.cpp b/src/reporter/pegasus_counter_reporter.cpp index cadba89..80fbb5c 100644 --- a/src/reporter/pegasus_counter_reporter.cpp +++ b/src/reporter/pegasus_counter_reporter.cpp @@ -227,15 +227,12 @@ void pegasus_counter_reporter::http_request_done(struct evhttp_request *req, voi } break; default: - derror("http post request receive ERROR: %u", req->response_code); - struct evbuffer *buf = evhttp_request_get_input_buffer(req); size_t len = evbuffer_get_length(buf); - char *tmp = (char *)malloc(len + 1); + char *tmp = (char *)alloca(len + 1); memcpy(tmp, evbuffer_pullup(buf, -1), len); tmp[len] = '\0'; derror("http post request receive ERROR: %u, %s", req->response_code, tmp); - free(tmp); event_base_loopexit(event, 0); return; } diff --git a/src/server/config.ini b/src/server/config.ini index d1fa49b..179a661 100644 --- a/src/server/config.ini +++ b/src/server/config.ini @@ -333,9 +333,6 @@ [task.LPC_PER_REPLICA_COLLECT_INFO_TIMER] ;is_profile = true -[task.LPC_MUTATION_PENDING_TIMER] - ;is_profile = true - [task.LPC_GROUP_CHECK] ;is_profile = true diff --git a/src/server/info_collector.cpp b/src/server/info_collector.cpp index 3f15758..f74bf6d 100644 --- a/src/server/info_collector.cpp +++ b/src/server/info_collector.cpp @@ -95,6 +95,8 @@ void info_collector::on_app_stat() all.recent_expire_count += row.recent_expire_count; all.recent_filter_count += row.recent_filter_count; all.recent_abnormal_count += row.recent_abnormal_count; + all.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count; + all.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count; all.storage_mb += row.storage_mb; all.storage_count += row.storage_count; all.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count; @@ -126,6 +128,10 @@ void info_collector::on_app_stat() counters->recent_expire_count->set(row.recent_expire_count); counters->recent_filter_count->set(row.recent_filter_count); counters->recent_abnormal_count->set(row.recent_abnormal_count); + counters->recent_write_throttling_delay_count->set( + row.recent_write_throttling_delay_count); + counters->recent_write_throttling_reject_count->set( + row.recent_write_throttling_reject_count); counters->storage_mb->set(row.storage_mb); counters->storage_count->set(row.storage_count); counters->rdb_block_cache_hit_rate->set(row.rdb_block_cache_hit_count / @@ -175,6 +181,8 @@ info_collector::AppStatCounters *info_collector::get_app_counters(const std::str INIT_COUNER(recent_expire_count); INIT_COUNER(recent_filter_count); INIT_COUNER(recent_abnormal_count); + INIT_COUNER(recent_write_throttling_delay_count); + INIT_COUNER(recent_write_throttling_reject_count); INIT_COUNER(storage_mb); INIT_COUNER(storage_count); INIT_COUNER(rdb_block_cache_hit_rate); diff --git a/src/server/info_collector.h b/src/server/info_collector.h index 9a354e5..bf43cf5 100644 --- a/src/server/info_collector.h +++ b/src/server/info_collector.h @@ -41,6 +41,8 @@ public: ::dsn::perf_counter_wrapper recent_expire_count; ::dsn::perf_counter_wrapper recent_filter_count; ::dsn::perf_counter_wrapper recent_abnormal_count; + ::dsn::perf_counter_wrapper recent_write_throttling_delay_count; + ::dsn::perf_counter_wrapper recent_write_throttling_reject_count; ::dsn::perf_counter_wrapper storage_mb; ::dsn::perf_counter_wrapper storage_count; ::dsn::perf_counter_wrapper rdb_block_cache_hit_rate; diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index 5d69ccb..2a70b9f 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -394,6 +394,8 @@ struct row_data double recent_expire_count = 0; double recent_filter_count = 0; double recent_abnormal_count = 0; + double recent_write_throttling_delay_count = 0; + double recent_write_throttling_reject_count = 0; double storage_mb = 0; double storage_count = 0; double rdb_block_cache_hit_count = 0; @@ -432,6 +434,10 @@ update_app_pegasus_perf_counter(row_data &row, const std::string &counter_name, row.recent_filter_count += value; else if (counter_name == "recent.abnormal.count") row.recent_abnormal_count += value; + else if (counter_name == "recent.write.throttling.delay.count") + row.recent_write_throttling_delay_count += value; + else if (counter_name == "recent.write.throttling.reject.count") + row.recent_write_throttling_reject_count += value; else if (counter_name == "disk.storage.sst(MB)") row.storage_mb += value; else if (counter_name == "disk.storage.sst.count") @@ -484,9 +490,9 @@ get_app_stat(shell_context *sc, const std::string &app_name, std::vectorapp_id); + sprintf(tmp, ".*@%d\\..*", app_info->app_id); } command.arguments.push_back(tmp); std::vector> results; diff --git a/src/shell/commands.h b/src/shell/commands.h index d041204..8dd7986 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -3654,6 +3654,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) sum.recent_expire_count += row.recent_expire_count; sum.recent_filter_count += row.recent_filter_count; sum.recent_abnormal_count += row.recent_abnormal_count; + sum.recent_write_throttling_delay_count += row.recent_write_throttling_delay_count; + sum.recent_write_throttling_reject_count += row.recent_write_throttling_reject_count; sum.storage_mb += row.storage_mb; sum.storage_count += row.storage_count; sum.rdb_block_cache_hit_count += row.rdb_block_cache_hit_count; @@ -3690,6 +3692,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) tp.add_column("expired"); tp.add_column("filtered"); tp.add_column("abnormal"); + tp.add_column("delayed"); + tp.add_column("rejected"); tp.add_column("file_mb"); tp.add_column("file_num"); tp.add_column("hit_rate"); @@ -3712,6 +3716,8 @@ inline bool app_stat(command_executor *e, shell_context *sc, arguments args) tp.append_data(row.recent_expire_count); tp.append_data(row.recent_filter_count); tp.append_data(row.recent_abnormal_count); + tp.append_data(row.recent_write_throttling_delay_count); + tp.append_data(row.recent_write_throttling_reject_count); tp.append_data(row.storage_mb); tp.append_data((uint64_t)row.storage_count); double block_cache_hit_rate = -- GitLab