提交 81b2b3c2 编写于 作者: W wangjiawei04

update profile

上级 dfbe9891
......@@ -16,10 +16,16 @@ def prase(pid_str, time_str, counter):
if len(name_list) == 2:
name = name_list[0]
else:
name = name_list[0] + "_" + name_list[1]
name = "_".join(name_list[:-1])
name_list = name.split("#")
if len(name_list) > 1:
tid = name_list[-1]
name = "#".join(name_list[:-1])
else:
tid = 0
event_dict = {}
event_dict["name"] = name
event_dict["tid"] = 0
event_dict["tid"] = tid
event_dict["pid"] = pid
event_dict["ts"] = ts
event_dict["ph"] = ph
......
......@@ -37,6 +37,7 @@ import time
import func_timeout
import enum
import collections
import copy
class _TimeProfiler(object):
......@@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all()
logging.debug(self._log("multi | {} get data succ!".format(op_name)))
return resp # reference, read only
return copy.deepcopy(resp) # reference, read only
def stop(self):
#TODO
......@@ -811,10 +812,11 @@ class Op(object):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix))
_profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix))
_profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata)
......@@ -827,9 +829,9 @@ class Op(object):
# preprecess
try:
_profiler.record("{}-prep_0".format(op_info_prefix))
_profiler.record("{}-prep#{}_0".format(op_info_prefix, tid))
preped_data = self.preprocess(channeldata)
_profiler.record("{}-prep_1".format(op_info_prefix))
_profiler.record("{}-prep#{}_1".format(op_info_prefix, tid))
except NotImplementedError as e:
# preprocess function not implemented
error_info = log(e)
......@@ -867,7 +869,7 @@ class Op(object):
midped_data = None
if self.with_serving:
ecode = ChannelDataEcode.OK.value
_profiler.record("{}-midp_0".format(op_info_prefix))
_profiler.record("{}-midp#{}_0".format(op_info_prefix, tid))
if self._timeout <= 0:
try:
midped_data = self.midprocess(preped_data, use_future)
......@@ -904,13 +906,13 @@ class Op(object):
data_id=data_id),
output_channels)
continue
_profiler.record("{}-midp_1".format(op_info_prefix))
_profiler.record("{}-midp#{}_1".format(op_info_prefix, tid))
else:
midped_data = preped_data
# postprocess
output_data = None
_profiler.record("{}-postp_0".format(op_info_prefix))
_profiler.record("{}-postp#{}_0".format(op_info_prefix, tid))
if self.with_serving and client_type == 'grpc' and use_future:
# use call_future
output_data = ChannelData(
......@@ -947,12 +949,12 @@ class Op(object):
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id)
_profiler.record("{}-postp_1".format(op_info_prefix))
_profiler.record("{}-postp#{}_1".format(op_info_prefix, tid))
# push data to channel (if run succ)
_profiler.record("{}-push_0".format(op_info_prefix))
_profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
self._push_to_output_channels(output_data, output_channels)
_profiler.record("{}-push_1".format(op_info_prefix))
_profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
def _log(self, info):
return "{} {}".format(self.name, info)
......@@ -991,12 +993,13 @@ class VirtualOp(Op):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix))
_profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix))
_profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
_profiler.record("{}-push_0".format(op_info_prefix))
_profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
if isinstance(channeldata, dict):
for name, data in channeldata.items():
self._push_to_output_channels(
......@@ -1006,7 +1009,7 @@ class VirtualOp(Op):
channeldata,
channels=output_channels,
name=self._virtual_pred_ops[0].name)
_profiler.record("{}-push_1".format(op_info_prefix))
_profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
class GeneralPythonService(
......@@ -1029,6 +1032,7 @@ class GeneralPythonService(
self._recive_func = threading.Thread(
target=GeneralPythonService._recive_out_channel_func, args=(self, ))
self._recive_func.start()
self._tid = threading.current_thread().ident
def _log(self, info_str):
return "[{}] {}".format(self.name, info_str)
......@@ -1130,21 +1134,21 @@ class GeneralPythonService(
return resp
def inference(self, request, context):
_profiler.record("{}-prepack_0".format(self.name))
_profiler.record("{}-prepack#{}_0".format(self.name, self._tid))
data, data_id = self._pack_data_for_infer(request)
_profiler.record("{}-prepack_1".format(self.name))
_profiler.record("{}-prepack#{}_1".format(self.name, self._tid))
resp_channeldata = None
for i in range(self._retry):
logging.debug(self._log('push data'))
_profiler.record("{}-push_0".format(self.name))
_profiler.record("{}-push#{}_0".format(self.name, self._tid))
self._in_channel.push(data, self.name)
_profiler.record("{}-push_1".format(self.name))
_profiler.record("{}-push#{}_1".format(self.name, self._tid))
logging.debug(self._log('wait for infer'))
_profiler.record("{}-fetch_0".format(self.name))
_profiler.record("{}-fetch#{}_0".format(self.name, self._tid))
resp_channeldata = self._get_data_in_globel_resp_dict(data_id)
_profiler.record("{}-fetch_1".format(self.name))
_profiler.record("{}-fetch#{}_1".format(self.name, self._tid))
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break
......@@ -1152,9 +1156,9 @@ class GeneralPythonService(
logging.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info))
_profiler.record("{}-postpack_0".format(self.name))
_profiler.record("{}-postpack#{}_0".format(self.name, self._tid))
resp = self._pack_data_for_resp(resp_channeldata)
_profiler.record("{}-postpack_1".format(self.name))
_profiler.record("{}-postpack#{}_1".format(self.name, self._tid))
_profiler.print_profile()
return resp
......
......@@ -37,6 +37,7 @@ import time
import func_timeout
import enum
import collections
import copy
class _TimeProfiler(object):
......@@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all()
logging.debug(self._log("multi | {} get data succ!".format(op_name)))
return resp # reference, read only
return copy.deepcopy(resp) # reference, read only
def stop(self):
#TODO
......@@ -811,10 +812,11 @@ class Op(object):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix))
_profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix))
_profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata)
......@@ -827,9 +829,9 @@ class Op(object):
# preprecess
try:
_profiler.record("{}-prep_0".format(op_info_prefix))
_profiler.record("{}-prep#{}_0".format(op_info_prefix, tid))
preped_data = self.preprocess(channeldata)
_profiler.record("{}-prep_1".format(op_info_prefix))
_profiler.record("{}-prep#{}_1".format(op_info_prefix, tid))
except NotImplementedError as e:
# preprocess function not implemented
error_info = log(e)
......@@ -867,7 +869,7 @@ class Op(object):
midped_data = None
if self.with_serving:
ecode = ChannelDataEcode.OK.value
_profiler.record("{}-midp_0".format(op_info_prefix))
_profiler.record("{}-midp#{}_0".format(op_info_prefix, tid))
if self._timeout <= 0:
try:
midped_data = self.midprocess(preped_data, use_future)
......@@ -904,13 +906,13 @@ class Op(object):
data_id=data_id),
output_channels)
continue
_profiler.record("{}-midp_1".format(op_info_prefix))
_profiler.record("{}-midp#{}_1".format(op_info_prefix, tid))
else:
midped_data = preped_data
# postprocess
output_data = None
_profiler.record("{}-postp_0".format(op_info_prefix))
_profiler.record("{}-postp#{}_0".format(op_info_prefix, tid))
if self.with_serving and client_type == 'grpc' and use_future:
# use call_future
output_data = ChannelData(
......@@ -947,12 +949,12 @@ class Op(object):
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id)
_profiler.record("{}-postp_1".format(op_info_prefix))
_profiler.record("{}-postp#{}_1".format(op_info_prefix, tid))
# push data to channel (if run succ)
_profiler.record("{}-push_0".format(op_info_prefix))
_profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
self._push_to_output_channels(output_data, output_channels)
_profiler.record("{}-push_1".format(op_info_prefix))
_profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
def _log(self, info):
return "{} {}".format(self.name, info)
......@@ -991,12 +993,13 @@ class VirtualOp(Op):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix)
self._is_run = True
tid = threading.current_thread().ident
while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix))
_profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix))
_profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
_profiler.record("{}-push_0".format(op_info_prefix))
_profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
if isinstance(channeldata, dict):
for name, data in channeldata.items():
self._push_to_output_channels(
......@@ -1006,7 +1009,7 @@ class VirtualOp(Op):
channeldata,
channels=output_channels,
name=self._virtual_pred_ops[0].name)
_profiler.record("{}-push_1".format(op_info_prefix))
_profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
class GeneralPythonService(
......@@ -1029,6 +1032,7 @@ class GeneralPythonService(
self._recive_func = threading.Thread(
target=GeneralPythonService._recive_out_channel_func, args=(self, ))
self._recive_func.start()
self._tid = threading.current_thread().ident
def _log(self, info_str):
return "[{}] {}".format(self.name, info_str)
......@@ -1130,21 +1134,21 @@ class GeneralPythonService(
return resp
def inference(self, request, context):
_profiler.record("{}-prepack_0".format(self.name))
_profiler.record("{}-prepack#{}_0".format(self.name, self._tid))
data, data_id = self._pack_data_for_infer(request)
_profiler.record("{}-prepack_1".format(self.name))
_profiler.record("{}-prepack#{}_1".format(self.name, self._tid))
resp_channeldata = None
for i in range(self._retry):
logging.debug(self._log('push data'))
_profiler.record("{}-push_0".format(self.name))
_profiler.record("{}-push#{}_0".format(self.name, self._tid))
self._in_channel.push(data, self.name)
_profiler.record("{}-push_1".format(self.name))
_profiler.record("{}-push#{}_1".format(self.name, self._tid))
logging.debug(self._log('wait for infer'))
_profiler.record("{}-fetch_0".format(self.name))
_profiler.record("{}-fetch#{}_0".format(self.name, self._tid))
resp_channeldata = self._get_data_in_globel_resp_dict(data_id)
_profiler.record("{}-fetch_1".format(self.name))
_profiler.record("{}-fetch#{}_1".format(self.name, self._tid))
if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break
......@@ -1152,9 +1156,9 @@ class GeneralPythonService(
logging.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info))
_profiler.record("{}-postpack_0".format(self.name))
_profiler.record("{}-postpack#{}_0".format(self.name, self._tid))
resp = self._pack_data_for_resp(resp_channeldata)
_profiler.record("{}-postpack_1".format(self.name))
_profiler.record("{}-postpack#{}_1".format(self.name, self._tid))
_profiler.print_profile()
return resp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册