提交 9acdf042 编写于 作者: B barrierye

update profiler

上级 dc6be190
...@@ -46,7 +46,7 @@ class DAGExecutor(object): ...@@ -46,7 +46,7 @@ class DAGExecutor(object):
raise Exception( raise Exception(
"profile cannot be used in multiprocess version temporarily") "profile cannot be used in multiprocess version temporarily")
self.name = "#G" self.name = "@G"
self._profiler = TimeProfiler() self._profiler = TimeProfiler()
self._profiler.enable(use_profile) self._profiler.enable(use_profile)
...@@ -149,23 +149,23 @@ class DAGExecutor(object): ...@@ -149,23 +149,23 @@ class DAGExecutor(object):
data_id=data_id), data_id data_id=data_id), data_id
def call(self, rpc_request): def call(self, rpc_request):
self._profiler.record("dag-call_0".format(self.name)) self._profiler.record("call#DAG_0")
self._profiler.record("{}-prepack_0".format(self.name)) self._profiler.record("prepack#{}_0".format(self.name))
req_channeldata, data_id = self._pack_channeldata(rpc_request) req_channeldata, data_id = self._pack_channeldata(rpc_request)
self._profiler.record("{}-prepack_1".format(self.name)) self._profiler.record("prepack#{}_1".format(self.name))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
_LOGGER.debug(self._log('push data')) _LOGGER.debug(self._log('push data'))
#self._profiler.record("{}-push_0".format(self.name)) #self._profiler.record("push#{}_0".format(self.name))
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
#self._profiler.record("{}-push_1".format(self.name)) #self._profiler.record("push#{}_1".format(self.name))
_LOGGER.debug(self._log('wait for infer')) _LOGGER.debug(self._log('wait for infer'))
#self._profiler.record("{}-fetch_0".format(self.name)) #self._profiler.record("fetch#{}_0".format(self.name))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id)
#self._profiler.record("{}-fetch_1".format(self.name)) #self._profiler.record("fetch#{}_1".format(self.name))
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break break
...@@ -173,11 +173,11 @@ class DAGExecutor(object): ...@@ -173,11 +173,11 @@ class DAGExecutor(object):
_LOGGER.warn("retry({}): {}".format( _LOGGER.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info)) i + 1, resp_channeldata.error_info))
self._profiler.record("{}-postpack_0".format(self.name)) self._profiler.record("postpack#{}_0".format(self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("{}-postpack_1".format(self.name)) self._profiler.record("postpack#{}_1".format(self.name))
self._profiler.record("dag-call_1".format(self.name)) self._profiler.record("call#DAG_1")
self._profiler.print_profile() self._profiler.print_profile()
return rpc_resp return rpc_resp
......
...@@ -364,9 +364,9 @@ class Op(object): ...@@ -364,9 +364,9 @@ class Op(object):
self._is_run = True self._is_run = True
while self._is_run: while self._is_run:
#self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) #self._profiler_record("get#{}_0".format(op_info_prefix))
channeldata_dict = input_channel.front(self.name) channeldata_dict = input_channel.front(self.name)
#self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid)) #self._profiler_record("get#{}_1".format(op_info_prefix))
_LOGGER.debug(log("input_data: {}".format(channeldata_dict))) _LOGGER.debug(log("input_data: {}".format(channeldata_dict)))
data_id, error_channeldata, parsed_data = self._parse_channeldata( data_id, error_channeldata, parsed_data = self._parse_channeldata(
...@@ -378,39 +378,39 @@ class Op(object): ...@@ -378,39 +378,39 @@ class Op(object):
continue continue
# preprecess # preprecess
self._profiler_record("{}-prep#{}_0".format(op_info_prefix, tid)) self._profiler_record("prep#{}_0".format(op_info_prefix))
preped_data, error_channeldata = self._run_preprocess( preped_data, error_channeldata = self._run_preprocess(
parsed_data, data_id, private_obj, log) parsed_data, data_id, private_obj, log)
self._profiler_record("{}-prep#{}_1".format(op_info_prefix, tid)) self._profiler_record("prep#{}_1".format(op_info_prefix))
if error_channeldata is not None: if error_channeldata is not None:
self._push_to_output_channels(error_channeldata, self._push_to_output_channels(error_channeldata,
output_channels) output_channels)
continue continue
# process # process
self._profiler_record("{}-midp#{}_0".format(op_info_prefix, tid)) self._profiler_record("midp#{}_0".format(op_info_prefix))
midped_data, error_channeldata = self._run_process( midped_data, error_channeldata = self._run_process(
client_predict_handler, preped_data, data_id, private_obj, log) client_predict_handler, preped_data, data_id, private_obj, log)
self._profiler_record("{}-midp#{}_1".format(op_info_prefix, tid)) self._profiler_record("midp#{}_1".format(op_info_prefix))
if error_channeldata is not None: if error_channeldata is not None:
self._push_to_output_channels(error_channeldata, self._push_to_output_channels(error_channeldata,
output_channels) output_channels)
continue continue
# postprocess # postprocess
self._profiler_record("{}-postp#{}_0".format(op_info_prefix, tid)) self._profiler_record("postp#{}_0".format(op_info_prefix))
output_data, error_channeldata = self._run_postprocess( output_data, error_channeldata = self._run_postprocess(
parsed_data, midped_data, data_id, private_obj, log) parsed_data, midped_data, data_id, private_obj, log)
self._profiler_record("{}-postp#{}_1".format(op_info_prefix, tid)) self._profiler_record("postp#{}_1".format(op_info_prefix))
if error_channeldata is not None: if error_channeldata is not None:
self._push_to_output_channels(error_channeldata, self._push_to_output_channels(error_channeldata,
output_channels) output_channels)
continue continue
# push data to channel (if run succ) # push data to channel (if run succ)
#self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid)) #self._profiler_record("push#{}_0".format(op_info_prefix))
self._push_to_output_channels(output_data, output_channels) self._push_to_output_channels(output_data, output_channels)
#self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid)) #self._profiler_record("push#{}_1".format(op_info_prefix))
def _log(self, info): def _log(self, info):
return "{} {}".format(self.name, info) return "{} {}".format(self.name, info)
...@@ -420,9 +420,9 @@ class RequestOp(Op): ...@@ -420,9 +420,9 @@ class RequestOp(Op):
""" RequestOp do not run preprocess, process, postprocess. """ """ RequestOp do not run preprocess, process, postprocess. """
def __init__(self, concurrency=1): def __init__(self, concurrency=1):
# PipelineService.name = "#G" # PipelineService.name = "@G"
super(RequestOp, self).__init__( super(RequestOp, self).__init__(
name="#G", input_ops=[], concurrency=concurrency) name="@G", input_ops=[], concurrency=concurrency)
# init op # init op
try: try:
self.init_op() self.init_op()
...@@ -447,7 +447,7 @@ class ResponseOp(Op): ...@@ -447,7 +447,7 @@ class ResponseOp(Op):
def __init__(self, input_ops, concurrency=1): def __init__(self, input_ops, concurrency=1):
super(ResponseOp, self).__init__( super(ResponseOp, self).__init__(
name="#R", input_ops=input_ops, concurrency=concurrency) name="@R", input_ops=input_ops, concurrency=concurrency)
# init op # init op
try: try:
self.init_op() self.init_op()
...@@ -530,12 +530,12 @@ class VirtualOp(Op): ...@@ -530,12 +530,12 @@ class VirtualOp(Op):
self._is_run = True self._is_run = True
while self._is_run: while self._is_run:
#self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid)) #self._profiler_record("get#{}_0".format(op_info_prefix))
channeldata_dict = input_channel.front(self.name) channeldata_dict = input_channel.front(self.name)
#self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid)) #self._profiler_record("get#{}_1".format(op_info_prefix))
#self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid)) #self._profiler_record("push#{}_0".format(op_info_prefix))
for name, data in channeldata_dict.items(): for name, data in channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
data, channels=output_channels, name=name) data, channels=output_channels, name=name)
#self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid)) #self._profiler_record("push#{}_1".format(op_info_prefix))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册