From 77287612e8c44ba563ed40cf0fc34707fc25aaa9 Mon Sep 17 00:00:00 2001 From: ShiningZhang Date: Wed, 9 Feb 2022 14:11:56 +0800 Subject: [PATCH] fix python prometheus client --- .../predictor/framework/prometheus_metric.cpp | 10 +++--- python/pipeline/dag.py | 36 +++++++++++++------ python/pipeline/profiler.py | 11 +++++- python/pipeline/prometheus_metrics.py | 12 +++---- python/requirements.txt | 1 + 5 files changed, 48 insertions(+), 22 deletions(-) diff --git a/core/predictor/framework/prometheus_metric.cpp b/core/predictor/framework/prometheus_metric.cpp index 5ddd0d1d..db4f2f45 100644 --- a/core/predictor/framework/prometheus_metric.cpp +++ b/core/predictor/framework/prometheus_metric.cpp @@ -30,26 +30,26 @@ PrometheusMetric::PrometheusMetric() serializer_(new prometheus::TextSerializer()), query_success_family_( prometheus::BuildCounter() - .Name("pd_query_request_success") + .Name("pd_query_request_success_total") .Help("Number of successful query requests") .Register(*registry_)), query_failure_family_( prometheus::BuildCounter() - .Name("pd_query_request_failure") + .Name("pd_query_request_failure_total") .Help("Number of failed query requests") .Register(*registry_)), inf_count_family_(prometheus::BuildCounter() - .Name("pd_inference_count") + .Name("pd_inference_count_total") .Help("Number of inferences performed") .Register(*registry_)), query_duration_us_family_( prometheus::BuildCounter() - .Name("pd_query_request_duration_us") + .Name("pd_query_request_duration_us_total") .Help("Cummulative query request duration in microseconds") .Register(*registry_)), inf_duration_us_family_( prometheus::BuildCounter() - .Name("pd_inference_duration_us") + .Name("pd_inference_duration_us_total") .Help("Cummulative inference duration in microseconds") .Register(*registry_)), metrics_enabled_(false) diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 430d8d90..d7020ae7 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -62,7 +62,10 @@ class DAGExecutor(object): self._retry = dag_conf["retry"] self._server_use_profile = dag_conf["use_profile"] - if "prometheus_port" in dag_conf: + self._enable_prometheus = False + if "enable_prometheus" in dag_conf: + self._enable_prometheus = dag_conf["enable_prometheus"] + if "prometheus_port" in dag_conf and self._enable_prometheus: self._prometheus_port = dag_conf["prometheus_port"] else: self._prometheus_port = None @@ -81,6 +84,8 @@ class DAGExecutor(object): if tracer_interval_s >= 1: self._tracer = PerformanceTracer( self._is_thread_op, tracer_interval_s, server_worker_num) + if self._enable_prometheus: + self._tracer.set_enable_dict(True) self._dag = DAG(self.name, response_op, self._server_use_profile, self._prometheus_port, self._is_thread_op, channel_size, build_dag_each_worker, @@ -844,9 +849,9 @@ class DAG(object): from .prometheus_metrics import registry from .prometheus_metrics import metric_query_success, metric_query_failure, metric_inf_count, metric_query_duration_us, metric_inf_duration_us app = Flask(__name__) - requests_total = Counter('c1','A counter') + # requests_total = Counter('c1','A counter') - @app.route("/metrics/") + @app.route("/metrics") def requests_count(): item = self._tracer.profile_dict _LOGGER.info("metrics: {}".format(item)) @@ -855,14 +860,24 @@ class DAG(object): total = item["DAG"]["query_count"] succ = total * item["DAG"]["succ"] fail = total * (1 - item["DAG"]["succ"]) - inf_cnt = total query_duration = total *item["DAG"]["avg"] - metric_query_success._value.set(succ) - metric_query_failure._value.set(fail) - metric_inf_count._value.set(total) - metric_query_duration_us._value.set(query_duration) + metric_query_success.inc(succ) + metric_query_failure._value.inc(fail) + metric_query_duration_us._value.inc(query_duration) + + inf_cnt = 0 + infer_duration = 0.0 + for name in item: + if name != "DAG": + if "count" in item[name]: + inf_cnt += item[name]["count"] + if "midp" in item[name]: + infer_duration += item[name]["count"]*item[name]["midp"] + metric_inf_count._value.inc(inf_cnt) + metric_inf_duration_us._value.inc(infer_duration) #return str(item) + self._tracer.profile_dict = {} return Response(prometheus_client.generate_latest(registry),mimetype="text/plain") def prom_run(): @@ -895,8 +910,9 @@ class DAG(object): else: self._threads_or_proces.extend(op.start_with_process()) _LOGGER.info("[DAG] start") - _LOGGER.info("Prometheus Start 1") - self.start_prom(self._prometheus_port) + if self._use_prometheus: + _LOGGER.info("Prometheus Start 1") + self.start_prom(self._prometheus_port) # not join yet return self._threads_or_proces diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 94d5a7a0..2318e4e8 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -50,6 +50,7 @@ class PerformanceTracer(object): # The size of data in Channel will not exceed server_worker_num self._server_worker_num = server_worker_num self.profile_dict = {} + self._enable_dict = False def data_buffer(self): return self._data_buffer @@ -75,6 +76,9 @@ class PerformanceTracer(object): def set_channels(self, channels): self._channels = channels + def set_enable_dict(self, enable): + self._enable_dict = enable + def _trace_func(self, channels): all_actions = ["in", "prep", "midp", "postp", "out"] calcu_actions = ["prep", "midp", "postp"] @@ -111,9 +115,14 @@ class PerformanceTracer(object): if len(op_cost) != 0: for name in op_cost: tot_cost, calcu_cost = 0.0, 0.0 + count = 0 for action, costs in op_cost[name].items(): op_cost[name][action] = sum(costs) / (1e3 * len(costs)) tot_cost += op_cost[name][action] + if action == "midp": + count = len(costs) + if "midp" in op_cost[name].keys(): + op_cost[name]['count'] = count if name != "DAG": _LOGGER.info("Op({}):".format(name)) @@ -146,7 +155,7 @@ class PerformanceTracer(object): for latency in latencys: _LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int( tot * latency / 100.0)])) - if _is_profile: + if _is_profile or self._enable_dict: self.profile_dict["DAG"]["query_count"] = tot self.profile_dict["DAG"]["qps"] = qps self.profile_dict["DAG"]["succ"] = 1 - 1.0 * err_count / tot diff --git a/python/pipeline/prometheus_metrics.py b/python/pipeline/prometheus_metrics.py index 6cf79336..72815374 100644 --- a/python/pipeline/prometheus_metrics.py +++ b/python/pipeline/prometheus_metrics.py @@ -1,8 +1,8 @@ -from prometheus_client import Counter, generate_latest, CollectorRegistry +from prometheus_client import Counter, generate_latest, CollectorRegistry, Gauge registry = CollectorRegistry() -metric_query_success = Counter("QuerySucc", "metric_query_success", registry=registry) -metric_query_failure = Counter("QueryFail", "metric_query_failure", registry=registry) -metric_inf_count = Counter("InferCnt", "metric_inf_count", registry=registry) -metric_query_duration_us = Counter("QueryDuratioin", "metric_query_duration_us", registry=registry) -metric_inf_duration_us = Counter("InferDuration", "metric_inf_duration_us", registry=registry) +metric_query_success = Counter("pd_query_request_success_total", "metric_query_success", registry=registry) +metric_query_failure = Counter("pd_query_request_failure_total", "metric_query_failure", registry=registry) +metric_inf_count = Counter("pd_inference_count_total", "metric_inf_count", registry=registry) +metric_query_duration_us = Counter("pd_query_request_duration_us_total", "metric_query_duration_us", registry=registry) +metric_inf_duration_us = Counter("pd_inference_duration_us_total", "metric_inf_duration_us", registry=registry) diff --git a/python/requirements.txt b/python/requirements.txt index 9f17ce5c..094d2f98 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -21,3 +21,4 @@ sentencepiece; platform_machine == "aarch64" opencv-python==4.3.0.38; platform_machine != "aarch64" opencv-python; platform_machine == "aarch64" pytest +prometheus-client==0.12.0 \ No newline at end of file -- GitLab