提交 77287612 编写于 作者: S ShiningZhang

fix python prometheus client

上级 ba1e97b8
......@@ -30,26 +30,26 @@ PrometheusMetric::PrometheusMetric()
serializer_(new prometheus::TextSerializer()),
query_success_family_(
prometheus::BuildCounter()
.Name("pd_query_request_success")
.Name("pd_query_request_success_total")
.Help("Number of successful query requests")
.Register(*registry_)),
query_failure_family_(
prometheus::BuildCounter()
.Name("pd_query_request_failure")
.Name("pd_query_request_failure_total")
.Help("Number of failed query requests")
.Register(*registry_)),
inf_count_family_(prometheus::BuildCounter()
.Name("pd_inference_count")
.Name("pd_inference_count_total")
.Help("Number of inferences performed")
.Register(*registry_)),
query_duration_us_family_(
prometheus::BuildCounter()
.Name("pd_query_request_duration_us")
.Name("pd_query_request_duration_us_total")
.Help("Cummulative query request duration in microseconds")
.Register(*registry_)),
inf_duration_us_family_(
prometheus::BuildCounter()
.Name("pd_inference_duration_us")
.Name("pd_inference_duration_us_total")
.Help("Cummulative inference duration in microseconds")
.Register(*registry_)),
metrics_enabled_(false)
......
......@@ -62,7 +62,10 @@ class DAGExecutor(object):
self._retry = dag_conf["retry"]
self._server_use_profile = dag_conf["use_profile"]
if "prometheus_port" in dag_conf:
self._enable_prometheus = False
if "enable_prometheus" in dag_conf:
self._enable_prometheus = dag_conf["enable_prometheus"]
if "prometheus_port" in dag_conf and self._enable_prometheus:
self._prometheus_port = dag_conf["prometheus_port"]
else:
self._prometheus_port = None
......@@ -81,6 +84,8 @@ class DAGExecutor(object):
if tracer_interval_s >= 1:
self._tracer = PerformanceTracer(
self._is_thread_op, tracer_interval_s, server_worker_num)
if self._enable_prometheus:
self._tracer.set_enable_dict(True)
self._dag = DAG(self.name, response_op, self._server_use_profile, self._prometheus_port,
self._is_thread_op, channel_size, build_dag_each_worker,
......@@ -844,9 +849,9 @@ class DAG(object):
from .prometheus_metrics import registry
from .prometheus_metrics import metric_query_success, metric_query_failure, metric_inf_count, metric_query_duration_us, metric_inf_duration_us
app = Flask(__name__)
requests_total = Counter('c1','A counter')
# requests_total = Counter('c1','A counter')
@app.route("/metrics/")
@app.route("/metrics")
def requests_count():
item = self._tracer.profile_dict
_LOGGER.info("metrics: {}".format(item))
......@@ -855,14 +860,24 @@ class DAG(object):
total = item["DAG"]["query_count"]
succ = total * item["DAG"]["succ"]
fail = total * (1 - item["DAG"]["succ"])
inf_cnt = total
query_duration = total *item["DAG"]["avg"]
metric_query_success._value.set(succ)
metric_query_failure._value.set(fail)
metric_inf_count._value.set(total)
metric_query_duration_us._value.set(query_duration)
metric_query_success.inc(succ)
metric_query_failure._value.inc(fail)
metric_query_duration_us._value.inc(query_duration)
inf_cnt = 0
infer_duration = 0.0
for name in item:
if name != "DAG":
if "count" in item[name]:
inf_cnt += item[name]["count"]
if "midp" in item[name]:
infer_duration += item[name]["count"]*item[name]["midp"]
metric_inf_count._value.inc(inf_cnt)
metric_inf_duration_us._value.inc(infer_duration)
#return str(item)
self._tracer.profile_dict = {}
return Response(prometheus_client.generate_latest(registry),mimetype="text/plain")
def prom_run():
......@@ -895,8 +910,9 @@ class DAG(object):
else:
self._threads_or_proces.extend(op.start_with_process())
_LOGGER.info("[DAG] start")
_LOGGER.info("Prometheus Start 1")
self.start_prom(self._prometheus_port)
if self._use_prometheus:
_LOGGER.info("Prometheus Start 1")
self.start_prom(self._prometheus_port)
# not join yet
return self._threads_or_proces
......
......@@ -50,6 +50,7 @@ class PerformanceTracer(object):
# The size of data in Channel will not exceed server_worker_num
self._server_worker_num = server_worker_num
self.profile_dict = {}
self._enable_dict = False
def data_buffer(self):
return self._data_buffer
......@@ -75,6 +76,9 @@ class PerformanceTracer(object):
def set_channels(self, channels):
self._channels = channels
def set_enable_dict(self, enable):
self._enable_dict = enable
def _trace_func(self, channels):
all_actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"]
......@@ -111,9 +115,14 @@ class PerformanceTracer(object):
if len(op_cost) != 0:
for name in op_cost:
tot_cost, calcu_cost = 0.0, 0.0
count = 0
for action, costs in op_cost[name].items():
op_cost[name][action] = sum(costs) / (1e3 * len(costs))
tot_cost += op_cost[name][action]
if action == "midp":
count = len(costs)
if "midp" in op_cost[name].keys():
op_cost[name]['count'] = count
if name != "DAG":
_LOGGER.info("Op({}):".format(name))
......@@ -146,7 +155,7 @@ class PerformanceTracer(object):
for latency in latencys:
_LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
if _is_profile:
if _is_profile or self._enable_dict:
self.profile_dict["DAG"]["query_count"] = tot
self.profile_dict["DAG"]["qps"] = qps
self.profile_dict["DAG"]["succ"] = 1 - 1.0 * err_count / tot
......
from prometheus_client import Counter, generate_latest, CollectorRegistry
from prometheus_client import Counter, generate_latest, CollectorRegistry, Gauge
registry = CollectorRegistry()
metric_query_success = Counter("QuerySucc", "metric_query_success", registry=registry)
metric_query_failure = Counter("QueryFail", "metric_query_failure", registry=registry)
metric_inf_count = Counter("InferCnt", "metric_inf_count", registry=registry)
metric_query_duration_us = Counter("QueryDuratioin", "metric_query_duration_us", registry=registry)
metric_inf_duration_us = Counter("InferDuration", "metric_inf_duration_us", registry=registry)
metric_query_success = Counter("pd_query_request_success_total", "metric_query_success", registry=registry)
metric_query_failure = Counter("pd_query_request_failure_total", "metric_query_failure", registry=registry)
metric_inf_count = Counter("pd_inference_count_total", "metric_inf_count", registry=registry)
metric_query_duration_us = Counter("pd_query_request_duration_us_total", "metric_query_duration_us", registry=registry)
metric_inf_duration_us = Counter("pd_inference_duration_us_total", "metric_inf_duration_us", registry=registry)
......@@ -21,3 +21,4 @@ sentencepiece; platform_machine == "aarch64"
opencv-python==4.3.0.38; platform_machine != "aarch64"
opencv-python; platform_machine == "aarch64"
pytest
prometheus-client==0.12.0
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册