From 85343b20713c78c7d3e075236006a96fbd70cacb Mon Sep 17 00:00:00 2001 From: barriery Date: Mon, 3 Aug 2020 13:37:09 +0000 Subject: [PATCH] add Tracer --- python/pipeline/channel.py | 6 +++ python/pipeline/dag.py | 24 +++++++--- python/pipeline/operator.py | 56 ++++++++++++---------- python/pipeline/pipeline_server.py | 3 -- python/pipeline/profiler.py | 74 ++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 33 deletions(-) diff --git a/python/pipeline/channel.py b/python/pipeline/channel.py index 67deb530..eb7d054b 100644 --- a/python/pipeline/channel.py +++ b/python/pipeline/channel.py @@ -239,6 +239,9 @@ class ProcessChannel(object): self._base_cursor = manager.Value('i', 0) self._output_buf = manager.list() + def size(self): + return self._que.qsize() + def get_producers(self): return self._producers @@ -530,6 +533,9 @@ class ThreadChannel(Queue.Queue): self._base_cursor = 0 self._output_buf = [] + def size(self): + return self.qsize() + def get_producers(self): return self._producers diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 87aa7dfe..afdffd22 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -28,7 +28,7 @@ import logging from .operator import Op, RequestOp, ResponseOp, VirtualOp from .channel import (ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType, ChannelStopError) -from .profiler import TimeProfiler +from .profiler import TimeProfiler, PerformanceTracer from .util import NameGenerator from .proto import pipeline_service_pb2 @@ -48,13 +48,18 @@ class DAGExecutor(object): self._profiler = TimeProfiler() self._profiler.enable(True) + self._tracer = PerformanceTracer() + self._dag = DAG(self.name, response_op, self._server_use_profile, self._is_thread_op, client_type, channel_size, - build_dag_each_worker) + build_dag_each_worker, self._tracer) (in_channel, out_channel, pack_rpc_func, unpack_rpc_func) = self._dag.build() self._dag.start() + self._tracer.set_channels(self._dag.get_channels()) + self._tracer.start() + self._set_in_channel(in_channel) self._set_out_channel(out_channel) self._pack_rpc_func = pack_rpc_func @@ -74,6 +79,7 @@ class DAGExecutor(object): def start(self): self._recive_func = threading.Thread( target=DAGExecutor._recive_out_channel_func, args=(self, )) + self._recive_func.daemon = True self._recive_func.start() _LOGGER.debug("[DAG Executor] Start recive thread") @@ -205,6 +211,8 @@ class DAGExecutor(object): client_need_profile=client_need_profile) def call(self, rpc_request): + data_buffer = self._tracer.data_buffer() + data_id, cond_v = self._get_next_data_id() _LOGGER.info("(logid={}) Succ generate id".format(data_id)) @@ -214,6 +222,7 @@ class DAGExecutor(object): data_id, data_id)) else: start_call = self._profiler.record("call_{}#DAG_0".format(data_id)) + data_buffer.put(("DAG", "call_{}".format(data_id), 0, start_call)) _LOGGER.debug("(logid={}) Parsing RPC request package".format(data_id)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name)) @@ -262,9 +271,7 @@ class DAGExecutor(object): data_id)) else: end_call = self._profiler.record("call_{}#DAG_1".format(data_id)) - _LOGGER.log(level=1, - msg="(logid={}) call[{} ms]".format( - data_id, (end_call - start_call) / 1e3)) + data_buffer.put(("DAG", "call_{}".format(data_id), 1, end_call)) profile_str = self._profiler.gen_profile_str() if self._server_use_profile: @@ -297,7 +304,7 @@ class DAGExecutor(object): class DAG(object): def __init__(self, request_name, response_op, use_profile, is_thread_op, - client_type, channel_size, build_dag_each_worker): + client_type, channel_size, build_dag_each_worker, tracer): self._request_name = request_name self._response_op = response_op self._use_profile = use_profile @@ -305,6 +312,7 @@ class DAG(object): self._channel_size = channel_size self._client_type = client_type self._build_dag_each_worker = build_dag_each_worker + self._tracer = tracer if not self._is_thread_op: self._manager = multiprocessing.Manager() _LOGGER.info("[DAG] Succ init") @@ -515,6 +523,9 @@ class DAG(object): return (actual_ops, channels, input_channel, output_channel, pack_func, unpack_func) + def get_channels(self): + return self._channels + def build(self): (actual_ops, channels, input_channel, output_channel, pack_func, unpack_func) = self._build_dag(self._response_op) @@ -533,6 +544,7 @@ class DAG(object): self._threads_or_proces = [] for op in self._actual_ops: op.use_profiler(self._use_profile) + op.set_tracer(self._tracer) if self._is_thread_op: self._threads_or_proces.extend( op.start_with_thread(self._client_type)) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 287893f9..56217f7d 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -13,6 +13,7 @@ # limitations under the License. # pylint: disable=doc-string-missing from time import time as _time +import time import threading import multiprocessing from paddle_serving_client import MultiLangClient, Client @@ -97,6 +98,7 @@ class Op(object): self._batch_size, self._auto_batching_timeout))) self._server_use_profile = False + self._tracer = None # only for thread op self._for_init_op_lock = threading.Lock() @@ -118,6 +120,9 @@ class Op(object): def use_profiler(self, use_profile): self._server_use_profile = use_profile + def set_tracer(self, tracer): + self._tracer = tracer + def init_client(self, client_type, client_config, server_endpoints, fetch_names): if self.with_serving == False: @@ -256,7 +261,9 @@ class Op(object): p = multiprocessing.Process( target=self._run, args=(concurrency_idx, self._get_input_channel(), - self._get_output_channels(), client_type, False)) + self._get_output_channels(), client_type, False, + self._tracer.data_buffer())) + p.daemon = True p.start() proces.append(p) return proces @@ -267,7 +274,8 @@ class Op(object): t = threading.Thread( target=self._run, args=(concurrency_idx, self._get_input_channel(), - self._get_output_channels(), client_type, True)) + self._get_output_channels(), client_type, True, + self._tracer.data_buffer())) # When a process exits, it attempts to terminate # all of its daemonic child processes. t.daemon = True @@ -482,7 +490,7 @@ class Op(object): return parsed_data_dict, need_profile_dict, profile_dict def _run(self, concurrency_idx, input_channel, output_channels, client_type, - is_thread_op): + is_thread_op, trace_buffer): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) tid = threading.current_thread().ident @@ -505,16 +513,18 @@ class Op(object): timeout=self._auto_batching_timeout, op_info_prefix=op_info_prefix) - start_prep, end_prep = None, None - start_midp, end_midp = None, None - start_postp, end_postp = None, None + start, end = None, None while True: + start = int(round(_time() * 1000000)) + trace_buffer.put((self.name, "in", 0, start)) try: channeldata_dict_batch = next(batch_generator) except ChannelStopError: _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break + end = int(round(_time() * 1000000)) + trace_buffer.put((self.name, "in", 1, end)) # parse channeldata batch try: @@ -530,14 +540,12 @@ class Op(object): continue # preprecess - start_prep = profiler.record("prep#{}_0".format(op_info_prefix)) + start = profiler.record("prep#{}_0".format(op_info_prefix)) + trace_buffer.put((self.name, "prep", 0, start)) preped_data_dict, err_channeldata_dict \ = self._run_preprocess(parsed_data_dict, op_info_prefix) - end_prep = profiler.record("prep#{}_1".format(op_info_prefix)) - _LOGGER.log(level=1, - msg="(logid={}) {} prep[{} ms]".format( - parsed_data_dict.keys(), op_info_prefix, - (end_prep - start_prep) / 1e3)) + end = profiler.record("prep#{}_1".format(op_info_prefix)) + trace_buffer.put((self.name, "prep", 1, end)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -553,14 +561,12 @@ class Op(object): continue # process - start_midp = profiler.record("midp#{}_0".format(op_info_prefix)) + start = profiler.record("midp#{}_0".format(op_info_prefix)) + trace_buffer.put((self.name, "midp", 0, start)) midped_data_dict, err_channeldata_dict \ = self._run_process(preped_data_dict, op_info_prefix) - end_midp = profiler.record("midp#{}_1".format(op_info_prefix)) - _LOGGER.log(level=1, - msg="(logid={}) {} midp[{} ms]".format( - preped_data_dict.keys(), op_info_prefix, - (end_midp - start_midp) / 1e3)) + end = profiler.record("midp#{}_1".format(op_info_prefix)) + trace_buffer.put((self.name, "midp", 1, end)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -576,15 +582,13 @@ class Op(object): continue # postprocess - start_postp = profiler.record("postp#{}_0".format(op_info_prefix)) + start = profiler.record("postp#{}_0".format(op_info_prefix)) + trace_buffer.put((self.name, "postp", 0, start)) postped_data_dict, err_channeldata_dict \ = self._run_postprocess( parsed_data_dict, midped_data_dict, op_info_prefix) - end_postp = profiler.record("postp#{}_1".format(op_info_prefix)) - _LOGGER.log(level=1, - msg="(logid={}) {} postp[{} ms]".format( - midped_data_dict.keys(), op_info_prefix, - (end_midp - start_midp) / 1e3)) + end = profiler.record("postp#{}_1".format(op_info_prefix)) + trace_buffer.put((self.name, "postp", 1, end)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( @@ -600,6 +604,8 @@ class Op(object): continue # push data to channel (if run succ) + start = int(round(_time() * 1000000)) + trace_buffer.put((self.name, "out", 0, start)) try: profile_str = profiler.gen_profile_str() for data_id, postped_data in postped_data_dict.items(): @@ -615,6 +621,8 @@ class Op(object): _LOGGER.debug("{} Stop.".format(op_info_prefix)) self._finalize(is_thread_op) break + end = int(round(_time() * 1000000)) + trace_buffer.put((self.name, "out", 1, end)) def _initialize(self, is_thread_op, client_type, concurrency_idx): if is_thread_op: diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index 0d64a558..2fba8447 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -41,9 +41,6 @@ class PipelineServicer(pipeline_service_pb2_grpc.PipelineServiceServicer): resp = self._dag_executor.call(request) return resp - def __del__(self): - self._dag_executor.stop() - @contextlib.contextmanager def _reserve_port(port): diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 60e4883a..d6140101 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -23,11 +23,85 @@ elif sys.version_info.major == 3: else: raise Exception("Error Python version") from time import time as _time +import time import threading +import multiprocessing _LOGGER = logging.getLogger() +class PerformanceTracer(object): + def __init__(self, interval_s=1): + self._data_buffer = multiprocessing.Manager().Queue() + self._interval_s = interval_s + self._proc = None + self._channels = [] + self._trace_filename = os.path.join("PipelineServingLogs", "INDEX.log") + + def data_buffer(self): + return self._data_buffer + + def start(self): + self._proc = multiprocessing.Process( + target=self._trace_func, args=(self._channels, )) + self._proc.daemon = True + self._proc.start() + + def set_channels(self, channels): + self._channels = channels + + def _trace_func(self, channels): + trace_file = open(self._trace_filename, "a") + actions = ["prep", "midp", "postp"] + tag_dict = {} + while True: + op_cost = {} + trace_file.write("==========================") + + # op + while not self._data_buffer.empty(): + name, action, stage, timestamp = self._data_buffer.get() + tag = "{}_{}".format(name, action) + if tag in tag_dict: + assert stage == 1 + start_timestamp = tag_dict.pop(tag) + cost = timestamp - start_timestamp + if name not in op_cost: + op_cost[name] = {} + if action not in op_cost[name]: + op_cost[name][action] = [] + op_cost[name][action].append(cost) + else: + assert stage == 0 + tag_dict[tag] = timestamp + + for name in op_cost: + tot_cost, cal_cost = 0.0, 0.0 + for action, costs in op_cost[name].items(): + op_cost[name][action] = sum(costs) / (1e3 * len(costs)) + tot_cost += op_cost[name][action] + + msg = ", ".join([ + "{}[{} ms]".format(action, cost) + for action, cost in op_cost[name].items() + ]) + + for action in actions: + if action in op_cost[name]: + cal_cost += op_cost[name][action] + + trace_file.write("Op({}) {}".format(name, msg)) + if name != "DAG": + trace_file.write("Op({}) idle[{}]".format( + name, 1 - 1.0 * cal_cost / tot_cost)) + + # channel + for channel in channels: + trace_file.write("Channel({}) size[{}]".format(channel.name, + channel.size())) + time.sleep(self._interval_s) + + class UnsafeTimeProfiler(object): """ thread unsafe profiler """ -- GitLab