diff --git a/python/examples/util/timeline_trace.py b/python/examples/util/timeline_trace.py index 144c21cb4458cf8f73fa9e198617b735970897bd..b5cfb519c64bd5108cd3afb790659670a316eb0e 100644 --- a/python/examples/util/timeline_trace.py +++ b/python/examples/util/timeline_trace.py @@ -16,10 +16,16 @@ def prase(pid_str, time_str, counter): if len(name_list) == 2: name = name_list[0] else: - name = name_list[0] + "_" + name_list[1] + name = "_".join(name_list[:-1]) + name_list = name.split("#") + if len(name_list) > 1: + tid = name_list[-1] + name = "#".join(name_list[:-1]) + else: + tid = 0 event_dict = {} event_dict["name"] = name - event_dict["tid"] = 0 + event_dict["tid"] = tid event_dict["pid"] = pid event_dict["ts"] = ts event_dict["ph"] = ph diff --git a/python/paddle_serving_server/pyserver.py b/python/paddle_serving_server/pyserver.py index 6d0bf49d5257997daacef48bede9e7b06e7c814d..2dec88cd3638d41a449f167aed468c62314ef072 100644 --- a/python/paddle_serving_server/pyserver.py +++ b/python/paddle_serving_server/pyserver.py @@ -37,6 +37,7 @@ import time import func_timeout import enum import collections +import copy class _TimeProfiler(object): @@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue): self._cv.notify_all() logging.debug(self._log("multi | {} get data succ!".format(op_name))) - return resp # reference, read only + return copy.deepcopy(resp) # reference, read only def stop(self): #TODO @@ -811,10 +812,11 @@ class Op(object): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - _profiler.record("{}-get_0".format(op_info_prefix)) + _profiler.record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - _profiler.record("{}-get_1".format(op_info_prefix)) + _profiler.record("{}-get#{}_1".format(op_info_prefix, tid)) logging.debug(log("input_data: {}".format(channeldata))) data_id, error_channeldata = self._parse_channeldata(channeldata) @@ -827,9 +829,9 @@ class Op(object): # preprecess try: - _profiler.record("{}-prep_0".format(op_info_prefix)) + _profiler.record("{}-prep#{}_0".format(op_info_prefix, tid)) preped_data = self.preprocess(channeldata) - _profiler.record("{}-prep_1".format(op_info_prefix)) + _profiler.record("{}-prep#{}_1".format(op_info_prefix, tid)) except NotImplementedError as e: # preprocess function not implemented error_info = log(e) @@ -867,7 +869,7 @@ class Op(object): midped_data = None if self.with_serving: ecode = ChannelDataEcode.OK.value - _profiler.record("{}-midp_0".format(op_info_prefix)) + _profiler.record("{}-midp#{}_0".format(op_info_prefix, tid)) if self._timeout <= 0: try: midped_data = self.midprocess(preped_data, use_future) @@ -904,13 +906,13 @@ class Op(object): data_id=data_id), output_channels) continue - _profiler.record("{}-midp_1".format(op_info_prefix)) + _profiler.record("{}-midp#{}_1".format(op_info_prefix, tid)) else: midped_data = preped_data # postprocess output_data = None - _profiler.record("{}-postp_0".format(op_info_prefix)) + _profiler.record("{}-postp#{}_0".format(op_info_prefix, tid)) if self.with_serving and client_type == 'grpc' and use_future: # use call_future output_data = ChannelData( @@ -947,12 +949,12 @@ class Op(object): ChannelDataType.CHANNEL_NPDATA.value, npdata=postped_data, data_id=data_id) - _profiler.record("{}-postp_1".format(op_info_prefix)) + _profiler.record("{}-postp#{}_1".format(op_info_prefix, tid)) # push data to channel (if run succ) - _profiler.record("{}-push_0".format(op_info_prefix)) + _profiler.record("{}-push#{}_0".format(op_info_prefix, tid)) self._push_to_output_channels(output_data, output_channels) - _profiler.record("{}-push_1".format(op_info_prefix)) + _profiler.record("{}-push#{}_1".format(op_info_prefix, tid)) def _log(self, info): return "{} {}".format(self.name, info) @@ -991,12 +993,13 @@ class VirtualOp(Op): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - _profiler.record("{}-get_0".format(op_info_prefix)) + _profiler.record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - _profiler.record("{}-get_1".format(op_info_prefix)) + _profiler.record("{}-get#{}_1".format(op_info_prefix, tid)) - _profiler.record("{}-push_0".format(op_info_prefix)) + _profiler.record("{}-push#{}_0".format(op_info_prefix, tid)) if isinstance(channeldata, dict): for name, data in channeldata.items(): self._push_to_output_channels( @@ -1006,7 +1009,7 @@ class VirtualOp(Op): channeldata, channels=output_channels, name=self._virtual_pred_ops[0].name) - _profiler.record("{}-push_1".format(op_info_prefix)) + _profiler.record("{}-push#{}_1".format(op_info_prefix, tid)) class GeneralPythonService( @@ -1029,6 +1032,7 @@ class GeneralPythonService( self._recive_func = threading.Thread( target=GeneralPythonService._recive_out_channel_func, args=(self, )) self._recive_func.start() + self._tid = threading.current_thread().ident def _log(self, info_str): return "[{}] {}".format(self.name, info_str) @@ -1130,21 +1134,21 @@ class GeneralPythonService( return resp def inference(self, request, context): - _profiler.record("{}-prepack_0".format(self.name)) + _profiler.record("{}-prepack#{}_0".format(self.name, self._tid)) data, data_id = self._pack_data_for_infer(request) - _profiler.record("{}-prepack_1".format(self.name)) + _profiler.record("{}-prepack#{}_1".format(self.name, self._tid)) resp_channeldata = None for i in range(self._retry): logging.debug(self._log('push data')) - _profiler.record("{}-push_0".format(self.name)) + _profiler.record("{}-push#{}_0".format(self.name, self._tid)) self._in_channel.push(data, self.name) - _profiler.record("{}-push_1".format(self.name)) + _profiler.record("{}-push#{}_1".format(self.name, self._tid)) logging.debug(self._log('wait for infer')) - _profiler.record("{}-fetch_0".format(self.name)) + _profiler.record("{}-fetch#{}_0".format(self.name, self._tid)) resp_channeldata = self._get_data_in_globel_resp_dict(data_id) - _profiler.record("{}-fetch_1".format(self.name)) + _profiler.record("{}-fetch#{}_1".format(self.name, self._tid)) if resp_channeldata.ecode == ChannelDataEcode.OK.value: break @@ -1152,9 +1156,9 @@ class GeneralPythonService( logging.warn("retry({}): {}".format( i + 1, resp_channeldata.error_info)) - _profiler.record("{}-postpack_0".format(self.name)) + _profiler.record("{}-postpack#{}_0".format(self.name, self._tid)) resp = self._pack_data_for_resp(resp_channeldata) - _profiler.record("{}-postpack_1".format(self.name)) + _profiler.record("{}-postpack#{}_1".format(self.name, self._tid)) _profiler.print_profile() return resp diff --git a/python/paddle_serving_server_gpu/pyserver.py b/python/paddle_serving_server_gpu/pyserver.py index 6d0bf49d5257997daacef48bede9e7b06e7c814d..2dec88cd3638d41a449f167aed468c62314ef072 100644 --- a/python/paddle_serving_server_gpu/pyserver.py +++ b/python/paddle_serving_server_gpu/pyserver.py @@ -37,6 +37,7 @@ import time import func_timeout import enum import collections +import copy class _TimeProfiler(object): @@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue): self._cv.notify_all() logging.debug(self._log("multi | {} get data succ!".format(op_name))) - return resp # reference, read only + return copy.deepcopy(resp) # reference, read only def stop(self): #TODO @@ -811,10 +812,11 @@ class Op(object): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - _profiler.record("{}-get_0".format(op_info_prefix)) + _profiler.record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - _profiler.record("{}-get_1".format(op_info_prefix)) + _profiler.record("{}-get#{}_1".format(op_info_prefix, tid)) logging.debug(log("input_data: {}".format(channeldata))) data_id, error_channeldata = self._parse_channeldata(channeldata) @@ -827,9 +829,9 @@ class Op(object): # preprecess try: - _profiler.record("{}-prep_0".format(op_info_prefix)) + _profiler.record("{}-prep#{}_0".format(op_info_prefix, tid)) preped_data = self.preprocess(channeldata) - _profiler.record("{}-prep_1".format(op_info_prefix)) + _profiler.record("{}-prep#{}_1".format(op_info_prefix, tid)) except NotImplementedError as e: # preprocess function not implemented error_info = log(e) @@ -867,7 +869,7 @@ class Op(object): midped_data = None if self.with_serving: ecode = ChannelDataEcode.OK.value - _profiler.record("{}-midp_0".format(op_info_prefix)) + _profiler.record("{}-midp#{}_0".format(op_info_prefix, tid)) if self._timeout <= 0: try: midped_data = self.midprocess(preped_data, use_future) @@ -904,13 +906,13 @@ class Op(object): data_id=data_id), output_channels) continue - _profiler.record("{}-midp_1".format(op_info_prefix)) + _profiler.record("{}-midp#{}_1".format(op_info_prefix, tid)) else: midped_data = preped_data # postprocess output_data = None - _profiler.record("{}-postp_0".format(op_info_prefix)) + _profiler.record("{}-postp#{}_0".format(op_info_prefix, tid)) if self.with_serving and client_type == 'grpc' and use_future: # use call_future output_data = ChannelData( @@ -947,12 +949,12 @@ class Op(object): ChannelDataType.CHANNEL_NPDATA.value, npdata=postped_data, data_id=data_id) - _profiler.record("{}-postp_1".format(op_info_prefix)) + _profiler.record("{}-postp#{}_1".format(op_info_prefix, tid)) # push data to channel (if run succ) - _profiler.record("{}-push_0".format(op_info_prefix)) + _profiler.record("{}-push#{}_0".format(op_info_prefix, tid)) self._push_to_output_channels(output_data, output_channels) - _profiler.record("{}-push_1".format(op_info_prefix)) + _profiler.record("{}-push#{}_1".format(op_info_prefix, tid)) def _log(self, info): return "{} {}".format(self.name, info) @@ -991,12 +993,13 @@ class VirtualOp(Op): op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) log = self._get_log_func(op_info_prefix) self._is_run = True + tid = threading.current_thread().ident while self._is_run: - _profiler.record("{}-get_0".format(op_info_prefix)) + _profiler.record("{}-get#{}_0".format(op_info_prefix, tid)) channeldata = input_channel.front(self.name) - _profiler.record("{}-get_1".format(op_info_prefix)) + _profiler.record("{}-get#{}_1".format(op_info_prefix, tid)) - _profiler.record("{}-push_0".format(op_info_prefix)) + _profiler.record("{}-push#{}_0".format(op_info_prefix, tid)) if isinstance(channeldata, dict): for name, data in channeldata.items(): self._push_to_output_channels( @@ -1006,7 +1009,7 @@ class VirtualOp(Op): channeldata, channels=output_channels, name=self._virtual_pred_ops[0].name) - _profiler.record("{}-push_1".format(op_info_prefix)) + _profiler.record("{}-push#{}_1".format(op_info_prefix, tid)) class GeneralPythonService( @@ -1029,6 +1032,7 @@ class GeneralPythonService( self._recive_func = threading.Thread( target=GeneralPythonService._recive_out_channel_func, args=(self, )) self._recive_func.start() + self._tid = threading.current_thread().ident def _log(self, info_str): return "[{}] {}".format(self.name, info_str) @@ -1130,21 +1134,21 @@ class GeneralPythonService( return resp def inference(self, request, context): - _profiler.record("{}-prepack_0".format(self.name)) + _profiler.record("{}-prepack#{}_0".format(self.name, self._tid)) data, data_id = self._pack_data_for_infer(request) - _profiler.record("{}-prepack_1".format(self.name)) + _profiler.record("{}-prepack#{}_1".format(self.name, self._tid)) resp_channeldata = None for i in range(self._retry): logging.debug(self._log('push data')) - _profiler.record("{}-push_0".format(self.name)) + _profiler.record("{}-push#{}_0".format(self.name, self._tid)) self._in_channel.push(data, self.name) - _profiler.record("{}-push_1".format(self.name)) + _profiler.record("{}-push#{}_1".format(self.name, self._tid)) logging.debug(self._log('wait for infer')) - _profiler.record("{}-fetch_0".format(self.name)) + _profiler.record("{}-fetch#{}_0".format(self.name, self._tid)) resp_channeldata = self._get_data_in_globel_resp_dict(data_id) - _profiler.record("{}-fetch_1".format(self.name)) + _profiler.record("{}-fetch#{}_1".format(self.name, self._tid)) if resp_channeldata.ecode == ChannelDataEcode.OK.value: break @@ -1152,9 +1156,9 @@ class GeneralPythonService( logging.warn("retry({}): {}".format( i + 1, resp_channeldata.error_info)) - _profiler.record("{}-postpack_0".format(self.name)) + _profiler.record("{}-postpack#{}_0".format(self.name, self._tid)) resp = self._pack_data_for_resp(resp_channeldata) - _profiler.record("{}-postpack_1".format(self.name)) + _profiler.record("{}-postpack#{}_1".format(self.name, self._tid)) _profiler.print_profile() return resp