From 7abd7a5ca0901221fbb1b9478062b7042565bd4c Mon Sep 17 00:00:00 2001 From: barriery Date: Thu, 30 Jul 2020 12:38:45 +0000 Subject: [PATCH] fix profile in client side --- python/pipeline/operator.py | 72 ++++++++++++++++++------------------- python/pipeline/profiler.py | 30 ++++++++++++++++ 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index e07e908c..08a56df9 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -29,7 +29,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ChannelData, ChannelDataType, ChannelStopError, ChannelTimeoutError) from .util import NameGenerator -from .profiler import TimeProfiler +from .profiler import UnsafeTimeProfiler as TimeProfiler _LOGGER = logging.getLogger() _op_name_gen = NameGenerator("Op") @@ -92,11 +92,6 @@ class Op(object): def use_profiler(self, use_profile): self._server_use_profile = use_profile - def _profiler_record(self, string): - if self._profiler is None: - return - self._profiler.record(string) - def init_client(self, client_type, client_config, server_endpoints, fetch_names): if self.with_serving == False: @@ -205,25 +200,21 @@ class Op(object): data, channels, name=None, + profile_str=None, client_need_profile=False, profile_set=None): if name is None: name = self.name - self._add_profile_into_channeldata(data, client_need_profile, - profile_set) - for channel in channels: - channel.push(data, name) - - def _add_profile_into_channeldata(self, data, client_need_profile, - profile_set): - profile_str = self._profiler.gen_profile_str() - if self._server_use_profile: - sys.stderr.write(profile_str) + # add profile into channeldata if client_need_profile and profile_set is not None: - profile_set.add(profile_str) + if profile_str is not None: + profile_set.add(profile_str) data.add_profile(profile_set) + for channel in channels: + channel.push(data, name) + def start_with_process(self, client_type): proces = [] for concurrency_idx in range(self.concurrency): @@ -465,8 +456,10 @@ class Op(object): tid = threading.current_thread().ident # init op + profiler = None try: - self._initialize(is_thread_op, client_type, concurrency_idx) + profiler = self._initialize(is_thread_op, client_type, + concurrency_idx) except Exception as e: _LOGGER.error(log("init op failed: {}".format(e))) os._exit(-1) @@ -501,15 +494,15 @@ class Op(object): continue # preprecess - self._profiler_record("prep#{}_0".format(op_info_prefix)) + profiler.record("prep#{}_0".format(op_info_prefix)) preped_data_dict, err_channeldata_dict \ = self._run_preprocess(parsed_data_dict, log) - self._profiler_record("prep#{}_1".format(op_info_prefix)) + profiler.record("prep#{}_1".format(op_info_prefix)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( - err_channeldata, - output_channels, + data=err_channeldata, + channels=output_channels, client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: @@ -520,15 +513,15 @@ class Op(object): continue # process - self._profiler_record("midp#{}_0".format(op_info_prefix)) + profiler.record("midp#{}_0".format(op_info_prefix)) midped_data_dict, err_channeldata_dict \ = self._run_process(preped_data_dict, log) - self._profiler_record("midp#{}_1".format(op_info_prefix)) + profiler.record("midp#{}_1".format(op_info_prefix)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( - err_channeldata, - output_channels, + data=err_channeldata, + channels=output_channels, client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: @@ -539,16 +532,16 @@ class Op(object): continue # postprocess - self._profiler_record("postp#{}_0".format(op_info_prefix)) + profiler.record("postp#{}_0".format(op_info_prefix)) postped_data_dict, err_channeldata_dict \ = self._run_postprocess( parsed_data_dict, midped_data_dict, log) - self._profiler_record("postp#{}_1".format(op_info_prefix)) + profiler.record("postp#{}_1".format(op_info_prefix)) try: for data_id, err_channeldata in err_channeldata_dict.items(): self._push_to_output_channels( - error_channeldata, - output_channels, + data=error_channeldata, + channels=output_channels, client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: @@ -560,10 +553,14 @@ class Op(object): # push data to channel (if run succ) try: + profile_str = profiler.gen_profile_str() for data_id, postped_data in postped_data_dict.items(): + if self._server_use_profile: + sys.stderr.write(profile_str) self._push_to_output_channels( - postped_data, - output_channels, + data=postped_data, + channels=output_channels, + profile_str=profile_str, client_need_profile=need_profile_dict[data_id], profile_set=profile_dict[data_id]) except ChannelStopError: @@ -577,9 +574,6 @@ class Op(object): if not self._succ_init_op: # for the threaded version of Op, each thread cannot get its concurrency_idx self.concurrency_idx = None - # init profiler - self._profiler = TimeProfiler() - self._profiler.enable(True) # init client self.client = self.init_client( client_type, self._client_config, @@ -590,9 +584,6 @@ class Op(object): self._succ_close_op = False else: self.concurrency_idx = concurrency_idx - # init profiler - self._profiler = TimeProfiler() - self._profiler.enable(True) # init client self.client = self.init_client(client_type, self._client_config, self._server_endpoints, @@ -600,6 +591,11 @@ class Op(object): # user defined self.init_op() + # use a separate TimeProfiler per thread or process + profiler = TimeProfiler() + profiler.enable(True) + return profiler + def _finalize(self, is_thread_op): if is_thread_op: with self._for_close_op_lock: diff --git a/python/pipeline/profiler.py b/python/pipeline/profiler.py index 0118c5ac..93d37e23 100644 --- a/python/pipeline/profiler.py +++ b/python/pipeline/profiler.py @@ -28,6 +28,36 @@ import threading _LOGGER = logging.getLogger() +class UnsafeTimeProfiler(object): + def __init__(self): + self.pid = os.getpid() + self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid) + self.time_record = [self.print_head] + self._enable = False + + def enable(self, enable): + self._enable = enable + + def record(self, name): + if self._enable is False: + return + self.time_record.append('{}:{} '.format(name, + int(round(_time() * 1000000)))) + + def print_profile(self): + if self._enable is False: + return + sys.stderr.write(self.gen_profile_str()) + + def gen_profile_str(self): + if self._enable is False: + return + self.time_record.append('\n') + profile_str = ''.join(self.time_record) + self.time_record = [self.print_head] + return profile_str + + class TimeProfiler(object): def __init__(self): self._pid = os.getpid() -- GitLab