提交 dc512ce7 编写于 作者: T TeslaZhao

Update Add new docs & Update docker builder shell

上级 afc21a3e
# 部署案例概述
# 快速开始案例部署
您可以通过以下 Paddle Serving 快速开始案例,分别了解到 C++ Serving 与 Python Pipeline 2种框架的部署方法。
- [使用 C++ Serving 部署 Resnet50 模型案例]()
......
# Python Pipeline 框架
在许多深度学习框架中,模型服务化部署通常用于单模型的一键部署。但在 AI 工业大生产的背景下,端到端的单一深度学习模型不能解决复杂问题,多个深度学习模型组合使用是解决现实复杂问题的常规手段,如文字识别 OCR 服务至少需要检测和识别2种模型;视频理解服务一般需要视频抽帧、切词、音频处理、分类等多种模型组合实现。当前,通用多模型组合服务的设计和实现是非常复杂的,既要能实现复杂的模型拓扑关系,又要保证服务的高并发、高可用和易于开发和维护等。
Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python Pipeline,不仅解决上述痛点,同时还能大幅提高 GPU 利用率,并易于开发和维护。
通过阅读以下内容掌握 Python Pipeline 框架基础功能、设计方案、使用指南等。
- [Python Pipeline 基础功能]()
- [Python Pipeline 使用案例]()
- [Python Pipeline 高阶用法]()
- [Python Pipeline 优化指南]()
# Python Pipeline 基础功能
设计一个通用端到端多模型组合框架所面临的挑战有如下4点:
1. 通用性:框架既要满足通用模型的输入类型,又要满足模型组合的复杂拓扑关系。
2. 高性能:与常见互联网后端服务不同,深度学习模型的推理程序属于计算密集型程序,同时 GPU 等计算芯片价格昂贵,因此在平均响应时间不苛刻的场景下,计算资源占用和吞吐量指标格外重要。
3. 高可用性:高可用的架构依赖每个服务的健壮性,服务状态可查询、异常可监控和管理是必备条件。
4. 易于开发与调试:使用 Python 语言开发可大幅提升研发效率,运行的错误信息准确帮助开发者快速定位问题。
## 总体设计
Python Pipeline 框架分为网络服务层和图执行引擎2部分,网络服务层处理多种网络协议请求和通用输入参数问题,图执行引擎层解决复杂拓扑关系。如下图所示
<div align=center>
<img src='../images/pipeline_serving-image1.png' height = "250" align="middle"/>
</div>
**一.网络服务层**
网络服务层包括了 gRPC-gateway 和 gRPC Server。gPRC gateway 接收 HTTP 请求,打包成 proto 格式后转发给 gRPC Server,一套处理程序可同时处理 HTTP、gRPC 2种类型请求。
另外,在支持多种模型的输入输出数据类型上,使用统一的 service.proto 结构,具有更好的通用性。
```proto
message Request {
repeated string key = 1;
repeated string value = 2;
optional string name = 3;
optional string method = 4;
optional int64 logid = 5;
optional string clientip = 6;
};
message Response {
optional int32 err_no = 1;
optional string err_msg = 2;
repeated string key = 3;
repeated string value = 4;
};
```
Request 是输入结构,`key``value` 是配对的 string 数组。 `name``method` 对应 URL://{ip}:{port}/{name}/{method}。`logid``clientip` 便于用户串联服务级请求和自定义策略。
Response 是输出结构,`err_no``err_msg` 表达处理结果的正确性和错误信息,`key``value` 为结果。
**二.图执行引擎层**
图执行引擎的设计思路是基于有向无环图实现多模型组合的复杂拓扑关系,有向无环图由单节点或多节点串联、并联结构构成。
<div align=center>
<img src='../images/pipeline_serving-image2.png' height = "300" align="middle"/>
</div>
图执行引擎抽象归纳出2种数据结构 OP 节点和 Channel 有向边,构建一条异步流水线工作流。核心概念和设计思路如下:
- OP 节点: 可理解成1个推理模型、一个处理方法,甚至是训练前向代码,可独立运行,独立设置并发度。每个 OP 节点的计算结果放入其绑定的 Channel 中。
- Channel 数据管道: 可理解为一个单向缓冲队列。每个 Channel 只接收上游 OP 节点的计算输出,作为下游 OP 节点的输入。
- 工作流:根据用户定义的节点依赖关系,图执行引擎自动生成有向无环图。每条用户请求到达图执行引擎时会生成一个唯一自增 ID,通过这种唯一性绑定关系标记流水线中的不同请求。
- 对于 OP 之间需要传输过大数据的情况,可以考虑 RAM DB 外存进行全局存储,通过在 Channel 中传递索引的 Key 来进行数据传输
## 基础功能
展开网络服务层和图执行引擎 Pipeline 服务、OP、重写OP前后处理,最后介绍特定OP(RequestOp和ResponseOp)二次开发的方法。
**一.Pipeline 服务**
PipelineServer包装了RPC运行层和图引擎执行,所有Pipeline服务首先要实例化PipelineServer示例,再设置2个核心接口 set_response_op、加载配置信息,最后调用run_server启动服务。代码示例如下:
```python
server = PipelineServer()
server.set_response_op(response_op)
server.prepare_server(config_yml_path)
#server.prepare_pipeline_config(config_yml_path)
server.run_server()
```
PipelineServer的核心接口:
- `set_response_op`,设置response_op 将会根据各个 OP 的拓扑关系初始化 Channel 并构建计算图。
- `prepare_server`: 加载配置信息,并启动远端Serving服务,适用于调用远端远端推理服务
- `prepare_pipeline_config`,仅加载配置信息,适用于local_prdict
- `run_server`,启动gRPC服务,接收请求
**二.OP 设计与实现**
OP 的设计原则:
- 单个 OP 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel
- 单个 OP 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现
- 单个 OP 可以控制并发数,从而增加处理并发数
- 单个 OP 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching
- OP 可以由线程或进程启动
其构造函数如下:
```python
def __init__(name=None,
input_ops=[],
server_endpoints=[],
fetch_list=[],
client_config=None,
client_type=None,
concurrency=1,
timeout=-1,
retry=1,
batch_size=1,
auto_batching_timeout=None,
local_service_handler=None)
```
各参数含义如下:
| 参数名 | 类型 | 含义 |
| :-------------------: | :---------: |:------------------------------------------------: |
| name | (str) | 用于标识 OP 类型的字符串,该字段必须全局唯一。 |
| input_ops | (list) | 当前 OP 的所有前继 OP 的列表。 |
| server_endpoints | (list) |远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,认为是local_precditor模式,从local_service_conf中读取配置。 |
| fetch_list | (list) |远程 Paddle Serving Service 的 fetch 列表。 |
| client_config | (str) |Paddle Serving Service 对应的 Client 端配置文件路径。 |
| client_type | (str) |可选择brpc、grpc或local_predictor。local_predictor不启动Serving服务,进程内预测。 |
| concurrency | (int) | OP 的并发数。 |
| timeout | (int) |process 操作的超时时间,单位为毫秒。若该值小于零,则视作不超时。 |
| retry | (int) |超时重试次数。当该值为 1 时,不进行重试。 |
| batch_size | (int) |进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值,默认为 1。 |
| auto_batching_timeout | (float) |进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。batch_size > 1时,要设置auto_batching_timeout,否则请求数量不足batch_size时会阻塞等待。 |
| local_service_handler | (object) |local predictor handler,Op init()入参赋值 或 在Op init()中创建|
**三.Channel 设计与实现**
Channel的设计原则:
- Channel 是 OP 之间共享数据的数据结构,负责共享数据或者共享数据状态信息
- Channel 可以支持多个OP的输出存储在同一个 Channel,同一个 Channel 中的数据可以被多个 OP 使用
下图为图执行引擎中 Channel 的设计,采用 input buffer 和 output buffer 进行多 OP 输入或多 OP 输出的数据对齐,中间采用一个 Queue 进行缓冲
<div align=center>
<img src='../images/pipeline_serving-image3.png' height = "500" align="middle"/>
</div>
**四.二次开发**
提供给开发者的二次开发接口有三种,分别是推理 OP 二次开发接口、RequestOp 二次开发接口和 ResponseOp 二次开发接口。
1. 推理 OP 二次开发
推理 OP 为开发者提供3个外部函数接口:
| 变量或接口 | 说明 |
| :----------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) |
| def process(self, feed_dict_list, typical_logid) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) |
| def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 OP 获取。(该函数对一个 **sample** 进行处理) |
| def init_op(self) | 用于加载资源(如字典等)。 |
| self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 OP 单独计算)。 |
OP 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下:
```python
def preprocess(self, input_dicts):
# multiple previous Op
if len(input_dicts) != 1:
raise NotImplementedError(
'this Op has multiple previous inputs. Please override this func.'
(_, input_dict), = input_dicts.items()
return input_dict
def process(self, feed_dict_list, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
if err != 0:
raise NotImplementedError(
"{} Please override preprocess func.".format(err_info))
call_result = self.client.predict(
feed=feed_dict_list, fetch=self._fetch_names, log_id=typical_logid)
if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0:
return None
call_result.pop("serving_status_code")
return call_result
def postprocess(self, input_dicts, fetch_dict):
return fetch_dict
```
**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 OP 的 name 为 Key,对应 OP 的输出为 Value 的字典。
**process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。`typical_logid` 作为向 PaddleServingService 穿透的 logid。
**postprocess** 的参数是 `input_dicts``fetch_dict``input_dicts` 与 preprocess 的参数一致,`fetch_dict` (作为一个 **sample**)是 process 函数的返回 batch 中的一个 sample(如果没有执行 process ,则该值为 preprocess 的返回值)。
用户还可以对 **init_op** 函数进行重写,已加载自定义的一些资源(比如字典等),默认实现如下:
```python
def init_op(self):
pass
```
需要**注意**的是,在线程版 OP 中,每个 OP 只会调用一次该函数,故加载的资源必须要求是线程安全的。
2. RequestOp 二次开发
RequestOp 用于处理 Pipeline Server 接收到的 RPC 数据,处理后的数据将会被加入到图执行引擎中。其功能实现如下:
```python
class RequestOp(Op):
def __init__(self):
# PipelineService.name = "@DAGExecutor"
super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[])
# init op
try:
self.init_op()
except Exception as e:
_LOGGER.critical("Op(Request) Failed to init: {}".format(e))
os._exit(-1)
def unpack_request_package(self, request):
dict_data = {}
log_id = None
if request is None:
_LOGGER.critical("request is None")
raise ValueError("request is None")
for idx, key in enumerate(request.key):
dict_data[key] = request.value[idx]
log_id = request.logid
_LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name,
request.method))
return dict_data, log_id, None, ""
```
**unpack_request_package** 的默认实现是将 RPC request 中的 key 和 value 做成字典交给第一个自定义OP。当默认的RequestOp无法满足参数解析需求时,可通过重写下面2个接口自定义请求参数解析方法。
| 接口 | 说明 |
| :---------------------------------------: | :----------------------------------------: |
| init_op(self) | OP初始化,设置默认名称@DAGExecutor |
| unpack_request_package(self, request) | 处理接收的RPC数据 |
3. ResponseOp 二次开发
ResponseOp 用于处理图执行引擎的预测结果,处理后的数据将会作为 Pipeline Server 的RPC 返回值,其函数实现如下,在pack_response_package中做了精简
```python
class ResponseOp(Op):
def __init__(self, input_ops):
super(ResponseOp, self).__init__(
name="@DAGExecutor", input_ops=input_ops)
# init op
try:
self.init_op()
except Exception as e:
_LOGGER.critical("Op(ResponseOp) Failed to init: {}".format(
e, exc_info=True))
os._exit(-1)
def pack_response_package(self, channeldata):
resp = pipeline_service_pb2.Response()
error_code = channeldata.error_code
error_info = ""
...
# pack results
if error_code is None:
error_code = 0
resp.err_no = error_code
resp.err_msg = error_info
return resp
```
**pack_response_package** 的默认实现是将预测结果的字典转化为 RPC response 中的 key 和 value。当默认的 ResponseOp 无法满足结果返回格式要求时,可通过重写下面2个接口自定义返回包打包方法。
| 接口 | 说明 |
| :------------------------------------------: | :-----------------------------------------: |
| init_op(self) | OP 初始化,设置默认名称 @DAGExecutor |
| pack_response_package(self, channeldata) | 处理接收的 RPC 数据 |
**五.自定义业务错误类型**
用户可根据业务场景自定义错误码,继承 ProductErrCode,在 Op 的 preprocess 或 postprocess 中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。
```python
class ProductErrCode(enum.Enum):
"""
ProductErrCode is a base class for recording business error code.
product developers inherit this class and extend more error codes.
"""
pass
```
**六.日志追踪**
Pipeline 服务日志在当前目录的 `PipelineServingLogs` 目录下,有3种类型日志,分别是 `pipeline.log``pipeline.log.wf``pipeline.tracer`
- `pipeline.log` : 记录 debug & info日志信息
- `pipeline.log.wf` : 记录 warning & error日志
- `pipeline.tracer` : 统计各个阶段耗时、channel 堆积信息
在服务发生异常时,错误信息会记录在 pipeline.log.wf 日志中。打印 tracer 日志要求在 config.yml 的 DAG 属性中添加 tracer 配置。
1. 日志与请求的唯一标识
Pipeline 中有2种 id 用以串联请求,分别是 data_id 和 log_id,二者区别如下:
- data_id : Pipeline 框架生成的自增 ID,标记请求唯一性标识
- log_id : 上游模块传入的标识,跟踪多个服务间串联关系,由于用户可不传入或不保证唯一性,因此不能作为唯一性标识
通常,Pipeline 框架打印的日志会同时带上 data_id 和 log_id。开启 auto-batching 后,会使用批量中的第一个 data_id 标记 batch 整体,同时框架会在一条日志中打印批量中所有 data_id。
2. 日志滚动
Pipeline 的日志模块在 `logger.py` 中定义,使用了 `logging.handlers.RotatingFileHandler` 支持磁盘日志文件的轮换。根据不同文件级别和日质量分别设置了 `maxBytes``backupCount`,当即将超出预定大小时,将关闭旧文件并打开一个新文件用于输出。
```python
"handlers": {
"f_pipeline.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log"),
"maxBytes": 512000000,
"backupCount": 20,
},
"f_pipeline.log.wf": {
"class": "logging.handlers.RotatingFileHandler",
"level": "WARNING",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
"maxBytes": 512000000,
"backupCount": 10,
},
"f_tracer.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "tracer_fmt",
"filename": os.path.join(log_dir, "pipeline.tracer"),
"maxBytes": 512000000,
"backupCount": 5,
},
}
```
**七.异构硬件**
Pipeline 除了支持 CPU、GPU 芯片推理之外,还支持在多种异构硬件推理部署。在 `config.yml` 中由 `device_type``devices`。优先使用 `device_type` 指定类型,当空缺时根据 `devices` 判断。`device_type` 描述如下:
- CPU(Intel) : 0
- GPU(Jetson/海光DCU) : 1
- TensorRT : 2
- CPU(Arm) : 3
- XPU : 4
- Ascend310 : 5
- ascend910 : 6
config.yml中硬件配置:
```
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
```
**八.低精度推理**
Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示:
- CPU
- fp32(default)
- fp16
- bf16(mkldnn)
- GPU
- fp32(default)
- fp16
- int8
- Tensor RT
- fp32(default)
- fp16
- int8
使用int8时,要开启use_calib: True
参考[simple_web_service](../../examples/Pipeline/simple_web_service)示例
# Python Pipeline 使用案例
Python Pipeline 使用案例部署步骤可分为下载模型、配置、编写代码、推理测试4个步骤。
所有Pipeline示例在[examples/Pipeline/](../../examples/Pipeline) 目录下,目前有7种类型模型示例:
- [PaddleClas](../../examples/Pipeline/PaddleClas)
- [Detection](../../examples/Pipeline/PaddleDetection)
- [bert](../../examples/Pipeline/PaddleNLP/bert)
- [imagenet](../../examples/Pipeline/PaddleClas/imagenet)
- [imdb_model_ensemble](../../examples/Pipeline/imdb_model_ensemble)
- [ocr](../../examples/Pipeline/PaddleOCR/ocr)
- [simple_web_service](../../examples/Pipeline/simple_web_service)
以 imdb_model_ensemble 为例来展示如何使用 Pipeline Serving,相关代码在 `Serving/examples/Pipeline/imdb_model_ensemble` 文件夹下可以找到,例子中的 Server 端结构如下图所示:
<div align=center>
<img src='../images/pipeline_serving-image4.png' height = "200" align="middle"/>
</div>
** 部署需要的文件 **
需要五类文件,其中模型文件、配置文件、服务端代码是构建Pipeline服务必备的三个文件。测试客户端和测试数据集为测试准备
- 模型文件
- 配置文件(config.yml)
- 服务级别:服务端口、gRPC线程数、服务超时、重试次数等
- DAG级别:资源类型、开启Trace、性能profile
- OP级别:模型路径、并发度、推理方式、计算硬件、推理超时、自动批量等
- 服务端(web_server.py)
- 服务级别:定义服务名称、读取配置文件、启动服务
- DAG级别:指定多OP之间的拓扑关系
- OP级别:重写OP前后处理
- 测试客户端
- 正确性校验
- 压力测试
- 测试数据集
- 图片、文本、语音等
## 获取模型
示例中通过`get_data.sh`获取模型文件,示例中的模型文件已保存Feed/Fetch Var参数,如没有保存请跳转到[保存Serving部署参数]()步骤。
```shell
cd Serving/examples/Pipeline/imdb_model_ensemble
sh get_data.sh
```
## 创建config.yaml
本示例采用了brpc的client连接类型,还可以选择grpc或local_predictor。
```yaml
#rpc端口, rpc_port和http_port不允许同时为空。当rpc_port为空且http_port不为空时,会自动将rpc_port设置为http_port+1
rpc_port: 18070
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18071
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
#当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 4
#build_dag_each_worker, False,框架在进程内创建一条DAG;True,框架会每个进程内创建多个独立的DAG
build_dag_each_worker: False
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: True
#重试次数
retry: 1
#使用性能分析, True,生成Timeline性能数据,对性能有一定影响;False为不使用
use_profile: False
#channel的最大长度,默认为0
channel_size: 0
#tracer, 跟踪框架吞吐,每个OP和channel的工作情况。无tracer时不生成数据
tracer:
#每次trace的时间间隔,单位秒/s
interval_s: 10
op:
bow:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# client连接类型,brpc, grpc和local_predictor
client_type: brpc
# Serving交互重试次数,默认不重试
retry: 1
# Serving交互超时时间, 单位ms
timeout: 3000
# Serving IPs
server_endpoints: ["127.0.0.1:9393"]
# bow模型client端配置
client_config: "imdb_bow_client_conf/serving_client_conf.prototxt"
# Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
# 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
cnn:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# client连接类型,brpc
client_type: brpc
# Serving交互重试次数,默认不重试
retry: 1
# 预测超时时间, 单位ms
timeout: 3000
# Serving IPs
server_endpoints: ["127.0.0.1:9292"]
# cnn模型client端配置
client_config: "imdb_cnn_client_conf/serving_client_conf.prototxt"
# Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["prediction"]
# 批量查询Serving的数量, 默认1。
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
combine:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# Serving交互重试次数,默认不重试
retry: 1
# 预测超时时间, 单位ms
timeout: 3000
# 批量查询Serving的数量, 默认1。
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
```
## 编写 Server 代码
代码示例中,重点留意3个自定义Op的preprocess、postprocess处理,以及Combin Op初始化列表input_ops=[bow_op, cnn_op],设置Combin Op的前置OP列表。
```python
from paddle_serving_server.pipeline import Op, RequestOp, ResponseOp
from paddle_serving_server.pipeline import PipelineServer
from paddle_serving_server.pipeline.proto import pipeline_service_pb2
from paddle_serving_server.pipeline.channel import ChannelDataEcode
import numpy as np
from paddle_serving_app.reader import IMDBDataset
class ImdbRequestOp(RequestOp):
def init_op(self):
self.imdb_dataset = IMDBDataset()
self.imdb_dataset.load_resource('imdb.vocab')
def unpack_request_package(self, request):
dictdata = {}
for idx, key in enumerate(request.key):
if key != "words":
continue
words = request.value[idx]
word_ids, _ = self.imdb_dataset.get_words_and_label(words)
dictdata[key] = np.array(word_ids)
return dictdata
class CombineOp(Op):
def preprocess(self, input_data):
combined_prediction = 0
for op_name, data in input_data.items():
combined_prediction += data["prediction"]
data = {"prediction": combined_prediction / 2}
return data
read_op = ImdbRequestOp()
bow_op = Op(name="bow",
input_ops=[read_op],
server_endpoints=["127.0.0.1:9393"],
fetch_list=["prediction"],
client_config="imdb_bow_client_conf/serving_client_conf.prototxt",
concurrency=1,
timeout=-1,
retry=1)
cnn_op = Op(name="cnn",
input_ops=[read_op],
server_endpoints=["127.0.0.1:9292"],
fetch_list=["prediction"],
client_config="imdb_cnn_client_conf/serving_client_conf.prototxt",
concurrency=1,
timeout=-1,
retry=1)
combine_op = CombineOp(
name="combine",
input_ops=[bow_op, cnn_op],
concurrency=5,
timeout=-1,
retry=1)
# use default ResponseOp implementation
response_op = ResponseOp(input_ops=[combine_op])
server = PipelineServer()
server.set_response_op(response_op)
server.prepare_server('config.yml')
server.run_server()
```
## 启动服务验证
```python
from paddle_serving_client.pipeline import PipelineClient
import numpy as np
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
words = 'i am very sad | 0'
futures = []
for i in range(3):
futures.append(
client.predict(
feed_dict={"words": words},
fetch=["prediction"],
asyn=True))
for f in futures:
res = f.result()
if res["ecode"] != 0:
print(res)
exit(1)
```
# Python Pipeline 高阶用法
高阶用法在复杂场景中使用,实现更多自定义能力,包括 DAG 跳过某个OP运行、自定义数据传输结构以及多卡推理等。
## DAG 跳过某个OP运行
为 DAG 图中跳过某个 OP 运行,实际做法是在跳过此 OP 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。
preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 OP 的 process 阶段,直接进入 postprocess 处理。
```python
def preprocess(self, input_dicts, data_id, log_id):
"""
In preprocess stage, assembling data for process stage. users can
override this function for model feed features.
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id
log_id: global unique id for RTT
Return:
input_dict: data for process stage
is_skip_process: skip process stage or not, False default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
# multiple previous Op
if len(input_dicts) != 1:
_LOGGER.critical(
self._log(
"Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func."))
os._exit(-1)
(_, input_dict), = input_dicts.items()
return input_dict, False, None, ""
```
## 自定义 proto 中 Request 和 Response 结构
当默认 proto 结构不满足业务需求时,同时下面2个文件的 proto 的 Request 和 Response message 结构,保持一致。
> pipeline/gateway/proto/gateway.proto
> pipeline/proto/pipeline_service.proto
再重新编译 Serving Server。
## 自定义 URL
grpc gateway 处理 post 请求,默认 `method``prediction`,例如:127.0.0.1:8080/ocr/prediction。用户可自定义 name 和 method,对于已有 url 的服务可无缝切换。
```proto
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/{name=*}/{method=*}"
body : "*"
};
}
};
```
## 批量推理
Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下:
- 场景1:一个推理请求包含批量数据(batch)
- 单条数据定长,批量变长,数据转成BCHW格式
- 单条数据变长,前处理中将单条数据做 padding 转成定长
- 场景2:一个推理请求的批量数据拆分成多个小块推理(mini-batch)
- 由于 padding 会按最长对齐,当一批数据中有个"极大"尺寸数据时会导致推理变慢
- 指定一个块大小,从而缩小"极大"尺寸数据的作用范围
- 场景3:合并多个请求数据批量推理(auto-batching)
- 推理耗时明显长于前后处理,合并多个请求数据推理一次会提高吞吐和GPU利用率
- 要求多个请求数据的 shape 一致
| 接口 | 说明 |
| :------------------------------------------: | :-----------------------------------------: |
| batch | client 发送批量数据,client.predict 的 batch=True |
| mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess|
| auto-batching | config.yml 中 OP 级别设置 batch_size 和 auto_batching_timeout |
### 4.6 单机多卡
单机多卡推理,M 个 OP 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 OP 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。
- 进程ID: 0 绑定 GPU 卡0
- 进程ID: 1 绑定 GPU 卡1
- 进程ID: 2 绑定 GPU 卡2
- 进程ID: 3 绑定 GPU 卡0
- 进程ID: 4 绑定 GPU 卡1
- 进程ID: 5 绑定 GPU 卡2
- 进程ID: 6 绑定 GPU 卡0
`config.yml` 中硬件配置:
```
#计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡
devices: "0,1,2"
```
# Python Pipeline 优化指南
## 如何通过 Timeline 工具进行优化
为了更好地对性能进行优化,Python Pipeline 提供了 Timeline 工具,对整个服务的各个阶段时间进行打点。
## 在 Server 端输出 Profile 信息
Server 端用 yaml 中的 `use_profile` 字段进行控制:
```yaml
dag:
use_profile: true
```
开启该功能后,Server 端在预测的过程中会将对应的日志信息打印到标准输出,为了更直观地展现各阶段的耗时,提供 Analyst 模块对日志文件做进一步的分析处理。
使用时先将 Server 的输出保存到文件,以 `profile.txt` 为例,脚本将日志中的时间打点信息转换成 json 格式保存到 `trace` 文件,`trace` 文件可以通过 chrome 浏览器的 tracing 功能进行可视化。
```python
from paddle_serving_server.pipeline import Analyst
import json
import sys
if __name__ == "__main__":
log_filename = "profile.txt"
trace_filename = "trace"
analyst = Analyst(log_filename)
analyst.save_trace(trace_filename)
```
具体操作:打开 chrome 浏览器,在地址栏输入 `chrome://tracing/` ,跳转至 tracing 页面,点击 load 按钮,打开保存的 `trace` 文件,即可将预测服务的各阶段时间信息可视化。
## 在 Client 端输出 Profile 信息
Client 端在 `predict` 接口设置 `profile=True`,即可开启 Profile 功能。
开启该功能后,Client 端在预测的过程中会将该次预测对应的日志信息打印到标准输出,后续分析处理同 Server。
## 分析方法
根据 `pipeline.tracer` 日志中的各个阶段耗时,按以下公式逐步分析出主要耗时在哪个阶段。
```
单 OP 耗时:
op_cost = process(pre + mid + post)
OP 期望并发数:
op_concurrency = 单OP耗时(s) * 期望QPS
服务吞吐量:
service_throughput = 1 / 最慢OP的耗时 * 并发数
服务平响:
service_avg_cost = ∑op_concurrency 【关键路径】
Channel 堆积:
channel_acc_size = QPS(down - up) * time
批量预测平均耗时:
avg_batch_cost = (N * pre + mid + post) / N
```
## 优化思路
根据长耗时在不同阶段,采用不同的优化方法.
- OP 推理阶段(mid-process):
- 增加 OP 并发度
- 开启 auto-batching (前提是多个请求的 shape 一致)
- 若批量数据中某条数据的 shape 很大,padding 很大导致推理很慢,可使用 mini-batch
- 开启 TensorRT/MKL-DNN 优化
- 开启低精度推理
- OP 前处理阶段(pre-process):
- 增加 OP 并发度
- 优化前处理逻辑
- in/out 耗时长(channel 堆积>5)
- 检查 channel 传递的数据大小和延迟
- 优化传入数据,不传递数据或压缩后再传入
- 增加 OP 并发度
- 减少上游 OP 并发度
# Imagenet Pipeline WebService
# Bert Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
This document will takes Bert service as an example to introduce how to use Pipeline WebService.
## Get model
```
......
# Imagenet Pipeline WebService
# Bert Pipeline WebService
这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。
这里以 Bert 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
......
......@@ -84,7 +84,7 @@ RUN ln -sf /usr/local/bin/python3.6 /usr/local/bin/python3 && ln -sf /usr/local/
RUN rm -r /root/python_build
# Install Go and glide
RUN wget -qO- https://dl.google.com/go/go1.14.linux-amd64.tar.gz | \
RUN wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \
tar -xz -C /usr/local && \
mkdir /root/go && \
mkdir /root/go/bin && \
......
# A image for building paddle binaries
# # Use cuda devel base image for both cpu and gpu environment
# # When you modify it, please be aware of cudnn-runtime version
FROM hub.baidubce.com/paddlepaddle/serving:latest-cuda10.2-cudnn8-devel
FROM registry.baidubce.com/paddlepaddle/serving:0.8.0-cuda10.2-cudnn8-devel
MAINTAINER PaddlePaddle Authors <paddle-dev@baidu.com>
......
......@@ -40,7 +40,7 @@ WORKDIR /home
RUN bash /build_scripts/install_trt.sh <<run_env>> && rm -rf /build_scripts
# install go
RUN wget -qO- https://dl.google.com/go/go1.14.linux-amd64.tar.gz | \
RUN wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \
tar -xz -C /usr/local && \
mkdir /root/go && \
mkdir /root/go/bin && \
......
......@@ -12,6 +12,7 @@ function usage
echo " --workdir : workdir in image";
echo " --command : command to launch serving"
echo " --port : serving port"
echo " --pod_num : number of pod replicas"
echo " -h | --help : helper";
}
......@@ -20,6 +21,9 @@ function parse_args
# positional args
args=()
# default
pod_num=1
# named args
while [ "$1" != "" ]; do
case "$1" in
......@@ -28,6 +32,7 @@ function parse_args
--workdir ) workdir="$2"; shift;;
--command ) start_command="$2"; shift;;
--port ) port="$2"; shift;;
--pod_num ) pod_num="$2"; shift;;
-h | --help ) usage; exit;; # quit and show usage
* ) args+=("$1") # if no match, add it to the positional args
esac
......@@ -41,7 +46,7 @@ function parse_args
positional_2="${args[1]}"
# validate required args
if [[ -z "${app_name}" || -z "${image_name}" || -z "${workdir}" || -z "${start_command}" || -z "${port}" ]]; then
if [[ -z "${app_name}" || -z "${image_name}" || -z "${workdir}" || -z "${start_command}" || -z "${port}" || -z "${pod_num}"]]; then
echo "Invalid arguments. check your params again."
usage
exit;
......@@ -59,6 +64,7 @@ function run
echo "named arg: workdir: $workdir"
echo "named arg: command: $start_command"
echo "named arg: port: $port"
echo "named arg: pod_num: $pod_num"
sed -e "s/<< APP_NAME >>/$app_name/g" -e "s/<< IMAGE_NAME >>/$(echo $image_name | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< WORKDIR >>/$(echo $workdir | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< COMMAND >>/\"$(echo $start_command | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')\"/g" -e "s/<< PORT >>/$port/g" tools/k8s_serving.yaml_template > k8s_serving.yaml
sed -e "s/<< APP_NAME >>/$app_name/g" -e "s/<< IMAGE_NAME >>/$(echo $image_name | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< WORKDIR >>/$(echo $workdir | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< COMMAND >>/\"$(echo $start_command | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')\"/g" -e "s/<< PORT >>/$port/g" tools/k8s_ingress.yaml_template > k8s_ingress.yaml
......
......@@ -78,7 +78,8 @@ function run
echo "named arg: image_name: $image_name"
sed -e "s/<<base_image>>/$base_image/g" -e "s/<<python_version>>/$python/g" -e "s/<<run_env>>/$env/g" -e "s/<<serving_version>>/$serving/g" -e "s/<<paddle_version>>/$paddle/g" tools/Dockerfile.runtime_template > Dockerfile.tmp
docker build --network=host --build-arg ftp_proxy=http://172.19.57.45:3128 --build-arg https_proxy=http://172.19.57.45:3128 --build-arg http_proxy=http://172.19.57.45:3128 --build-arg HTTP_PROXY=http://172.19.57.45:3128 --build-arg HTTPS_PROXY=http://172.19.57.45:3128 -t $image_name -f Dockerfile.tmp .
#docker build --network=host --build-arg ftp_proxy=http://172.19.57.45:3128 --build-arg https_proxy=http://172.19.57.45:3128 --build-arg http_proxy=http://172.19.57.45:3128 --build-arg HTTP_PROXY=http://172.19.57.45:3128 --build-arg HTTPS_PROXY=http://172.19.57.45:3128 -t $image_name -f Dockerfile.tmp .
docker build --network=host -t $image_name -f Dockerfile.tmp .
}
run "$@";
......@@ -20,7 +20,7 @@ metadata:
app: << APP_NAME >>
name: << APP_NAME >>
spec:
replicas: 1
replicas: << POD_NUM >>
selector:
matchLabels:
app: << APP_NAME >>
......
......@@ -21,7 +21,7 @@ function env_install()
{
apt install -y libcurl4-openssl-dev libbz2-dev
wget https://paddle-serving.bj.bcebos.com/others/centos_ssl.tar && tar xf centos_ssl.tar && rm -rf centos_ssl.tar && mv libcrypto.so.1.0.2k /usr/lib/libcrypto.so.1.0.2k && mv libssl.so.1.0.2k /usr/lib/libssl.so.1.0.2k && ln -sf /usr/lib/libcrypto.so.1.0.2k /usr/lib/libcrypto.so.10 && ln -sf /usr/lib/libssl.so.1.0.2k /usr/lib/libssl.so.10 && ln -sf /usr/lib/libcrypto.so.10 /usr/lib/libcrypto.so && ln -sf /usr/lib/libssl.so.10 /usr/lib/libssl.so
rm -rf /usr/local/go && wget -qO- https://paddle-ci.gz.bcebos.com/go1.15.12.linux-amd64.tar.gz | \
rm -rf /usr/local/go && wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \
tar -xz -C /usr/local && \
mkdir /root/go && \
mkdir /root/go/bin && \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册