From d60b154aff4172ac0aa3d0aa6467c214d399de8f Mon Sep 17 00:00:00 2001 From: TeslaZhao Date: Tue, 18 May 2021 16:24:28 +0800 Subject: [PATCH] Modify PIPELINE DOCs & Pipeline Serving supports low precision inference --- doc/PIPELINE_SERVING.md | 625 +++++++++++------- doc/PIPELINE_SERVING_CN.md | 606 ++++++++++------- python/examples/pipeline/imagenet/config.yml | 8 + .../pipeline/simple_web_service/config.yml | 10 +- python/paddle_serving_app/local_predict.py | 12 +- python/pipeline/local_service_handler.py | 23 +- python/pipeline/operator.py | 20 +- 7 files changed, 786 insertions(+), 518 deletions(-) diff --git a/doc/PIPELINE_SERVING.md b/doc/PIPELINE_SERVING.md index 5e7df613..d928a0bd 100644 --- a/doc/PIPELINE_SERVING.md +++ b/doc/PIPELINE_SERVING.md @@ -2,12 +2,18 @@ ([简体中文](PIPELINE_SERVING_CN.md)|English) +- [Architecture Design](PIPELINE_SERVING.md#1.Architecture_Design) +- [Detailed Design](PIPELINE_SERVING.md#2.Detailed_Design) +- [Classic Examples](PIPELINE_SERVING.md#3.Classic_Examples) +- [Advanced Usages](PIPELINE_SERVING.md#4.Advanced_Usages) +- [Log Tracing](PIPELINE_SERVING.md#5.Log_Tracing) +- [Performance Analysis And Optimization](PIPELINE_SERVING.md#6.Performance_analysis_and_optimization) -Paddle Serving is usually used for the deployment of single model, but the end-to-end deep learning model can not solve all the problems at present. Usually, it is necessary to use multiple deep learning models to solve practical problems. +In many deep learning frameworks, Serving is usually used for the deployment of single model.but in the context of AI industrial, the end-to-end deep learning model can not solve all the problems at present. Usually, it is necessary to use multiple deep learning models to solve practical problems.However, the design of multi-model applications is complicated. In order to reduce the difficulty of development and maintenance, and to ensure the availability of services, serial or simple parallel methods are usually used. In general, the throughput only reaches the usable state and the GPU utilization rate is low. Paddle Serving provides a user-friendly programming framework for multi-model composite services, Pipeline Serving, which aims to reduce the threshold of programming, improve resource utilization (especially GPU), and improve the prediction efficiency. -## ★ Architecture Design +## 1.Architecture Design The Server side is built based on RPC Service and graph execution engine. The relationship between them is shown in the following figure. @@ -15,11 +21,11 @@ The Server side is built based on RPC Service and graph execution engi -### 1. RPC Service +### 1.1 RPC Service In order to meet the needs of different users, the RPC service starts one Web server and one RPC server at the same time, and can process 2 types of requests, RESTful API and gRPC.The gPRC gateway receives RESTful API requests and forwards requests to the gRPC server through the reverse proxy server; gRPC requests are received by the gRPC server, so the two types of requests are processed by the gRPC Service in a unified manner to ensure that the processing logic is consistent. -#### 1.1 Request and Respose of proto +#### 1.1.1 Request and Respose of proto gRPC service and gRPC gateway service are generated with service.proto. @@ -45,7 +51,7 @@ The `key` and `value` in the Request are paired string arrays. The `name` and `m In Response, `err_no` and `err_msg` express the correctness and error information of the processing result, and `key` and `value` are the returned results. -### 2. Graph Execution Engine +### 1.2 Graph Execution Engine The graph execution engine consists of OPs and Channels, and the connected OPs share one Channel. @@ -59,7 +65,7 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s -#### 2.1 OP Design +#### 1.2.1 OP Design - The default function of a single OP is to access a single Paddle Serving Service based on the input Channel data and put the result into the output Channel. - OP supports user customization, including preprocess, process, postprocess functions that can be inherited and implemented by the user. @@ -67,7 +73,7 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s - OP can obtain data from multiple different RPC requests for Auto-Batching. - OP can be started by a thread or process. -#### 2.2 Channel Design +#### 1.2.2 Channel Design - Channel is the data structure for sharing data between OPs, responsible for sharing data or sharing data status information. - Outputs from multiple OPs can be stored in the same Channel, and data from the same Channel can be used by multiple OPs. @@ -78,7 +84,7 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s -#### 2.3 client type design +#### 1.2.3 client type design - Prediction type (client_type) of Op has 3 types, brpc, grpc and local_predictor - brpc: Using bRPC Client to interact with remote Serving by network, performance is better than grpc. @@ -88,26 +94,47 @@ The graph execution engine consists of OPs and Channels, and the connected OPs s - Time cost(lower is better): local_predict < brpc <= grpc - Microservice: Split the brpc or grpc model into independent services, simplify development and deployment complexity, and improve resource utilization -#### 2.4 Extreme Case Consideration +#### 1.2.4 Extreme Case Consideration -- Request timeout +- `Request timeout` The entire graph execution engine may time out at every step. The graph execution engine controls the time out by setting `timeout` value. Requests that time out at any step will return a timeout response. -- Channel stores too much data +- `Channel stores too much data` Channels may store too much data, causing copy time to be too high. Graph execution engines can store OP calculation results in external memory, such as high-speed memory KV systems. -- Whether input buffers and output buffers in Channel will increase indefinitely +- `Whether input buffers and output buffers in Channel will increase indefinitely` - It will not increase indefinitely. The input to the entire graph execution engine is placed inside a Channel's internal queue, directly acting as a traffic control buffer queue for the entire service. - For input buffer, adjust the number of concurrencies of OP1 and OP2 according to the amount of computation, so that the number of input buffers from each input OP is relatively balanced. (The length of the input buffer depends on the speed at which each item in the internal queue is ready) - For output buffer, you can use a similar process as input buffer, which adjusts the concurrency of OP3 and OP4 to control the buffer length of output buffer. (The length of the output buffer depends on the speed at which downstream OPs obtain data from the output buffer) - The amount of data in the Channel will not exceed `worker_num` of gRPC, that is, it will not exceed the thread pool size. -## ★ Detailed Design +## 2.Detailed Design -#### 1. General OP Definition +For the design and implementation of Pipeline, first introduce PipelineServer, OP, pre- and post-processing of rewriting OP, and finally introduce the secondary development method of specific OP (RequestOp and ResponseOp). + +### 2.1 PipelineServer Definition + +PipelineServer encapsulates the RPC runtime layer and graph engine execution. All Pipeline services must first instantiate the PipelineServer example, then set up two core steps, set response op and load configuration information, and finally call run_server to start the service. The code example is as follows: + +```python +server = PipelineServer() +server.set_response_op(response_op) +server.prepare_server(config_yml_path) +#server.prepare_pipeline_config(config_yml_path) +server.run_server() +``` + +The core interface of PipelineServer: +- `set_response_op`: setting response_op will initialize the Channel according to the topological relationship of each OP and build a calculation graph. +- `prepare_server`: load configuration information, and start remote Serving service, suitable for calling remote remote reasoning service. +- `prepare_pipeline_config`: only load configuration information, applicable to local_prdict +- `run_server`: start gRPC service, receive request. + + +### 2.2 OP Definition As the basic unit of graph execution engine, the general OP constructor is as follows: @@ -144,7 +171,7 @@ The meaning of each parameter is as follows: | local_service_handler | (object) local predictor handler,assigned by Op init() input parameters or created in Op init()| -#### 2. General OP Secondary Development Interface +### 2.3 Rewrite preprocess and postprocess of OP | Interface or Variable | Explain | | :----------------------------------------------: | :----------------------------------------------------------: | @@ -198,245 +225,126 @@ def init_op(self): It should be **noted** that in the threaded version of OP, each OP will only call this function once, so the loaded resources must be thread safe. -#### 3. RequestOp Definition and Secondary Development Interface +### 2.4 RequestOp Definition and Secondary Development Interface RequestOp is used to process RPC data received by Pipeline Server, and the processed data will be added to the graph execution engine. Its constructor is as follows: ```python -def __init__(self) +class RequestOp(Op): + def __init__(self): + # PipelineService.name = "@DAGExecutor" + super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[]) + # init op + try: + self.init_op() + except Exception as e: + _LOGGER.critical("Op(Request) Failed to init: {}".format(e)) + os._exit(-1) + def unpack_request_package(self, request): + dict_data = {} + log_id = None + if request is None: + _LOGGER.critical("request is None") + raise ValueError("request is None") + + for idx, key in enumerate(request.key): + dict_data[key] = request.value[idx] + log_id = request.logid + _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ + name:{}, method:{}".format(log_id, request.clientip, request.name, + request.method)) + + return dict_data, log_id, None, "" ``` -When the default RequestOp cannot meet the parameter parsing requirements, you can customize the request parameter parsing method by rewriting the following two interfaces. +The default implementation of **unpack_request_package** is to make the key and value in RPC request into a dictionary.When the default RequestOp cannot meet the parameter parsing requirements, you can customize the request parameter parsing method by rewriting the following two interfaces.The return value is required to be a dictionary type. | Interface or Variable | Explain | | :---------------------------------------: | :----------------------------------------------------------: | | def init_op(self) | It is used to load resources (such as dictionaries), and is consistent with general OP. | | def unpack_request_package(self, request) | Process received RPC data. | -The default implementation of **unpack_request_package** is to make the key and value in RPC request into a dictionary: - -```python -def unpack_request_package(self, request): - dictdata = {} - for idx, key in enumerate(request.key): - data = request.value[idx] - try: - data = eval(data) - except Exception as e: - pass - dictdata[key] = data - return dictdata -``` -The return value is required to be a dictionary type. -#### 4. ResponseOp Definition and Secondary Development Interface +### 2.5 ResponseOp Definition and Secondary Development Interface ResponseOp is used to process the prediction results of the graph execution engine. The processed data will be used as the RPC return value of Pipeline Server. Its constructor is as follows: ```python -def __init__(self, input_ops) -``` +class RequestOp(Op): + def __init__(self): + # PipelineService.name = "@DAGExecutor" + super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[]) + # init op + try: + self.init_op() + except Exception as e: + _LOGGER.critical("Op(Request) Failed to init: {}".format(e)) + os._exit(-1) + def unpack_request_package(self, request): + dict_data = {} + log_id = None + if request is None: + _LOGGER.critical("request is None") + raise ValueError("request is None") -`input_ops` is the last OP of graph execution engine. Users can construct different DAGs by setting different `input_ops` without modifying the topology of OPs. + for idx, key in enumerate(request.key): + dict_data[key] = request.value[idx] + log_id = request.logid + _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ + name:{}, method:{}".format(log_id, request.clientip, request.name, + request.method)) -When the default ResponseOp cannot meet the requirements of the result return format, you can customize the return package packaging method by rewriting the following two interfaces. + return dict_data, log_id, None, "" +``` + +The default implementation of **pack_response_package** is to convert the dictionary of prediction results into key and value in RPC response.When the default ResponseOp cannot meet the requirements of the result return format, you can customize the return package packaging method by rewriting the following two interfaces. | Interface or Variable | Explain | | :------------------------------------------: | :----------------------------------------------------------: | | def init_op(self) | It is used to load resources (such as dictionaries), and is consistent with general OP. | | def pack_response_package(self, channeldata) | Process the prediction results of the graph execution engine as the return of RPC. | -The default implementation of **pack_response_package** is to convert the dictionary of prediction results into key and value in RPC response: - -```python -def pack_response_package(self, channeldata): - resp = pipeline_service_pb2.Response() - resp.ecode = channeldata.ecode - if resp.ecode == ChannelDataEcode.OK.value: - if channeldata.datatype == ChannelDataType.CHANNEL_NPDATA.value: - feed = channeldata.parse() - np.set_printoptions(threshold=np.nan) - for name, var in feed.items(): - resp.value.append(var.__repr__()) - resp.key.append(name) - elif channeldata.datatype == ChannelDataType.DICT.value: - feed = channeldata.parse() - for name, var in feed.items(): - if not isinstance(var, str): - resp.ecode = ChannelDataEcode.TYPE_ERROR.value - resp.error_info = self._log( - "fetch var type must be str({}).".format(type(var))) - break - resp.value.append(var) - resp.key.append(name) - else: - resp.ecode = ChannelDataEcode.TYPE_ERROR.value - resp.error_info = self._log( - "Error type({}) in datatype.".format(channeldata.datatype)) - else: - resp.error_info = channeldata.error_info - return resp -``` - -#### 5. PipelineServer Definition - -The definition of PipelineServer is relatively simple, as follows: - -```python -server = PipelineServer() -server.set_response_op(response_op) -server.prepare_server(config_yml_path) -server.run_server() -``` - -Where `response_op` is the responseop mentioned above, PipelineServer will initialize Channels according to the topology relationship of each OP and build the calculation graph. `config_yml_path` is the configuration file of PipelineServer. The example file is as follows: - -```yaml -# gRPC port -rpc_port: 18080 - -# http port, do not start HTTP service when the value is less or equals 0. The default value is 0. -http_port: 18071 - -# gRPC thread pool size (the number of processes in the process version servicer). The default is 1 -worker_num: 1 - - # Whether to use process server or not. The default is false -build_dag_each_worker: false - -dag: - # Whether to use the thread version of OP. The default is true - is_thread_op: true - - # The number of times DAG executor retries after failure. The default value is 1, that is, no retrying - retry: 1 - - # Whether to print the log on the server side. The default is false - use_profile: false - - # Monitoring time interval of Tracer (in seconds). Do not start monitoring when the value is less than 1. The default value is -1 - tracer: - interval_s: 600 - -op: - bow: - # Concurrency, when is_thread_op=True, it's thread concurrency; otherwise, it's process concurrency - concurrency: 1 - - # Client types, brpc, grpc and local_predictor - client_type: brpc - - # Retry times, no retry by default - retry: 1 - - # Prediction timeout, ms - timeout: 3000 - - # Serving IPs - server_endpoints: ["127.0.0.1:9393"] - - # Client config of bow model - client_config: "imdb_bow_client_conf/serving_client_conf.prototxt" - - # Fetch list - fetch_list: ["prediction"] - - # Batch size, default 1 - batch_size: 1 - - # Batch query timeout - auto_batching_timeout: 2000 -``` - -### 6. Special usages - -#### 6.1 Business custom error type - -Users can customize the error code according to the business, inherit ProductErrCode, and return it in the return list in Op's preprocess or postprocess. The next stage of processing will skip the post OP processing based on the custom error code. - -```python -class ProductErrCode(enum.Enum): - """ - ProductErrCode is a base class for recording business error code. - product developers inherit this class and extend more error codes. - """ - pass -``` - -#### 6.2 Skip process stage - -The 2rd result of the result list returned by preprocess is `is_skip_process=True`, indicating whether to skip the process stage of the current OP and directly enter the postprocess processing - -```python -def preprocess(self, input_dicts, data_id, log_id): - """ - In preprocess stage, assembling data for process stage. users can - override this function for model feed features. - Args: - input_dicts: input data to be preprocessed - data_id: inner unique id - log_id: global unique id for RTT - Return: - input_dict: data for process stage - is_skip_process: skip process stage or not, False default - prod_errcode: None default, otherwise, product errores occured. - It is handled in the same way as exception. - prod_errinfo: "" default - """ - # multiple previous Op - if len(input_dicts) != 1: - _LOGGER.critical( - self._log( - "Failed to run preprocess: this Op has multiple previous " - "inputs. Please override this func.")) - os._exit(-1) - (_, input_dict), = input_dicts.items() - return input_dict, False, None, "" - -``` - -#### 6.3 Custom proto Request and Response - -When the default proto structure does not meet the business requirements, at the same time, the Request and Response message structures of the proto in the following two files remain the same. - -> pipeline/gateway/proto/gateway.proto - -> pipeline/proto/pipeline_service.proto - -Recompile Serving Server again. - -#### 6.4 Custom URL - -The grpc gateway processes post requests. The default `method` is `prediction`, for example: 127.0.0.1:8080/ocr/prediction. Users can customize the name and method, and can seamlessly switch services with existing URLs. - -```proto -service PipelineService { - rpc inference(Request) returns (Response) { - option (google.api.http) = { - post : "/{name=*}/{method=*}" - body : "*" - }; - } -}; -``` *** -## ★ Classic examples +## 3.Classic Examples -All examples of pipelines are in [examples/pipeline/](../python/examples/pipeline) directory. +All examples of pipelines are in [examples/pipeline/](../python/examples/pipeline) directory, There are 7 types of model examples currently: +- [PaddleClas](../python/examples/pipeline/PaddleClas) +- [Detection](../python/examples/pipeline/PaddleDetection) +- [bert](../python/examples/pipeline/bert) +- [imagenet](../python/examples/pipeline/imagenet) +- [imdb_model_ensemble](../python/examples/pipeline/imdb_model_ensemble) +- [ocr](../python/examples/pipeline/ocr) +- [simple_web_service](../python/examples/pipeline/simple_web_service) Here, we build a simple imdb model enable example to show how to use Pipeline Serving. The relevant code can be found in the `python/examples/pipeline/imdb_model_ensemble` folder. The Server-side structure in the example is shown in the following figure: - -
+### 3.1 Files required for pipeline deployment + +Five types of files are needed, of which model files, configuration files, and server code are the three necessary files for building a Pipeline service. Test client and test data set are prepared for testing. +- model files +- configure files(config.yml) + - service level: Service port, thread number, service timeout, retry, etc. + - DAG level: Resource type, enable Trace, performance profile, etc. + - OP level: Model path, concurrency, client type, device type, automatic batching, etc. +- Server files(web_server.py) + - service level: Define service name, read configuration file, start service, etc. + - DAG level: Topological relationship between OPs. + - OP level: Rewrite preprocess and postprocess of OP. +- Test client files + - Correctness check + - Performance testing +- Test data set + - pictures, texts, voices, etc. -### 1. Get the model file and start the Paddle Serving Service + +### 3.2 Get model files ```shell cd python/examples/pipeline/imdb_model_ensemble @@ -447,83 +355,115 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow. PipelineServing also supports local automatic startup of PaddleServingService. Please refer to the example `python/examples/pipeline/ocr`. -### 2. Create config.yaml -Because there is a lot of configuration information in config.yaml,, only part of the OP configuration is shown here. For full information, please refer to `python/examples/pipeline/imdb_model_ensemble/config.yaml` +### 3.3 Create config.yaml + +This example uses the client connection type of brpc, and you can also choose grpc or local_predictor. ```yaml +#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1 +rpc_port: 18070 + +#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port +http_port: 18071 + +#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG +#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num +worker_num: 4 + +#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG +build_dag_each_worker: False + +dag: + #op资源类型, True, 为线程模型;False,为进程模型 + is_thread_op: True + + #重试次数 + retry: 1 + + #使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用 + use_profile: False + + #channel的最大长度,默认为0 + channel_size: 0 + + #tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据 + tracer: + #每次trace的时间间隔,单位秒/s + interval_s: 10 op: bow: - # Concurrency, when is_thread_op=True, it's thread concurrency; otherwise, it's process concurrency + # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 concurrency: 1 - # Client types, brpc, grpc and local_predictor + # client连接类型,brpc, grpc和local_predictor client_type: brpc - # Retry times, no retry by default + # Serving交互重试次数,默认不重试 retry: 1 - # Predcition timeout, ms + # Serving交互超时时间, 单位ms timeout: 3000 # Serving IPs server_endpoints: ["127.0.0.1:9393"] - # Client config of bow model + # bow模型client端配置 client_config: "imdb_bow_client_conf/serving_client_conf.prototxt" - # Fetch list - fetch_list: ["prediction"] - - # Batch request size, default 1 - batch_size: 1 + # Fetch结果列表,以client_config中fetch_var的alias_name为准 + fetch_list: ["prediction"] + + # 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞 + batch_size: 2 - # Batch query timeout + # 批量查询超时,与batch_size配合使用 auto_batching_timeout: 2000 cnn: - # Concurrency + # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 concurrency: 1 - # Client types, brpc, grpc and local_predictor + # client连接类型,brpc client_type: brpc - # Retry times, no retry by default + # Serving交互重试次数,默认不重试 retry: 1 - # Predcition timeout, ms + # 预测超时时间, 单位ms timeout: 3000 # Serving IPs server_endpoints: ["127.0.0.1:9292"] - # Client config of cnn model + # cnn模型client端配置 client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt" - # Fetch list + # Fetch结果列表,以client_config中fetch_var的alias_name为准 fetch_list: ["prediction"] - # Batch request size, default 1 - batch_size: 1 + # 批量查询Serving的数量, 默认1。 + batch_size: 2 - # Batch query timeout + # 批量查询超时,与batch_size配合使用 auto_batching_timeout: 2000 combine: - # Concurrency + # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 concurrency: 1 - #R etry times, no retry by default + # Serving交互重试次数,默认不重试 retry: 1 - # Predcition timeout, ms + # 预测超时时间, 单位ms timeout: 3000 - # Batch request size, default 1 - batch_size: 1 + # 批量查询Serving的数量, 默认1。 + batch_size: 2 - # Batch query timeout, ms + # 批量查询超时,与batch_size配合使用 auto_batching_timeout: 2000 +``` -### 3. Start PipelineServer +### 3.4 Start PipelineServer Run the following code @@ -594,7 +534,7 @@ server.prepare_server('config.yml') server.run_server() ``` -### 4. Perform prediction through PipelineClient +### 3.5 Perform prediction through PipelineClient ```python from paddle_serving_client.pipeline import PipelineClient @@ -622,14 +562,177 @@ for f in futures: *** -## ★ Performance analysis +## 4.Advanced Usages + +### 4.1 Business custom error type + +Users can customize the error code according to the business, inherit ProductErrCode, and return it in the return list in Op's preprocess or postprocess. The next stage of processing will skip the post OP processing based on the custom error code. + +```python +class ProductErrCode(enum.Enum): + """ + ProductErrCode is a base class for recording business error code. + product developers inherit this class and extend more error codes. + """ + pass +``` +### 4.2 Skip process stage -### 1. How to optimize with the timeline tool +The 2rd result of the result list returned by preprocess is `is_skip_process=True`, indicating whether to skip the process stage of the current OP and directly enter the postprocess processing + +```python +def preprocess(self, input_dicts, data_id, log_id): + """ + In preprocess stage, assembling data for process stage. users can + override this function for model feed features. + Args: + input_dicts: input data to be preprocessed + data_id: inner unique id + log_id: global unique id for RTT + Return: + input_dict: data for process stage + is_skip_process: skip process stage or not, False default + prod_errcode: None default, otherwise, product errores occured. + It is handled in the same way as exception. + prod_errinfo: "" default + """ + # multiple previous Op + if len(input_dicts) != 1: + _LOGGER.critical( + self._log( + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.")) + os._exit(-1) + (_, input_dict), = input_dicts.items() + return input_dict, False, None, "" + +``` + +### 4.3 Custom proto Request and Response + +When the default proto structure does not meet the business requirements, at the same time, the Request and Response message structures of the proto in the following two files remain the same. + +> pipeline/gateway/proto/gateway.proto + +> pipeline/proto/pipeline_service.proto + +Recompile Serving Server again. + +### 4.4 Custom URL + +The grpc gateway processes post requests. The default `method` is `prediction`, for example: 127.0.0.1:8080/ocr/prediction. Users can customize the name and method, and can seamlessly switch services with existing URLs. + +```proto +service PipelineService { + rpc inference(Request) returns (Response) { + option (google.api.http) = { + post : "/{name=*}/{method=*}" + body : "*" + }; + } +}; +``` + +### 4.5 Batch predictor + +Pipeline supports batch predictor, and GPU utilization can be improved by increasing the batch size. Pipeline Serving supports 3 Pipeline Serving supports 3 batch forms and applicable scenarios are as follows: +- case 1: An inference request contains batch data (batch) + - The data is of fixed length, the batch is variable, and the data is converted into BCHW format + - The data length is variable. In the pre-processing, a single piece of data is padding converted into a fixed length +- case 2: Split the batch data of a inference request into multiple small pieces of data (mini-batch) + - Since padding will be aligned at the longest shape, when there is a "extremely large" shape size data in a batch of data, the padding is very large. + - Specify the size of a block to reduce the scope of the "extremely large" size data +- case 3: Merge multiple requests for one batch(auto-batching) + - Inference time is significantly longer than preprocess and postprocess. Merge multiple request data and inference at one time will increase throughput and GPU utilization. + - The shape of the data of multiple requests is required to be consistent + +| Interfaces | Explain | +| :------------------------------------------: | :-----------------------------------------: | +| batch | client send batch data,set batch=True of client.predict | +| mini-batch | the return type of preprocess is list,refer to the preprocess of RecOp in OCR example| +| auto-batching | set batch_size and auto_batching_timeout in config.yml | + +### 4.7 Single-machine and multi-card inference + +Single-machine multi-card inference can be abstracted into M OP processes bound to N GPU cards. It is related to the configuration of three parameters in config.yml. First, select the process mode, the number of concurrent processes is the number of processes, and devices is the GPU card ID.The binding method is to traverse the GPU card ID when the process starts, for example, start 7 OP processes, set devices:0,1,2 in config.yml, then the first, fourth, and seventh started processes are bound to the 0 card, and the second , 4 started processes are bound to 1 card, 3 and 6 processes are bound to card 2. + +- PROCESS ID: 0 binds GPU card 0 +- PROCESS ID: 1 binds GPU card 1 +- PROCESS ID: 2 binds GPU card 2 +- PROCESS ID: 3 binds GPU card 0 +- PROCESS ID: 4 binds GPU card 1 +- PROCESS ID: 5 binds GPU card 2 +- PROCESS ID: 6 binds GPU card 0 + +Reference config.yml: +``` +#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 +devices: "0,1,2" +``` + +### 4.8 Heterogeneous Devices +In addition to supporting CPU and GPU, Pipeline also supports the deployment of a variety of heterogeneous hardware. It consists of device_type and devices in config.yml. Use device_type to specify the type first, and judge according to devices when it is vacant. The device_type is described as follows: +- CPU(Intel) : 0 +- GPU : 1 +- TensorRT : 2 +- CPU(Arm) : 3 +- XPU : 4 + +Reference config.yml: +``` +#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu +device_type: 0 + +#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 +devices: "" # "0,1" +``` + +### 4.9 Low precision inference +Pipeline Serving supports low-precision inference. The precision types supported by CPU, GPU and TensoRT are shown in the figure below: + +- CPU + - fp32(default) + - fp16 + - bf16(mkldnn) +- GPU + - fp32(default) + - fp16 + - int8 +- Tensor RT + - fp32(default) + - fp16 + - int8 + +Reference the example [simple_web_service](../python/examples/pipeline/simple_web_service). + +*** + +## 5.Log Tracing +Pipeline service logs are under the `PipelineServingLogs` directory of the current directory. There are 3 types of logs, namely `pipeline.log`, `pipeline.log.wf`, and `pipeline.tracer`. + +- pipeline.log : Record debug & info level log +- pipeline.log.wf : Record warning & error level log +- pipeline.tracer : Statistics the time-consuming and channel accumulation information in each stage + +When an exception occurs in the service, the error message will be recorded in the file `pipeline.log.wf`. Printing the tracer log requires adding the tracer configuration in the DAG property of `config.yml`. + +### 5.1 Log uniquely id +There are two kinds of IDs in the pipeline for concatenating requests, `data_id` and `log_id` respectively. The difference between the two is as follows: + +- `data_id`: The self-incrementing ID generated by the pipeline framework, marking the unique identification of the request. +- `log_id`: The identifier passed in by the upstream module tracks the serial relationship between multiple services. Since users may not pass in or guarantee uniqueness, it cannot be used as a unique identifier. + +The log printed by the Pipeline framework will carry both data_id and log_id. After auto-batching is turned on, the first `data_id` in the batch will be used to mark the whole batch, and the framework will print all data_ids in the batch in a log. + +## 6.Performance analysis and optimization + + +### 6.1 How to optimize with the timeline tool In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service. -### 2. Output profile information on server side +### 6.2 Output profile information on server side The server is controlled by the `use_profile` field in yaml: @@ -656,13 +759,15 @@ if __name__ == "__main__": Specific operation: open Chrome browser, input in the address bar `chrome://tracing/` , jump to the tracing page, click the load button, open the saved `trace` file, and then visualize the time information of each stage of the prediction service. -### 3. Output profile information on client side +### 6.3 Output profile information on client side The profile function can be enabled by setting `profile=True` in the `predict` interface on the client side. After the function is enabled, the client will print the log information corresponding to the prediction to the standard output during the prediction process, and the subsequent analysis and processing are the same as that of the server. -### 4. Analytical methods +### 6.4 Analytical methods +According to the time consumption of each stage in the pipeline.tracer log, the following formula is used to gradually analyze which stage is the main time consumption. + ``` cost of one single OP: op_cost = process(pre + mid + post) @@ -682,3 +787,21 @@ channel_acc_size = QPS(down - up) * time Average cost of batch predictor: avg_batch_cost = (N * pre + mid + post) / N ``` + +### 6.5 Optimization ideas +According to the long time consuming in stages below, different optimization methods are adopted. +- OP Inference stage(mid-process): + - Increase `concurrency` + - Turn on `auto-batching`(Ensure that the shapes of multiple requests are consistent) + - Use `mini-batch`, If the shape of data is very large. + - Turn on TensorRT for GPU + - Turn on MKLDNN for CPU + - Turn on low precison inference +- OP preprocess or postprocess stage: + - Increase `concurrency` + - Optimize processing logic +- In/Out stage(channel accumulation > 5): + - Check the size and delay of the data passed by the channel + - Optimize the channel to transmit data, do not transmit data or compress it before passing it in + - Increase `concurrency` + - Decrease `concurrency` upstreams. diff --git a/doc/PIPELINE_SERVING_CN.md b/doc/PIPELINE_SERVING_CN.md index e9eb74f8..a9510b84 100644 --- a/doc/PIPELINE_SERVING_CN.md +++ b/doc/PIPELINE_SERVING_CN.md @@ -2,12 +2,20 @@ (简体中文|[English](PIPELINE_SERVING.md)) -Paddle Serving 通常用于单模型的一键部署,但端到端的深度学习模型当前还不能解决所有问题,多个深度学习模型配合起来使用还是解决现实问题的常规手段。 +- [架构设计](PIPELINE_SERVING_CN.md#1.架构设计) +- [详细设计](PIPELINE_SERVING_CN.md#2.详细设计) +- [典型示例](PIPELINE_SERVING_CN.md#3.典型示例) +- [高阶用法](PIPELINE_SERVING_CN.md#4.高阶用法) +- [日志追踪](PIPELINE_SERVING_CN.md#5.日志追踪) +- [性能分析与优化](PIPELINE_SERVING_CN.md#6.性能优化) -Paddle Serving 提供了用户友好的多模型组合服务编程框架,Pipeline Serving,旨在降低编程门槛,提高资源使用率(尤其是GPU设备),提升整体的预估效率。 +在许多深度学习框架中,Serving通常用于单模型的一键部署。在AI工业大生产的背景下,端到端的深度学习模型当前还不能解决所有问题,多个深度学习模型配合起来使用还是解决现实问题的常规手段。但多模型应用设计复杂,为了降低开发和维护难度,同时保证服务的可用性,通常会采用串行或简单的并行方式,但一般这种情况下吞吐量仅达到可用状态,而且GPU利用率偏低。 -## ★ 整体架构设计 +Paddle Serving提供了用户友好的多模型组合服务编程框架,Pipeline Serving,旨在降低编程门槛,提高资源使用率(尤其是GPU设备),提升整体的预估效率。 + + +## 1.架构设计 Server端基于RPC服务层图执行引擎构建,两者的关系如下图所示。 @@ -17,11 +25,11 @@ Server端基于RPC服务层图执行引擎构建,两者的关 -### 1. RPC服务层 +### 1.1 RPC服务层 为满足用户不同的使用需求,RPC服务层同时启动1个Web服务器和1个RPC服务器,可同时处理RESTful API、gRPC 2种类型请求。gPRC gateway接收RESTful API请求通过反向代理服务器将请求转发给gRPC Service;gRPC请求由gRPC service接收,所以,2种类型的请求统一由gRPC Service处理,确保处理逻辑一致。 -#### 1.1 proto的输入输出结构 +#### 1.1.1 proto的输入输出结构 gRPC服务和gRPC gateway服务统一用service.proto生成。 @@ -42,12 +50,12 @@ message Response { repeated string value = 4; }; ``` -Request中`key`与`value`是配对的string数组。 `name`与`method`对应RESTful API的URL://{ip}:{port}/{name}/{method}。`logid`和`clientip`便于用户串联服务级请求和自定义策略。 +Request中`key`与`value`是配对的string数组用于接收数据。 `name`与`method`对应RESTful API的URL://{ip}:{port}/{name}/{method}。`logid`和`clientip`便于用户串联服务级请求和自定义策略。 Response中`err_no`和`err_msg`表达处理结果的正确性和错误信息,`key`和`value`为返回结果。 -### 2. 图执行引擎 +### 1.2 图执行引擎 图执行引擎由 OP 和 Channel 构成,相连接的 OP 之间会共享一个 Channel。 @@ -61,7 +69,7 @@ Response中`err_no`和`err_msg`表达处理结果的正确性和错误信息,` -#### 2.1 OP的设计 +#### 1.2.1 OP的设计 - 单个 OP 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel - 单个 OP 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现 @@ -69,7 +77,7 @@ Response中`err_no`和`err_msg`表达处理结果的正确性和错误信息,` - 单个 OP 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching - OP 可以由线程或进程启动 -#### 2.2 Channel的设计 +#### 1.2.2 Channel的设计 - Channel 是 OP 之间共享数据的数据结构,负责共享数据或者共享数据状态信息 - Channel 可以支持多个OP的输出存储在同一个 Channel,同一个 Channel 中的数据可以被多个 OP 使用 @@ -79,28 +87,29 @@ Response中`err_no`和`err_msg`表达处理结果的正确性和错误信息,` -#### 2.3 预测类型的设计 +#### 1.2.3 预测类型的设计 -- OP的预测类型(client_type)有3种类型,brpc、grpc和local_predictor - - brpc: 使用bRPC Client与远端的Serving服务网络交互,性能优于grpc - - grpc: 使用gRPC Client与远端的Serving服务网络交互,支持跨平台部署 - - local_predictor: 本地服务内加载模型并完成预测,不需要与网络交互。支持多卡部署,和TensorRT高性能预测。 +- OP的预测类型(client_type)有3种类型,brpc、grpc和local_predictor,各自特点如下: + - brpc: 使用bRPC Client与远端的Serving服务网络交互,性能优于grpc,但仅支持Linux平台 + - grpc: 使用gRPC Client与远端的Serving服务网络交互,支持跨操作系统部署,性能弱于bprc + - local_predictor: 本地服务内加载模型并完成预测,不需要网络交互,延时更低,支持Linux部署。支持本机多卡部署和TensorRT实现高性能预测。 - 选型: - - 延时(越少越好): local_predict < brpc <= grpc + - 延时(越少越好): local_predictor < brpc <= grpc + - 操作系统:grpc > local_precitor >= brpc - 微服务: brpc或grpc模型分拆成独立服务,简化开发和部署复杂度,提升资源利用率 -#### 2.4 极端情况的考虑 +#### 1.2.4 极端情况的考虑 -- 请求超时的处理 +- `请求超时的处理` 整个图执行引擎每一步都有可能发生超时,图执行引擎里面通过设置 timeout 值来控制,任何环节超时的请求都会返回超时响应。 -- Channel 存储的数据过大 +- `Channel 存储的数据过大` Channel 中可能会存储过大的数据,导致拷贝等耗时过高,图执行引擎里面可以通过将 OP 计算结果数据存储到外存,如高速的内存 KV 系统 -- Channel 设计中的 input buffer 和 output buffer 是否会无限增加 +- `Channel 设计中的 input buffer 和 output buffer 是否会无限增加` - 不会。整个图执行引擎的输入会放到一个 Channel 的 internal queue 里面,直接作为整个服务的流量控制缓冲队列 - 对于 input buffer,根据计算量的情况调整 OP1 和 OP2 的并发数,使得 input buffer 来自各个输入 OP 的数量相对平衡(input buffer 的长度取决于 internal queue 中每个 item 完全 ready 的速度) @@ -109,9 +118,30 @@ Response中`err_no`和`err_msg`表达处理结果的正确性和错误信息,` *** -## ★ 详细设计 -### 1. 普通 OP 定义 +## 2.详细设计 + +对于Pipeline的设计实现,首先介绍PipelineServer、OP、重写OP前后处理,最后介绍特定OP(RequestOp和ResponseOp)二次开发的方法。 + +### 2.1 PipelineServer定义 + +PipelineServer包装了RPC运行层和图引擎执行,所有Pipeline服务首先要实例化PipelineServer示例,再设置2个核心接口 set_response_op、加载配置信息,最后调用run_server启动服务。代码示例如下: + +```python +server = PipelineServer() +server.set_response_op(response_op) +server.prepare_server(config_yml_path) +#server.prepare_pipeline_config(config_yml_path) +server.run_server() +``` +PipelineServer的核心接口: +- `set_response_op`,设置response_op 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。 +- `prepare_server`: 加载配置信息,并启动远端Serving服务,适用于调用远端远端推理服务 +- `prepare_pipeline_config`,仅加载配置信息,适用于local_prdict +- `run_server`,启动gRPC服务,接收请求 + + +### 2.2 OP 定义 普通 OP 作为图执行引擎中的基本单元,其构造函数如下: @@ -149,9 +179,7 @@ def __init__(name=None, - - -### 2. 普通 OP二次开发接口 +### 2.3 重写OP前后处理 OP 二次开发的目的是满足业务开发人员控制OP处理策略。 | 变量或接口 | 说明 | @@ -206,242 +234,120 @@ def init_op(self): 需要**注意**的是,在线程版 OP 中,每个 OP 只会调用一次该函数,故加载的资源必须要求是线程安全的。 -### 3. RequestOp 定义 与 二次开发接口 - -RequestOp 用于处理 Pipeline Server 接收到的 RPC 数据,处理后的数据将会被加入到图执行引擎中。其构造函数如下: - -```python -def __init__(self) -``` - -当默认的RequestOp无法满足参数解析需求时,可通过重写下面2个接口自定义请求参数解析方法。 - -| 变量或接口 | 说明 | -| :---------------------------------------: | :----------------------------------------: | -| def init_op(self) | 用于加载资源(如字典等),与普通 OP 一致。 | -| def unpack_request_package(self, request) | 处理接收到的 RPC 数据。 | - -**unpack_request_package** 的默认实现是将 RPC request 中的 key 和 value 做成字典: - -```python -def unpack_request_package(self, request): - dictdata = {} - for idx, key in enumerate(request.key): - data = request.value[idx] - try: - data = eval(data) - except Exception as e: - pass - dictdata[key] = data - return dictdata -``` - -要求返回值是一个字典类型。 - -#### 4. ResponseOp 定义 与 二次开发接口 - -ResponseOp 用于处理图执行引擎的预测结果,处理后的数据将会作为 Pipeline Server 的RPC 返回值,其构造函数如下: +### 2.4 RequestOp 定义 与 二次开发接口 -```python -def __init__(self, input_ops) -``` - -其中,`input_ops` 是图执行引擎的最后一个 OP,用户可以通过设置不同的 `input_ops` 以在不修改 OP 的拓扑关系下构造不同的 DAG。 - -当默认的 ResponseOp 无法满足结果返回格式要求时,可通过重写下面2个接口自定义返回包打包方法。 - -| 变量或接口 | 说明 | -| :------------------------------------------: | :-----------------------------------------: | -| def init_op(self) | 用于加载资源(如字典等),与普通 OP 一致。 | -| def pack_response_package(self, channeldata) | 处理图执行引擎的预测结果,作为 RPC 的返回。 | - -**pack_response_package** 的默认实现是将预测结果的字典转化为 RPC response 中的 key 和 value: +RequestOp 用于处理 Pipeline Server 接收到的 RPC 数据,处理后的数据将会被加入到图执行引擎中。其功能实现如下: ```python -def pack_response_package(self, channeldata): - resp = pipeline_service_pb2.Response() - resp.ecode = channeldata.ecode - if resp.ecode == ChannelDataEcode.OK.value: - if channeldata.datatype == ChannelDataType.CHANNEL_NPDATA.value: - feed = channeldata.parse() - np.set_printoptions(threshold=np.nan) - for name, var in feed.items(): - resp.value.append(var.__repr__()) - resp.key.append(name) - elif channeldata.datatype == ChannelDataType.DICT.value: - feed = channeldata.parse() - for name, var in feed.items(): - if not isinstance(var, str): - resp.ecode = ChannelDataEcode.TYPE_ERROR.value - resp.error_info = self._log( - "fetch var type must be str({}).".format(type(var))) - break - resp.value.append(var) - resp.key.append(name) - else: - resp.ecode = ChannelDataEcode.TYPE_ERROR.value - resp.error_info = self._log( - "Error type({}) in datatype.".format(channeldata.datatype)) - else: - resp.error_info = channeldata.error_info - return resp -``` +class RequestOp(Op): + def __init__(self): + # PipelineService.name = "@DAGExecutor" + super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[]) + # init op + try: + self.init_op() + except Exception as e: + _LOGGER.critical("Op(Request) Failed to init: {}".format(e)) + os._exit(-1) -#### 5. PipelineServer定义 + def unpack_request_package(self, request): + dict_data = {} + log_id = None + if request is None: + _LOGGER.critical("request is None") + raise ValueError("request is None") -PipelineServer 的定义比较简单,如下所示: + for idx, key in enumerate(request.key): + dict_data[key] = request.value[idx] + log_id = request.logid + _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \ + name:{}, method:{}".format(log_id, request.clientip, request.name, + request.method)) -```python -server = PipelineServer() -server.set_response_op(response_op) -server.prepare_server(config_yml_path) -server.run_server() + return dict_data, log_id, None, "" ``` -其中,`response_op` 为上面提到的 ResponseOp,PipelineServer 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。`config_yml_path` 为 PipelineServer 的配置文件,示例文件如下: - -```yaml -# gRPC端口号 -rpc_port: 18080 - -# http端口号,若该值小于或等于 0 则不开启 HTTP 服务,默认为 0 -http_port: 18071 - -# #worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG -worker_num: 1 - -# 是否使用进程版 Servicer,默认为 false -build_dag_each_worker: false - -dag: - # op资源类型, True, 为线程模型;False,为进程模型,默认为 True - is_thread_op: true +**unpack_request_package** 的默认实现是将 RPC request 中的 key 和 value 做成字典交给第一个自定义OP。当默认的RequestOp无法满足参数解析需求时,可通过重写下面2个接口自定义请求参数解析方法。 - # DAG Executor 在失败后重试次数,默认为 1,即不重试 - retry: 1 - - # 是否在 Server 端打印日志,默认为 false - use_profile: false - - # 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据 - tracer: - interval_s: 600 # 监控的时间间隔,单位为秒。当该值小于 1 时不启动监控,默认为 -1 - -op: - bow: - # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 - concurrency: 1 - - # client连接类型,brpc - client_type: brpc - - # Serving交互重试次数,默认不重试 - retry: 1 - - # Serving交互超时时间, 单位ms - timeout: 3000 - - # Serving IPs - server_endpoints: ["127.0.0.1:9393"] - - # bow模型client端配置 - client_config: "imdb_bow_client_conf/serving_client_conf.prototxt" - - # Fetch结果列表,以client_config中fetch_var的alias_name为准 - fetch_list: ["prediction"] +| 接口 | 说明 | +| :---------------------------------------: | :----------------------------------------: | +| init_op(self) | OP初始化,设置默认名称@DAGExecutor | +| unpack_request_package(self, request) | 处理接收的RPC数据 | - # 批量查询Serving的数量, 默认1。batch_size>1要设置 auto_batching_timeout,否则不足batch_size时会阻塞 - batch_size: 1 - # 批量查询超时,与batch_size配合使用 - auto_batching_timeout: 2000 -``` +### 2.5 ResponseOp 定义 与 二次开发接口 -### 6. 特殊用法 +ResponseOp 用于处理图执行引擎的预测结果,处理后的数据将会作为 Pipeline Server 的RPC 返回值,其函数实现如下,在pack_response_package中做了精简 -#### 6.1 业务自定义错误类型 -用户可根据业务场景自定义错误码,继承ProductErrCode,在Op的preprocess或postprocess中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。 ```python -class ProductErrCode(enum.Enum): - """ - ProductErrCode is a base class for recording business error code. - product developers inherit this class and extend more error codes. - """ - pass -``` - -#### 6.2 跳过OP process阶段 -preprocess返回结果列表的第二个结果是`is_skip_process=True`表示是否跳过当前OP的process阶段,直接进入postprocess处理 - -```python -def preprocess(self, input_dicts, data_id, log_id): - """ - In preprocess stage, assembling data for process stage. users can - override this function for model feed features. - Args: - input_dicts: input data to be preprocessed - data_id: inner unique id - log_id: global unique id for RTT - Return: - input_dict: data for process stage - is_skip_process: skip process stage or not, False default - prod_errcode: None default, otherwise, product errores occured. - It is handled in the same way as exception. - prod_errinfo: "" default - """ - # multiple previous Op - if len(input_dicts) != 1: - _LOGGER.critical( - self._log( - "Failed to run preprocess: this Op has multiple previous " - "inputs. Please override this func.")) +class ResponseOp(Op): + def __init__(self, input_ops): + super(ResponseOp, self).__init__( + name="@DAGExecutor", input_ops=input_ops) + # init op + try: + self.init_op() + except Exception as e: + _LOGGER.critical("Op(ResponseOp) Failed to init: {}".format( + e, exc_info=True)) os._exit(-1) - (_, input_dict), = input_dicts.items() - return input_dict, False, None, "" - + + def pack_response_package(self, channeldata): + resp = pipeline_service_pb2.Response() + error_code = channeldata.error_code + error_info = "" + ... + + # pack results + if error_code is None: + error_code = 0 + resp.err_no = error_code + resp.err_msg = error_info + + return resp ``` +**pack_response_package** 的默认实现是将预测结果的字典转化为 RPC response 中的 key 和 value。当默认的 ResponseOp 无法满足结果返回格式要求时,可通过重写下面2个接口自定义返回包打包方法。 -#### 6.3 自定义proto Request 和 Response结构 - -当默认proto结构不满足业务需求时,同时下面2个文件的proto的Request和Response message结构,保持一致。 - -> pipeline/gateway/proto/gateway.proto - -> pipeline/proto/pipeline_service.proto - -再重新编译Serving Server。 - - -#### 6.4 自定义URL -grpc gateway处理post请求,默认`method`是`prediction`,例如:127.0.0.1:8080/ocr/prediction。用户可自定义name和method,对于已有url的服务可无缝切换 - -```proto -service PipelineService { - rpc inference(Request) returns (Response) { - option (google.api.http) = { - post : "/{name=*}/{method=*}" - body : "*" - }; - } -}; -``` +| 接口 | 说明 | +| :------------------------------------------: | :-----------------------------------------: | +| init_op(self) | OP初始化,设置默认名称@DAGExecutor | +| pack_response_package(self, channeldata) | 处理接收的RPC数据 | *** -## ★ 典型示例 - -所有Pipeline示例在[examples/pipeline/](../python/examples/pipeline) 目录下。 - -这里通过搭建简单的 imdb model ensemble 例子来展示如何使用 Pipeline Serving,相关代码在 `python/examples/pipeline/imdb_model_ensemble` 文件夹下可以找到,例子中的 Server 端结构如下图所示: - - +## 3.典型示例 +所有Pipeline示例在[examples/pipeline/](../python/examples/pipeline) 目录下,目前有7种类型模型示例: +- [Detection](../python/examples/pipeline/PaddleDetection) +- [bert](../python/examples/pipeline/bert) +- [imagenet](../python/examples/pipeline/imagenet) +- [imdb_model_ensemble](../python/examples/pipeline/imdb_model_ensemble) +- [ocr](../python/examples/pipeline/ocr) +- [simple_web_service](../python/examples/pipeline/simple_web_service) +以 imdb_model_ensemble 为例来展示如何使用 Pipeline Serving,相关代码在 `python/examples/pipeline/imdb_model_ensemble` 文件夹下可以找到,例子中的 Server 端结构如下图所示:
- -### 1. 获取模型文件并启动 Paddle Serving Service +### 3.1 Pipeline部署需要的文件 +需要五类文件,其中模型文件、配置文件、服务端代码是构建Pipeline服务必备的三个文件。测试客户端和测试数据集为测试准备 +- 模型文件 +- 配置文件(config.yml) + - 服务级别:服务端口、gRPC线程数、服务超时、重试次数等 + - DAG级别:资源类型、开启Trace、性能profile + - OP级别:模型路径、并发度、推理方式、计算硬件、推理超时、自动批量等 +- 服务端(web_server.py) + - 服务级别:定义服务名称、读取配置文件、启动服务 + - DAG级别:指定多OP之间的拓扑关系 + - OP级别:重写OP前后处理 +- 测试客户端 + - 正确性校验 + - 压力测试 +- 测试数据集 + - 图片、文本、语音等 + + +### 3.2 获取模型文件 ```shell cd python/examples/pipeline/imdb_model_ensemble @@ -452,15 +358,45 @@ python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow. PipelineServing 也支持本地自动启动 PaddleServingService,请参考 `python/examples/pipeline/ocr` 下的例子。 -### 2. 创建config.yaml -由于config.yaml配置信息量很多,这里仅展示OP部分配置,全量信息参考`python/examples/pipeline/imdb_model_ensemble/config.yaml` +### 3.3 创建config.yaml +本示例采用了brpc的client连接类型,还可以选择grpc或local_predictor。 ```yaml +#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1 +rpc_port: 18070 + +#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port +http_port: 18071 + +#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG +#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num +worker_num: 4 + +#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG +build_dag_each_worker: False + +dag: + #op资源类型, True, 为线程模型;False,为进程模型 + is_thread_op: True + + #重试次数 + retry: 1 + + #使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用 + use_profile: False + + #channel的最大长度,默认为0 + channel_size: 0 + + #tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据 + tracer: + #每次trace的时间间隔,单位秒/s + interval_s: 10 op: bow: # 并发数,is_thread_op=True时,为线程并发;否则为进程并发 concurrency: 1 - # client连接类型,brpc + # client连接类型,brpc, grpc和local_predictor client_type: brpc # Serving交互重试次数,默认不重试 @@ -479,7 +415,7 @@ op: fetch_list: ["prediction"] # 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞 - batch_size: 1 + batch_size: 2 # 批量查询超时,与batch_size配合使用 auto_batching_timeout: 2000 @@ -506,7 +442,7 @@ op: fetch_list: ["prediction"] # 批量查询Serving的数量, 默认1。 - batch_size: 1 + batch_size: 2 # 批量查询超时,与batch_size配合使用 auto_batching_timeout: 2000 @@ -521,13 +457,13 @@ op: timeout: 3000 # 批量查询Serving的数量, 默认1。 - batch_size: 1 + batch_size: 2 # 批量查询超时,与batch_size配合使用 auto_batching_timeout: 2000 ``` -### 3. 启动 PipelineServer +### 3.4 实现Server并启动服务 代码示例中,重点留意3个自定义Op的proprocess、postprocess处理,以及Combin Op初始化列表input_ops=[bow_op, cnn_op],设置Combin Op的前置OP列表。 @@ -597,7 +533,7 @@ server.prepare_server('config.yml') server.run_server() ``` -### 4. 通过 PipelineClient 执行预测 +### 3.5 推理测试 ```python from paddle_serving_client.pipeline import PipelineClient @@ -625,14 +561,170 @@ for f in futures: *** -## ★ 性能分析 +## 4.高阶用法 +### 4.1 业务自定义错误类型 +用户可根据业务场景自定义错误码,继承ProductErrCode,在Op的preprocess或postprocess中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。 +```python +class ProductErrCode(enum.Enum): + """ + ProductErrCode is a base class for recording business error code. + product developers inherit this class and extend more error codes. + """ + pass +``` -### 1. 如何通过 Timeline 工具进行优化 +### 4.2 跳过OP process阶段 +preprocess返回结果列表的第二个结果是`is_skip_process=True`表示是否跳过当前OP的process阶段,直接进入postprocess处理 + +```python +def preprocess(self, input_dicts, data_id, log_id): + """ + In preprocess stage, assembling data for process stage. users can + override this function for model feed features. + Args: + input_dicts: input data to be preprocessed + data_id: inner unique id + log_id: global unique id for RTT + Return: + input_dict: data for process stage + is_skip_process: skip process stage or not, False default + prod_errcode: None default, otherwise, product errores occured. + It is handled in the same way as exception. + prod_errinfo: "" default + """ + # multiple previous Op + if len(input_dicts) != 1: + _LOGGER.critical( + self._log( + "Failed to run preprocess: this Op has multiple previous " + "inputs. Please override this func.")) + os._exit(-1) + (_, input_dict), = input_dicts.items() + return input_dict, False, None, "" + +``` + +### 4.3 自定义proto Request 和 Response结构 + +当默认proto结构不满足业务需求时,同时下面2个文件的proto的Request和Response message结构,保持一致。 + +> pipeline/gateway/proto/gateway.proto + +> pipeline/proto/pipeline_service.proto + +再重新编译Serving Server。 + + +### 4.4 自定义URL +grpc gateway处理post请求,默认`method`是`prediction`,例如:127.0.0.1:8080/ocr/prediction。用户可自定义name和method,对于已有url的服务可无缝切换 + +```proto +service PipelineService { + rpc inference(Request) returns (Response) { + option (google.api.http) = { + post : "/{name=*}/{method=*}" + body : "*" + }; + } +}; +``` + +### 4.5 批量推理 +Pipeline支持批量推理,通过增大batch size可以提高GPU利用率。Pipeline Pipeline Serving支持3种batch形式以及适用的场景如下: +- 场景1:一个推理请求包含批量数据(batch) + - 单条数据定长,批量变长,数据转成BCHW格式 + - 单条数据变长,前处理中将单条数据做padding转成定长 +- 场景2:一个推理请求的批量数据拆分成多个小块推理(mini-batch) + - 由于padding会按最长对齐,当一批数据中有个"极大"尺寸数据时会导致推理变慢 + - 指定一个块大小,从而缩小"极大"尺寸数据的作用范围 +- 场景3:合并多个请求数据批量推理(auto-batching) + - 推理耗时明显长于前后处理,合并多个请求数据推理一次会提高吞吐和GPU利用率 + - 要求多个request的数据的shape一致 + +| 接口 | 说明 | +| :------------------------------------------: | :-----------------------------------------: | +| batch | client发送批量数据,client.predict的batch=True | +| mini-batch | preprocess按list类型返回,参考OCR示例 RecOp的preprocess| +| auto-batching | config.yml中OP级别设置batch_size和auto_batching_timeout | + + +### 4.7 单机多卡 +单机多卡推理,M个OP进程与N个GPU卡绑定,在config.yml中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices是GPU卡ID。绑定方法是进程启动时遍历GPU卡ID,例如启动7个OP进程,config.yml设置devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。 +- 进程ID: 0 绑定 GPU 卡0 +- 进程ID: 1 绑定 GPU 卡1 +- 进程ID: 2 绑定 GPU 卡2 +- 进程ID: 3 绑定 GPU 卡0 +- 进程ID: 4 绑定 GPU 卡1 +- 进程ID: 5 绑定 GPU 卡2 +- 进程ID: 6 绑定 GPU 卡0 + +config.yml中硬件配置: +``` +#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 +devices: "0,1,2" +``` + +### 4.8 异构硬件 +Pipeline除了支持CPU、GPU之外,还支持在多种异构硬件部署。在config.yml中由device_type和devices。优先使用device_type指定类型,当空缺时根据devices判断。device_type描述如下: +- CPU(Intel) : 0 +- GPU : 1 +- TensorRT : 2 +- CPU(Arm) : 3 +- XPU : 4 + +config.yml中硬件配置: +``` +#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu +device_type: 0 + +#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡 +devices: "" # "0,1" +``` + +### 4.9 低精度推理 +Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示: + +- CPU + - fp32(default) + - fp16 + - bf16(mkldnn) +- GPU + - fp32(default) + - fp16 + - int8 +- Tensor RT + - fp32(default) + - fp16 + - int8 + +参考[simple_web_service](../python/examples/pipeline/simple_web_service)示例 +*** + +## 5.日志追踪 +Pipeline服务日志在当前目录的PipelineServingLogs目录下,有3种类型日志,分别是pipeline.log日志、pipeline.log.wf日志、pipeline.tracer日志。 +- pipeline.log日志 : 记录 debug & info日志信息 +- pipeline.log.wf日志 : 记录 warning & error日志 +- pipeline.tracer日志 : 统计各个阶段耗时、channel堆积信息 + +在服务发生异常时,错误信息会记录在pipeline.log.wf日志中。打印tracer日志要求在config.yml的DAG属性中添加tracer配置。 + +### 5.1 log唯一标识 +Pipeline中有2种id用以串联请求,分别时data_id和log_id,二者区别如下: +- data_id : Pipeline框架生成的自增ID,标记请求唯一性标识 +- log_id : 上游模块传入的标识,跟踪多个服务间串联关系,由于用户可不传入或不保证唯一性,因此不能作为唯一性标识 + +通常,Pipeline框架打印的日志会同时带上data_id和log_id。开启auto-batching后,会使用批量中的第一个data_id标记batch整体,同时框架会在一条日志中打印批量中所有data_id。 + + +## 6.性能分析与优化 + + +### 6.1 如何通过 Timeline 工具进行优化 为了更好地对性能进行优化,PipelineServing 提供了 Timeline 工具,对整个服务的各个阶段时间进行打点。 -### 2. 在 Server 端输出 Profile 信息 +### 6.2 在 Server 端输出 Profile 信息 Server 端用 yaml 中的 `use_profile` 字段进行控制: @@ -659,13 +751,14 @@ if __name__ == "__main__": 具体操作:打开 chrome 浏览器,在地址栏输入 `chrome://tracing/` ,跳转至 tracing 页面,点击 load 按钮,打开保存的 `trace` 文件,即可将预测服务的各阶段时间信息可视化。 -### 3. 在 Client 端输出 Profile 信息 +### 6.3 在 Client 端输出 Profile 信息 Client 端在 `predict` 接口设置 `profile=True`,即可开启 Profile 功能。 开启该功能后,Client 端在预测的过程中会将该次预测对应的日志信息打印到标准输出,后续分析处理同 Server。 -### 4. 分析方法 +### 6.4 分析方法 +根据pipeline.tracer日志中的各个阶段耗时,按以下公式逐步分析出主要耗时在哪个阶段。 ``` 单OP耗时: op_cost = process(pre + mid + post) @@ -685,3 +778,20 @@ channel_acc_size = QPS(down - up) * time 批量预测平均耗时: avg_batch_cost = (N * pre + mid + post) / N ``` + +### 6.5 优化思路 +根据长耗时在不同阶段,采用不同的优化方法. +- OP推理阶段(mid-process): + - 增加OP并发度 + - 开启auto-batching(前提是多个请求的shape一致) + - 若批量数据中某条数据的shape很大,padding很大导致推理很慢,可使用mini-batch + - 开启TensorRT/MKL-DNN优化 + - 开启低精度推理 +- OP前处理阶段(pre-process): + - 增加OP并发度 + - 优化前处理逻辑 +- in/out耗时长(channel堆积>5) + - 检查channel传递的数据大小和延迟 + - 优化传入数据,不传递数据或压缩后再传入 + - 增加OP并发度 + - 减少上游OP并发度 diff --git a/python/examples/pipeline/imagenet/config.yml b/python/examples/pipeline/imagenet/config.yml index 6e48018f..598db3d0 100644 --- a/python/examples/pipeline/imagenet/config.yml +++ b/python/examples/pipeline/imagenet/config.yml @@ -31,3 +31,11 @@ op: #Fetch结果列表,以client_config中fetch_var的alias_name为准 fetch_list: ["score"] + + #precsion, 预测精度,降低预测精度可提升推理速度 + #GPU 支持: "fp32"(default), "fp16", "int8"; + #CPU 支持: "fp32"(default), "fp16", "bf16"(mkldnn); 不支持: "int8" + precision: "fp16" + + #ir_optim开关 + ir_optim: False diff --git a/python/examples/pipeline/simple_web_service/config.yml b/python/examples/pipeline/simple_web_service/config.yml index 52e67409..837687db 100644 --- a/python/examples/pipeline/simple_web_service/config.yml +++ b/python/examples/pipeline/simple_web_service/config.yml @@ -30,4 +30,12 @@ op: client_type: local_predictor #Fetch结果列表,以client_config中fetch_var的alias_name为准 - fetch_list: ["price"] + fetch_list: ["price"] + + #precsion, 预测精度,降低预测精度可提升预测速度 + #GPU 支持: "fp32"(default), "fp16", "int8"; + #CPU 支持: "fp32"(default), "fp16", "bf16"(mkldnn); 不支持: "int8" + precision: "FP16" + + #ir_optim开关 + ir_optim: False diff --git a/python/paddle_serving_app/local_predict.py b/python/paddle_serving_app/local_predict.py index c31c818e..945e891a 100644 --- a/python/paddle_serving_app/local_predict.py +++ b/python/paddle_serving_app/local_predict.py @@ -119,8 +119,11 @@ class LocalPredictor(object): self.fetch_names_to_type_[var.alias_name] = var.fetch_type precision_type = paddle_infer.PrecisionType.Float32 - if precision.lower() in precision_map: + if precision is not None and precision.lower() in precision_map: precision_type = precision_map[precision.lower()] + else: + logger.warning("precision error!!! Please check precision:{}". + format(precision)) if use_profile: config.enable_profile() if mem_optim: @@ -156,8 +159,11 @@ class LocalPredictor(object): if not use_gpu and not use_lite: if precision_type == paddle_infer.PrecisionType.Int8: - config.enable_quantizer() - if precision.lower() == "bf16": + logger.warning( + "PRECISION INT8 is not supported in CPU right now! Please use fp16 or bf16." + ) + #config.enable_quantizer() + if precision is not None and precision.lower() == "bf16": config.enable_mkldnn_bfloat16() self.predictor = paddle_infer.create_predictor(config) diff --git a/python/pipeline/local_service_handler.py b/python/pipeline/local_service_handler.py index 7f61b739..a15a3eeb 100644 --- a/python/pipeline/local_service_handler.py +++ b/python/pipeline/local_service_handler.py @@ -44,7 +44,8 @@ class LocalServiceHandler(object): mem_optim=True, ir_optim=False, available_port_generator=None, - use_profile=False): + use_profile=False, + precision="fp32"): """ Initialization of localservicehandler @@ -62,6 +63,7 @@ class LocalServiceHandler(object): ir_optim: use calculation chart optimization, False default. available_port_generator: generate available ports use_profile: use profiling, False default. + precision: inference precesion, e.g. "fp32", "fp16", "int8" Returns: None @@ -137,16 +139,17 @@ class LocalServiceHandler(object): self._server_pros = [] self._use_profile = use_profile self._fetch_names = fetch_names + self._precision = precision _LOGGER.info( "Models({}) will be launched by device {}. use_gpu:{}, " "use_trt:{}, use_lite:{}, use_xpu:{}, device_type:{}, devices:{}, " "mem_optim:{}, ir_optim:{}, use_profile:{}, thread_num:{}, " - "client_type:{}, fetch_names:{}".format( + "client_type:{}, fetch_names:{} precision:{}".format( model_config, self._device_name, self._use_gpu, self._use_trt, - self._use_lite, self._use_xpu, device_type, self._devices, - self._mem_optim, self._ir_optim, self._use_profile, - self._thread_num, self._client_type, self._fetch_names)) + self._use_lite, self._use_xpu, device_type, self._devices, self. + _mem_optim, self._ir_optim, self._use_profile, self._thread_num, + self._client_type, self._fetch_names, self._precision)) def get_fetch_list(self): return self._fetch_names @@ -197,14 +200,15 @@ class LocalServiceHandler(object): ir_optim=self._ir_optim, use_trt=self._use_trt, use_lite=self._use_lite, - use_xpu=self._use_xpu) + use_xpu=self._use_xpu, + precision=self._precision) return self._local_predictor_client def get_client_config(self): return os.path.join(self._model_config, "serving_server_conf.prototxt") def _prepare_one_server(self, workdir, port, gpuid, thread_num, mem_optim, - ir_optim): + ir_optim, precision): """ According to self._device_name, generating one Cpu/Gpu/Arm Server, and setting the model config amd startup params. @@ -216,6 +220,7 @@ class LocalServiceHandler(object): thread_num: thread num mem_optim: use memory/graphics memory optimization ir_optim: use calculation chart optimization + precision: inference precison, e.g."fp32", "fp16", "int8" Returns: server: CpuServer/GpuServer @@ -256,6 +261,7 @@ class LocalServiceHandler(object): server.set_num_threads(thread_num) server.set_memory_optimize(mem_optim) server.set_ir_optimize(ir_optim) + server.set_precision(precision) server.load_model_config(self._model_config) server.prepare_server( @@ -292,7 +298,8 @@ class LocalServiceHandler(object): device_id, thread_num=self._thread_num, mem_optim=self._mem_optim, - ir_optim=self._ir_optim)) + ir_optim=self._ir_optim, + precision=self._precision)) def start_server(self): """ diff --git a/python/pipeline/operator.py b/python/pipeline/operator.py index 6eb57a32..821daee6 100644 --- a/python/pipeline/operator.py +++ b/python/pipeline/operator.py @@ -138,6 +138,7 @@ class Op(object): self.devices = "" self.mem_optim = False self.ir_optim = False + self.precision = "fp32" if self._server_endpoints is None: server_endpoints = conf.get("server_endpoints", []) if len(server_endpoints) != 0: @@ -159,6 +160,7 @@ class Op(object): self.mem_optim = local_service_conf.get("mem_optim") self.ir_optim = local_service_conf.get("ir_optim") self._fetch_names = local_service_conf.get("fetch_list") + self.precision = local_service_conf.get("precision") if self.model_config is None: self.with_serving = False else: @@ -173,7 +175,8 @@ class Op(object): device_type=self.device_type, devices=self.devices, mem_optim=self.mem_optim, - ir_optim=self.ir_optim) + ir_optim=self.ir_optim, + precision=self.precision) service_handler.prepare_server() # get fetch_list serivce_ports = service_handler.get_port_list() self._server_endpoints = [ @@ -195,7 +198,8 @@ class Op(object): devices=self.devices, fetch_names=self._fetch_names, mem_optim=self.mem_optim, - ir_optim=self.ir_optim) + ir_optim=self.ir_optim, + precision=self.precision) if self._client_config is None: self._client_config = service_handler.get_client_config( ) @@ -560,7 +564,7 @@ class Op(object): self._get_output_channels(), False, trace_buffer, self.model_config, self.workdir, self.thread_num, self.device_type, self.devices, self.mem_optim, - self.ir_optim)) + self.ir_optim, self.precision)) p.daemon = True p.start() process.append(p) @@ -594,7 +598,7 @@ class Op(object): self._get_output_channels(), True, trace_buffer, self.model_config, self.workdir, self.thread_num, self.device_type, self.devices, self.mem_optim, - self.ir_optim)) + self.ir_optim, self.precision)) # When a process exits, it attempts to terminate # all of its daemonic child processes. t.daemon = True @@ -1064,7 +1068,7 @@ class Op(object): def _run(self, concurrency_idx, input_channel, output_channels, is_thread_op, trace_buffer, model_config, workdir, thread_num, - device_type, devices, mem_optim, ir_optim): + device_type, devices, mem_optim, ir_optim, precision): """ _run() is the entry function of OP process / thread model.When client type is local_predictor in process mode, the CUDA environment needs to @@ -1085,7 +1089,8 @@ class Op(object): device_type: support multiple devices devices: gpu id list[gpu], "" default[cpu] mem_optim: use memory/graphics memory optimization, True default. - ir_optim: use calculation chart optimization, False default. + ir_optim: use calculation chart optimization, False default. + precision: inference precision, e.g. "fp32", "fp16", "int8" Returns: None @@ -1104,7 +1109,8 @@ class Op(object): device_type=device_type, devices=devices, mem_optim=mem_optim, - ir_optim=ir_optim) + ir_optim=ir_optim, + precision=precision) _LOGGER.info("Init cuda env in process {}".format( concurrency_idx)) -- GitLab