From 45747ba8c407ae9635f9be39cf7ab83653c4cd1c Mon Sep 17 00:00:00 2001 From: barrierye Date: Mon, 10 Aug 2020 19:11:41 +0800 Subject: [PATCH] update tracer --- python/pipeline/dag.py | 15 +++++++++------ python/pipeline/operator.py | 38 ++++++++++++++++++++++++++++--------- python/pipeline/profiler.py | 38 +++++++++++++++++++++++-------------- 3 files changed, 62 insertions(+), 29 deletions(-) diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 4352cfed..bde87cfe 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -24,6 +24,7 @@ else: raise Exception("Error Python version") import os import logging +import collections from .operator import Op, RequestOp, ResponseOp, VirtualOp from .channel import (ThreadChannel, ProcessChannel, ChannelData, @@ -284,12 +285,14 @@ class DAGExecutor(object): end_call = self._profiler.record("call_{}#DAG_1".format(data_id)) if self._tracer is not None: - if resp_channeldata.ecode == ChannelDataEcode.OK.value: - trace_buffer.put(("DAG", "call_{}".format(data_id), True, - end_call - start_call)) - else: - trace_buffer.put(("DAG", "call_{}".format(data_id), False, - end_call - start_call)) + trace_buffer.put_nowait({ + "name": "DAG", + "id": data_id, + "succ": resp_channeldata.ecode == ChannelDataEcode.OK.value, + "actions": { + "call_{}".format(data_id): end_call - start_call, + }, + }) profile_str = self._profiler.gen_profile_str() if self._server_use_profile: diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 9d4398fb..cc2b7637 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -25,6 +25,12 @@ import sys import collections import numpy as np from numpy import * +if sys.version_info.major == 2: + import Queue +elif sys.version_info.major == 3: + import queue as Queue +else: + raise Exception("Error Python version") from .proto import pipeline_service_pb2 from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, @@ -532,6 +538,7 @@ class Op(object): op_info_prefix=op_info_prefix) start, end = None, None + trace_que = collections.deque() while True: start = int(round(_time() * 1000000)) try: @@ -541,8 +548,7 @@ class Op(object): self._finalize(is_thread_op) break end = int(round(_time() * 1000000)) - if trace_buffer is not None: - trace_buffer.put((self.name, "in", True, end - start)) + in_time = end - start # parse channeldata batch try: @@ -562,8 +568,7 @@ class Op(object): preped_data_dict, err_channeldata_dict \ = self._run_preprocess(parsed_data_dict, op_info_prefix) end = profiler.record("prep#{}_1".format(op_info_prefix)) - if trace_buffer is not None: - trace_buffer.put((self.name, "prep", True, end - start)) + prep_time = end - start try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -583,8 +588,7 @@ class Op(object): midped_data_dict, err_channeldata_dict \ = self._run_process(preped_data_dict, op_info_prefix) end = profiler.record("midp#{}_1".format(op_info_prefix)) - if trace_buffer is not None: - trace_buffer.put((self.name, "midp", True, end - start)) + midp_time = end - start try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -605,8 +609,7 @@ class Op(object): = self._run_postprocess( parsed_data_dict, midped_data_dict, op_info_prefix) end = profiler.record("postp#{}_1".format(op_info_prefix)) - if trace_buffer is not None: - trace_buffer.put((self.name, "postp", True, end - start)) + postp_time = end - start try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -639,8 +642,25 @@ class Op(object): self._finalize(is_thread_op) break end = int(round(_time() * 1000000)) + out_time = end - start if trace_buffer is not None: - trace_buffer.put((self.name, "out", True, end - start)) + trace_que.append({ + "name": self.name, + "actions": { + "in": in_time, + "prep": prep_time, + "midp": midp_time, + "postp": postp_time, + "out": out_time, + } + }) + while trace_que: + info = trace_que[0] + try: + trace_buffer.put_nowait(info) + trace_que.popleft() + except Queue.Full: + break def _initialize(self, is_thread_op, client_type, concurrency_idx): if is_thread_op: diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 3cca6527..e5066163 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -68,26 +68,35 @@ class PerformanceTracer(object): self._channels = channels def _trace_func(self, channels): - actions = ["in", "prep", "midp", "postp", "out"] + all_actions = ["in", "prep", "midp", "postp", "out"] calcu_actions = ["prep", "midp", "postp"] while True: op_cost = {} + err_request = [] err_count = 0 _LOGGER.info("==================== TRACER ======================") # op while True: try: - name, action, stage, cost = self._data_buffer.get_nowait() - if stage == False: - # only for name == DAG - assert name == "DAG" - err_count += 1 + item = self._data_buffer.get_nowait() + name = item["name"] + actions = item["actions"] + + if name == "DAG": + succ = item["succ"] + req_id = item["id"] + if not succ: + err_count += 1 + err_request.append(req_id) + if name not in op_cost: op_cost[name] = {} - if action not in op_cost[name]: - op_cost[name][action] = [] - op_cost[name][action].append(cost) + + for action, cost in actions.items(): + if action not in op_cost[name]: + op_cost[name][action] = [] + op_cost[name][action].append(cost) except Queue.Empty: break @@ -100,7 +109,7 @@ class PerformanceTracer(object): if name != "DAG": _LOGGER.info("Op({}):".format(name)) - for action in actions: + for action in all_actions: if action in op_cost[name]: _LOGGER.info("\t{}[{} ms]".format( action, op_cost[name][action])) @@ -118,10 +127,11 @@ class PerformanceTracer(object): ave_cost = sum(calls) / tot latencys = [50, 60, 70, 80, 90, 95, 99] _LOGGER.info("DAGExecutor:") - _LOGGER.info("\tquery count[{}]".format(tot)) - _LOGGER.info("\tqps[{} q/s]".format(qps)) - _LOGGER.info("\tsucc[{}]".format(1 - 1.0 * err_count / tot)) - _LOGGER.info("\tlatency:") + _LOGGER.info("\tQuery count[{}]".format(tot)) + _LOGGER.info("\tQPS[{} q/s]".format(qps)) + _LOGGER.info("\tSucc[{}]".format(1 - 1.0 * err_count / tot)) + _LOGGER.info("\tError req[{}]".format([", ".join([str(x) for x in err_request])])) + _LOGGER.info("\tLatency:") _LOGGER.info("\t\tave[{} ms]".format(ave_cost)) for latency in latencys: _LOGGER.info("\t\t.{}[{} ms]".format(latency, calls[int( -- GitLab