提交 823b88f4 编写于 作者: W wangjiawei04

update profile

上级 5365e74c
...@@ -16,10 +16,16 @@ def prase(pid_str, time_str, counter): ...@@ -16,10 +16,16 @@ def prase(pid_str, time_str, counter):
if len(name_list) == 2: if len(name_list) == 2:
name = name_list[0] name = name_list[0]
else: else:
name = name_list[0] + "_" + name_list[1] name = "_".join(name_list[:-1])
name_list = name.split("#")
if len(name_list) > 1:
tid = name_list[-1]
name = "#".join(name_list[:-1])
else:
tid = 0
event_dict = {} event_dict = {}
event_dict["name"] = name event_dict["name"] = name
event_dict["tid"] = 0 event_dict["tid"] = tid
event_dict["pid"] = pid event_dict["pid"] = pid
event_dict["ts"] = ts event_dict["ts"] = ts
event_dict["ph"] = ph event_dict["ph"] = ph
......
...@@ -37,6 +37,7 @@ import time ...@@ -37,6 +37,7 @@ import time
import func_timeout import func_timeout
import enum import enum
import collections import collections
import copy
class _TimeProfiler(object): class _TimeProfiler(object):
...@@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue): ...@@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all() self._cv.notify_all()
logging.debug(self._log("multi | {} get data succ!".format(op_name))) logging.debug(self._log("multi | {} get data succ!".format(op_name)))
return resp # reference, read only return copy.deepcopy(resp) # reference, read only
def stop(self): def stop(self):
#TODO #TODO
...@@ -811,10 +812,11 @@ class Op(object): ...@@ -811,10 +812,11 @@ class Op(object):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix) log = self._get_log_func(op_info_prefix)
self._is_run = True self._is_run = True
tid = threading.current_thread().ident
while self._is_run: while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix)) _profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name) channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix)) _profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
logging.debug(log("input_data: {}".format(channeldata))) logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata) data_id, error_channeldata = self._parse_channeldata(channeldata)
...@@ -827,9 +829,9 @@ class Op(object): ...@@ -827,9 +829,9 @@ class Op(object):
# preprecess # preprecess
try: try:
_profiler.record("{}-prep_0".format(op_info_prefix)) _profiler.record("{}-prep#{}_0".format(op_info_prefix, tid))
preped_data = self.preprocess(channeldata) preped_data = self.preprocess(channeldata)
_profiler.record("{}-prep_1".format(op_info_prefix)) _profiler.record("{}-prep#{}_1".format(op_info_prefix, tid))
except NotImplementedError as e: except NotImplementedError as e:
# preprocess function not implemented # preprocess function not implemented
error_info = log(e) error_info = log(e)
...@@ -867,7 +869,7 @@ class Op(object): ...@@ -867,7 +869,7 @@ class Op(object):
midped_data = None midped_data = None
if self.with_serving: if self.with_serving:
ecode = ChannelDataEcode.OK.value ecode = ChannelDataEcode.OK.value
_profiler.record("{}-midp_0".format(op_info_prefix)) _profiler.record("{}-midp#{}_0".format(op_info_prefix, tid))
if self._timeout <= 0: if self._timeout <= 0:
try: try:
midped_data = self.midprocess(preped_data, use_future) midped_data = self.midprocess(preped_data, use_future)
...@@ -904,13 +906,13 @@ class Op(object): ...@@ -904,13 +906,13 @@ class Op(object):
data_id=data_id), data_id=data_id),
output_channels) output_channels)
continue continue
_profiler.record("{}-midp_1".format(op_info_prefix)) _profiler.record("{}-midp#{}_1".format(op_info_prefix, tid))
else: else:
midped_data = preped_data midped_data = preped_data
# postprocess # postprocess
output_data = None output_data = None
_profiler.record("{}-postp_0".format(op_info_prefix)) _profiler.record("{}-postp#{}_0".format(op_info_prefix, tid))
if self.with_serving and client_type == 'grpc' and use_future: if self.with_serving and client_type == 'grpc' and use_future:
# use call_future # use call_future
output_data = ChannelData( output_data = ChannelData(
...@@ -947,12 +949,12 @@ class Op(object): ...@@ -947,12 +949,12 @@ class Op(object):
ChannelDataType.CHANNEL_NPDATA.value, ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data, npdata=postped_data,
data_id=data_id) data_id=data_id)
_profiler.record("{}-postp_1".format(op_info_prefix)) _profiler.record("{}-postp#{}_1".format(op_info_prefix, tid))
# push data to channel (if run succ) # push data to channel (if run succ)
_profiler.record("{}-push_0".format(op_info_prefix)) _profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
self._push_to_output_channels(output_data, output_channels) self._push_to_output_channels(output_data, output_channels)
_profiler.record("{}-push_1".format(op_info_prefix)) _profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
def _log(self, info): def _log(self, info):
return "{} {}".format(self.name, info) return "{} {}".format(self.name, info)
...@@ -991,12 +993,13 @@ class VirtualOp(Op): ...@@ -991,12 +993,13 @@ class VirtualOp(Op):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix) log = self._get_log_func(op_info_prefix)
self._is_run = True self._is_run = True
tid = threading.current_thread().ident
while self._is_run: while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix)) _profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name) channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix)) _profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
_profiler.record("{}-push_0".format(op_info_prefix)) _profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
if isinstance(channeldata, dict): if isinstance(channeldata, dict):
for name, data in channeldata.items(): for name, data in channeldata.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -1006,7 +1009,7 @@ class VirtualOp(Op): ...@@ -1006,7 +1009,7 @@ class VirtualOp(Op):
channeldata, channeldata,
channels=output_channels, channels=output_channels,
name=self._virtual_pred_ops[0].name) name=self._virtual_pred_ops[0].name)
_profiler.record("{}-push_1".format(op_info_prefix)) _profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
class GeneralPythonService( class GeneralPythonService(
...@@ -1029,6 +1032,7 @@ class GeneralPythonService( ...@@ -1029,6 +1032,7 @@ class GeneralPythonService(
self._recive_func = threading.Thread( self._recive_func = threading.Thread(
target=GeneralPythonService._recive_out_channel_func, args=(self, )) target=GeneralPythonService._recive_out_channel_func, args=(self, ))
self._recive_func.start() self._recive_func.start()
self._tid = threading.current_thread().ident
def _log(self, info_str): def _log(self, info_str):
return "[{}] {}".format(self.name, info_str) return "[{}] {}".format(self.name, info_str)
...@@ -1130,21 +1134,21 @@ class GeneralPythonService( ...@@ -1130,21 +1134,21 @@ class GeneralPythonService(
return resp return resp
def inference(self, request, context): def inference(self, request, context):
_profiler.record("{}-prepack_0".format(self.name)) _profiler.record("{}-prepack#{}_0".format(self.name, self._tid))
data, data_id = self._pack_data_for_infer(request) data, data_id = self._pack_data_for_infer(request)
_profiler.record("{}-prepack_1".format(self.name)) _profiler.record("{}-prepack#{}_1".format(self.name, self._tid))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
logging.debug(self._log('push data')) logging.debug(self._log('push data'))
_profiler.record("{}-push_0".format(self.name)) _profiler.record("{}-push#{}_0".format(self.name, self._tid))
self._in_channel.push(data, self.name) self._in_channel.push(data, self.name)
_profiler.record("{}-push_1".format(self.name)) _profiler.record("{}-push#{}_1".format(self.name, self._tid))
logging.debug(self._log('wait for infer')) logging.debug(self._log('wait for infer'))
_profiler.record("{}-fetch_0".format(self.name)) _profiler.record("{}-fetch#{}_0".format(self.name, self._tid))
resp_channeldata = self._get_data_in_globel_resp_dict(data_id) resp_channeldata = self._get_data_in_globel_resp_dict(data_id)
_profiler.record("{}-fetch_1".format(self.name)) _profiler.record("{}-fetch#{}_1".format(self.name, self._tid))
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break break
...@@ -1152,9 +1156,9 @@ class GeneralPythonService( ...@@ -1152,9 +1156,9 @@ class GeneralPythonService(
logging.warn("retry({}): {}".format( logging.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info)) i + 1, resp_channeldata.error_info))
_profiler.record("{}-postpack_0".format(self.name)) _profiler.record("{}-postpack#{}_0".format(self.name, self._tid))
resp = self._pack_data_for_resp(resp_channeldata) resp = self._pack_data_for_resp(resp_channeldata)
_profiler.record("{}-postpack_1".format(self.name)) _profiler.record("{}-postpack#{}_1".format(self.name, self._tid))
_profiler.print_profile() _profiler.print_profile()
return resp return resp
......
...@@ -37,6 +37,7 @@ import time ...@@ -37,6 +37,7 @@ import time
import func_timeout import func_timeout
import enum import enum
import collections import collections
import copy
class _TimeProfiler(object): class _TimeProfiler(object):
...@@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue): ...@@ -630,7 +631,7 @@ class ThreadChannel(Queue.Queue):
self._cv.notify_all() self._cv.notify_all()
logging.debug(self._log("multi | {} get data succ!".format(op_name))) logging.debug(self._log("multi | {} get data succ!".format(op_name)))
return resp # reference, read only return copy.deepcopy(resp) # reference, read only
def stop(self): def stop(self):
#TODO #TODO
...@@ -811,10 +812,11 @@ class Op(object): ...@@ -811,10 +812,11 @@ class Op(object):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix) log = self._get_log_func(op_info_prefix)
self._is_run = True self._is_run = True
tid = threading.current_thread().ident
while self._is_run: while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix)) _profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name) channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix)) _profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
logging.debug(log("input_data: {}".format(channeldata))) logging.debug(log("input_data: {}".format(channeldata)))
data_id, error_channeldata = self._parse_channeldata(channeldata) data_id, error_channeldata = self._parse_channeldata(channeldata)
...@@ -827,9 +829,9 @@ class Op(object): ...@@ -827,9 +829,9 @@ class Op(object):
# preprecess # preprecess
try: try:
_profiler.record("{}-prep_0".format(op_info_prefix)) _profiler.record("{}-prep#{}_0".format(op_info_prefix, tid))
preped_data = self.preprocess(channeldata) preped_data = self.preprocess(channeldata)
_profiler.record("{}-prep_1".format(op_info_prefix)) _profiler.record("{}-prep#{}_1".format(op_info_prefix, tid))
except NotImplementedError as e: except NotImplementedError as e:
# preprocess function not implemented # preprocess function not implemented
error_info = log(e) error_info = log(e)
...@@ -867,7 +869,7 @@ class Op(object): ...@@ -867,7 +869,7 @@ class Op(object):
midped_data = None midped_data = None
if self.with_serving: if self.with_serving:
ecode = ChannelDataEcode.OK.value ecode = ChannelDataEcode.OK.value
_profiler.record("{}-midp_0".format(op_info_prefix)) _profiler.record("{}-midp#{}_0".format(op_info_prefix, tid))
if self._timeout <= 0: if self._timeout <= 0:
try: try:
midped_data = self.midprocess(preped_data, use_future) midped_data = self.midprocess(preped_data, use_future)
...@@ -904,13 +906,13 @@ class Op(object): ...@@ -904,13 +906,13 @@ class Op(object):
data_id=data_id), data_id=data_id),
output_channels) output_channels)
continue continue
_profiler.record("{}-midp_1".format(op_info_prefix)) _profiler.record("{}-midp#{}_1".format(op_info_prefix, tid))
else: else:
midped_data = preped_data midped_data = preped_data
# postprocess # postprocess
output_data = None output_data = None
_profiler.record("{}-postp_0".format(op_info_prefix)) _profiler.record("{}-postp#{}_0".format(op_info_prefix, tid))
if self.with_serving and client_type == 'grpc' and use_future: if self.with_serving and client_type == 'grpc' and use_future:
# use call_future # use call_future
output_data = ChannelData( output_data = ChannelData(
...@@ -947,12 +949,12 @@ class Op(object): ...@@ -947,12 +949,12 @@ class Op(object):
ChannelDataType.CHANNEL_NPDATA.value, ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data, npdata=postped_data,
data_id=data_id) data_id=data_id)
_profiler.record("{}-postp_1".format(op_info_prefix)) _profiler.record("{}-postp#{}_1".format(op_info_prefix, tid))
# push data to channel (if run succ) # push data to channel (if run succ)
_profiler.record("{}-push_0".format(op_info_prefix)) _profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
self._push_to_output_channels(output_data, output_channels) self._push_to_output_channels(output_data, output_channels)
_profiler.record("{}-push_1".format(op_info_prefix)) _profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
def _log(self, info): def _log(self, info):
return "{} {}".format(self.name, info) return "{} {}".format(self.name, info)
...@@ -991,12 +993,13 @@ class VirtualOp(Op): ...@@ -991,12 +993,13 @@ class VirtualOp(Op):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
log = self._get_log_func(op_info_prefix) log = self._get_log_func(op_info_prefix)
self._is_run = True self._is_run = True
tid = threading.current_thread().ident
while self._is_run: while self._is_run:
_profiler.record("{}-get_0".format(op_info_prefix)) _profiler.record("{}-get#{}_0".format(op_info_prefix, tid))
channeldata = input_channel.front(self.name) channeldata = input_channel.front(self.name)
_profiler.record("{}-get_1".format(op_info_prefix)) _profiler.record("{}-get#{}_1".format(op_info_prefix, tid))
_profiler.record("{}-push_0".format(op_info_prefix)) _profiler.record("{}-push#{}_0".format(op_info_prefix, tid))
if isinstance(channeldata, dict): if isinstance(channeldata, dict):
for name, data in channeldata.items(): for name, data in channeldata.items():
self._push_to_output_channels( self._push_to_output_channels(
...@@ -1006,7 +1009,7 @@ class VirtualOp(Op): ...@@ -1006,7 +1009,7 @@ class VirtualOp(Op):
channeldata, channeldata,
channels=output_channels, channels=output_channels,
name=self._virtual_pred_ops[0].name) name=self._virtual_pred_ops[0].name)
_profiler.record("{}-push_1".format(op_info_prefix)) _profiler.record("{}-push#{}_1".format(op_info_prefix, tid))
class GeneralPythonService( class GeneralPythonService(
...@@ -1029,6 +1032,7 @@ class GeneralPythonService( ...@@ -1029,6 +1032,7 @@ class GeneralPythonService(
self._recive_func = threading.Thread( self._recive_func = threading.Thread(
target=GeneralPythonService._recive_out_channel_func, args=(self, )) target=GeneralPythonService._recive_out_channel_func, args=(self, ))
self._recive_func.start() self._recive_func.start()
self._tid = threading.current_thread().ident
def _log(self, info_str): def _log(self, info_str):
return "[{}] {}".format(self.name, info_str) return "[{}] {}".format(self.name, info_str)
...@@ -1130,21 +1134,21 @@ class GeneralPythonService( ...@@ -1130,21 +1134,21 @@ class GeneralPythonService(
return resp return resp
def inference(self, request, context): def inference(self, request, context):
_profiler.record("{}-prepack_0".format(self.name)) _profiler.record("{}-prepack#{}_0".format(self.name, self._tid))
data, data_id = self._pack_data_for_infer(request) data, data_id = self._pack_data_for_infer(request)
_profiler.record("{}-prepack_1".format(self.name)) _profiler.record("{}-prepack#{}_1".format(self.name, self._tid))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
logging.debug(self._log('push data')) logging.debug(self._log('push data'))
_profiler.record("{}-push_0".format(self.name)) _profiler.record("{}-push#{}_0".format(self.name, self._tid))
self._in_channel.push(data, self.name) self._in_channel.push(data, self.name)
_profiler.record("{}-push_1".format(self.name)) _profiler.record("{}-push#{}_1".format(self.name, self._tid))
logging.debug(self._log('wait for infer')) logging.debug(self._log('wait for infer'))
_profiler.record("{}-fetch_0".format(self.name)) _profiler.record("{}-fetch#{}_0".format(self.name, self._tid))
resp_channeldata = self._get_data_in_globel_resp_dict(data_id) resp_channeldata = self._get_data_in_globel_resp_dict(data_id)
_profiler.record("{}-fetch_1".format(self.name)) _profiler.record("{}-fetch#{}_1".format(self.name, self._tid))
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break break
...@@ -1152,9 +1156,9 @@ class GeneralPythonService( ...@@ -1152,9 +1156,9 @@ class GeneralPythonService(
logging.warn("retry({}): {}".format( logging.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info)) i + 1, resp_channeldata.error_info))
_profiler.record("{}-postpack_0".format(self.name)) _profiler.record("{}-postpack#{}_0".format(self.name, self._tid))
resp = self._pack_data_for_resp(resp_channeldata) resp = self._pack_data_for_resp(resp_channeldata)
_profiler.record("{}-postpack_1".format(self.name)) _profiler.record("{}-postpack#{}_1".format(self.name, self._tid))
_profiler.print_profile() _profiler.print_profile()
return resp return resp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册