提交 02d581bb 编写于 作者: T TeslaZhao

Update doc

上级 2b8393e7
...@@ -4,8 +4,9 @@ ...@@ -4,8 +4,9 @@
Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python Pipeline,不仅解决上述痛点,同时还能大幅提高 GPU 利用率,并易于开发和维护。 Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python Pipeline,不仅解决上述痛点,同时还能大幅提高 GPU 利用率,并易于开发和维护。
通过阅读以下内容掌握 Python Pipeline 框架基础功能、设计方案、使用指南等。 Python Pipeline 使用案例请阅读[Python Pipeline 快速部署案例](./3-2_QuickStart_Pipeline_OCR_CN.md)
- [Python Pipeline 基础功能]()
- [Python Pipeline 使用案例]() 通过阅读以下内容掌握 Python Pipeline 设计方案、高阶用法和优化指南等。
- [Python Pipeline 高阶用法]() - [Python Pipeline 框架设计](7-1_Python_Pipeline_Design_CN.md)
- [Python Pipeline 优化指南]() - [Python Pipeline 高阶用法](7-2_Python_Pipeline_Senior_CN.md)
- [Python Pipeline 优化指南](7-3_Python_Pipeline_Optimize_CN.md)
# Python Pipeline 基础功能 # Python Pipeline 核心功能
设计一个通用端到端多模型组合框架所面临的挑战有如下4点: 为了解决多个深度学习模型组合的复杂问题,Paddle Serving 团队设计了一个通用端到端多模型组合框架,其核心特点包括:
1. 通用性:框架既要满足通用模型的输入类型,又要满足模型组合的复杂拓扑关系。 1. 通用性:框架既要满足通用模型的输入类型,又要满足模型组合的复杂拓扑关系。
2. 高性能:与常见互联网后端服务不同,深度学习模型的推理程序属于计算密集型程序,同时 GPU 等计算芯片价格昂贵,因此在平均响应时间不苛刻的场景下,计算资源占用和吞吐量指标格外重要。 2. 高性能:与常见互联网后端服务不同,深度学习模型的推理程序属于计算密集型程序,同时 GPU 等计算芯片价格昂贵,因此在平均响应时间不苛刻的场景下,计算资源占用和吞吐量指标格外重要。
3. 高可用性:高可用的架构依赖每个服务的健壮性,服务状态可查询、异常可监控和管理是必备条件。 3. 高可用性:高可用的架构依赖每个服务的健壮性,服务状态可查询、异常可监控和管理是必备条件。
4. 易于开发与调试:使用 Python 语言开发可大幅提升研发效率,运行的错误信息准确帮助开发者快速定位问题。 4. 易于开发与调试:使用 Python 语言开发可大幅提升研发效率,运行的错误信息准确帮助开发者快速定位问题。
## 总体设计
## 框架设计
Python Pipeline 框架分为网络服务层和图执行引擎2部分,网络服务层处理多种网络协议请求和通用输入参数问题,图执行引擎层解决复杂拓扑关系。如下图所示 Python Pipeline 框架分为网络服务层和图执行引擎2部分,网络服务层处理多种网络协议请求和通用输入参数问题,图执行引擎层解决复杂拓扑关系。如下图所示
<div align=center> <div align=center>
...@@ -41,6 +42,22 @@ Request 是输入结构,`key` 与 `value` 是配对的 string 数组。 `name` ...@@ -41,6 +42,22 @@ Request 是输入结构,`key` 与 `value` 是配对的 string 数组。 `name`
Response 是输出结构,`err_no``err_msg` 表达处理结果的正确性和错误信息,`key``value` 为结果。 Response 是输出结构,`err_no``err_msg` 表达处理结果的正确性和错误信息,`key``value` 为结果。
Pipeline 服务包装了继承于 WebService 类,以 OCR 示例为例,派生出 OcrService 类,get_pipeline_response 函数内实现 DAG 拓扑关系,默认服务入口为 read_op,函数返回的 Op 为最后一个处理,此处要求最后返回的 Op 必须唯一。
所有服务和模型的所有配置信息在 `config.yml` 中记录,URL 的 name 字段由 OcrService 初始化定义;run_service 函数启动服务。
```python
class OcrService(WebService):
def get_pipeline_response(self, read_op):
det_op = DetOp(name="det", input_ops=[read_op])
rec_op = RecOp(name="rec", input_ops=[det_op])
return rec_op
ocr_service = OcrService(name="ocr")
ocr_service.prepare_pipeline_config("config.yml")
ocr_service.run_service()
```
**二.图执行引擎层** **二.图执行引擎层**
...@@ -50,44 +67,17 @@ Response 是输出结构,`err_no` 和 `err_msg` 表达处理结果的正确性 ...@@ -50,44 +67,17 @@ Response 是输出结构,`err_no` 和 `err_msg` 表达处理结果的正确性
<img src='../images/pipeline_serving-image2.png' height = "300" align="middle"/> <img src='../images/pipeline_serving-image2.png' height = "300" align="middle"/>
</div> </div>
图执行引擎抽象归纳出2种数据结构 OP 节点和 Channel 有向边,构建一条异步流水线工作流。核心概念和设计思路如下: 图执行引擎抽象归纳出2种数据结构 Op 节点和 Channel 有向边,构建一条异步流水线工作流。核心概念和设计思路如下:
- OP 节点: 可理解成1个推理模型、一个处理方法,甚至是训练前向代码,可独立运行,独立设置并发度。每个 OP 节点的计算结果放入其绑定的 Channel 中。 - Op 节点: 可理解成1个推理模型、一个处理方法,甚至是训练前向代码,可独立运行,独立设置并发度。每个 Op 节点的计算结果放入其绑定的 Channel 中。
- Channel 数据管道: 可理解为一个单向缓冲队列。每个 Channel 只接收上游 OP 节点的计算输出,作为下游 OP 节点的输入。 - Channel 数据管道: 可理解为一个单向缓冲队列。每个 Channel 只接收上游 Op 节点的计算输出,作为下游 Op 节点的输入。
- 工作流:根据用户定义的节点依赖关系,图执行引擎自动生成有向无环图。每条用户请求到达图执行引擎时会生成一个唯一自增 ID,通过这种唯一性绑定关系标记流水线中的不同请求。 - 工作流:根据用户定义的节点依赖关系,图执行引擎自动生成有向无环图。每条用户请求到达图执行引擎时会生成一个唯一自增 ID,通过这种唯一性绑定关系标记流水线中的不同请求。
- 对于 OP 之间需要传输过大数据的情况,可以考虑 RAM DB 外存进行全局存储,通过在 Channel 中传递索引的 Key 来进行数据传输 Op 的设计原则:
- 单个 Op 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel
## 基础功能 - 单个 Op 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现
- 单个 Op 可以控制并发数,从而增加处理并发数
展开网络服务层和图执行引擎 Pipeline 服务、OP、重写OP前后处理,最后介绍特定OP(RequestOp和ResponseOp)二次开发的方法。 - 单个 Op 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching
- Op 可以由线程或进程启动
**一.Pipeline 服务**
PipelineServer包装了RPC运行层和图引擎执行,所有Pipeline服务首先要实例化PipelineServer示例,再设置2个核心接口 set_response_op、加载配置信息,最后调用run_server启动服务。代码示例如下:
```python
server = PipelineServer()
server.set_response_op(response_op)
server.prepare_server(config_yml_path)
#server.prepare_pipeline_config(config_yml_path)
server.run_server()
```
PipelineServer的核心接口:
- `set_response_op`,设置response_op 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。
- `prepare_server`: 加载配置信息,并启动远端Serving服务,适用于调用远端远端推理服务
- `prepare_pipeline_config`,仅加载配置信息,适用于local_prdict
- `run_server`,启动gRPC服务,接收请求
**二.OP 设计与实现**
OP 的设计原则:
- 单个 OP 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel
- 单个 OP 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现
- 单个 OP 可以控制并发数,从而增加处理并发数
- 单个 OP 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching
- OP 可以由线程或进程启动
其构造函数如下: 其构造函数如下:
...@@ -110,48 +100,161 @@ def __init__(name=None, ...@@ -110,48 +100,161 @@ def __init__(name=None,
| 参数名 | 类型 | 含义 | | 参数名 | 类型 | 含义 |
| :-------------------: | :---------: |:------------------------------------------------: | | :-------------------: | :---------: |:------------------------------------------------: |
| name | (str) | 用于标识 OP 类型的字符串,该字段必须全局唯一。 | | name | (str) | 用于标识 Op 类型的字符串,该字段必须全局唯一。 |
| input_ops | (list) | 当前 OP 的所有前继 OP 的列表。 | | input_ops | (list) | 当前 Op 的所有前继 Op 的列表。 |
| server_endpoints | (list) |远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,认为是local_precditor模式,从local_service_conf中读取配置。 | | server_endpoints | (list) |远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,认为是local_precditor模式,从local_service_conf中读取配置。 |
| fetch_list | (list) |远程 Paddle Serving Service 的 fetch 列表。 | | fetch_list | (list) |远程 Paddle Serving Service 的 fetch 列表。 |
| client_config | (str) |Paddle Serving Service 对应的 Client 端配置文件路径。 | | client_config | (str) |Paddle Serving Service 对应的 Client 端配置文件路径。 |
| client_type | (str) |可选择brpc、grpc或local_predictor。local_predictor不启动Serving服务,进程内预测。 | | client_type | (str) |可选择brpc、grpc或local_predictor。local_predictor不启动Serving服务,进程内预测。 |
| concurrency | (int) | OP 的并发数。 | | concurrency | (int) | Op 的并发数。 |
| timeout | (int) |process 操作的超时时间,单位为毫秒。若该值小于零,则视作不超时。 | | timeout | (int) |process 操作的超时时间,单位为毫秒。若该值小于零,则视作不超时。 |
| retry | (int) |超时重试次数。当该值为 1 时,不进行重试。 | | retry | (int) |超时重试次数。当该值为 1 时,不进行重试。 |
| batch_size | (int) |进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值,默认为 1。 | | batch_size | (int) |进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值,默认为 1。 |
| auto_batching_timeout | (float) |进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。batch_size > 1时,要设置auto_batching_timeout,否则请求数量不足batch_size时会阻塞等待。 | | auto_batching_timeout | (float) |进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。batch_size > 1时,要设置auto_batching_timeout,否则请求数量不足batch_size时会阻塞等待。 |
| local_service_handler | (object) |local predictor handler,Op init()入参赋值 或 在Op init()中创建| | local_service_handler | (object) |local predictor handler,Op init() 入参赋值或在 Op init() 中创建|
对于 Op 之间需要传输过大数据的情况,可以考虑 RAM DB 外存进行全局存储,通过在 Channel 中传递索引的 Key 来进行数据传输
**三.Channel 设计与实现**
Channel的设计原则: Channel的设计原则:
- Channel 是 OP 之间共享数据的数据结构,负责共享数据或者共享数据状态信息 - Channel 是 Op 之间共享数据的数据结构,负责共享数据或者共享数据状态信息
- Channel 可以支持多个OP的输出存储在同一个 Channel,同一个 Channel 中的数据可以被多个 OP 使用 - Channel 可以支持多个OP的输出存储在同一个 Channel,同一个 Channel 中的数据可以被多个 Op 使用
下图为图执行引擎中 Channel 的设计,采用 input buffer 和 output buffer 进行多 OP 输入或多 OP 输出的数据对齐,中间采用一个 Queue 进行缓冲 下图为图执行引擎中 Channel 的设计,采用 input buffer 和 output buffer 进行多 Op 输入或多 Op 输出的数据对齐,中间采用一个 Queue 进行缓冲
<div align=center> <div align=center>
<img src='../images/pipeline_serving-image3.png' height = "500" align="middle"/> <img src='../images/pipeline_serving-image3.png' height = "500" align="middle"/>
</div> </div>
**四.二次开发**
提供给开发者的二次开发接口有三种,分别是推理 OP 二次开发接口、RequestOp 二次开发接口和 ResponseOp 二次开发接口。
1. 推理 OP 二次开发 **三. 服务日志**
Pipeline 服务日志在当前目录的 `PipelineServingLogs` 目录下,有3种类型日志,分别是 `pipeline.log``pipeline.log.wf``pipeline.tracer`
- `pipeline.log` : 记录 debug & info日志信息
- `pipeline.log.wf` : 记录 warning & error日志
- `pipeline.tracer` : 统计各个阶段耗时、channel 堆积信息
在服务发生异常时,错误信息会记录在 pipeline.log.wf 日志中。打印 tracer 日志要求在 config.yml 的 DAG 属性中添加 tracer 配置。
1. 日志与请求的唯一标识
Pipeline 中有2种 id 用以串联请求,分别是 data_id 和 log_id,二者区别如下:
- data_id : Pipeline 框架生成的自增 ID,标记请求唯一性标识
- log_id : 上游模块传入的标识,跟踪多个服务间串联关系,由于用户可不传入或不保证唯一性,因此不能作为唯一性标识
通常,Pipeline 框架打印的日志会同时带上 data_id 和 log_id。开启 auto-batching 后,会使用批量中的第一个 data_id 标记 batch 整体,同时框架会在一条日志中打印批量中所有 data_id。
2. 日志滚动
Pipeline 的日志模块在 `logger.py` 中定义,使用了 `logging.handlers.RotatingFileHandler` 支持磁盘日志文件的轮换。根据不同文件级别和日质量分别设置了 `maxBytes``backupCount`,当即将超出预定大小时,将关闭旧文件并打开一个新文件用于输出。
```python
"handlers": {
"f_pipeline.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log"),
"maxBytes": 512000000,
"backupCount": 20,
},
"f_pipeline.log.wf": {
"class": "logging.handlers.RotatingFileHandler",
"level": "WARNING",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
"maxBytes": 512000000,
"backupCount": 10,
},
"f_tracer.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "tracer_fmt",
"filename": os.path.join(log_dir, "pipeline.tracer"),
"maxBytes": 512000000,
"backupCount": 5,
},
}
```
## 自定义信息
提供给开发者提供以下自定义信息,包括自定义 Web 服务、自定义服务输入和输出结构、自定义服务并发和模型配置和自定义推理过程
- 自定义 Web 服务 URL
- 自定义服务输入和输出结构
- 自定义服务并发和模型配置
- 自定义推理过程
- 自定义业务错误类型
1. 自定义 Web 服务 URL
在 Web 服务中自定义服务名称是常见操作,尤其是将已有服务迁移到新框架。URL 中核心字段包括 `ip``port``name``method`,根据最新部署的环境信息设置前2个字段,重点介绍如何设置 `name``method`,框架提供默认的 `methon``prediciton`,如 `http://127.0.0.1:9999/ocr/prediction`
框架有2处代码与此相关,分别是 gRPC Gateway 的配置文件 `python/pipeline/gateway/proto/gateway.proto` 和 服务启动文件 `web_server.py`
业务场景中通过设置 `name` 和 验证 `method` 解决问题。以 [OCR 示例]()为例,服务启动文件 `web_server.py` 通过类 `OcrService` 构造函数的 `name` 字段设置 URL 中 `name` 字段;
```
ocr_service = OcrService(name="ocr")
ocr_service.prepare_pipeline_config("config.yml")
ocr_service.run_service()
```
框架提供默认的 `methon``prediciton`,通过重载 `RequestOp::unpack_request_package` 来验证 `method`
```
def unpack_request_package(self, request):
dict_data = {}
log_id = None
if request is None:
_LOGGER.critical("request is None")
raise ValueError("request is None")
if request.method is not "prediction":
_LOGGER.critical("request method error")
raise ValueError("request method error")
...
```
`python/pipeline/gateway/proto/gateway.proto` 文件可以对 `name``method` 做严格限制,一般不需要修改,如需要特殊指定修改后,需要重新编译 Paddle Serving,[编译方法]()
```proto
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/{name=*}/{method=*}"
body : "*"
};
}
};
```
2. 自定义服务输入和输出结构
输入和输出结构包括 proto 中 Request 和 Response 结构,以及 Op 前后处理返回。
当默认 proto 结构不满足业务需求时,同时下面2个文件的 proto 的 Request 和 Response message 结构,保持一致。
- pipeline/gateway/proto/gateway.proto
- pipeline/proto/pipeline_service.proto
推理 OP 为开发者提供3个外部函数接口: 修改后,需要[重新编译]()
3. 自定义服务并发和模型配置
完整的配置信息可参考[配置信息]()
4. 自定义推理过程
推理 Op 为开发者提供3个外部函数接口:
| 变量或接口 | 说明 | | 变量或接口 | 说明 |
| :----------------------------------------------: | :----------------------------------------------------------: | | :----------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) | | def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) |
| def process(self, feed_dict_list, typical_logid) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) | | def process(self, feed_dict_list, typical_logid) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) |
| def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。(该函数对一个 **sample** 进行处理) | | def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 Op 获取。(该函数对一个 **sample** 进行处理) |
| def init_op(self) | 用于加载资源(如字典等)。 | | def init_op(self) | 用于加载资源(如字典等)。 |
| self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 | | self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 Op 单独计算)。 |
OP 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下: Op 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下:
```python ```python
def preprocess(self, input_dicts): def preprocess(self, input_dicts):
...@@ -180,7 +283,7 @@ def postprocess(self, input_dicts, fetch_dict): ...@@ -180,7 +283,7 @@ def postprocess(self, input_dicts, fetch_dict):
return fetch_dict return fetch_dict
``` ```
**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。 **preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 Op 的 name 为 Key,对应 Op 的输出为 Value 的字典。
**process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。`typical_logid` 作为向 PaddleServingService 穿透的 logid。 **process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。`typical_logid` 作为向 PaddleServingService 穿透的 logid。
...@@ -193,11 +296,14 @@ def init_op(self): ...@@ -193,11 +296,14 @@ def init_op(self):
pass pass
``` ```
需要**注意**的是,在线程版 OP 中,每个 OP 只会调用一次该函数,故加载的资源必须要求是线程安全的 RequestOp 和 ResponseOp 是 Python Pipeline 的中2个特殊 Op,分别是用分解 RPC 数据加入到图执行引擎中,和拿到图执行引擎的预测结果并打包 RPC 数据到客户端
2. RequestOp 二次开发 RequestOp 类的设计如下所示,核心是在 unpack_request_package 函数中解析请求数据,因此,当修改 Request 结构后重写此函数实现全新的解包处理。
RequestOp 用于处理 Pipeline Server 接收到的 RPC 数据,处理后的数据将会被加入到图执行引擎中。其功能实现如下: | 接口 | 说明 |
| :---------------------------------------: | :----------------------------------------: |
| init_op(self) | OP初始化,设置默认名称@DAGExecutor |
| unpack_request_package(self, request) | 解析请求数据 |
```python ```python
class RequestOp(Op): class RequestOp(Op):
...@@ -228,16 +334,13 @@ class RequestOp(Op): ...@@ -228,16 +334,13 @@ class RequestOp(Op):
return dict_data, log_id, None, "" return dict_data, log_id, None, ""
``` ```
**unpack_request_package** 的默认实现是将 RPC request 中的 key 和 value 做成字典交给第一个自定义OP。当默认的RequestOp无法满足参数解析需求时,可通过重写下面2个接口自定义请求参数解析方法。
| 接口 | 说明 |
| :---------------------------------------: | :----------------------------------------: |
| init_op(self) | OP初始化,设置默认名称@DAGExecutor |
| unpack_request_package(self, request) | 处理接收的RPC数据 |
3. ResponseOp 二次开发 ResponseOp 类的设计如下所示,核心是在 pack_response_package 中打包返回结构,因此修改 Response 结构后重写此函数实现全新的打包格式。
ResponseOp 用于处理图执行引擎的预测结果,处理后的数据将会作为 Pipeline Server 的RPC 返回值,其函数实现如下,在pack_response_package中做了精简 | 接口 | 说明 |
| :------------------------------------------: | :-----------------------------------------: |
| init_op(self) | Op 初始化,设置默认名称 @DAGExecutor |
| pack_response_package(self, channeldata) | 处理接收的 RPC 数据 |
```python ```python
class ResponseOp(Op): class ResponseOp(Op):
...@@ -266,14 +369,9 @@ class ResponseOp(Op): ...@@ -266,14 +369,9 @@ class ResponseOp(Op):
return resp return resp
``` ```
**pack_response_package** 的默认实现是将预测结果的字典转化为 RPC response 中的 key 和 value。当默认的 ResponseOp 无法满足结果返回格式要求时,可通过重写下面2个接口自定义返回包打包方法。
| 接口 | 说明 | 5. 自定义业务错误类型
| :------------------------------------------: | :-----------------------------------------: |
| init_op(self) | OP 初始化,设置默认名称 @DAGExecutor |
| pack_response_package(self, channeldata) | 处理接收的 RPC 数据 |
**五.自定义业务错误类型**
用户可根据业务场景自定义错误码,继承 ProductErrCode,在 Op 的 preprocess 或 postprocess 中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。 用户可根据业务场景自定义错误码,继承 ProductErrCode,在 Op 的 preprocess 或 postprocess 中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。
```python ```python
class ProductErrCode(enum.Enum): class ProductErrCode(enum.Enum):
...@@ -283,92 +381,3 @@ class ProductErrCode(enum.Enum): ...@@ -283,92 +381,3 @@ class ProductErrCode(enum.Enum):
""" """
pass pass
``` ```
**六.日志追踪**
Pipeline 服务日志在当前目录的 `PipelineServingLogs` 目录下,有3种类型日志,分别是 `pipeline.log``pipeline.log.wf``pipeline.tracer`
- `pipeline.log` : 记录 debug & info日志信息
- `pipeline.log.wf` : 记录 warning & error日志
- `pipeline.tracer` : 统计各个阶段耗时、channel 堆积信息
在服务发生异常时,错误信息会记录在 pipeline.log.wf 日志中。打印 tracer 日志要求在 config.yml 的 DAG 属性中添加 tracer 配置。
1. 日志与请求的唯一标识
Pipeline 中有2种 id 用以串联请求,分别是 data_id 和 log_id,二者区别如下:
- data_id : Pipeline 框架生成的自增 ID,标记请求唯一性标识
- log_id : 上游模块传入的标识,跟踪多个服务间串联关系,由于用户可不传入或不保证唯一性,因此不能作为唯一性标识
通常,Pipeline 框架打印的日志会同时带上 data_id 和 log_id。开启 auto-batching 后,会使用批量中的第一个 data_id 标记 batch 整体,同时框架会在一条日志中打印批量中所有 data_id。
2. 日志滚动
Pipeline 的日志模块在 `logger.py` 中定义,使用了 `logging.handlers.RotatingFileHandler` 支持磁盘日志文件的轮换。根据不同文件级别和日质量分别设置了 `maxBytes``backupCount`,当即将超出预定大小时,将关闭旧文件并打开一个新文件用于输出。
```python
"handlers": {
"f_pipeline.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log"),
"maxBytes": 512000000,
"backupCount": 20,
},
"f_pipeline.log.wf": {
"class": "logging.handlers.RotatingFileHandler",
"level": "WARNING",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
"maxBytes": 512000000,
"backupCount": 10,
},
"f_tracer.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "tracer_fmt",
"filename": os.path.join(log_dir, "pipeline.tracer"),
"maxBytes": 512000000,
"backupCount": 5,
},
}
```
**七.异构硬件**
Pipeline 除了支持 CPU、GPU 芯片推理之外,还支持在多种异构硬件推理部署。在 `config.yml` 中由 `device_type``devices`。优先使用 `device_type` 指定类型,当空缺时根据 `devices` 判断。`device_type` 描述如下:
- CPU(Intel) : 0
- GPU(Jetson/海光DCU) : 1
- TensorRT : 2
- CPU(Arm) : 3
- XPU : 4
- Ascend310 : 5
- ascend910 : 6
config.yml中硬件配置:
```
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
```
**八.低精度推理**
Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示:
- CPU
- fp32(default)
- fp16
- bf16(mkldnn)
- GPU
- fp32(default)
- fp16
- int8
- Tensor RT
- fp32(default)
- fp16
- int8
使用int8时,要开启use_calib: True
参考[simple_web_service](../../examples/Pipeline/simple_web_service)示例
# Python Pipeline 高阶用法 # Python Pipeline 高阶用法
高阶用法在复杂场景中使用,实现更多自定义能力,包括 DAG 跳过某个OP运行、自定义数据传输结构以及多卡推理等。 在复杂业务场景中使用常规功能无法满足需求,本文介绍一些高阶用法。
- DAG 结构跳过某个 Op 运行
- 批量推理
- 单机多卡推理
- 多种计算芯片上推理
- 低精度推理
- TensorRT 推理加速
- MKLDNN 推理加速
## DAG 跳过某个OP运行
为 DAG 图中跳过某个 OP 运行,实际做法是在跳过此 OP 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。 **一. DAG 结构跳过某个 Op 运行 **
preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 OP 的 process 阶段,直接进入 postprocess 处理。
此应用场景一般在 Op 前后处理中有 if 条件判断时,不满足条件时,跳过后面处理。实际做法是在跳过此 Op 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。
preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 Op 的 process 阶段,直接进入 postprocess 处理。
```python ```python
def preprocess(self, input_dicts, data_id, log_id): def preprocess(self, input_dicts, data_id, log_id):
...@@ -35,32 +43,8 @@ def preprocess(self, input_dicts, data_id, log_id): ...@@ -35,32 +43,8 @@ def preprocess(self, input_dicts, data_id, log_id):
``` ```
## 自定义 proto 中 Request 和 Response 结构 ** 二. 批量推理 **
当默认 proto 结构不满足业务需求时,同时下面2个文件的 proto 的 Request 和 Response message 结构,保持一致。
> pipeline/gateway/proto/gateway.proto
> pipeline/proto/pipeline_service.proto
再重新编译 Serving Server。
## 自定义 URL
grpc gateway 处理 post 请求,默认 `method``prediction`,例如:127.0.0.1:8080/ocr/prediction。用户可自定义 name 和 method,对于已有 url 的服务可无缝切换。
```proto
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/{name=*}/{method=*}"
body : "*"
};
}
};
```
## 批量推理
Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下: Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下:
- 场景1:一个推理请求包含批量数据(batch) - 场景1:一个推理请求包含批量数据(batch)
- 单条数据定长,批量变长,数据转成BCHW格式 - 单条数据定长,批量变长,数据转成BCHW格式
...@@ -76,11 +60,12 @@ Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率 ...@@ -76,11 +60,12 @@ Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率
| :------------------------------------------: | :-----------------------------------------: | | :------------------------------------------: | :-----------------------------------------: |
| batch | client 发送批量数据,client.predict 的 batch=True | | batch | client 发送批量数据,client.predict 的 batch=True |
| mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess| | mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess|
| auto-batching | config.yml 中 OP 级别设置 batch_size 和 auto_batching_timeout | | auto-batching | config.yml 中 Op 级别设置 batch_size 和 auto_batching_timeout |
### 4.6 单机多卡 ** 三. 单机多卡推理 **
单机多卡推理,M 个 OP 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 OP 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。
单机多卡推理,M 个 Op 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 Op 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。
- 进程ID: 0 绑定 GPU 卡0 - 进程ID: 0 绑定 GPU 卡0
- 进程ID: 1 绑定 GPU 卡1 - 进程ID: 1 绑定 GPU 卡1
- 进程ID: 2 绑定 GPU 卡2 - 进程ID: 2 绑定 GPU 卡2
...@@ -94,3 +79,44 @@ Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率 ...@@ -94,3 +79,44 @@ Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率
#计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡 #计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡
devices: "0,1,2" devices: "0,1,2"
``` ```
** 四. 多种计算芯片上推理 **
Pipeline 除了支持 CPU、GPU 芯片推理之外,还支持在多种计算硬件推理部署。在 `config.yml` 中由 `device_type``devices`。优先使用 `device_type` 指定类型,当空缺时根据 `devices` 判断。`device_type` 描述如下:
- CPU(Intel) : 0
- GPU(Jetson/海光DCU) : 1
- TensorRT : 2
- CPU(Arm) : 3
- XPU : 4
- Ascend310 : 5
- ascend910 : 6
config.yml中硬件配置:
```
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
```
** 五. 低精度推理 **
Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示:
- CPU
- fp32(default)
- fp16
- bf16(mkldnn)
- GPU
- fp32(default)
- fp16
- int8
- Tensor RT
- fp32(default)
- fp16
- int8
使用int8时,要开启use_calib: True
参考[simple_web_service](../../examples/Pipeline/simple_web_service)示例
# Python Pipeline 使用案例
Python Pipeline 使用案例部署步骤可分为下载模型、配置、编写代码、推理测试4个步骤。
所有Pipeline示例在[examples/Pipeline/](../../examples/Pipeline) 目录下,目前有7种类型模型示例:
- [PaddleClas](../../examples/Pipeline/PaddleClas)
- [Detection](../../examples/Pipeline/PaddleDetection)
- [bert](../../examples/Pipeline/PaddleNLP/bert)
- [imagenet](../../examples/Pipeline/PaddleClas/imagenet)
- [imdb_model_ensemble](../../examples/Pipeline/imdb_model_ensemble)
- [ocr](../../examples/Pipeline/PaddleOCR/ocr)
- [simple_web_service](../../examples/Pipeline/simple_web_service)
以 imdb_model_ensemble 为例来展示如何使用 Pipeline Serving,相关代码在 `Serving/examples/Pipeline/imdb_model_ensemble` 文件夹下可以找到,例子中的 Server 端结构如下图所示:
<div align=center>
<img src='../images/pipeline_serving-image4.png' height = "200" align="middle"/>
</div>
** 部署需要的文件 **
需要五类文件,其中模型文件、配置文件、服务端代码是构建Pipeline服务必备的三个文件。测试客户端和测试数据集为测试准备
- 模型文件
- 配置文件(config.yml)
- 服务级别:服务端口、gRPC线程数、服务超时、重试次数等
- DAG级别:资源类型、开启Trace、性能profile
- OP级别:模型路径、并发度、推理方式、计算硬件、推理超时、自动批量等
- 服务端(web_server.py)
- 服务级别:定义服务名称、读取配置文件、启动服务
- DAG级别:指定多OP之间的拓扑关系
- OP级别:重写OP前后处理
- 测试客户端
- 正确性校验
- 压力测试
- 测试数据集
- 图片、文本、语音等
## 获取模型
示例中通过`get_data.sh`获取模型文件,示例中的模型文件已保存Feed/Fetch Var参数,如没有保存请跳转到[保存Serving部署参数]()步骤。
```shell
cd Serving/examples/Pipeline/imdb_model_ensemble
sh get_data.sh
```
## 创建config.yaml
本示例采用了brpc的client连接类型,还可以选择grpc或local_predictor。
```yaml
#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
rpc_port: 18070
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18071
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 4
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker: False
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: True
#重试次数
retry: 1
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile: False
#channel的最大长度,默认为0
channel_size: 0
#tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据
tracer:
#每次trace的时间间隔,单位秒/s
interval_s: 10
op:
bow:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# client连接类型,brpc, grpc和local_predictor
client_type: brpc
# Serving交互重试次数,默认不重试
retry: 1
# Serving交互超时时间, 单位ms
timeout: 3000
# Serving IPs
server_endpoints: ["127.0.0.1:9393"]
# bow模型client端配置
client_config: "imdb_bow_client_conf/serving_client_conf.prototxt"
# Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
# 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
cnn:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# client连接类型,brpc
client_type: brpc
# Serving交互重试次数,默认不重试
retry: 1
# 预测超时时间, 单位ms
timeout: 3000
# Serving IPs
server_endpoints: ["127.0.0.1:9292"]
# cnn模型client端配置
client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt"
# Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
# 批量查询Serving的数量, 默认1。
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
combine:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# Serving交互重试次数,默认不重试
retry: 1
# 预测超时时间, 单位ms
timeout: 3000
# 批量查询Serving的数量, 默认1。
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
```
## 编写 Server 代码
代码示例中,重点留意3个自定义Op的preprocess、postprocess处理,以及Combin Op初始化列表input_ops=[bow_op, cnn_op],设置Combin Op的前置OP列表。
```python
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np
from paddle_serving_app.reader import IMDBDataset
class ImdbRequestOp(RequestOp):
def init_op(self):
self.imdb_dataset = IMDBDataset()
self.imdb_dataset.load_resource('imdb.vocab')
def unpack_request_package(self, request):
dictdata = {}
for idx, key in enumerate(request.key):
if key != "words":
continue
words = request.value[idx]
word_ids, _ = self.imdb_dataset.get_words_and_label(words)
dictdata[key] = np.array(word_ids)
return dictdata
class CombineOp(Op):
def preprocess(self, input_data):
combined_prediction = 0
for op_name, data in input_data.items():
combined_prediction += data["prediction"]
data = {"prediction": combined_prediction / 2}
return data
read_op = ImdbRequestOp()
bow_op = Op(name="bow",
input_ops=[read_op],
server_endpoints=["127.0.0.1:9393"],
fetch_list=["prediction"],
client_config="imdb_bow_client_conf/serving_client_conf.prototxt",
concurrency=1,
timeout=-1,
retry=1)
cnn_op = Op(name="cnn",
input_ops=[read_op],
server_endpoints=["127.0.0.1:9292"],
fetch_list=["prediction"],
client_config="imdb_cnn_client_conf/serving_client_conf.prototxt",
concurrency=1,
timeout=-1,
retry=1)
combine_op = CombineOp(
name="combine",
input_ops=[bow_op, cnn_op],
concurrency=5,
timeout=-1,
retry=1)
# use default ResponseOp implementation
response_op = ResponseOp(input_ops=[combine_op])
server = PipelineServer()
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
```
## 启动服务验证
```python
from paddle_serving_client.pipeline import PipelineClient
import numpy as np
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
words = 'i am very sad | 0'
futures = []
for i in range(3):
futures.append(
client.predict(
feed_dict={"words": words},
fetch=["prediction"],
asyn=True))
for f in futures:
res = f.result()
if res["ecode"] != 0:
print(res)
exit(1)
```
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册