提交 7abd7a5c 编写于 作者: B barriery

fix profile in client side

上级 a75197a8
...@@ -29,7 +29,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ...@@ -29,7 +29,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
ChannelData, ChannelDataType, ChannelStopError, ChannelData, ChannelDataType, ChannelStopError,
ChannelTimeoutError) ChannelTimeoutError)
from .util import NameGenerator from .util import NameGenerator
from .profiler import TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
_op_name_gen = NameGenerator("Op") _op_name_gen = NameGenerator("Op")
...@@ -92,11 +92,6 @@ class Op(object): ...@@ -92,11 +92,6 @@ class Op(object):
def use_profiler(self, use_profile): def use_profiler(self, use_profile):
self._server_use_profile = use_profile self._server_use_profile = use_profile
def _profiler_record(self, string):
if self._profiler is None:
return
self._profiler.record(string)
def init_client(self, client_type, client_config, server_endpoints, def init_client(self, client_type, client_config, server_endpoints,
fetch_names): fetch_names):
if self.with_serving == False: if self.with_serving == False:
...@@ -205,25 +200,21 @@ class Op(object): ...@@ -205,25 +200,21 @@ class Op(object):
data, data,
channels, channels,
name=None, name=None,
profile_str=None,
client_need_profile=False, client_need_profile=False,
profile_set=None): profile_set=None):
if name is None: if name is None:
name = self.name name = self.name
self._add_profile_into_channeldata(data, client_need_profile,
profile_set)
for channel in channels:
channel.push(data, name)
def _add_profile_into_channeldata(self, data, client_need_profile,
profile_set):
profile_str = self._profiler.gen_profile_str()
if self._server_use_profile:
sys.stderr.write(profile_str)
# add profile into channeldata
if client_need_profile and profile_set is not None: if client_need_profile and profile_set is not None:
profile_set.add(profile_str) if profile_str is not None:
profile_set.add(profile_str)
data.add_profile(profile_set) data.add_profile(profile_set)
for channel in channels:
channel.push(data, name)
def start_with_process(self, client_type): def start_with_process(self, client_type):
proces = [] proces = []
for concurrency_idx in range(self.concurrency): for concurrency_idx in range(self.concurrency):
...@@ -465,8 +456,10 @@ class Op(object): ...@@ -465,8 +456,10 @@ class Op(object):
tid = threading.current_thread().ident tid = threading.current_thread().ident
# init op # init op
profiler = None
try: try:
self._initialize(is_thread_op, client_type, concurrency_idx) profiler = self._initialize(is_thread_op, client_type,
concurrency_idx)
except Exception as e: except Exception as e:
_LOGGER.error(log("init op failed: {}".format(e))) _LOGGER.error(log("init op failed: {}".format(e)))
os._exit(-1) os._exit(-1)
...@@ -501,15 +494,15 @@ class Op(object): ...@@ -501,15 +494,15 @@ class Op(object):
continue continue
# preprecess # preprecess
self._profiler_record("prep#{}_0".format(op_info_prefix)) profiler.record("prep#{}_0".format(op_info_prefix))
preped_data_dict, err_channeldata_dict \ preped_data_dict, err_channeldata_dict \
= self._run_preprocess(parsed_data_dict, log) = self._run_preprocess(parsed_data_dict, log)
self._profiler_record("prep#{}_1".format(op_info_prefix)) profiler.record("prep#{}_1".format(op_info_prefix))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
err_channeldata, data=err_channeldata,
output_channels, channels=output_channels,
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
...@@ -520,15 +513,15 @@ class Op(object): ...@@ -520,15 +513,15 @@ class Op(object):
continue continue
# process # process
self._profiler_record("midp#{}_0".format(op_info_prefix)) profiler.record("midp#{}_0".format(op_info_prefix))
midped_data_dict, err_channeldata_dict \ midped_data_dict, err_channeldata_dict \
= self._run_process(preped_data_dict, log) = self._run_process(preped_data_dict, log)
self._profiler_record("midp#{}_1".format(op_info_prefix)) profiler.record("midp#{}_1".format(op_info_prefix))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
err_channeldata, data=err_channeldata,
output_channels, channels=output_channels,
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
...@@ -539,16 +532,16 @@ class Op(object): ...@@ -539,16 +532,16 @@ class Op(object):
continue continue
# postprocess # postprocess
self._profiler_record("postp#{}_0".format(op_info_prefix)) profiler.record("postp#{}_0".format(op_info_prefix))
postped_data_dict, err_channeldata_dict \ postped_data_dict, err_channeldata_dict \
= self._run_postprocess( = self._run_postprocess(
parsed_data_dict, midped_data_dict, log) parsed_data_dict, midped_data_dict, log)
self._profiler_record("postp#{}_1".format(op_info_prefix)) profiler.record("postp#{}_1".format(op_info_prefix))
try: try:
for data_id, err_channeldata in err_channeldata_dict.items(): for data_id, err_channeldata in err_channeldata_dict.items():
self._push_to_output_channels( self._push_to_output_channels(
error_channeldata, data=error_channeldata,
output_channels, channels=output_channels,
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
...@@ -560,10 +553,14 @@ class Op(object): ...@@ -560,10 +553,14 @@ class Op(object):
# push data to channel (if run succ) # push data to channel (if run succ)
try: try:
profile_str = profiler.gen_profile_str()
for data_id, postped_data in postped_data_dict.items(): for data_id, postped_data in postped_data_dict.items():
if self._server_use_profile:
sys.stderr.write(profile_str)
self._push_to_output_channels( self._push_to_output_channels(
postped_data, data=postped_data,
output_channels, channels=output_channels,
profile_str=profile_str,
client_need_profile=need_profile_dict[data_id], client_need_profile=need_profile_dict[data_id],
profile_set=profile_dict[data_id]) profile_set=profile_dict[data_id])
except ChannelStopError: except ChannelStopError:
...@@ -577,9 +574,6 @@ class Op(object): ...@@ -577,9 +574,6 @@ class Op(object):
if not self._succ_init_op: if not self._succ_init_op:
# for the threaded version of Op, each thread cannot get its concurrency_idx # for the threaded version of Op, each thread cannot get its concurrency_idx
self.concurrency_idx = None self.concurrency_idx = None
# init profiler
self._profiler = TimeProfiler()
self._profiler.enable(True)
# init client # init client
self.client = self.init_client( self.client = self.init_client(
client_type, self._client_config, client_type, self._client_config,
...@@ -590,9 +584,6 @@ class Op(object): ...@@ -590,9 +584,6 @@ class Op(object):
self._succ_close_op = False self._succ_close_op = False
else: else:
self.concurrency_idx = concurrency_idx self.concurrency_idx = concurrency_idx
# init profiler
self._profiler = TimeProfiler()
self._profiler.enable(True)
# init client # init client
self.client = self.init_client(client_type, self._client_config, self.client = self.init_client(client_type, self._client_config,
self._server_endpoints, self._server_endpoints,
...@@ -600,6 +591,11 @@ class Op(object): ...@@ -600,6 +591,11 @@ class Op(object):
# user defined # user defined
self.init_op() self.init_op()
# use a separate TimeProfiler per thread or process
profiler = TimeProfiler()
profiler.enable(True)
return profiler
def _finalize(self, is_thread_op): def _finalize(self, is_thread_op):
if is_thread_op: if is_thread_op:
with self._for_close_op_lock: with self._for_close_op_lock:
......
...@@ -28,6 +28,36 @@ import threading ...@@ -28,6 +28,36 @@ import threading
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
class UnsafeTimeProfiler(object):
def __init__(self):
self.pid = os.getpid()
self.print_head = 'PROFILE\tpid:{}\t'.format(self.pid)
self.time_record = [self.print_head]
self._enable = False
def enable(self, enable):
self._enable = enable
def record(self, name):
if self._enable is False:
return
self.time_record.append('{}:{} '.format(name,
int(round(_time() * 1000000))))
def print_profile(self):
if self._enable is False:
return
sys.stderr.write(self.gen_profile_str())
def gen_profile_str(self):
if self._enable is False:
return
self.time_record.append('\n')
profile_str = ''.join(self.time_record)
self.time_record = [self.print_head]
return profile_str
class TimeProfiler(object): class TimeProfiler(object):
def __init__(self): def __init__(self):
self._pid = os.getpid() self._pid = os.getpid()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册