提交 8d37559d 编写于 作者: S ShiningZhang

Merge pull request #1586 from bjjwwang/develop

add prom python client
上级 e999e9ae
......@@ -30,26 +30,26 @@ PrometheusMetric::PrometheusMetric()
serializer_(new prometheus::TextSerializer()),
query_success_family_(
prometheus::BuildCounter()
.Name("pd_query_request_success")
.Name("pd_query_request_success_total")
.Help("Number of successful query requests")
.Register(*registry_)),
query_failure_family_(
prometheus::BuildCounter()
.Name("pd_query_request_failure")
.Name("pd_query_request_failure_total")
.Help("Number of failed query requests")
.Register(*registry_)),
inf_count_family_(prometheus::BuildCounter()
.Name("pd_inference_count")
.Name("pd_inference_count_total")
.Help("Number of inferences performed")
.Register(*registry_)),
query_duration_us_family_(
prometheus::BuildCounter()
.Name("pd_query_request_duration_us")
.Name("pd_query_request_duration_us_total")
.Help("Cummulative query request duration in microseconds")
.Register(*registry_)),
inf_duration_us_family_(
prometheus::BuildCounter()
.Name("pd_inference_duration_us")
.Name("pd_inference_duration_us_total")
.Help("Cummulative inference duration in microseconds")
.Register(*registry_)),
metrics_enabled_(false)
......
......@@ -62,6 +62,13 @@ class DAGExecutor(object):
self._retry = dag_conf["retry"]
self._server_use_profile = dag_conf["use_profile"]
self._enable_prometheus = False
if "enable_prometheus" in dag_conf:
self._enable_prometheus = dag_conf["enable_prometheus"]
if "prometheus_port" in dag_conf and self._enable_prometheus:
self._prometheus_port = dag_conf["prometheus_port"]
else:
self._prometheus_port = None
channel_size = dag_conf["channel_size"]
channel_recv_frist_arrive = dag_conf["channel_recv_frist_arrive"]
self._is_thread_op = dag_conf["is_thread_op"]
......@@ -77,8 +84,10 @@ class DAGExecutor(object):
if tracer_interval_s >= 1:
self._tracer = PerformanceTracer(
self._is_thread_op, tracer_interval_s, server_worker_num)
if self._enable_prometheus:
self._tracer.set_enable_dict(True)
self._dag = DAG(self.name, response_op, self._server_use_profile,
self._dag = DAG(self.name, response_op, self._server_use_profile, self._prometheus_port,
self._is_thread_op, channel_size, build_dag_each_worker,
self._tracer, channel_recv_frist_arrive)
(in_channel, out_channel, pack_rpc_func,
......@@ -480,10 +489,10 @@ class DAG(object):
"""
Directed Acyclic Graph(DAG) engine, builds one DAG topology.
"""
def __init__(self, request_name, response_op, use_profile, is_thread_op,
def __init__(self, request_name, response_op, use_profile, prometheus_port, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive):
_LOGGER.info("{}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, is_thread_op,
_LOGGER.info("{}, {}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, prometheus_port, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive))
@ErrorCatch
......@@ -491,6 +500,7 @@ class DAG(object):
def init_helper(self, request_name: str,
response_op,
use_profile: [bool, None],
prometheus_port: [int, None],
is_thread_op: bool,
channel_size,
build_dag_each_worker: [bool, None],
......@@ -499,6 +509,8 @@ class DAG(object):
self._request_name = request_name
self._response_op = response_op
self._use_profile = use_profile
self._prometheus_port = prometheus_port
self._use_prometheus = (self._prometheus_port is not None)
self._is_thread_op = is_thread_op
self._channel_size = channel_size
self._build_dag_each_worker = build_dag_each_worker
......@@ -506,7 +518,7 @@ class DAG(object):
self._channel_recv_frist_arrive = channel_recv_frist_arrive
if not self._is_thread_op:
self._manager = PipelineProcSyncManager()
init_helper(self, request_name, response_op, use_profile, is_thread_op,
init_helper(self, request_name, response_op, use_profile, prometheus_port, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive)
print("[DAG] Succ init")
......@@ -828,6 +840,56 @@ class DAG(object):
return self._input_channel, self._output_channel, self._pack_func, self._unpack_func
def start_prom(self, prometheus_port):
import prometheus_client
from prometheus_client import Counter
from prometheus_client.core import CollectorRegistry
from flask import Response, Flask
from .prometheus_metrics import registry
from .prometheus_metrics import metric_query_success, metric_query_failure, metric_inf_count, metric_query_duration_us, metric_inf_duration_us
app = Flask(__name__)
# requests_total = Counter('c1','A counter')
@app.route("/metrics")
def requests_count():
item = self._tracer.profile_dict
_LOGGER.info("metrics: {}".format(item))
# {'uci': {'in': 727.443, 'prep': 0.5525833333333333, 'midp': 2.21375, 'postp': 1.32375, 'out': 0.9396666666666667}, 'DAG': {'call_0': 29.479, 'call_1': 8.176, 'call_2': 8.045, 'call_3': 7.988, 'call_4': 7.609, 'call_5': 7.629, 'call_6': 7.625, 'call_7': 8.32, 'call_8': 8.57, 'call_9': 8.055, 'call_10': 7.915, 'call_11': 7.873, 'query_count': 12, 'qps': 1.2, 'succ': 1.0, 'avg': 9.773666666666667, '50': 8.045, '60': 8.055, '70': 8.176, '80': 8.32, '90': 8.57, '95': 29.479, '99': 29.479}}
if "DAG" in item:
total = item["DAG"]["query_count"]
succ = total * item["DAG"]["succ"]
fail = total * (1 - item["DAG"]["succ"])
query_duration = total *item["DAG"]["avg"]
metric_query_success.inc(succ)
metric_query_failure._value.inc(fail)
metric_query_duration_us._value.inc(query_duration)
inf_cnt = 0
infer_duration = 0.0
for name in item:
if name != "DAG":
if "count" in item[name]:
inf_cnt += item[name]["count"]
if "midp" in item[name]:
infer_duration += item[name]["count"]*item[name]["midp"]
metric_inf_count._value.inc(inf_cnt)
metric_inf_duration_us._value.inc(infer_duration)
#return str(item)
self._tracer.profile_dict = {}
return Response(prometheus_client.generate_latest(registry),mimetype="text/plain")
def prom_run():
app.run(host="0.0.0.0",port=prometheus_port)
p = threading.Thread(
target=prom_run,
args=())
_LOGGER.info("Prometheus Start 2")
p.daemon = True
p.start()
def start(self):
"""
Each OP starts a thread or process by _is_thread_op
......@@ -842,12 +904,16 @@ class DAG(object):
for op in self._actual_ops:
op.use_profiler(self._use_profile)
op.set_tracer(self._tracer)
op.set_use_prometheus(self._use_prometheus)
if self._is_thread_op:
self._threads_or_proces.extend(op.start_with_thread())
else:
self._threads_or_proces.extend(op.start_with_process())
_LOGGER.info("[DAG] start")
if self._use_prometheus:
_LOGGER.info("Prometheus Start 1")
self.start_prom(self._prometheus_port)
# not join yet
return self._threads_or_proces
......
......@@ -371,6 +371,9 @@ class Op(object):
def set_tracer(self, tracer):
self._tracer = tracer
def set_use_prometheus(self, use_prometheus):
self._use_prometheus = use_prometheus
def init_client(self, client_config, server_endpoints):
"""
Initialize the client object. There are three types of clients, brpc,
......@@ -1448,6 +1451,7 @@ class Op(object):
midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, op_info_prefix, skip_process_dict, logid_dict)
end = profiler.record("midp#{}_1".format(op_info_prefix))
_LOGGER.info("prometheus inf count +1")
midp_time = end - start
_LOGGER.debug("op:{} process_end:{}, cost:{}".format(
op_info_prefix, time.time(), midp_time))
......
......@@ -49,13 +49,18 @@ class PerformanceTracer(object):
self._channels = []
# The size of data in Channel will not exceed server_worker_num
self._server_worker_num = server_worker_num
if _is_profile:
self.profile_dict = {}
self.profile_dict = {}
self._enable_dict = False
def data_buffer(self):
return self._data_buffer
def start(self):
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
self._thrd.daemon = True
self._thrd.start()
"""
if self._is_thread_mode:
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
......@@ -66,10 +71,14 @@ class PerformanceTracer(object):
target=self._trace_func, args=(self._channels, ))
self._proc.daemon = True
self._proc.start()
"""
def set_channels(self, channels):
self._channels = channels
def set_enable_dict(self, enable):
self._enable_dict = enable
def _trace_func(self, channels):
all_actions = ["in", "prep", "midp", "postp", "out"]
calcu_actions = ["prep", "midp", "postp"]
......@@ -106,9 +115,14 @@ class PerformanceTracer(object):
if len(op_cost) != 0:
for name in op_cost:
tot_cost, calcu_cost = 0.0, 0.0
count = 0
for action, costs in op_cost[name].items():
op_cost[name][action] = sum(costs) / (1e3 * len(costs))
tot_cost += op_cost[name][action]
if action == "midp":
count = len(costs)
if "midp" in op_cost[name].keys():
op_cost[name]['count'] = count
if name != "DAG":
_LOGGER.info("Op({}):".format(name))
......@@ -121,8 +135,7 @@ class PerformanceTracer(object):
calcu_cost += op_cost[name][action]
_LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost))
if _is_profile:
self.profile_dict = copy.deepcopy(op_cost)
self.profile_dict = copy.deepcopy(op_cost)
if "DAG" in op_cost:
calls = list(op_cost["DAG"].values())
......@@ -142,7 +155,7 @@ class PerformanceTracer(object):
for latency in latencys:
_LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int(
tot * latency / 100.0)]))
if _is_profile:
if _is_profile or self._enable_dict:
self.profile_dict["DAG"]["query_count"] = tot
self.profile_dict["DAG"]["qps"] = qps
self.profile_dict["DAG"]["succ"] = 1 - 1.0 * err_count / tot
......
from prometheus_client import Counter, generate_latest, CollectorRegistry, Gauge
registry = CollectorRegistry()
metric_query_success = Counter("pd_query_request_success_total", "metric_query_success", registry=registry)
metric_query_failure = Counter("pd_query_request_failure_total", "metric_query_failure", registry=registry)
metric_inf_count = Counter("pd_inference_count_total", "metric_inf_count", registry=registry)
metric_query_duration_us = Counter("pd_query_request_duration_us_total", "metric_query_duration_us", registry=registry)
metric_inf_duration_us = Counter("pd_inference_duration_us_total", "metric_inf_duration_us", registry=registry)
......@@ -21,3 +21,4 @@ sentencepiece; platform_machine == "aarch64"
opencv-python==4.3.0.38; platform_machine != "aarch64"
opencv-python; platform_machine == "aarch64"
pytest
prometheus-client==0.12.0
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册