提交 99ea8b54 编写于 作者: B barrierye

update profiler for process

上级 480f5afe
...@@ -40,18 +40,26 @@ class DAGExecutor(object): ...@@ -40,18 +40,26 @@ class DAGExecutor(object):
use_multithread = yml_config.get('use_multithread', True) use_multithread = yml_config.get('use_multithread', True)
use_profile = yml_config.get('profile', False) use_profile = yml_config.get('profile', False)
channel_size = yml_config.get('channel_size', 0) channel_size = yml_config.get('channel_size', 0)
self._asyn_profile = yml_config.get('asgn_profile', False)
if not use_multithread:
if use_profile: if use_profile:
raise Exception( _LOGGER.info("====> profiler <====")
"profile cannot be used in multiprocess version temporarily") if use_multithread:
_LOGGER.info("op: thread")
else:
_LOGGER.info("op: process")
if self._asyn_profile:
_LOGGER.info("profile mode: asyn")
else:
_LOGGER.info("profile mode: sync")
_LOGGER.info("====================")
self.name = "@G" self.name = "@G"
self._profiler = TimeProfiler() self._profiler = TimeProfiler()
self._profiler.enable(use_profile) self._profiler.enable(use_profile)
self._dag = DAG(response_op, self._profiler, use_multithread, self._dag = DAG(response_op, self._profiler, use_profile,
client_type, channel_size) use_multithread, client_type, channel_size)
(in_channel, out_channel, pack_rpc_func, (in_channel, out_channel, pack_rpc_func,
unpack_rpc_func) = self._dag.build() unpack_rpc_func) = self._dag.build()
self._dag.start() self._dag.start()
...@@ -131,9 +139,8 @@ class DAGExecutor(object): ...@@ -131,9 +139,8 @@ class DAGExecutor(object):
self._cv.notify_all() self._cv.notify_all()
return resp return resp
def _pack_channeldata(self, rpc_request): def _pack_channeldata(self, rpc_request, data_id):
_LOGGER.debug(self._log('start inferce')) _LOGGER.debug(self._log('start inferce'))
data_id = self._get_next_data_id()
dictdata = None dictdata = None
try: try:
dictdata = self._unpack_rpc_func(rpc_request) dictdata = self._unpack_rpc_func(rpc_request)
...@@ -141,31 +148,35 @@ class DAGExecutor(object): ...@@ -141,31 +148,35 @@ class DAGExecutor(object):
return ChannelData( return ChannelData(
ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value, ecode=ChannelDataEcode.RPC_PACKAGE_ERROR.value,
error_info="rpc package error: {}".format(e), error_info="rpc package error: {}".format(e),
data_id=data_id), data_id data_id=data_id)
else: else:
return ChannelData( return ChannelData(
datatype=ChannelDataType.DICT.value, datatype=ChannelDataType.DICT.value,
dictdata=dictdata, dictdata=dictdata,
data_id=data_id), data_id data_id=data_id)
def call(self, rpc_request): def call(self, rpc_request):
self._profiler.record("call#DAG_0") data_id = self._get_next_data_id()
if self._asyn_profile:
self._profiler.record("call_{}#DAG-{}_0".format(data_id, data_id))
else:
self._profiler.record("call_{}#DAG_0".format(data_id))
self._profiler.record("prepack#{}_0".format(self.name)) self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
req_channeldata, data_id = self._pack_channeldata(rpc_request) req_channeldata = self._pack_channeldata(rpc_request, data_id)
self._profiler.record("prepack#{}_1".format(self.name)) self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))
resp_channeldata = None resp_channeldata = None
for i in range(self._retry): for i in range(self._retry):
_LOGGER.debug(self._log('push data')) _LOGGER.debug(self._log('push data'))
#self._profiler.record("push#{}_0".format(self.name)) #self._profiler.record("push_{}#{}_0".format(data_id, self.name))
self._in_channel.push(req_channeldata, self.name) self._in_channel.push(req_channeldata, self.name)
#self._profiler.record("push#{}_1".format(self.name)) #self._profiler.record("push_{}#{}_1".format(data_id, self.name))
_LOGGER.debug(self._log('wait for infer')) _LOGGER.debug(self._log('wait for infer'))
#self._profiler.record("fetch#{}_0".format(self.name)) #self._profiler.record("fetch_{}#{}_0".format(data_id, self.name))
resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id) resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id)
#self._profiler.record("fetch#{}_1".format(self.name)) #self._profiler.record("fetch_{}#{}_1".format(data_id, self.name))
if resp_channeldata.ecode == ChannelDataEcode.OK.value: if resp_channeldata.ecode == ChannelDataEcode.OK.value:
break break
...@@ -173,11 +184,14 @@ class DAGExecutor(object): ...@@ -173,11 +184,14 @@ class DAGExecutor(object):
_LOGGER.warn("retry({}): {}".format( _LOGGER.warn("retry({}): {}".format(
i + 1, resp_channeldata.error_info)) i + 1, resp_channeldata.error_info))
self._profiler.record("postpack#{}_0".format(self.name)) self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
rpc_resp = self._pack_for_rpc_resp(resp_channeldata) rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
self._profiler.record("postpack#{}_1".format(self.name)) self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
self._profiler.record("call#DAG_1") if self._asyn_profile:
self._profiler.record("call_{}#DAG-{}_1".format(data_id, data_id))
else:
self._profiler.record("call_{}#DAG_1".format(data_id))
self._profiler.print_profile() self._profiler.print_profile()
return rpc_resp return rpc_resp
...@@ -190,9 +204,10 @@ class DAGExecutor(object): ...@@ -190,9 +204,10 @@ class DAGExecutor(object):
class DAG(object): class DAG(object):
def __init__(self, response_op, profiler, use_multithread, client_type, def __init__(self, response_op, profiler, use_profile, use_multithread,
channel_size): client_type, channel_size):
self._response_op = response_op self._response_op = response_op
self._use_profile = use_profile
self._use_multithread = use_multithread self._use_multithread = use_multithread
self._channel_size = channel_size self._channel_size = channel_size
self._client_type = client_type self._client_type = client_type
...@@ -398,7 +413,7 @@ class DAG(object): ...@@ -398,7 +413,7 @@ class DAG(object):
def start(self): def start(self):
self._threads_or_proces = [] self._threads_or_proces = []
for op in self._actual_ops: for op in self._actual_ops:
op.init_profiler(self._profiler) op.init_profiler(self._profiler, self._use_profile)
if self._use_multithread: if self._use_multithread:
self._threads_or_proces.extend( self._threads_or_proces.extend(
op.start_with_thread(self._client_type)) op.start_with_thread(self._client_type))
......
...@@ -65,8 +65,9 @@ class Op(object): ...@@ -65,8 +65,9 @@ class Op(object):
self._for_init_op_lock = threading.Lock() self._for_init_op_lock = threading.Lock()
self._succ_init_op = False self._succ_init_op = False
def init_profiler(self, profiler): def init_profiler(self, profiler, use_profile):
self._profiler = profiler self._profiler = profiler
self._use_profile = use_profile
def _profiler_record(self, string): def _profiler_record(self, string):
if self._profiler is None: if self._profiler is None:
...@@ -349,6 +350,11 @@ class Op(object): ...@@ -349,6 +350,11 @@ class Op(object):
_LOGGER.error(log(e)) _LOGGER.error(log(e))
os._exit(-1) os._exit(-1)
# init profiler
if not use_multithread:
self._profiler = TimeProfiler()
self._profiler.enable(self._use_profile)
self._is_run = True self._is_run = True
while self._is_run: while self._is_run:
#self._profiler_record("get#{}_0".format(op_info_prefix)) #self._profiler_record("get#{}_0".format(op_info_prefix))
...@@ -398,6 +404,7 @@ class Op(object): ...@@ -398,6 +404,7 @@ class Op(object):
#self._profiler_record("push#{}_0".format(op_info_prefix)) #self._profiler_record("push#{}_0".format(op_info_prefix))
self._push_to_output_channels(output_data, output_channels) self._push_to_output_channels(output_data, output_channels)
#self._profiler_record("push#{}_1".format(op_info_prefix)) #self._profiler_record("push#{}_1".format(op_info_prefix))
self._profiler.print_profile()
def _log(self, info): def _log(self, info):
return "{} {}".format(self.name, info) return "{} {}".format(self.name, info)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册