PIPELINE_SERVING.md 37.3 KB
Newer Older
B
barrierye 已提交
1 2 3 4
# Pipeline Serving

([简体中文](PIPELINE_SERVING_CN.md)|English)

5 6 7 8 9 10
- [Architecture Design](PIPELINE_SERVING.md#1.Architecture_Design)
- [Detailed Design](PIPELINE_SERVING.md#2.Detailed_Design)
- [Classic Examples](PIPELINE_SERVING.md#3.Classic_Examples)
- [Advanced Usages](PIPELINE_SERVING.md#4.Advanced_Usages)
- [Log Tracing](PIPELINE_SERVING.md#5.Log_Tracing)
- [Performance Analysis And Optimization](PIPELINE_SERVING.md#6.Performance_analysis_and_optimization)
B
barrierye 已提交
11

12
In many deep learning frameworks,  Serving is usually used for the deployment of single model.but in the context of AI industrial, the end-to-end deep learning model can not solve all the problems at present. Usually, it is necessary to use multiple deep learning models to solve practical problems.However, the design of multi-model applications is complicated. In order to reduce the difficulty of development and maintenance, and to ensure the availability of services, serial or simple parallel methods are usually used. In general, the throughput only reaches the usable state and the GPU utilization rate is low.
B
barrierye 已提交
13 14 15

Paddle Serving provides a user-friendly programming framework for multi-model composite services, Pipeline Serving, which aims to reduce the threshold of programming, improve resource utilization (especially GPU), and improve the prediction efficiency.

16
## 1.Architecture Design
B
barrierye 已提交
17

18
The Server side is built based on <b>RPC Service</b> and <b>graph execution engine</b>. The relationship between them is shown in the following figure.
B
barrierye 已提交
19 20 21 22

<center>
<img src='pipeline_serving-image1.png' height = "250" align="middle"/>
</center>
23

24
### 1.1 RPC Service
25 26 27

In order to meet the needs of different users, the RPC service starts one Web server and one RPC server at the same time, and can process 2 types of requests, RESTful API and gRPC.The gPRC gateway receives RESTful API requests and forwards requests to the gRPC server through the reverse proxy server; gRPC requests are received by the gRPC server, so the two types of requests are processed by the gRPC Service in a unified manner to ensure that the processing logic is consistent.

28
#### <b>1.1.1 Request and Respose of proto</b>
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53

gRPC service and gRPC gateway service are generated with service.proto.

```proto
message Request {
  repeated string key = 1;  
  repeated string value = 2;
  optional string name = 3;
  optional string method = 4;
  optional int64 logid = 5;
  optional string clientip = 6;
};

message Response {
  optional int32 err_no = 1;
  optional string err_msg = 2;
  repeated string key = 3;
  repeated string value = 4;
};
```

The `key` and `value` in the Request are paired string arrays. The `name` and `method` correspond to the URL of the RESTful API://{ip}:{port}/{name}/{method}.The `logid` and `clientip` are convenient for users to connect service-level requests and customize strategies.

In Response, `err_no` and `err_msg` express the correctness and error information of the processing result, and `key` and `value` are the returned results.

54
### 1.2 Graph Execution Engine
B
barrierye 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67

The graph execution engine consists of OPs and Channels, and the connected OPs share one Channel.

- Channel can be understood as a buffer queue. Each OP accepts only one Channel input and multiply Channel outputs (each output is the same); a Channel can contain outputs from multiple OPs, and data from the same Channel can be used as input for multiple OPs.
- Users only need to define relationships between OPs. Graph engine will analyze the dependencies of the entire graph and declaring Channels at the compile time.
- After Request data enters the graph execution engine service, the graph engine will generator an Request ID, and Reponse is returned through corresponding Request ID.
- For cases where large data needs to be transferred between OPs, consider RAM DB external memory for global storage and data transfer by passing index keys in Channel.

<center>
<img src='pipeline_serving-image2.png' height = "300" align="middle"/>
</center>


68
#### <b>1.2.1 OP Design</b>
B
barrierye 已提交
69 70 71 72

- The default function of a single OP is to access a single Paddle Serving Service based on the input Channel data and put the result into the output Channel.
- OP supports user customization, including preprocess, process, postprocess functions that can be inherited and implemented by the user.
- OP can set the number of concurrencies to increase the number of concurrencies processed.
B
barrierye 已提交
73
- OP can obtain data from multiple different RPC requests for Auto-Batching.
B
barrierye 已提交
74 75
- OP can be started by a thread or process.

76
#### <b>1.2.2 Channel Design</b>
B
barrierye 已提交
77 78 79 80 81 82 83 84 85 86

- Channel is the data structure for sharing data between OPs, responsible for sharing data or sharing data status information.
- Outputs from multiple OPs can be stored in the same Channel, and data from the same Channel can be used by multiple OPs.
- The following illustration shows the design of Channel in the graph execution engine, using input buffer and output buffer to align data between multiple OP inputs and multiple OP outputs, with a queue in the middle to buffer.

<center>
<img src='pipeline_serving-image3.png' height = "500" align="middle"/>
</center>


87
#### <b>1.2.3 client type design</b>
88 89 90 91 92 93 94 95

- Prediction type (client_type) of Op has 3 types, brpc, grpc and local_predictor
- brpc: Using bRPC Client to interact with remote Serving by network, performance is better than grpc.
  - grpc: Using gRPC Client to interact with remote Serving by network, cross-platform deployment supported.
  - local_predictor: Load the model and predict in the local service without interacting with the network. Support multi-card deployment, and TensorRT prediction.
  - Selection: 
    - Time cost(lower is better): local_predict < brpc <= grpc
    - Microservice: Split the brpc or grpc model into independent services, simplify development and deployment complexity, and improve resource utilization
B
barriery 已提交
96

97
#### <b>1.2.4 Extreme Case Consideration</b>
B
barrierye 已提交
98

99
- `Request timeout`
B
barrierye 已提交
100 101 102

  The entire graph execution engine may time out at every step. The graph execution engine controls the time out by setting `timeout` value. Requests that time out at any step will return a timeout response.

103
- `Channel stores too much data`
B
barrierye 已提交
104 105 106

  Channels may store too much data, causing copy time to be too high. Graph execution engines can store OP calculation results in external memory, such as high-speed memory KV systems.

107
- `Whether input buffers and output buffers in Channel will increase indefinitely`
B
barrierye 已提交
108 109

  - It will not increase indefinitely. The input to the entire graph execution engine is placed inside a Channel's internal queue, directly acting as a traffic control buffer queue for the entire service.
B
barrierye 已提交
110 111 112
  - For input buffer, adjust the number of concurrencies of OP1 and OP2 according to the amount of computation, so that the number of input buffers from each input OP is relatively balanced. (The length of the input buffer depends on the speed at which each item in the internal queue is ready)
  - For output buffer, you can use a similar process as input buffer, which adjusts the concurrency of OP3 and OP4 to control the buffer length of output buffer. (The length of the output buffer depends on the speed at which downstream OPs obtain data from the output buffer)
  - The amount of data in the Channel will not exceed `worker_num` of gRPC, that is, it will not exceed the thread pool size.
B
barrierye 已提交
113

114
## 2.Detailed Design
B
barrierye 已提交
115

116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
For the design and implementation of Pipeline, first introduce PipelineServer, OP, pre- and post-processing of rewriting OP, and finally introduce the secondary development method of specific OP (RequestOp and ResponseOp).

### 2.1 PipelineServer Definition

PipelineServer encapsulates the RPC runtime layer and graph engine execution. All Pipeline services must first instantiate the PipelineServer example, then set up two core steps, set response op and load configuration information, and finally call run_server to start the service. The code example is as follows:

```python
server = PipelineServer()
server.set_response_op(response_op)
server.prepare_server(config_yml_path)
#server.prepare_pipeline_config(config_yml_path)
server.run_server()
```

The core interface of PipelineServer:
- `set_response_op`: setting response_op will initialize the Channel according to the topological relationship of each OP and build a calculation graph.
- `prepare_server`: load configuration information, and start remote Serving service, suitable for calling remote remote reasoning service.
- `prepare_pipeline_config`: only load configuration information, applicable to local_prdict
- `run_server`: start gRPC service, receive request.


### 2.2 OP Definition
B
barrierye 已提交
138 139 140 141 142 143 144 145 146

As the basic unit of graph execution engine, the general OP constructor is as follows:

```python
def __init__(name=None,
             input_ops=[],
             server_endpoints=[],
             fetch_list=[],
             client_config=None,
147
             client_type=None,
B
barrierye 已提交
148 149
             concurrency=1,
             timeout=-1,
B
barrierye 已提交
150 151
             retry=1,
             batch_size=1,
152 153
             auto_batching_timeout=None,
             local_service_handler=None)
B
barrierye 已提交
154 155 156 157
```

The meaning of each parameter is as follows:

B
barrierye 已提交
158 159 160 161
|       Parameter       |                           Meaning                            |
| :-------------------: | :----------------------------------------------------------: |
|         name          | (str) String used to identify the OP type, which must be globally unique. |
|       input_ops       |     (list) A list of all previous OPs of the current Op.     |
162
|   server_endpoints    | (list) List of endpoints for remote Paddle Serving Service. If this parameter is not set,it is considered as local_precditor mode, and the configuration is read from local_service_conf |
B
barrierye 已提交
163 164
|      fetch_list       | (list) List of fetch variable names for remote Paddle Serving Service. |
|     client_config     | (str) The path of the client configuration file corresponding to the Paddle Serving Service. |
165
|     client_type       | (str)brpc, grpc or local_predictor. local_predictor does not start the Serving service, in-process prediction|
B
barrierye 已提交
166
|      concurrency      |             (int) The number of concurrent OPs.              |
167
|        timeout        | (int) The timeout time of the process operation, in ms. If the value is less than zero, no timeout is considered. |
B
barrierye 已提交
168 169
|         retry         | (int) Timeout number of retries. When the value is 1, no retries are made. |
|      batch_size       | (int) The expected batch_size of Auto-Batching, since building batches may time out, the actual batch_size may be less than the set value. |
170 171
| auto_batching_timeout | (float) Timeout for building batches of Auto-Batching (the unit is ms). When batch_size> 1, auto_batching_timeout should be set, otherwise the waiting will be blocked when the number of requests is insufficient for batch_size|
| local_service_handler | (object) local predictor handler,assigned by Op init() input parameters or created in Op init()|
B
barrierye 已提交
172

B
barriery 已提交
173

174
### 2.3 Rewrite preprocess and postprocess of OP
B
barrierye 已提交
175

B
barrierye 已提交
176 177 178 179 180 181 182
|              Interface or Variable               |                           Explain                            |
| :----------------------------------------------: | :----------------------------------------------------------: |
|        def preprocess(self, input_dicts)         | Process the data obtained from the channel, and the processed data will be used as the input of the **process** function. (This function handles a **sample**) |
| def process(self, feed_dict_list, typical_logid) | The RPC prediction process is based on the Paddle Serving Client, and the processed data will be used as the input of the **postprocess** function. (This function handles a **batch**) |
|  def postprocess(self, input_dicts, fetch_dict)  | After processing the prediction results, the processed data will be put into the subsequent Channel to be obtained by the subsequent OP. (This function handles a **sample**) |
|                def init_op(self)                 |      Used to load resources (such as word dictionary).       |
|               self.concurrency_idx               | Concurrency index of current process(not thread) (different kinds of OP are calculated separately). |
B
barrierye 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195

In a running cycle, OP will execute three operations: preprocess, process, and postprocess (when the `server_endpoints` parameter is not set, the process operation is not executed). Users can rewrite these three functions. The default implementation is as follows:

```python
def preprocess(self, input_dicts):
  # multiple previous Op
  if len(input_dicts) != 1:
    raise NotImplementedError(
      'this Op has multiple previous inputs. Please override this func.'
    
  (_, input_dict), = input_dicts.items()
  return input_dict

B
barrierye 已提交
196
def process(self, feed_dict_list, typical_logid):
B
barrierye 已提交
197
  err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
B
barrierye 已提交
198 199 200 201
  if err != 0:
    raise NotImplementedError(
      "{} Please override preprocess func.".format(err_info))
  call_result = self.client.predict(
B
barrierye 已提交
202 203 204 205 206
    feed=feed_dict_list, fetch=self._fetch_names, log_id=typical_logid)
  if isinstance(self.client, MultiLangClient):
    if call_result is None or call_result["serving_status_code"] != 0:
      return None
    call_result.pop("serving_status_code")
B
barrierye 已提交
207 208 209 210 211 212
  return call_result

def postprocess(self, input_dicts, fetch_dict):
  return fetch_dict
```

B
barrierye 已提交
213
The parameter of **preprocess** is the data `input_dicts` in the previous Channel. This variable (as a **sample**) is a dictionary with the name of the previous OP as key and the output of the corresponding OP as value.
B
barrierye 已提交
214

B
barrierye 已提交
215
The parameter of **process** is the input variable `fetch_dict_list` (a list of the return value of the preprocess function) of the Paddle Serving Client prediction interface. This variable (as a **batch**) is a list of dictionaries with feed_name as the key and the data in the ndarray format as the value. `typical_logid` is used as the logid that penetrates to PaddleServingService.
B
barrierye 已提交
216

B
barrierye 已提交
217
The parameters of **postprocess** are `input_dicts` and `fetch_dict`. `input_dicts` is consistent with the parameter of preprocess, and `fetch_dict` (as a **sample**) is a sample of the return batch of the process function (if process is not executed, this value is the return value of preprocess).
B
barrierye 已提交
218 219 220 221 222 223 224 225

Users can also rewrite the **init_op** function to load some custom resources (such as word dictionary). The default implementation is as follows:

```python
def init_op(self):
  pass
```

B
barrierye 已提交
226
It should be **noted** that in the threaded version of OP, each OP will only call this function once, so the loaded resources must be thread safe.
B
barrierye 已提交
227

228
### 2.4 RequestOp Definition and Secondary Development Interface
B
barrierye 已提交
229 230 231 232

RequestOp is used to process RPC data received by Pipeline Server, and the processed data will be added to the graph execution engine. Its constructor is as follows:

```python
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
class RequestOp(Op):
    def __init__(self):
       # PipelineService.name = "@DAGExecutor"
        super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[])
        # init op
        try:
            self.init_op()
        except Exception as e:
            _LOGGER.critical("Op(Request) Failed to init: {}".format(e))
            os._exit(-1)
    def unpack_request_package(self, request):
        dict_data = {}
        log_id = None
        if request is None:
            _LOGGER.critical("request is None")
            raise ValueError("request is None")

        for idx, key in enumerate(request.key):
            dict_data[key] = request.value[idx]
        log_id = request.logid
        _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
            name:{}, method:{}".format(log_id, request.clientip, request.name,
                                       request.method))

        return dict_data, log_id, None, ""
B
barrierye 已提交
258 259
```

260
The default implementation of **unpack_request_package** is to make the key and value in RPC request into a dictionary.When the default RequestOp cannot meet the parameter parsing requirements, you can customize the request parameter parsing method by rewriting the following two interfaces.The return value is required to be a dictionary type.
B
barrierye 已提交
261 262 263 264 265 266 267 268

|           Interface or Variable           |                           Explain                            |
| :---------------------------------------: | :----------------------------------------------------------: |
|             def init_op(self)             | It is used to load resources (such as dictionaries), and is consistent with general OP. |
| def unpack_request_package(self, request) |                  Process received RPC data.                  |



269
### 2.5 ResponseOp Definition and Secondary Development Interface
B
barrierye 已提交
270 271 272 273

ResponseOp is used to process the prediction results of the graph execution engine. The processed data will be used as the RPC return value of Pipeline Server. Its constructor is as follows:

```python
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
class RequestOp(Op):
    def __init__(self):
        # PipelineService.name = "@DAGExecutor"
        super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[])
        # init op
        try:
            self.init_op()
        except Exception as e:
            _LOGGER.critical("Op(Request) Failed to init: {}".format(e))
            os._exit(-1)
    def unpack_request_package(self, request):
        dict_data = {}
        log_id = None
        if request is None:
            _LOGGER.critical("request is None")
            raise ValueError("request is None")
B
barrierye 已提交
290

291 292 293 294 295 296
        for idx, key in enumerate(request.key):
            dict_data[key] = request.value[idx]
        log_id = request.logid
        _LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
            name:{}, method:{}".format(log_id, request.clientip, request.name,
                                       request.method))
B
barrierye 已提交
297

298 299 300 301
        return dict_data, log_id, None, ""
```

The default implementation of **pack_response_package** is to convert the dictionary of prediction results into key and value in RPC response.When the default ResponseOp cannot meet the requirements of the result return format, you can customize the return package packaging method by rewriting the following two interfaces.
B
barrierye 已提交
302 303 304 305 306 307

|            Interface or Variable             |                           Explain                            |
| :------------------------------------------: | :----------------------------------------------------------: |
|              def init_op(self)               | It is used to load resources (such as dictionaries), and is consistent with general OP. |
| def pack_response_package(self, channeldata) | Process the prediction results of the graph execution engine as the return of RPC. |

308 309 310

***

311
## 3.Classic Examples
B
barrierye 已提交
312

313 314 315 316 317 318 319 320
All examples of pipelines are in [examples/pipeline/](../python/examples/pipeline) directory, There are 7 types of model examples currently:
- [PaddleClas](../python/examples/pipeline/PaddleClas)
- [Detection](../python/examples/pipeline/PaddleDetection)  
- [bert](../python/examples/pipeline/bert)
- [imagenet](../python/examples/pipeline/imagenet)
- [imdb_model_ensemble](../python/examples/pipeline/imdb_model_ensemble)
- [ocr](../python/examples/pipeline/ocr)
- [simple_web_service](../python/examples/pipeline/simple_web_service)
T
TeslaZhao 已提交
321

B
barrierye 已提交
322 323 324 325 326 327
Here, we build a simple imdb model enable example to show how to use Pipeline Serving. The relevant code can be found in the `python/examples/pipeline/imdb_model_ensemble` folder. The Server-side structure in the example is shown in the following figure:

<center>
<img src='pipeline_serving-image4.png' height = "200" align="middle"/>
</center>

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
### 3.1 Files required for pipeline deployment

Five types of files are needed, of which model files, configuration files, and server code are the three necessary files for building a Pipeline service. Test client and test data set are prepared for testing.
- model files
- configure files(config.yml)
  - service level: Service port, thread number, service timeout, retry, etc.
  - DAG level: Resource type, enable Trace, performance profile, etc.
  - OP level: Model path, concurrency, client type, device type, automatic batching, etc.
- Server files(web_server.py)
  - service level: Define service name, read configuration file, start service, etc.
  - DAG level: Topological relationship between OPs.
  - OP level: Rewrite preprocess and postprocess of OP.
- Test client files
  - Correctness check
  - Performance testing
- Test data set
  - pictures, texts, voices, etc.
B
barrierye 已提交
345

346 347

### 3.2 Get model files
B
barrierye 已提交
348 349 350 351 352 353 354 355

```shell
cd python/examples/pipeline/imdb_model_ensemble
sh get_data.sh
python -m paddle_serving_server.serve --model imdb_cnn_model --port 9292 &> cnn.log &
python -m paddle_serving_server.serve --model imdb_bow_model --port 9393 &> bow.log &
```

B
barriery 已提交
356 357
PipelineServing also supports local automatic startup of PaddleServingService. Please refer to the example `python/examples/pipeline/ocr`.

358

359 360 361
### 3.3 Create config.yaml

This example uses the client connection type of brpc, and you can also choose grpc or local_predictor.
362 363

```yaml
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
rpc_port: 18070

#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18071

#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 4

#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker: False

dag:
    #op资源类型, True, 为线程模型;False,为进程模型
    is_thread_op: True

    #重试次数
    retry: 1

    #使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
    use_profile: False

    #channel的最大长度,默认为0
    channel_size: 0

    #tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据
    tracer:
        #每次trace的时间间隔,单位秒/s
        interval_s: 10
394 395
op:
    bow:
396
        # 并发数,is_thread_op=True时,为线程并发;否则为进程并发
397 398
        concurrency: 1

399
        # client连接类型,brpc, grpc和local_predictor
400 401
        client_type: brpc

402
        # Serving交互重试次数,默认不重试
403 404
        retry: 1

405
        # Serving交互超时时间, 单位ms
406 407 408 409 410
        timeout: 3000

        # Serving IPs
        server_endpoints: ["127.0.0.1:9393"]

411
        # bow模型client端配置
412 413
        client_config: "imdb_bow_client_conf/serving_client_conf.prototxt"

414 415 416 417 418
        # Fetch结果列表,以client_config中fetch_var的alias_name为准
        fetch_list: ["prediction"]

        # 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
        batch_size: 2
419

420
        # 批量查询超时,与batch_size配合使用
421 422
        auto_batching_timeout: 2000
    cnn:
423
        # 并发数,is_thread_op=True时,为线程并发;否则为进程并发
424 425
        concurrency: 1

426
        # client连接类型,brpc
427 428
        client_type: brpc

429
        # Serving交互重试次数,默认不重试
430 431
        retry: 1

432
        # 预测超时时间, 单位ms
433 434 435 436 437
        timeout: 3000

        # Serving IPs
        server_endpoints: ["127.0.0.1:9292"]

438
        # cnn模型client端配置
439 440
        client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt"

441
        # Fetch结果列表,以client_config中fetch_var的alias_name为准
442 443
        fetch_list: ["prediction"]
        
444 445
        # 批量查询Serving的数量, 默认1。
        batch_size: 2
446

447
        # 批量查询超时,与batch_size配合使用
448 449
        auto_batching_timeout: 2000
    combine:
450
        # 并发数,is_thread_op=True时,为线程并发;否则为进程并发
451 452
        concurrency: 1

453
        # Serving交互重试次数,默认不重试
454 455
        retry: 1

456
        # 预测超时时间, 单位ms
457 458
        timeout: 3000

459 460
        # 批量查询Serving的数量, 默认1。
        batch_size: 2
461

462
        # 批量查询超时,与batch_size配合使用
463
        auto_batching_timeout: 2000
464
```
465

466
### 3.4 Start PipelineServer
B
barrierye 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536

Run the following code

```python
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np
from paddle_serving_app.reader import IMDBDataset

class ImdbRequestOp(RequestOp):
    def init_op(self):
        self.imdb_dataset = IMDBDataset()
        self.imdb_dataset.load_resource('imdb.vocab')

    def unpack_request_package(self, request):
        dictdata = {}
        for idx, key in enumerate(request.key):
            if key != "words":
                continue
            words = request.value[idx]
            word_ids, _ = self.imdb_dataset.get_words_and_label(words)
            dictdata[key] = np.array(word_ids)
        return dictdata


class CombineOp(Op):
    def preprocess(self, input_data):
        combined_prediction = 0
        for op_name, data in input_data.items():
            _LOGGER.info("{}: {}".format(op_name, data["prediction"]))
            combined_prediction += data["prediction"]
        data = {"prediction": combined_prediction / 2}
        return data


read_op = ImdbRequestOp()
bow_op = Op(name="bow",
            input_ops=[read_op],
            server_endpoints=["127.0.0.1:9393"],
            fetch_list=["prediction"],
            client_config="imdb_bow_client_conf/serving_client_conf.prototxt",
            concurrency=1,
            timeout=-1,
            retry=1)
cnn_op = Op(name="cnn",
            input_ops=[read_op],
            server_endpoints=["127.0.0.1:9292"],
            fetch_list=["prediction"],
            client_config="imdb_cnn_client_conf/serving_client_conf.prototxt",
            concurrency=1,
            timeout=-1,
            retry=1)
combine_op = CombineOp(
    name="combine",
    input_ops=[bow_op, cnn_op],
    concurrency=5,
    timeout=-1,
    retry=1)

# use default ResponseOp implementation
response_op = ResponseOp(input_ops=[combine_op])

server = PipelineServer()
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
```

537
### 3.5 Perform prediction through PipelineClient
B
barrierye 已提交
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562

```python
from paddle_serving_client.pipeline import PipelineClient
import numpy as np

client = PipelineClient()
client.connect(['127.0.0.1:18080'])

words = 'i am very sad | 0'

futures = []
for i in range(3):
    futures.append(
        client.predict(
            feed_dict={"words": words},
            fetch=["prediction"],
            asyn=True))

for f in futures:
    res = f.result()
    if res["ecode"] != 0:
        print(res)
        exit(1)
```

T
TeslaZhao 已提交
563 564
***

565 566 567 568 569 570 571 572 573 574 575 576 577 578
## 4.Advanced Usages

### 4.1 Business custom error type

Users can customize the error code according to the business, inherit ProductErrCode, and return it in the return list in Op's preprocess or postprocess. The next stage of processing will skip the post OP processing based on the custom error code.

```python
class ProductErrCode(enum.Enum):
    """
    ProductErrCode is a base class for recording business error code. 
    product developers inherit this class and extend more error codes. 
    """
    pass
```
B
barrierye 已提交
579

580
### 4.2 Skip process stage
B
barrierye 已提交
581

582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
The 2rd result of the result list returned by preprocess is `is_skip_process=True`, indicating whether to skip the process stage of the current OP and directly enter the postprocess processing

```python
def preprocess(self, input_dicts, data_id, log_id):
        """
        In preprocess stage, assembling data for process stage. users can 
        override this function for model feed features.
        Args:
            input_dicts: input data to be preprocessed
            data_id: inner unique id
            log_id: global unique id for RTT
        Return:
            input_dict: data for process stage
            is_skip_process: skip process stage or not, False default
            prod_errcode: None default, otherwise, product errores occured.
                          It is handled in the same way as exception. 
            prod_errinfo: "" default
        """
        # multiple previous Op
        if len(input_dicts) != 1:
            _LOGGER.critical(
                self._log(
                    "Failed to run preprocess: this Op has multiple previous "
                    "inputs. Please override this func."))
            os._exit(-1)
        (_, input_dict), = input_dicts.items()
        return input_dict, False, None, ""

```

### 4.3 Custom proto Request and Response

When the default proto structure does not meet the business requirements, at the same time, the Request and Response message structures of the proto in the following two files remain the same.

> pipeline/gateway/proto/gateway.proto 

> pipeline/proto/pipeline_service.proto

Recompile Serving Server again.

### 4.4 Custom URL

The grpc gateway processes post requests. The default `method` is `prediction`, for example: 127.0.0.1:8080/ocr/prediction. Users can customize the name and method, and can seamlessly switch services with existing URLs.

```proto
service PipelineService {
  rpc inference(Request) returns (Response) {
    option (google.api.http) = {
      post : "/{name=*}/{method=*}"
      body : "*"
    };
  }
};
```

### 4.5 Batch predictor

Pipeline supports batch predictor, and GPU utilization can be improved by increasing the batch size. Pipeline Serving supports 3 Pipeline Serving supports 3 batch forms and applicable scenarios are as follows:
- case 1: An inference request contains batch data (batch)
  - The data is of fixed length, the batch is variable, and the data is converted into BCHW format
  - The data length is variable. In the pre-processing, a single piece of data is padding converted into a fixed length
- case 2: Split the batch data of a inference request into multiple small pieces of data (mini-batch)
  - Since padding will be aligned at the longest shape, when there is a "extremely large" shape size data in a batch of data, the padding is very large.
  - Specify the size of a block to reduce the scope of the "extremely large" size data
- case 3: Merge multiple requests for one batch(auto-batching)
  - Inference time is significantly longer than preprocess and postprocess. Merge multiple request data and inference at one time will increase throughput and GPU utilization.
  - The shape of the data of multiple requests is required to be consistent

|                 Interfaces                  |                    Explain                     |
| :------------------------------------------: | :-----------------------------------------: |
|  batch | client send batch data,set batch=True of client.predict |
| mini-batch | the return type of preprocess is list,refer to the preprocess of RecOp in OCR example|
| auto-batching | set batch_size and auto_batching_timeout in config.yml |

### 4.7 Single-machine and multi-card inference

Single-machine multi-card inference can be abstracted into M OP processes bound to N GPU cards. It is related to the configuration of three parameters in config.yml. First, select the process mode, the number of concurrent processes is the number of processes, and devices is the GPU card ID.The binding method is to traverse the GPU card ID when the process starts, for example, start 7 OP processes, set devices:0,1,2 in config.yml, then the first, fourth, and seventh started processes are bound to the 0 card, and the second , 4 started processes are bound to 1 card, 3 and 6 processes are bound to card 2.

- PROCESS ID: 0  binds GPU card 0
- PROCESS ID: 1  binds GPU card 1
- PROCESS ID: 2  binds GPU card 2
- PROCESS ID: 3  binds GPU card 0
- PROCESS ID: 4  binds GPU card 1
- PROCESS ID: 5  binds GPU card 2
- PROCESS ID: 6  binds GPU card 0

Reference config.yml:
```
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0,1,2"
```

### 4.8 Heterogeneous Devices
In addition to supporting CPU and GPU, Pipeline also supports the deployment of a variety of heterogeneous hardware. It consists of device_type and devices in config.yml. Use device_type to specify the type first, and judge according to devices when it is vacant. The device_type is described as follows:
- CPU(Intel) : 0
- GPU : 1
- TensorRT : 2
- CPU(Arm) : 3
- XPU : 4

Reference config.yml:
```
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0

#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
```

### 4.9 Low precision inference
Pipeline Serving supports low-precision inference. The precision types supported by CPU, GPU and TensoRT are shown in the figure below:

- CPU
  - fp32(default)
  - fp16
  - bf16(mkldnn)
- GPU
  - fp32(default)
  - fp16
  - int8
- Tensor RT
  - fp32(default)
  - fp16
  - int8 

Reference the example [simple_web_service](../python/examples/pipeline/simple_web_service).

***
 
## 5.Log Tracing
Pipeline service logs are under the `PipelineServingLogs` directory of the current directory. There are 3 types of logs, namely `pipeline.log`, `pipeline.log.wf`, and `pipeline.tracer`.

- pipeline.log : Record debug & info level log
- pipeline.log.wf : Record warning & error level log
- pipeline.tracer : Statistics the time-consuming and channel accumulation information in each stage 

When an exception occurs in the service, the error message will be recorded in the file `pipeline.log.wf`. Printing the tracer log requires adding the tracer configuration in the DAG property of `config.yml`.

### 5.1 Log uniquely id
There are two kinds of IDs in the pipeline for concatenating requests, `data_id` and `log_id` respectively. The difference between the two is as follows:

- `data_id`: The self-incrementing ID generated by the pipeline framework, marking the unique identification of the request.
- `log_id`: The identifier passed in by the upstream module tracks the serial relationship between multiple services. Since users may not pass in or guarantee uniqueness, it cannot be used as a unique identifier.

The log printed by the Pipeline framework will carry both data_id and log_id. After auto-batching is turned on, the first `data_id` in the batch will be used to mark the whole batch, and the framework will print all data_ids in the batch in a log.

## 6.Performance analysis and optimization


### 6.1 How to optimize with the timeline tool
B
barrierye 已提交
732 733 734

In order to better optimize the performance, PipelineServing provides a timeline tool to monitor the time of each stage of the whole service.

735
### 6.2 Output profile information on server side
B
barrierye 已提交
736 737 738 739 740 741 742 743

The server is controlled by the `use_profile` field in yaml:

```yaml
dag:
    use_profile: true
```

B
barrierye 已提交
744
After the function is enabled, the server will print the corresponding log information to the standard output in the process of prediction. In order to show the time consumption of each stage more intuitively, Analyst module is provided for further analysis and processing of log files.
B
barrierye 已提交
745

B
barrierye 已提交
746
The output of the server is first saved to a file. Taking `profile.txt` as an example, the script converts the time monitoring information in the log into JSON format and saves it to the `trace` file. The `trace` file can be visualized through the tracing function of Chrome browser.
B
barrierye 已提交
747 748

```shell
B
barrierye 已提交
749 750 751 752 753 754 755 756 757
from paddle_serving_server.pipeline import Analyst
import json
import sys

if __name__ == "__main__":
    log_filename = "profile.txt"
    trace_filename = "trace"
    analyst = Analyst(log_filename)
    analyst.save_trace(trace_filename)
B
barrierye 已提交
758 759
```

B
barrierye 已提交
760
Specific operation: open Chrome browser, input in the address bar `chrome://tracing/` , jump to the tracing page, click the load button, open the saved `trace` file, and then visualize the time information of each stage of the prediction service.
B
barrierye 已提交
761

762
### 6.3 Output profile information on client side
B
barrierye 已提交
763 764 765 766

The profile function can be enabled by setting `profile=True` in the `predict` interface on the client side.

After the function is enabled, the client will print the log information corresponding to the prediction to the standard output during the prediction process, and the subsequent analysis and processing are the same as that of the server.
T
TeslaZhao 已提交
767

768 769 770
### 6.4 Analytical methods
According to the time consumption of each stage in the pipeline.tracer log, the following formula is used to gradually analyze which stage is the main time consumption.

T
TeslaZhao 已提交
771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
```
cost of one single OP:
op_cost = process(pre + mid + post) 

OP Concurrency: 
op_concurrency = op_cost(s) * qps_expected

Service throughput:
service_throughput = 1 / slowest_op_cost * op_concurrency

Service average cost:
service_avg_cost = ∑op_concurrency in critical Path

Channel accumulations:
channel_acc_size = QPS(down - up) * time

Average cost of batch predictor:
avg_batch_cost = (N * pre + mid + post) / N 
```
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807

### 6.5 Optimization ideas
According to the long time consuming in stages below, different optimization methods are adopted. 
- OP Inference stage(mid-process):
  - Increase `concurrency`
  - Turn on `auto-batching`(Ensure that the shapes of multiple requests are consistent)
  - Use `mini-batch`, If the shape of data is very large.
  - Turn on TensorRT for GPU
  - Turn on MKLDNN for CPU
  - Turn on low precison inference
- OP preprocess or postprocess stage:
  - Increase `concurrency`
  - Optimize processing logic
- In/Out stage(channel accumulation > 5):
  - Check the size and delay of the data passed by the channel
  - Optimize the channel to transmit data, do not transmit data or compress it before passing it in
  - Increase `concurrency`
  - Decrease `concurrency` upstreams.