diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index a45ec92f12d35749152ee037aa73be61cf1208b4..ac8caf372278f8225d6d1dc193f7b4c6e2dc5514 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -46,7 +46,7 @@ class DAGExecutor(object): raise Exception( "profile cannot be used in multiprocess version temporarily") - self.name = "#G" + self.name = "@G" self._profiler = TimeProfiler() self._profiler.enable(use_profile) @@ -149,23 +149,23 @@ class DAGExecutor(object): data_id=data_id), data_id def call(self, rpc_request): - self._profiler.record("dag-call_0".format(self.name)) + self._profiler.record("call#DAG_0") - self._profiler.record("{}-prepack_0".format(self.name)) + self._profiler.record("prepack#{}_0".format(self.name)) req_channeldata, data_id = self._pack_channeldata(rpc_request) - self._profiler.record("{}-prepack_1".format(self.name)) + self._profiler.record("prepack#{}_1".format(self.name)) resp_channeldata = None for i in range(self._retry): _LOGGER.debug(self._log('push data')) - #self._profiler.record("{}-push_0".format(self.name)) + #self._profiler.record("push#{}_0".format(self.name)) self._in_channel.push(req_channeldata, self.name) - #self._profiler.record("{}-push_1".format(self.name)) + #self._profiler.record("push#{}_1".format(self.name)) _LOGGER.debug(self._log('wait for infer')) - #self._profiler.record("{}-fetch_0".format(self.name)) + #self._profiler.record("fetch#{}_0".format(self.name)) resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) - #self._profiler.record("{}-fetch_1".format(self.name)) + #self._profiler.record("fetch#{}_1".format(self.name)) if resp_channeldata.ecode == ChannelDataEcode.OK.value: break @@ -173,11 +173,11 @@ class DAGExecutor(object): _LOGGER.warn("retry({}): {}".format( i + 1, resp_channeldata.error_info)) - self._profiler.record("{}-postpack_0".format(self.name)) + self._profiler.record("postpack#{}_0".format(self.name)) rpc_resp = self._pack_for_rpc_resp(resp_channeldata) - self._profiler.record("{}-postpack_1".format(self.name)) + self._profiler.record("postpack#{}_1".format(self.name)) - self._profiler.record("dag-call_1".format(self.name)) + self._profiler.record("call#DAG_1") self._profiler.print_profile() return rpc_resp diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index c228213d8b5cbacb70c1ee1e788ca5c769d82802..d10cd6d83e0cba0c5053383beb95301ec0d8a975 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -364,9 +364,9 @@ class Op(object): self._is_run = True while self._is_run: - #self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) + #self._profiler_record("get#{}_0".format(op_info_prefix)) channeldata_dict = input_channel.front(self.name) - #self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid)) + #self._profiler_record("get#{}_1".format(op_info_prefix)) _LOGGER.debug(log("input_data: {}".format(channeldata_dict))) data_id, error_channeldata, parsed_data = self._parse_channeldata( @@ -378,39 +378,39 @@ class Op(object): continue # preprecess - self._profiler_record("{}-prep#{}_0".format(op_info_prefix, tid)) + self._profiler_record("prep#{}_0".format(op_info_prefix)) preped_data, error_channeldata = self._run_preprocess( parsed_data, data_id, private_obj, log) - self._profiler_record("{}-prep#{}_1".format(op_info_prefix, tid)) + self._profiler_record("prep#{}_1".format(op_info_prefix)) if error_channeldata is not None: self._push_to_output_channels(error_channeldata, output_channels) continue # process - self._profiler_record("{}-midp#{}_0".format(op_info_prefix, tid)) + self._profiler_record("midp#{}_0".format(op_info_prefix)) midped_data, error_channeldata = self._run_process( client_predict_handler, preped_data, data_id, private_obj, log) - self._profiler_record("{}-midp#{}_1".format(op_info_prefix, tid)) + self._profiler_record("midp#{}_1".format(op_info_prefix)) if error_channeldata is not None: self._push_to_output_channels(error_channeldata, output_channels) continue # postprocess - self._profiler_record("{}-postp#{}_0".format(op_info_prefix, tid)) + self._profiler_record("postp#{}_0".format(op_info_prefix)) output_data, error_channeldata = self._run_postprocess( parsed_data, midped_data, data_id, private_obj, log) - self._profiler_record("{}-postp#{}_1".format(op_info_prefix, tid)) + self._profiler_record("postp#{}_1".format(op_info_prefix)) if error_channeldata is not None: self._push_to_output_channels(error_channeldata, output_channels) continue # push data to channel (if run succ) - #self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid)) + #self._profiler_record("push#{}_0".format(op_info_prefix)) self._push_to_output_channels(output_data, output_channels) - #self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid)) + #self._profiler_record("push#{}_1".format(op_info_prefix)) def _log(self, info): return "{} {}".format(self.name, info) @@ -420,9 +420,9 @@ class RequestOp(Op): """ RequestOp do not run preprocess, process, postprocess. """ def __init__(self, concurrency=1): - # PipelineService.name = "#G" + # PipelineService.name = "@G" super(RequestOp, self).__init__( - name="#G", input_ops=[], concurrency=concurrency) + name="@G", input_ops=[], concurrency=concurrency) # init op try: self.init_op() @@ -447,7 +447,7 @@ class ResponseOp(Op): def __init__(self, input_ops, concurrency=1): super(ResponseOp, self).__init__( - name="#R", input_ops=input_ops, concurrency=concurrency) + name="@R", input_ops=input_ops, concurrency=concurrency) # init op try: self.init_op() @@ -530,12 +530,12 @@ class VirtualOp(Op): self._is_run = True while self._is_run: - #self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) + #self._profiler_record("get#{}_0".format(op_info_prefix)) channeldata_dict = input_channel.front(self.name) - #self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid)) + #self._profiler_record("get#{}_1".format(op_info_prefix)) - #self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid)) + #self._profiler_record("push#{}_0".format(op_info_prefix)) for name, data in channeldata_dict.items(): self._push_to_output_channels( data, channels=output_channels, name=name) - #self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid)) + #self._profiler_record("push#{}_1".format(op_info_prefix))