From 49f1d81050ca12f3d20d3152e2067966b14d6912 Mon Sep 17 00:00:00 2001 From: bjjwwang Date: Fri, 28 Jan 2022 07:12:29 +0000 Subject: [PATCH] add prometheus python cli --- python/pipeline/dag.py | 47 +++++++++++++++++++++++++++---------- python/pipeline/operator.py | 4 ++++ python/pipeline/profiler.py | 12 ++++++---- 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 47e40f67..430d8d90 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -62,6 +62,10 @@ class DAGExecutor(object): self._retry = dag_conf["retry"] self._server_use_profile = dag_conf["use_profile"] + if "prometheus_port" in dag_conf: + self._prometheus_port = dag_conf["prometheus_port"] + else: + self._prometheus_port = None channel_size = dag_conf["channel_size"] channel_recv_frist_arrive = dag_conf["channel_recv_frist_arrive"] self._is_thread_op = dag_conf["is_thread_op"] @@ -78,7 +82,7 @@ class DAGExecutor(object): self._tracer = PerformanceTracer( self._is_thread_op, tracer_interval_s, server_worker_num) - self._dag = DAG(self.name, response_op, self._server_use_profile, + self._dag = DAG(self.name, response_op, self._server_use_profile, self._prometheus_port, self._is_thread_op, channel_size, build_dag_each_worker, self._tracer, channel_recv_frist_arrive) (in_channel, out_channel, pack_rpc_func, @@ -480,10 +484,10 @@ class DAG(object): """ Directed Acyclic Graph(DAG) engine, builds one DAG topology. """ - def __init__(self, request_name, response_op, use_profile, is_thread_op, + def __init__(self, request_name, response_op, use_profile, prometheus_port, is_thread_op, channel_size, build_dag_each_worker, tracer, channel_recv_frist_arrive): - _LOGGER.info("{}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, is_thread_op, + _LOGGER.info("{}, {}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, prometheus_port, is_thread_op, channel_size, build_dag_each_worker, tracer, channel_recv_frist_arrive)) @ErrorCatch @@ -491,6 +495,7 @@ class DAG(object): def init_helper(self, request_name: str, response_op, use_profile: [bool, None], + prometheus_port: [int, None], is_thread_op: bool, channel_size, build_dag_each_worker: [bool, None], @@ -499,6 +504,8 @@ class DAG(object): self._request_name = request_name self._response_op = response_op self._use_profile = use_profile + self._prometheus_port = prometheus_port + self._use_prometheus = (self._prometheus_port is not None) self._is_thread_op = is_thread_op self._channel_size = channel_size self._build_dag_each_worker = build_dag_each_worker @@ -506,7 +513,7 @@ class DAG(object): self._channel_recv_frist_arrive = channel_recv_frist_arrive if not self._is_thread_op: self._manager = PipelineProcSyncManager() - init_helper(self, request_name, response_op, use_profile, is_thread_op, + init_helper(self, request_name, response_op, use_profile, prometheus_port, is_thread_op, channel_size, build_dag_each_worker, tracer, channel_recv_frist_arrive) print("[DAG] Succ init") @@ -828,27 +835,40 @@ class DAG(object): return self._input_channel, self._output_channel, self._pack_func, self._unpack_func - def start_prom(self): + def start_prom(self, prometheus_port): import prometheus_client from prometheus_client import Counter from prometheus_client.core import CollectorRegistry from flask import Response, Flask from .prometheus_metrics import registry - + from .prometheus_metrics import metric_query_success, metric_query_failure, metric_inf_count, metric_query_duration_us, metric_inf_duration_us app = Flask(__name__) requests_total = Counter('c1','A counter') - + @app.route("/metrics/") def requests_count(): - requests_total.inc(1) - # requests_total.inc(2) + item = self._tracer.profile_dict + _LOGGER.info("metrics: {}".format(item)) + # {'uci': {'in': 727.443, 'prep': 0.5525833333333333, 'midp': 2.21375, 'postp': 1.32375, 'out': 0.9396666666666667}, 'DAG': {'call_0': 29.479, 'call_1': 8.176, 'call_2': 8.045, 'call_3': 7.988, 'call_4': 7.609, 'call_5': 7.629, 'call_6': 7.625, 'call_7': 8.32, 'call_8': 8.57, 'call_9': 8.055, 'call_10': 7.915, 'call_11': 7.873, 'query_count': 12, 'qps': 1.2, 'succ': 1.0, 'avg': 9.773666666666667, '50': 8.045, '60': 8.055, '70': 8.176, '80': 8.32, '90': 8.57, '95': 29.479, '99': 29.479}} + if "DAG" in item: + total = item["DAG"]["query_count"] + succ = total * item["DAG"]["succ"] + fail = total * (1 - item["DAG"]["succ"]) + inf_cnt = total + query_duration = total *item["DAG"]["avg"] + metric_query_success._value.set(succ) + metric_query_failure._value.set(fail) + metric_inf_count._value.set(total) + metric_query_duration_us._value.set(query_duration) + + #return str(item) return Response(prometheus_client.generate_latest(registry),mimetype="text/plain") def prom_run(): - app.run(host="0.0.0.0",port=8581) - - p = multiprocessing.Process( + app.run(host="0.0.0.0",port=prometheus_port) + + p = threading.Thread( target=prom_run, args=()) _LOGGER.info("Prometheus Start 2") @@ -869,13 +889,14 @@ class DAG(object): for op in self._actual_ops: op.use_profiler(self._use_profile) op.set_tracer(self._tracer) + op.set_use_prometheus(self._use_prometheus) if self._is_thread_op: self._threads_or_proces.extend(op.start_with_thread()) else: self._threads_or_proces.extend(op.start_with_process()) _LOGGER.info("[DAG] start") _LOGGER.info("Prometheus Start 1") - self.start_prom() + self.start_prom(self._prometheus_port) # not join yet return self._threads_or_proces diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index a868b3e7..bd4079ad 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -349,6 +349,9 @@ class Op(object): def set_tracer(self, tracer): self._tracer = tracer + def set_use_prometheus(self, use_prometheus): + self._use_prometheus = use_prometheus + def init_client(self, client_config, server_endpoints): """ Initialize the client object. There are three types of clients, brpc, @@ -1403,6 +1406,7 @@ class Op(object): midped_data_dict, err_channeldata_dict \ = self._run_process(preped_data_dict, op_info_prefix, skip_process_dict, logid_dict) end = profiler.record("midp#{}_1".format(op_info_prefix)) + _LOGGER.info("prometheus inf count +1") midp_time = end - start _LOGGER.debug("op:{} process_end:{}, cost:{}".format( op_info_prefix, time.time(), midp_time)) diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 6e38cb31..94d5a7a0 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -49,13 +49,17 @@ class PerformanceTracer(object): self._channels = [] # The size of data in Channel will not exceed server_worker_num self._server_worker_num = server_worker_num - if _is_profile: - self.profile_dict = {} + self.profile_dict = {} def data_buffer(self): return self._data_buffer def start(self): + self._thrd = threading.Thread( + target=self._trace_func, args=(self._channels, )) + self._thrd.daemon = True + self._thrd.start() + """ if self._is_thread_mode: self._thrd = threading.Thread( target=self._trace_func, args=(self._channels, )) @@ -66,6 +70,7 @@ class PerformanceTracer(object): target=self._trace_func, args=(self._channels, )) self._proc.daemon = True self._proc.start() + """ def set_channels(self, channels): self._channels = channels @@ -121,8 +126,7 @@ class PerformanceTracer(object): calcu_cost += op_cost[name][action] _LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost / tot_cost)) - if _is_profile: - self.profile_dict = copy.deepcopy(op_cost) + self.profile_dict = copy.deepcopy(op_cost) if "DAG" in op_cost: calls = list(op_cost["DAG"].values()) -- GitLab