From dc6be190da95ec34623c6bf210a484e0ebcd44eb Mon Sep 17 00:00:00 2001 From: barrierye Date: Sun, 5 Jul 2020 21:51:21 +0800 Subject: [PATCH] move profiler into dag executor --- .../test_pipeline_client.py | 2 +- python/pipeline/dag.py | 37 ++++++++++++++----- python/pipeline/pipeline_server.py | 26 +------------ 3 files changed, 30 insertions(+), 35 deletions(-) diff --git a/python/examples/pipeline/imdb_model_ensemble/test_pipeline_client.py b/python/examples/pipeline/imdb_model_ensemble/test_pipeline_client.py index ce090a58..df642894 100644 --- a/python/examples/pipeline/imdb_model_ensemble/test_pipeline_client.py +++ b/python/examples/pipeline/imdb_model_ensemble/test_pipeline_client.py @@ -23,7 +23,7 @@ lp_wrapper = lp(client.predict) words = 'i am very sad | 0' -for i in range(1): +for i in range(10): fetch_map = lp_wrapper(feed_dict={"words": words}, fetch=["prediction"]) print(fetch_map) diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 83c30393..a45ec92f 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -26,21 +26,34 @@ import logging from .operator import Op, RequestOp, ResponseOp, VirtualOp from .channel import ThreadChannel, ProcessChannel, ChannelData, ChannelDataEcode, ChannelDataType +from .profiler import TimeProfiler from .util import NameGenerator _LOGGER = logging.getLogger() class DAGExecutor(object): - def __init__(self, response_op, profiler, use_multithread, retry, - client_type, channel_size): + def __init__(self, response_op, yml_config): + self._retry = yml_config.get('retry', 1) + + client_type = yml_config.get('client_type', 'brpc') + use_multithread = yml_config.get('use_multithread', True) + use_profile = yml_config.get('profile', False) + channel_size = yml_config.get('channel_size', 0) + + if not use_multithread: + if use_profile: + raise Exception( + "profile cannot be used in multiprocess version temporarily") + self.name = "#G" - self._retry = min(retry, 1) - self._profiler = profiler - self._dag = DAG(response_op, profiler, use_multithread, client_type, - channel_size) - in_channel, out_channel, pack_rpc_func, unpack_rpc_func = self._dag.build( - ) + self._profiler = TimeProfiler() + self._profiler.enable(use_profile) + + self._dag = DAG(response_op, self._profiler, use_multithread, + client_type, channel_size) + (in_channel, out_channel, pack_rpc_func, + unpack_rpc_func) = self._dag.build() self._dag.start() self._set_in_channel(in_channel) @@ -52,10 +65,10 @@ class DAGExecutor(object): _LOGGER.debug(self._log(out_channel.debug())) self._id_lock = threading.Lock() - self._cv = threading.Condition() - self._fetch_buffer = {} self._id_counter = 0 self._reset_max_id = 1000000000000000000 + self._cv = threading.Condition() + self._fetch_buffer = {} self._is_run = False self._recive_func = None @@ -136,6 +149,8 @@ class DAGExecutor(object): data_id=data_id), data_id def call(self, rpc_request): + self._profiler.record("dag-call_0".format(self.name)) + self._profiler.record("{}-prepack_0".format(self.name)) req_channeldata, data_id = self._pack_channeldata(rpc_request) self._profiler.record("{}-prepack_1".format(self.name)) @@ -161,6 +176,8 @@ class DAGExecutor(object): self._profiler.record("{}-postpack_0".format(self.name)) rpc_resp = self._pack_for_rpc_resp(resp_channeldata) self._profiler.record("{}-postpack_1".format(self.name)) + + self._profiler.record("dag-call_1".format(self.name)) self._profiler.print_profile() return rpc_resp diff --git a/python/pipeline/pipeline_server.py b/python/pipeline/pipeline_server.py index b849a4e5..f1a07f7c 100644 --- a/python/pipeline/pipeline_server.py +++ b/python/pipeline/pipeline_server.py @@ -21,17 +21,16 @@ import yaml from .proto import pipeline_service_pb2_grpc from .operator import ResponseOp -from .profiler import TimeProfiler from .dag import DAGExecutor _LOGGER = logging.getLogger() -_profiler = TimeProfiler() class PipelineService(pipeline_service_pb2_grpc.PipelineServiceServicer): def __init__(self, dag_executor): super(PipelineService, self).__init__() self._dag_executor = dag_executor + self._dag_executor.start() def inference(self, request, context): resp = self._dag_executor.call(request) @@ -44,10 +43,6 @@ class PipelineServer(object): self._worker_num = None self._response_op = None - def gen_desc(self): - _LOGGER.info('here will generate desc for PAAS') - pass - def set_response_op(self, response_op): if not isinstance(response_op, ResponseOp): raise Exception("response_op must be ResponseOp type.") @@ -69,25 +64,8 @@ class PipelineServer(object): raise SystemExit("Prot {} is already used".format(self._port)) self._worker_num = yml_config.get('worker_num', 2) - retry = yml_config.get('retry', 1) - client_type = yml_config.get('client_type', 'brpc') - use_multithread = yml_config.get('use_multithread', True) - use_profile = yml_config.get('profile', False) - channel_size = yml_config.get('channel_size', 0) - - if not use_multithread: - if use_profile: - raise Exception( - "profile cannot be used in multiprocess version temporarily") - _profiler.enable(use_profile) - # init dag executor - self._dag_executor = DAGExecutor(self._response_op, _profiler, - use_multithread, retry, client_type, - channel_size) - self._dag_executor.start() - - self.gen_desc() + self._dag_executor = DAGExecutor(self._response_op, yml_config) def run_server(self): service = PipelineService(self._dag_executor) -- GitLab