diff --git a/python/examples/util/timeline_trace.py b/python/examples/util/timeline_trace.py index 144c21cb4458cf8f73fa9e198617b735970897bd..b5cfb519c64bd5108cd3afb790659670a316eb0e 100644 --- a/python/examples/util/timeline_trace.py +++ b/python/examples/util/timeline_trace.py @@ -16,10 +16,16 @@ def prase(pid_str, time_str, counter): if len(name_list) == 2: name = name_list[0] else: - name = name_list[0] + "_" + name_list[1] + name = "_".join(name_list[:-1]) + name_list = name.split("#") + if len(name_list) > 1: + tid = name_list[-1] + name = "#".join(name_list[:-1]) + else: + tid = 0 event_dict = {} event_dict["name"] = name - event_dict["tid"] = 0 + event_dict["tid"] = tid event_dict["pid"] = pid event_dict["ts"] = ts event_dict["ph"] = ph diff --git a/python/paddle_serving_server_gpu/pyserver.py b/python/paddle_serving_server_gpu/pyserver.py index 97c39bc98a5f50806fe17321a00e03083b9289ba..1ba7d84b10101d8ce5980531159cecff26a1ff08 100644 --- a/python/paddle_serving_server_gpu/pyserver.py +++ b/python/paddle_serving_server_gpu/pyserver.py @@ -813,10 +813,11 @@ class Op(object): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - _profiler.record("{}-get_0".format(op_info_prefix)) + _profiler.record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - _profiler.record("{}-get_1".format(op_info_prefix)) + _profiler.record("{}-get#{}_1".format(op_info_prefix, tid)) logging.debug(log("input_data: {}".format(channeldata))) data_id, error_channeldata = self._parse_channeldata(channeldata) @@ -829,9 +830,9 @@ class Op(object): # preprecess try: - _profiler.record("{}-prep_0".format(op_info_prefix)) + _profiler.record("{}-prep#{}_0".format(op_info_prefix, tid)) preped_data = self.preprocess(channeldata) - _profiler.record("{}-prep_1".format(op_info_prefix)) + _profiler.record("{}-prep#{}_1".format(op_info_prefix, tid)) except NotImplementedError as e: # preprocess function not implemented error_info = log(e) @@ -869,7 +870,7 @@ class Op(object): midped_data = None if self.with_serving: ecode = ChannelDataEcode.OK.value - _profiler.record("{}-midp_0".format(op_info_prefix)) + _profiler.record("{}-midp#{}_0".format(op_info_prefix, tid)) if self._timeout <= 0: try: midped_data = self.midprocess(preped_data, use_future) @@ -906,13 +907,13 @@ class Op(object): data_id=data_id), output_channels) continue - _profiler.record("{}-midp_1".format(op_info_prefix)) + _profiler.record("{}-midp#{}_1".format(op_info_prefix, tid)) else: midped_data = preped_data # postprocess output_data = None - _profiler.record("{}-postp_0".format(op_info_prefix)) + _profiler.record("{}-postp#{}_0".format(op_info_prefix, tid)) if self.with_serving and client_type == 'grpc' and use_future: # use call_future output_data = ChannelData( @@ -949,12 +950,12 @@ class Op(object): ChannelDataType.CHANNEL_NPDATA.value, npdata=postped_data, data_id=data_id) - _profiler.record("{}-postp_1".format(op_info_prefix)) + _profiler.record("{}-postp#{}_1".format(op_info_prefix, tid)) # push data to channel (if run succ) - _profiler.record("{}-push_0".format(op_info_prefix)) + _profiler.record("{}-push#{}_0".format(op_info_prefix, tid)) self._push_to_output_channels(output_data, output_channels) - _profiler.record("{}-push_1".format(op_info_prefix)) + _profiler.record("{}-push#{}_1".format(op_info_prefix, tid)) def _log(self, info): return "{} {}".format(self.name, info) @@ -994,12 +995,13 @@ class VirtualOp(Op): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - _profiler.record("{}-get_0".format(op_info_prefix)) + _profiler.record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - _profiler.record("{}-get_1".format(op_info_prefix)) + _profiler.record("{}-get#{}_1".format(op_info_prefix, tid)) - _profiler.record("{}-push_0".format(op_info_prefix)) + _profiler.record("{}-push#{}_0".format(op_info_prefix, tid)) if isinstance(channeldata, dict): for name, data in channeldata.items(): self._push_to_output_channels( @@ -1009,7 +1011,7 @@ class VirtualOp(Op): channeldata, channels=output_channels, name=self._virtual_pred_ops[0].name) - _profiler.record("{}-push_1".format(op_info_prefix)) + _profiler.record("{}-push#{}_1".format(op_info_prefix, tid)) class GeneralPythonService( @@ -1032,6 +1034,7 @@ class GeneralPythonService( self._recive_func = threading.Thread( target=GeneralPythonService._recive_out_channel_func, args=(self, )) self._recive_func.start() + self._tid = threading.current_thread().ident def _log(self, info_str): return "[{}] {}".format(self.name, info_str) @@ -1133,21 +1136,21 @@ class GeneralPythonService( return resp def inference(self, request, context): - _profiler.record("{}-prepack_0".format(self.name)) + _profiler.record("{}-prepack#{}_0".format(self.name, self._tid)) data, data_id = self._pack_data_for_infer(request) - _profiler.record("{}-prepack_1".format(self.name)) + _profiler.record("{}-prepack#{}_1".format(self.name, self._tid)) resp_channeldata = None for i in range(self._retry): logging.debug(self._log('push data')) - _profiler.record("{}-push_0".format(self.name)) + _profiler.record("{}-push#{}_0".format(self.name, self._tid)) self._in_channel.push(data, self.name) - _profiler.record("{}-push_1".format(self.name)) + _profiler.record("{}-push#{}_1".format(self.name, self._tid)) logging.debug(self._log('wait for infer')) - _profiler.record("{}-fetch_0".format(self.name)) + _profiler.record("{}-fetch#{}_0".format(self.name, self._tid)) resp_channeldata = self._get_data_in_globel_resp_dict(data_id) - _profiler.record("{}-fetch_1".format(self.name)) + _profiler.record("{}-fetch#{}_1".format(self.name, self._tid)) if resp_channeldata.ecode == ChannelDataEcode.OK.value: break @@ -1155,9 +1158,9 @@ class GeneralPythonService( logging.warn("retry({}): {}".format( i + 1, resp_channeldata.error_info)) - _profiler.record("{}-postpack_0".format(self.name)) + _profiler.record("{}-postpack#{}_0".format(self.name, self._tid)) resp = self._pack_data_for_resp(resp_channeldata) - _profiler.record("{}-postpack_1".format(self.name)) + _profiler.record("{}-postpack#{}_1".format(self.name, self._tid)) _profiler.print_profile() return resp diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 1c6eed189162a19982d5c458094d36abdd79bc17..5831081921e7594877054fe85370b7f15a2aa79a 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -203,10 +203,11 @@ class Op(object): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - self._profiler_record("{}-get_0".format(op_info_prefix)) + self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - self._profiler_record("{}-get_1".format(op_info_prefix)) + self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid)) logging.debug(log("input_data: {}".format(channeldata))) data_id, error_channeldata = self._parse_channeldata(channeldata) @@ -219,9 +220,11 @@ class Op(object): # preprecess try: - self._profiler_record("{}-prep_0".format(op_info_prefix)) + self._profiler_record("{}-prep#{}_0".format(op_info_prefix, + tid)) preped_data = self.preprocess(channeldata) - self._profiler_record("{}-prep_1".format(op_info_prefix)) + self._profiler_record("{}-prep#{}_1".format(op_info_prefix, + tid)) except NotImplementedError as e: # preprocess function not implemented error_info = log(e) @@ -259,7 +262,8 @@ class Op(object): midped_data = None if self.with_serving: ecode = ChannelDataEcode.OK.value - self._profiler_record("{}-midp_0".format(op_info_prefix)) + self._profiler_record("{}-midp#{}_0".format(op_info_prefix, + tid)) if self._timeout <= 0: try: midped_data = self.midprocess(preped_data, use_future) @@ -296,13 +300,14 @@ class Op(object): data_id=data_id), output_channels) continue - self._profiler_record("{}-midp_1".format(op_info_prefix)) + self._profiler_record("{}-midp#{}_1".format(op_info_prefix, + tid)) else: midped_data = preped_data # postprocess output_data = None - self._profiler_record("{}-postp_0".format(op_info_prefix)) + self._profiler_record("{}-postp#{}_0".format(op_info_prefix, tid)) if self.with_serving and client_type == 'grpc' and use_future: # use call_future output_data = ChannelData( @@ -339,12 +344,12 @@ class Op(object): ChannelDataType.CHANNEL_NPDATA.value, npdata=postped_data, data_id=data_id) - self._profiler_record("{}-postp_1".format(op_info_prefix)) + self._profiler_record("{}-postp#{}_1".format(op_info_prefix, tid)) # push data to channel (if run succ) - self._profiler_record("{}-push_0".format(op_info_prefix)) + self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid)) self._push_to_output_channels(output_data, output_channels) - self._profiler_record("{}-push_1".format(op_info_prefix)) + self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid)) def _log(self, info): return "{} {}".format(self.name, info) @@ -385,11 +390,11 @@ class VirtualOp(Op): log = self._get_log_func(op_info_prefix) self._is_run = True while self._is_run: - self._profiler_record("{}-get_0".format(op_info_prefix)) + self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - self._profiler_record("{}-get_1".format(op_info_prefix)) + self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid)) - self._profiler_record("{}-push_0".format(op_info_prefix)) + self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid)) if isinstance(channeldata, dict): for name, data in channeldata.items(): self._push_to_output_channels( @@ -399,4 +404,4 @@ class VirtualOp(Op): channeldata, channels=output_channels, name=self._virtual_pred_ops[0].name) - self._profiler_record("{}-push_1".format(op_info_prefix)) + self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid))