提交 be0806cf 编写于 作者: B barrierye

merge code

......@@ -16,10 +16,16 @@ def prase(pid_str, time_str, counter):
if len(name_list) == 2:
name = name_list[0]
else:
name = name_list[0] + "_" + name_list[1]
name = "_".join(name_list[:-1])
name_list = name.split("#")
if len(name_list) > 1:
tid = name_list[-1]
name = "#".join(name_list[:-1])
else:
tid = 0
event_dict = {}
event_dict["name"] = name
event_dict["tid"] = 0
event_dict["tid"] = tid
event_dict["pid"] = pid
event_dict["ts"] = ts
event_dict["ph"] = ph
......
......@@ -813,10 +813,11 @@ class Op(object):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix))
_profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix))
_profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata)
......@@ -829,9 +830,9 @@ class Op(object):
# preprecess
try:
_profiler.record("{}-prep_0".format(op_info_prefix))
_profiler.record("{}-prep#{}_0".format(op_info_prefix, tid))
preped_data = self.preprocess(channeldata)
_profiler.record("{}-prep_1".format(op_info_prefix))
_profiler.record("{}-prep#{}_1".format(op_info_prefix, tid))
except NotImplementedError as e:
# preprocess function not implemented
error_info = log(e)
......@@ -869,7 +870,7 @@ class Op(object):
midped_data = None
if self.with_serving:
ecode = ChannelDataEcode.OK.value
_profiler.record("{}-midp_0".format(op_info_prefix))
_profiler.record("{}-midp#{}_0".format(op_info_prefix, tid))
if self._timeout <= 0:
try:
midped_data = self.midprocess(preped_data, use_future)
......@@ -906,13 +907,13 @@ class Op(object):
data_id=data_id),
output_channels)
continue
_profiler.record("{}-midp_1".format(op_info_prefix))
_profiler.record("{}-midp#{}_1".format(op_info_prefix, tid))
else:
midped_data = preped_data
# postprocess
output_data = None
_profiler.record("{}-postp_0".format(op_info_prefix))
_profiler.record("{}-postp#{}_0".format(op_info_prefix, tid))
if self.with_serving and client_type == 'grpc' and use_future:
# use call_future
output_data = ChannelData(
......@@ -949,12 +950,12 @@ class Op(object):
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id)
_profiler.record("{}-postp_1".format(op_info_prefix))
_profiler.record("{}-postp#{}_1".format(op_info_prefix, tid))
# push data to channel (if run succ)
_profiler.record("{}-push_0".format(op_info_prefix))
_profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
self._push_to_output_channels(output_data, output_channels)
_profiler.record("{}-push_1".format(op_info_prefix))
_profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
def _log(self, info):
return "{} {}".format(self.name, info)
......@@ -994,12 +995,13 @@ class VirtualOp(Op):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix))
_profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix))
_profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
_profiler.record("{}-push_0".format(op_info_prefix))
_profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
if isinstance(channeldata, dict):
for name, data in channeldata.items():
self._push_to_output_channels(
......@@ -1009,7 +1011,7 @@ class VirtualOp(Op):
channeldata,
channels=output_channels,
name=self._virtual_pred_ops[0].name)
_profiler.record("{}-push_1".format(op_info_prefix))
_profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
class GeneralPythonService(
......@@ -1032,6 +1034,7 @@ class GeneralPythonService(
self._recive_func = threading.Thread(
target=GeneralPythonService._recive_out_channel_func, args=(self, ))
self._recive_func.start()
self._tid = threading.current_thread().ident
def _log(self, info_str):
return "[{}] {}".format(self.name, info_str)
......@@ -1133,21 +1136,21 @@ class GeneralPythonService(
return resp
def inference(self, request, context):
_profiler.record("{}-prepack_0".format(self.name))
_profiler.record("{}-prepack#{}_0".format(self.name, self._tid))
data, data_id = self._pack_data_for_infer(request)
_profiler.record("{}-prepack_1".format(self.name))
_profiler.record("{}-prepack#{}_1".format(self.name, self._tid))
resp_channeldata = None
for i in range(self._retry):
logging.debug(self._log('push data'))
_profiler.record("{}-push_0".format(self.name))
_profiler.record("{}-push#{}_0".format(self.name, self._tid))
self._in_channel.push(data, self.name)
_profiler.record("{}-push_1".format(self.name))
_profiler.record("{}-push#{}_1".format(self.name, self._tid))
logging.debug(self._log('wait for infer'))
_profiler.record("{}-fetch_0".format(self.name))
_profiler.record("{}-fetch#{}_0".format(self.name, self._tid))
resp_channeldata = self._get_data_in_globel_resp_dict(data_id)
_profiler.record("{}-fetch_1".format(self.name))
_profiler.record("{}-fetch#{}_1".format(self.name, self._tid))
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break
......@@ -1155,9 +1158,9 @@ class GeneralPythonService(
logging.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info))
_profiler.record("{}-postpack_0".format(self.name))
_profiler.record("{}-postpack#{}_0".format(self.name, self._tid))
resp = self._pack_data_for_resp(resp_channeldata)
_profiler.record("{}-postpack_1".format(self.name))
_profiler.record("{}-postpack#{}_1".format(self.name, self._tid))
_profiler.print_profile()
return resp
......
......@@ -203,10 +203,11 @@ class Op(object):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
self._profiler_record("{}-get_0".format(op_info_prefix))
self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
self._profiler_record("{}-get_1".format(op_info_prefix))
self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid))
logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata)
......@@ -219,9 +220,11 @@ class Op(object):
# preprecess
try:
self._profiler_record("{}-prep_0".format(op_info_prefix))
self._profiler_record("{}-prep#{}_0".format(op_info_prefix,
tid))
preped_data = self.preprocess(channeldata)
self._profiler_record("{}-prep_1".format(op_info_prefix))
self._profiler_record("{}-prep#{}_1".format(op_info_prefix,
tid))
except NotImplementedError as e:
# preprocess function not implemented
error_info = log(e)
......@@ -259,7 +262,8 @@ class Op(object):
midped_data = None
if self.with_serving:
ecode = ChannelDataEcode.OK.value
self._profiler_record("{}-midp_0".format(op_info_prefix))
self._profiler_record("{}-midp#{}_0".format(op_info_prefix,
tid))
if self._timeout <= 0:
try:
midped_data = self.midprocess(preped_data, use_future)
......@@ -296,13 +300,14 @@ class Op(object):
data_id=data_id),
output_channels)
continue
self._profiler_record("{}-midp_1".format(op_info_prefix))
self._profiler_record("{}-midp#{}_1".format(op_info_prefix,
tid))
else:
midped_data = preped_data
# postprocess
output_data = None
self._profiler_record("{}-postp_0".format(op_info_prefix))
self._profiler_record("{}-postp#{}_0".format(op_info_prefix, tid))
if self.with_serving and client_type == 'grpc' and use_future:
# use call_future
output_data = ChannelData(
......@@ -339,12 +344,12 @@ class Op(object):
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id)
self._profiler_record("{}-postp_1".format(op_info_prefix))
self._profiler_record("{}-postp#{}_1".format(op_info_prefix, tid))
# push data to channel (if run succ)
self._profiler_record("{}-push_0".format(op_info_prefix))
self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid))
self._push_to_output_channels(output_data, output_channels)
self._profiler_record("{}-push_1".format(op_info_prefix))
self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid))
def _log(self, info):
return "{} {}".format(self.name, info)
......@@ -385,11 +390,11 @@ class VirtualOp(Op):
log = self._get_log_func(op_info_prefix)
self._is_run = True
while self._is_run:
self._profiler_record("{}-get_0".format(op_info_prefix))
self._profiler_record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
self._profiler_record("{}-get_1".format(op_info_prefix))
self._profiler_record("{}-get#{}_1".format(op_info_prefix, tid))
self._profiler_record("{}-push_0".format(op_info_prefix))
self._profiler_record("{}-push#{}_0".format(op_info_prefix, tid))
if isinstance(channeldata, dict):
for name, data in channeldata.items():
self._push_to_output_channels(
......@@ -399,4 +404,4 @@ class VirtualOp(Op):
channeldata,
channels=output_channels,
name=self._virtual_pred_ops[0].name)
self._profiler_record("{}-push_1".format(op_info_prefix))
self._profiler_record("{}-push#{}_1".format(op_info_prefix, tid))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册