提交 49f1d810 编写于 作者: B bjjwwang

add prometheus python cli

上级 bc9a65c0
......@@ -62,6 +62,10 @@ class DAGExecutor(object):
self._retry = dag_conf["retry"]
self._server_use_profile = dag_conf["use_profile"]
if "prometheus_port" in dag_conf:
self._prometheus_port = dag_conf["prometheus_port"]
else:
self._prometheus_port = None
channel_size = dag_conf["channel_size"]
channel_recv_frist_arrive = dag_conf["channel_recv_frist_arrive"]
self._is_thread_op = dag_conf["is_thread_op"]
......@@ -78,7 +82,7 @@ class DAGExecutor(object):
self._tracer = PerformanceTracer(
self._is_thread_op, tracer_interval_s, server_worker_num)
self._dag = DAG(self.name, response_op, self._server_use_profile,
self._dag = DAG(self.name, response_op, self._server_use_profile, self._prometheus_port,
self._is_thread_op, channel_size, build_dag_each_worker,
self._tracer, channel_recv_frist_arrive)
(in_channel, out_channel, pack_rpc_func,
......@@ -480,10 +484,10 @@ class DAG(object):
"""
Directed Acyclic Graph(DAG) engine, builds one DAG topology.
"""
def __init__(self, request_name, response_op, use_profile, is_thread_op,
def __init__(self, request_name, response_op, use_profile, prometheus_port, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive):
_LOGGER.info("{}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, is_thread_op,
_LOGGER.info("{}, {}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, prometheus_port, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive))
@ErrorCatch
......@@ -491,6 +495,7 @@ class DAG(object):
def init_helper(self, request_name: str,
response_op,
use_profile: [bool, None],
prometheus_port: [int, None],
is_thread_op: bool,
channel_size,
build_dag_each_worker: [bool, None],
......@@ -499,6 +504,8 @@ class DAG(object):
self._request_name = request_name
self._response_op = response_op
self._use_profile = use_profile
self._prometheus_port = prometheus_port
self._use_prometheus = (self._prometheus_port is not None)
self._is_thread_op = is_thread_op
self._channel_size = channel_size
self._build_dag_each_worker = build_dag_each_worker
......@@ -506,7 +513,7 @@ class DAG(object):
self._channel_recv_frist_arrive = channel_recv_frist_arrive
if not self._is_thread_op:
self._manager = PipelineProcSyncManager()
init_helper(self, request_name, response_op, use_profile, is_thread_op,
init_helper(self, request_name, response_op, use_profile, prometheus_port, is_thread_op,
channel_size, build_dag_each_worker, tracer,
channel_recv_frist_arrive)
print("[DAG] Succ init")
......@@ -828,27 +835,40 @@ class DAG(object):
return self._input_channel, self._output_channel, self._pack_func, self._unpack_func
def start_prom(self):
def start_prom(self, prometheus_port):
import prometheus_client
from prometheus_client import Counter
from prometheus_client.core import CollectorRegistry
from flask import Response, Flask
from .prometheus_metrics import registry
from .prometheus_metrics import metric_query_success, metric_query_failure, metric_inf_count, metric_query_duration_us, metric_inf_duration_us
app = Flask(__name__)
requests_total = Counter('c1','A counter')
@app.route("/metrics/")
def requests_count():
requests_total.inc(1)
# requests_total.inc(2)
item = self._tracer.profile_dict
_LOGGER.info("metrics: {}".format(item))
# {'uci': {'in': 727.443, 'prep': 0.5525833333333333, 'midp': 2.21375, 'postp': 1.32375, 'out': 0.9396666666666667}, 'DAG': {'call_0': 29.479, 'call_1': 8.176, 'call_2': 8.045, 'call_3': 7.988, 'call_4': 7.609, 'call_5': 7.629, 'call_6': 7.625, 'call_7': 8.32, 'call_8': 8.57, 'call_9': 8.055, 'call_10': 7.915, 'call_11': 7.873, 'query_count': 12, 'qps': 1.2, 'succ': 1.0, 'avg': 9.773666666666667, '50': 8.045, '60': 8.055, '70': 8.176, '80': 8.32, '90': 8.57, '95': 29.479, '99': 29.479}}
if "DAG" in item:
total = item["DAG"]["query_count"]
succ = total * item["DAG"]["succ"]
fail = total * (1 - item["DAG"]["succ"])
inf_cnt = total
query_duration = total *item["DAG"]["avg"]
metric_query_success._value.set(succ)
metric_query_failure._value.set(fail)
metric_inf_count._value.set(total)
metric_query_duration_us._value.set(query_duration)
#return str(item)
return Response(prometheus_client.generate_latest(registry),mimetype="text/plain")
def prom_run():
app.run(host="0.0.0.0",port=8581)
p = multiprocessing.Process(
app.run(host="0.0.0.0",port=prometheus_port)
p = threading.Thread(
target=prom_run,
args=())
_LOGGER.info("Prometheus Start 2")
......@@ -869,13 +889,14 @@ class DAG(object):
for op in self._actual_ops:
op.use_profiler(self._use_profile)
op.set_tracer(self._tracer)
op.set_use_prometheus(self._use_prometheus)
if self._is_thread_op:
self._threads_or_proces.extend(op.start_with_thread())
else:
self._threads_or_proces.extend(op.start_with_process())
_LOGGER.info("[DAG] start")
_LOGGER.info("Prometheus Start 1")
self.start_prom()
self.start_prom(self._prometheus_port)
# not join yet
return self._threads_or_proces
......
......@@ -349,6 +349,9 @@ class Op(object):
def set_tracer(self, tracer):
self._tracer = tracer
def set_use_prometheus(self, use_prometheus):
self._use_prometheus = use_prometheus
def init_client(self, client_config, server_endpoints):
"""
Initialize the client object. There are three types of clients, brpc,
......@@ -1403,6 +1406,7 @@ class Op(object):
midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, op_info_prefix, skip_process_dict, logid_dict)
end = profiler.record("midp#{}_1".format(op_info_prefix))
_LOGGER.info("prometheus inf count +1")
midp_time = end - start
_LOGGER.debug("op:{} process_end:{}, cost:{}".format(
op_info_prefix, time.time(), midp_time))
......
......@@ -49,13 +49,17 @@ class PerformanceTracer(object):
self._channels = []
# The size of data in Channel will not exceed server_worker_num
self._server_worker_num = server_worker_num
if _is_profile:
self.profile_dict = {}
self.profile_dict = {}
def data_buffer(self):
return self._data_buffer
def start(self):
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
self._thrd.daemon = True
self._thrd.start()
"""
if self._is_thread_mode:
self._thrd = threading.Thread(
target=self._trace_func, args=(self._channels, ))
......@@ -66,6 +70,7 @@ class PerformanceTracer(object):
target=self._trace_func, args=(self._channels, ))
self._proc.daemon = True
self._proc.start()
"""
def set_channels(self, channels):
self._channels = channels
......@@ -121,8 +126,7 @@ class PerformanceTracer(object):
calcu_cost += op_cost[name][action]
_LOGGER.info("\tidle[{}]".format(1 - 1.0 * calcu_cost /
tot_cost))
if _is_profile:
self.profile_dict = copy.deepcopy(op_cost)
self.profile_dict = copy.deepcopy(op_cost)
if "DAG" in op_cost:
calls = list(op_cost["DAG"].values())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册