未验证 提交 ee8b5e2d 编写于 作者: T TeslaZhao 提交者: GitHub

Merge pull request #1735 from TeslaZhao/develop

Update doc
......@@ -40,13 +40,20 @@ The goal of Paddle Serving is to provide high-performance, flexible and easy-to-
- Support service monitoring, provide prometheus-based performance statistics and port access
<h2 align="center">Tutorial</h2>
<h2 align="center">Tutorial and Papers</h2>
- AIStudio tutorial(Chinese) : [Paddle Serving服务化部署框架](https://www.paddlepaddle.org.cn/tutorials/projectdetail/2538249)
- AIStudio OCR practice(Chinese) : [基于PaddleServing的OCR服务化部署实战](https://aistudio.baidu.com/aistudio/projectdetail/3630726)
- Video tutorial(Chinese) : [深度学习服务化部署-以互联网应用为例](https://aistudio.baidu.com/aistudio/course/introduce/19084)
- Edge AI solution(Chinese) : [基于Paddle Serving&百度智能边缘BIE的边缘AI解决方案](https://mp.weixin.qq.com/s/j0EVlQXaZ7qmoz9Fv96Yrw)
- Paper : [JiZhi: A Fast and Cost-Effective Model-As-A-Service System for
Web-Scale Online Inference at Baidu](https://arxiv.org/pdf/2106.01674.pdf)
- Paper : [ERNIE 3.0 TITAN: EXPLORING LARGER-SCALE KNOWLEDGE
ENHANCED PRE-TRAINING FOR LANGUAGE UNDERSTANDING
AND GENERATION](https://arxiv.org/pdf/2112.12731.pdf)
<p align="center">
<img src="doc/images/demo.gif" width="700">
</p>
......@@ -90,8 +97,6 @@ The first step is to call the model save interface to generate a model parameter
- [Analyze and optimize performance](doc/Python_Pipeline/Performance_Tuning_EN.md)
- [TensorRT dynamic Shape](doc/TensorRT_Dynamic_Shape_EN.md)
- [Benchmark(Chinese)](doc/Python_Pipeline/Benchmark_CN.md)
- Our Paper: [JiZhi: A Fast and Cost-Effective Model-As-A-Service System for
Web-Scale Online Inference at Baidu](https://arxiv.org/pdf/2106.01674.pdf)
- Client SDK
- [Python SDK(Chinese)](doc/C++_Serving/Introduction_CN.md#42-多语言多协议Client)
- [JAVA SDK](doc/Java_SDK_EN.md)
......
......@@ -39,11 +39,18 @@ Paddle Serving依托深度学习框架PaddlePaddle旨在帮助深度学习开发
- 支持服务监控,提供基于普罗米修斯的性能数据统计及端口访问
<h2 align="center">教程</h2>
- AIStudio教程-[Paddle Serving服务化部署框架](https://www.paddlepaddle.org.cn/tutorials/projectdetail/2538249)
- 视频教程-[深度学习服务化部署-以互联网应用为例](https://aistudio.baidu.com/aistudio/course/introduce/19084)
- 边缘AI解决方案-[基于Paddle Serving&百度智能边缘BIE的边缘AI解决方案](https://mp.weixin.qq.com/s/j0EVlQXaZ7qmoz9Fv96Yrw)
<h2 align="center">教程与论文</h2>
- AIStudio 使用教程 : [Paddle Serving服务化部署框架](https://www.paddlepaddle.org.cn/tutorials/projectdetail/2538249)
- AIStudio OCR实战 : [基于PaddleServing的OCR服务化部署实战](https://aistudio.baidu.com/aistudio/projectdetail/3630726)
- 视频教程 : [深度学习服务化部署-以互联网应用为例](https://aistudio.baidu.com/aistudio/course/introduce/19084)
- 边缘AI 解决方案 : [基于Paddle Serving&百度智能边缘BIE的边缘AI解决方案](https://mp.weixin.qq.com/s/j0EVlQXaZ7qmoz9Fv96Yrw)
- 论文 : [JiZhi: A Fast and Cost-Effective Model-As-A-Service System for
Web-Scale Online Inference at Baidu](https://arxiv.org/pdf/2106.01674.pdf)
- 论文 : [ERNIE 3.0 TITAN: EXPLORING LARGER-SCALE KNOWLEDGE
ENHANCED PRE-TRAINING FOR LANGUAGE UNDERSTANDING
AND GENERATION](https://arxiv.org/pdf/2112.12731.pdf)
<p align="center">
<img src="doc/images/demo.gif" width="700">
......
# 部署案例概述
# 快速开始案例部署
您可以通过以下 Paddle Serving 快速开始案例,分别了解到 C++ Serving 与 Python Pipeline 2种框架的部署方法。
- [使用 C++ Serving 部署 Resnet50 模型案例]()
- [使用 Python Pipeline 部署 OCR 模型案例]()
通过阅读以下内容掌握 Paddle Serving 基础功能以及2种框架特性和使用指南:
- [基础功能]()
- [进阶 C++ Serving 介绍]()
- [进阶 Python Pipeline 介绍]()
# Python Pipeline 框架
在许多深度学习框架中,模型服务化部署通常用于单模型的一键部署。但在 AI 工业大生产的背景下,端到端的单一深度学习模型不能解决复杂问题,多个深度学习模型组合使用是解决现实复杂问题的常规手段,如文字识别 OCR 服务至少需要检测和识别2种模型;视频理解服务一般需要视频抽帧、切词、音频处理、分类等多种模型组合实现。当前,通用多模型组合服务的设计和实现是非常复杂的,既要能实现复杂的模型拓扑关系,又要保证服务的高并发、高可用和易于开发和维护等。
Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python Pipeline,不仅解决上述痛点,同时还能大幅提高 GPU 利用率,并易于开发和维护。
Python Pipeline 使用案例请阅读[Python Pipeline 快速部署案例](./3-2_QuickStart_Pipeline_OCR_CN.md)
通过阅读以下内容掌握 Python Pipeline 设计方案、高阶用法和优化指南等。
- [Python Pipeline 框架设计](7-1_Python_Pipeline_Design_CN.md)
- [Python Pipeline 高阶用法](7-2_Python_Pipeline_Senior_CN.md)
- [Python Pipeline 优化指南](7-3_Python_Pipeline_Optimize_CN.md)
# Python Pipeline 核心功能
为了解决多个深度学习模型组合的复杂问题,Paddle Serving 团队设计了一个通用端到端多模型组合框架,其核心特点包括:
1. 通用性:框架既要满足通用模型的输入类型,又要满足模型组合的复杂拓扑关系。
2. 高性能:与常见互联网后端服务不同,深度学习模型的推理程序属于计算密集型程序,同时 GPU 等计算芯片价格昂贵,因此在平均响应时间不苛刻的场景下,计算资源占用和吞吐量指标格外重要。
3. 高可用性:高可用的架构依赖每个服务的健壮性,服务状态可查询、异常可监控和管理是必备条件。
4. 易于开发与调试:使用 Python 语言开发可大幅提升研发效率,运行的错误信息准确帮助开发者快速定位问题。
## 框架设计
Python Pipeline 框架分为网络服务层和图执行引擎2部分,网络服务层处理多种网络协议请求和通用输入参数问题,图执行引擎层解决复杂拓扑关系。如下图所示
<div align=center>
<img src='../images/pipeline_serving-image1.png' height = "250" align="middle"/>
</div>
**一.网络服务层**
网络服务层包括了 gRPC-gateway 和 gRPC Server。gPRC gateway 接收 HTTP 请求,打包成 proto 格式后转发给 gRPC Server,一套处理程序可同时处理 HTTP、gRPC 2种类型请求。
另外,在支持多种模型的输入输出数据类型上,使用统一的 service.proto 结构,具有更好的通用性。
```proto
message Request {
repeated string key = 1;
repeated string value = 2;
optional string name = 3;
optional string method = 4;
optional int64 logid = 5;
optional string clientip = 6;
};
message Response {
optional int32 err_no = 1;
optional string err_msg = 2;
repeated string key = 3;
repeated string value = 4;
};
```
Request 是输入结构,`key``value` 是配对的 string 数组。 `name``method` 对应 URL://{ip}:{port}/{name}/{method}。`logid``clientip` 便于用户串联服务级请求和自定义策略。
Response 是输出结构,`err_no``err_msg` 表达处理结果的正确性和错误信息,`key``value` 为结果。
Pipeline 服务包装了继承于 WebService 类,以 OCR 示例为例,派生出 OcrService 类,get_pipeline_response 函数内实现 DAG 拓扑关系,默认服务入口为 read_op,函数返回的 Op 为最后一个处理,此处要求最后返回的 Op 必须唯一。
所有服务和模型的所有配置信息在 `config.yml` 中记录,URL 的 name 字段由 OcrService 初始化定义;run_service 函数启动服务。
```python
class OcrService(WebService):
def get_pipeline_response(self, read_op):
det_op = DetOp(name="det", input_ops=[read_op])
rec_op = RecOp(name="rec", input_ops=[det_op])
return rec_op
ocr_service = OcrService(name="ocr")
ocr_service.prepare_pipeline_config("config.yml")
ocr_service.run_service()
```
**二.图执行引擎层**
图执行引擎的设计思路是基于有向无环图实现多模型组合的复杂拓扑关系,有向无环图由单节点或多节点串联、并联结构构成。
<div align=center>
<img src='../images/pipeline_serving-image2.png' height = "300" align="middle"/>
</div>
图执行引擎抽象归纳出2种数据结构 Op 节点和 Channel 有向边,构建一条异步流水线工作流。核心概念和设计思路如下:
- Op 节点: 可理解成1个推理模型、一个处理方法,甚至是训练前向代码,可独立运行,独立设置并发度。每个 Op 节点的计算结果放入其绑定的 Channel 中。
- Channel 数据管道: 可理解为一个单向缓冲队列。每个 Channel 只接收上游 Op 节点的计算输出,作为下游 Op 节点的输入。
- 工作流:根据用户定义的节点依赖关系,图执行引擎自动生成有向无环图。每条用户请求到达图执行引擎时会生成一个唯一自增 ID,通过这种唯一性绑定关系标记流水线中的不同请求。
Op 的设计原则:
- 单个 Op 默认的功能是根据输入的 Channel 数据,访问一个 Paddle Serving 的单模型服务,并将结果存在输出的 Channel
- 单个 Op 可以支持用户自定义,包括 preprocess,process,postprocess 三个函数都可以由用户继承和实现
- 单个 Op 可以控制并发数,从而增加处理并发数
- 单个 Op 可以获取多个不同 RPC 请求的数据,以实现 Auto-Batching
- Op 可以由线程或进程启动
其构造函数如下:
```python
def __init__(name=None,
input_ops=[],
server_endpoints=[],
fetch_list=[],
client_config=None,
client_type=None,
concurrency=1,
timeout=-1,
retry=1,
batch_size=1,
auto_batching_timeout=None,
local_service_handler=None)
```
各参数含义如下:
| 参数名 | 类型 | 含义 |
| :-------------------: | :---------: |:------------------------------------------------: |
| name | (str) | 用于标识 Op 类型的字符串,该字段必须全局唯一。 |
| input_ops | (list) | 当前 Op 的所有前继 Op 的列表。 |
| server_endpoints | (list) |远程 Paddle Serving Service 的 endpoints 列表。如果不设置该参数,认为是local_precditor模式,从local_service_conf中读取配置。 |
| fetch_list | (list) |远程 Paddle Serving Service 的 fetch 列表。 |
| client_config | (str) |Paddle Serving Service 对应的 Client 端配置文件路径。 |
| client_type | (str) |可选择brpc、grpc或local_predictor。local_predictor不启动Serving服务,进程内预测。 |
| concurrency | (int) | Op 的并发数。 |
| timeout | (int) |process 操作的超时时间,单位为毫秒。若该值小于零,则视作不超时。 |
| retry | (int) |超时重试次数。当该值为 1 时,不进行重试。 |
| batch_size | (int) |进行 Auto-Batching 的期望 batch_size 大小,由于构建 batch 可能超时,实际 batch_size 可能小于设定值,默认为 1。 |
| auto_batching_timeout | (float) |进行 Auto-Batching 构建 batch 的超时时间,单位为毫秒。batch_size > 1时,要设置auto_batching_timeout,否则请求数量不足batch_size时会阻塞等待。 |
| local_service_handler | (object) |local predictor handler,Op init() 入参赋值或在 Op init() 中创建|
对于 Op 之间需要传输过大数据的情况,可以考虑 RAM DB 外存进行全局存储,通过在 Channel 中传递索引的 Key 来进行数据传输
Channel的设计原则:
- Channel 是 Op 之间共享数据的数据结构,负责共享数据或者共享数据状态信息
- Channel 可以支持多个OP的输出存储在同一个 Channel,同一个 Channel 中的数据可以被多个 Op 使用
下图为图执行引擎中 Channel 的设计,采用 input buffer 和 output buffer 进行多 Op 输入或多 Op 输出的数据对齐,中间采用一个 Queue 进行缓冲
<div align=center>
<img src='../images/pipeline_serving-image3.png' height = "500" align="middle"/>
</div>
**三. 服务日志**
Pipeline 服务日志在当前目录的 `PipelineServingLogs` 目录下,有3种类型日志,分别是 `pipeline.log``pipeline.log.wf``pipeline.tracer`
- `pipeline.log` : 记录 debug & info日志信息
- `pipeline.log.wf` : 记录 warning & error日志
- `pipeline.tracer` : 统计各个阶段耗时、channel 堆积信息
在服务发生异常时,错误信息会记录在 pipeline.log.wf 日志中。打印 tracer 日志要求在 config.yml 的 DAG 属性中添加 tracer 配置。
1. 日志与请求的唯一标识
Pipeline 中有2种 id 用以串联请求,分别是 data_id 和 log_id,二者区别如下:
- data_id : Pipeline 框架生成的自增 ID,标记请求唯一性标识
- log_id : 上游模块传入的标识,跟踪多个服务间串联关系,由于用户可不传入或不保证唯一性,因此不能作为唯一性标识
通常,Pipeline 框架打印的日志会同时带上 data_id 和 log_id。开启 auto-batching 后,会使用批量中的第一个 data_id 标记 batch 整体,同时框架会在一条日志中打印批量中所有 data_id。
2. 日志滚动
Pipeline 的日志模块在 `logger.py` 中定义,使用了 `logging.handlers.RotatingFileHandler` 支持磁盘日志文件的轮换。根据不同文件级别和日质量分别设置了 `maxBytes``backupCount`,当即将超出预定大小时,将关闭旧文件并打开一个新文件用于输出。
```python
"handlers": {
"f_pipeline.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log"),
"maxBytes": 512000000,
"backupCount": 20,
},
"f_pipeline.log.wf": {
"class": "logging.handlers.RotatingFileHandler",
"level": "WARNING",
"formatter": "normal_fmt",
"filename": os.path.join(log_dir, "pipeline.log.wf"),
"maxBytes": 512000000,
"backupCount": 10,
},
"f_tracer.log": {
"class": "logging.handlers.RotatingFileHandler",
"level": "INFO",
"formatter": "tracer_fmt",
"filename": os.path.join(log_dir, "pipeline.tracer"),
"maxBytes": 512000000,
"backupCount": 5,
},
}
```
## 自定义信息
提供给开发者提供以下自定义信息,包括自定义 Web 服务、自定义服务输入和输出结构、自定义服务并发和模型配置和自定义推理过程
- 自定义 Web 服务 URL
- 自定义服务输入和输出结构
- 自定义服务并发和模型配置
- 自定义推理过程
- 自定义业务错误类型
1. 自定义 Web 服务 URL
在 Web 服务中自定义服务名称是常见操作,尤其是将已有服务迁移到新框架。URL 中核心字段包括 `ip``port``name``method`,根据最新部署的环境信息设置前2个字段,重点介绍如何设置 `name``method`,框架提供默认的 `methon``prediciton`,如 `http://127.0.0.1:9999/ocr/prediction`
框架有2处代码与此相关,分别是 gRPC Gateway 的配置文件 `python/pipeline/gateway/proto/gateway.proto` 和 服务启动文件 `web_server.py`
业务场景中通过设置 `name` 和 验证 `method` 解决问题。以 [OCR 示例]()为例,服务启动文件 `web_server.py` 通过类 `OcrService` 构造函数的 `name` 字段设置 URL 中 `name` 字段;
```
ocr_service = OcrService(name="ocr")
ocr_service.prepare_pipeline_config("config.yml")
ocr_service.run_service()
```
框架提供默认的 `methon``prediciton`,通过重载 `RequestOp::unpack_request_package` 来验证 `method`
```
def unpack_request_package(self, request):
dict_data = {}
log_id = None
if request is None:
_LOGGER.critical("request is None")
raise ValueError("request is None")
if request.method is not "prediction":
_LOGGER.critical("request method error")
raise ValueError("request method error")
...
```
`python/pipeline/gateway/proto/gateway.proto` 文件可以对 `name``method` 做严格限制,一般不需要修改,如需要特殊指定修改后,需要重新编译 Paddle Serving,[编译方法]()
```proto
service PipelineService {
rpc inference(Request) returns (Response) {
option (google.api.http) = {
post : "/{name=*}/{method=*}"
body : "*"
};
}
};
```
2. 自定义服务输入和输出结构
输入和输出结构包括 proto 中 Request 和 Response 结构,以及 Op 前后处理返回。
当默认 proto 结构不满足业务需求时,同时下面2个文件的 proto 的 Request 和 Response message 结构,保持一致。
- pipeline/gateway/proto/gateway.proto
- pipeline/proto/pipeline_service.proto
修改后,需要[重新编译]()
3. 自定义服务并发和模型配置
完整的配置信息可参考[配置信息]()
4. 自定义推理过程
推理 Op 为开发者提供3个外部函数接口:
| 变量或接口 | 说明 |
| :----------------------------------------------: | :----------------------------------------------------------: |
| def preprocess(self, input_dicts) | 对从 Channel 中获取的数据进行处理,处理完的数据将作为 **process** 函数的输入。(该函数对一个 **sample** 进行处理) |
| def process(self, feed_dict_list, typical_logid) | 基于 Paddle Serving Client 进行 RPC 预测,处理完的数据将作为 **postprocess** 函数的输入。(该函数对一个 **batch** 进行处理) |
| def postprocess(self, input_dicts, fetch_dict) | 处理预测结果,处理完的数据将被放入后继 Channel 中,以被后继 Op 获取。(该函数对一个 **sample** 进行处理) |
| def init_op(self) | 用于加载资源(如字典等)。 |
| self.concurrency_idx | 当前进程(非线程)的并发数索引(不同种类的 Op 单独计算)。 |
Op 在一个运行周期中会依次执行 preprocess,process,postprocess 三个操作(当不设置 `server_endpoints` 参数时,不执行 process 操作),用户可以对这三个函数进行重写,默认实现如下:
```python
def preprocess(self, input_dicts):
# multiple previous Op
if len(input_dicts) != 1:
raise NotImplementedError(
'this Op has multiple previous inputs. Please override this func.'
(_, input_dict), = input_dicts.items()
return input_dict
def process(self, feed_dict_list, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_dict_list)
if err != 0:
raise NotImplementedError(
"{} Please override preprocess func.".format(err_info))
call_result = self.client.predict(
feed=feed_dict_list, fetch=self._fetch_names, log_id=typical_logid)
if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0:
return None
call_result.pop("serving_status_code")
return call_result
def postprocess(self, input_dicts, fetch_dict):
return fetch_dict
```
**preprocess** 的参数是前继 Channel 中的数据 `input_dicts`,该变量(作为一个 **sample**)是一个以前继 Op 的 name 为 Key,对应 Op 的输出为 Value 的字典。
**process** 的参数是 Paddle Serving Client 预测接口的输入变量 `fetch_dict_list`(preprocess 函数的返回值的列表),该变量(作为一个 **batch**)是一个列表,列表中的元素为以 feed_name 为 Key,对应 ndarray 格式的数据为 Value 的字典。`typical_logid` 作为向 PaddleServingService 穿透的 logid。
**postprocess** 的参数是 `input_dicts``fetch_dict``input_dicts` 与 preprocess 的参数一致,`fetch_dict` (作为一个 **sample**)是 process 函数的返回 batch 中的一个 sample(如果没有执行 process ,则该值为 preprocess 的返回值)。
用户还可以对 **init_op** 函数进行重写,已加载自定义的一些资源(比如字典等),默认实现如下:
```python
def init_op(self):
pass
```
RequestOp 和 ResponseOp 是 Python Pipeline 的中2个特殊 Op,分别是用分解 RPC 数据加入到图执行引擎中,和拿到图执行引擎的预测结果并打包 RPC 数据到客户端。
RequestOp 类的设计如下所示,核心是在 unpack_request_package 函数中解析请求数据,因此,当修改 Request 结构后重写此函数实现全新的解包处理。
| 接口 | 说明 |
| :---------------------------------------: | :----------------------------------------: |
| init_op(self) | OP初始化,设置默认名称@DAGExecutor |
| unpack_request_package(self, request) | 解析请求数据 |
```python
class RequestOp(Op):
def __init__(self):
# PipelineService.name = "@DAGExecutor"
super(RequestOp, self).__init__(name="@DAGExecutor", input_ops=[])
# init op
try:
self.init_op()
except Exception as e:
_LOGGER.critical("Op(Request) Failed to init: {}".format(e))
os._exit(-1)
def unpack_request_package(self, request):
dict_data = {}
log_id = None
if request is None:
_LOGGER.critical("request is None")
raise ValueError("request is None")
for idx, key in enumerate(request.key):
dict_data[key] = request.value[idx]
log_id = request.logid
_LOGGER.info("RequestOp unpack one request. log_id:{}, clientip:{} \
name:{}, method:{}".format(log_id, request.clientip, request.name,
request.method))
return dict_data, log_id, None, ""
```
ResponseOp 类的设计如下所示,核心是在 pack_response_package 中打包返回结构,因此修改 Response 结构后重写此函数实现全新的打包格式。
| 接口 | 说明 |
| :------------------------------------------: | :-----------------------------------------: |
| init_op(self) | Op 初始化,设置默认名称 @DAGExecutor |
| pack_response_package(self, channeldata) | 处理接收的 RPC 数据 |
```python
class ResponseOp(Op):
def __init__(self, input_ops):
super(ResponseOp, self).__init__(
name="@DAGExecutor", input_ops=input_ops)
# init op
try:
self.init_op()
except Exception as e:
_LOGGER.critical("Op(ResponseOp) Failed to init: {}".format(
e, exc_info=True))
os._exit(-1)
def pack_response_package(self, channeldata):
resp = pipeline_service_pb2.Response()
error_code = channeldata.error_code
error_info = ""
...
# pack results
if error_code is None:
error_code = 0
resp.err_no = error_code
resp.err_msg = error_info
return resp
```
5. 自定义业务错误类型
用户可根据业务场景自定义错误码,继承 ProductErrCode,在 Op 的 preprocess 或 postprocess 中返回列表中返回,下一阶段处理会根据自定义错误码跳过后置OP处理。
```python
class ProductErrCode(enum.Enum):
"""
ProductErrCode is a base class for recording business error code.
product developers inherit this class and extend more error codes.
"""
pass
```
# Python Pipeline 高阶用法
在复杂业务场景中使用常规功能无法满足需求,本文介绍一些高阶用法。
- DAG 结构跳过某个 Op 运行
- 批量推理
- 单机多卡推理
- 多种计算芯片上推理
- 低精度推理
- TensorRT 推理加速
- MKLDNN 推理加速
**一. DAG 结构跳过某个 Op 运行 **
此应用场景一般在 Op 前后处理中有 if 条件判断时,不满足条件时,跳过后面处理。实际做法是在跳过此 Op 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。
preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 Op 的 process 阶段,直接进入 postprocess 处理。
```python
def preprocess(self, input_dicts, data_id, log_id):
"""
In preprocess stage, assembling data for process stage. users can
override this function for model feed features.
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id
log_id: global unique id for RTT
Return:
input_dict: data for process stage
is_skip_process: skip process stage or not, False default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
# multiple previous Op
if len(input_dicts) != 1:
_LOGGER.critical(
self._log(
"Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func."))
os._exit(-1)
(_, input_dict), = input_dicts.items()
return input_dict, False, None, ""
```
** 二. 批量推理 **
Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下:
- 场景1:一个推理请求包含批量数据(batch)
- 单条数据定长,批量变长,数据转成BCHW格式
- 单条数据变长,前处理中将单条数据做 padding 转成定长
- 场景2:一个推理请求的批量数据拆分成多个小块推理(mini-batch)
- 由于 padding 会按最长对齐,当一批数据中有个"极大"尺寸数据时会导致推理变慢
- 指定一个块大小,从而缩小"极大"尺寸数据的作用范围
- 场景3:合并多个请求数据批量推理(auto-batching)
- 推理耗时明显长于前后处理,合并多个请求数据推理一次会提高吞吐和GPU利用率
- 要求多个请求数据的 shape 一致
| 接口 | 说明 |
| :------------------------------------------: | :-----------------------------------------: |
| batch | client 发送批量数据,client.predict 的 batch=True |
| mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess|
| auto-batching | config.yml 中 Op 级别设置 batch_size 和 auto_batching_timeout |
** 三. 单机多卡推理 **
单机多卡推理,M 个 Op 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 Op 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。
- 进程ID: 0 绑定 GPU 卡0
- 进程ID: 1 绑定 GPU 卡1
- 进程ID: 2 绑定 GPU 卡2
- 进程ID: 3 绑定 GPU 卡0
- 进程ID: 4 绑定 GPU 卡1
- 进程ID: 5 绑定 GPU 卡2
- 进程ID: 6 绑定 GPU 卡0
`config.yml` 中硬件配置:
```
#计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡
devices: "0,1,2"
```
** 四. 多种计算芯片上推理 **
Pipeline 除了支持 CPU、GPU 芯片推理之外,还支持在多种计算硬件推理部署。在 `config.yml` 中由 `device_type``devices`。优先使用 `device_type` 指定类型,当空缺时根据 `devices` 判断。`device_type` 描述如下:
- CPU(Intel) : 0
- GPU(Jetson/海光DCU) : 1
- TensorRT : 2
- CPU(Arm) : 3
- XPU : 4
- Ascend310 : 5
- ascend910 : 6
config.yml中硬件配置:
```
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
```
** 五. 低精度推理 **
Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示:
- CPU
- fp32(default)
- fp16
- bf16(mkldnn)
- GPU
- fp32(default)
- fp16
- int8
- Tensor RT
- fp32(default)
- fp16
- int8
使用int8时,要开启use_calib: True
参考[simple_web_service](../../examples/Pipeline/simple_web_service)示例
# Python Pipeline 优化指南
## 如何通过 Timeline 工具进行优化
为了更好地对性能进行优化,Python Pipeline 提供了 Timeline 工具,对整个服务的各个阶段时间进行打点。
## 在 Server 端输出 Profile 信息
Server 端用 yaml 中的 `use_profile` 字段进行控制:
```yaml
dag:
use_profile: true
```
开启该功能后,Server 端在预测的过程中会将对应的日志信息打印到标准输出,为了更直观地展现各阶段的耗时,提供 Analyst 模块对日志文件做进一步的分析处理。
使用时先将 Server 的输出保存到文件,以 `profile.txt` 为例,脚本将日志中的时间打点信息转换成 json 格式保存到 `trace` 文件,`trace` 文件可以通过 chrome 浏览器的 tracing 功能进行可视化。
```python
from paddle_serving_server.pipeline import Analyst
import json
import sys
if __name__ == "__main__":
log_filename = "profile.txt"
trace_filename = "trace"
analyst = Analyst(log_filename)
analyst.save_trace(trace_filename)
```
具体操作:打开 chrome 浏览器,在地址栏输入 `chrome://tracing/` ,跳转至 tracing 页面,点击 load 按钮,打开保存的 `trace` 文件,即可将预测服务的各阶段时间信息可视化。
## 在 Client 端输出 Profile 信息
Client 端在 `predict` 接口设置 `profile=True`,即可开启 Profile 功能。
开启该功能后,Client 端在预测的过程中会将该次预测对应的日志信息打印到标准输出,后续分析处理同 Server。
## 分析方法
根据 `pipeline.tracer` 日志中的各个阶段耗时,按以下公式逐步分析出主要耗时在哪个阶段。
```
单 OP 耗时:
op_cost = process(pre + mid + post)
OP 期望并发数:
op_concurrency = 单OP耗时(s) * 期望QPS
服务吞吐量:
service_throughput = 1 / 最慢OP的耗时 * 并发数
服务平响:
service_avg_cost = ∑op_concurrency 【关键路径】
Channel 堆积:
channel_acc_size = QPS(down - up) * time
批量预测平均耗时:
avg_batch_cost = (N * pre + mid + post) / N
```
## 优化思路
根据长耗时在不同阶段,采用不同的优化方法.
- OP 推理阶段(mid-process):
- 增加 OP 并发度
- 开启 auto-batching (前提是多个请求的 shape 一致)
- 若批量数据中某条数据的 shape 很大,padding 很大导致推理很慢,可使用 mini-batch
- 开启 TensorRT/MKL-DNN 优化
- 开启低精度推理
- OP 前处理阶段(pre-process):
- 增加 OP 并发度
- 优化前处理逻辑
- in/out 耗时长(channel 堆积>5)
- 检查 channel 传递的数据大小和延迟
- 优化传入数据,不传递数据或压缩后再传入
- 增加 OP 并发度
- 减少上游 OP 并发度
Kubernetes 集群部署
服务部署经历从物理机、虚拟机、容器化、云原生4个阶段。云原生,提供集装箱组合模式的乐高生态,Docker、Kubernetes 已称为云原生时代基础设施,推动应用程序大发展。Kubernetes 的可扩展性和分布式架构一直是人工智能和机器学习的绝佳选择,随着解决方案不断成熟,推动机器学习大规模工程落地。
本章节介绍 Kubernetes 上集群化部署 Paddle Serving 方案以及企业级安全网关部署案例。
- [Kubernetes 集群部署方案]()
- [Kubernetes 安全网关部署案例]()
......@@ -2,6 +2,12 @@
(简体中文|[English](./Save_EN.md))
## 保存用于 Serving 部署模型的意义
## 从已保存的模型文件中导出
如果已使用Paddle 的`save_inference_model`接口保存出预测要使用的模型,你可以使用Paddle Serving提供的名为`paddle_serving_client.convert`的内置模块进行转换。
```python
......
# Imagenet Pipeline WebService
# Bert Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
This document will takes Bert service as an example to introduce how to use Pipeline WebService.
## Get model
```
......
# Imagenet Pipeline WebService
# Bert Pipeline WebService
这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。
这里以 Bert 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
......
......@@ -29,9 +29,26 @@ test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data1 = file.read()
# print file name
print('{}{}{}'.format('*' * 10, img_file, '*' * 10))
image = cv2_to_base64(image_data1)
for i in range(4):
data = {"key": ["image"], "value": [image]}
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
result = r.json()
print("erro_no:{}, err_msg:{}".format(result["err_no"], result["err_msg"]))
# check success
if result["err_no"] == 0:
ocr_result = result["value"][0]
try:
for item in eval(ocr_result):
# return transcription and points
print("{}, {}".format(item[0], item[1]))
except Exception as e:
print("No results")
continue
else:
print(
"For details about error message, see PipelineServingLogs/pipeline.log.wf"
)
......@@ -34,8 +34,24 @@ test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
# print file name
print('{}{}{}'.format('*' * 10, img_file, '*' * 10))
image = cv2_to_base64(image_data)
for i in range(1):
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
print(ret)
result = client.predict(feed_dict={"image": image}, fetch=["res"])
print("erro_no:{}, err_msg:{}".format(result.err_no, result.err_msg))
# check success
if result.err_no == 0:
ocr_result = result.value[0]
try:
for item in eval(ocr_result):
# return transcription and points
print("{}, {}".format(item[0], item[1]))
except Exception as e:
print("No results")
continue
else:
print(
"For details about error message, see PipelineServingLogs/pipeline.log.wf"
)
......@@ -14,6 +14,7 @@
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import copy
import cv2
import base64
from paddle_serving_app.reader import OCRReader
......@@ -34,17 +35,18 @@ class DetOp(Op):
self.filter_func = FilterBoxes(10, 10)
self.post_func = DBPostProcess({
"thresh": 0.3,
"box_thresh": 0.5,
"box_thresh": 0.6,
"max_candidates": 1000,
"unclip_ratio": 1.5,
"min_size": 3
})
"""
when opening tensorrt(configure in config.yml) and each time the input shape
for inferring is different, using this method for configuring tensorrt
dynamic shape to infer in each op model
"""
def set_dynamic_shape_info(self):
min_input_shape = {
"x": [1, 3, 50, 50],
......@@ -74,7 +76,7 @@ class DetOp(Op):
"min_input_shape": min_input_shape,
"max_input_shape": max_input_shape,
"opt_input_shape": opt_input_shape,
}
}
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
......@@ -107,25 +109,20 @@ class RecOp(Op):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
"""
when opening tensorrt(configure in config.yml) and each time the input shape
for inferring is different, using this method for configuring tensorrt
dynamic shape to infer in each op model
"""
def set_dynamic_shape_info(self):
min_input_shape = {
"x": [1, 3, 32, 10],
"lstm_1.tmp_0": [1, 1, 128]
}
min_input_shape = {"x": [1, 3, 32, 10], "lstm_1.tmp_0": [1, 1, 128]}
max_input_shape = {
"x": [50, 3, 32, 1000],
"lstm_1.tmp_0": [500, 50, 128]
}
opt_input_shape = {
"x": [6, 3, 32, 100],
"lstm_1.tmp_0": [25, 5, 128]
}
opt_input_shape = {"x": [6, 3, 32, 100], "lstm_1.tmp_0": [25, 5, 128]}
self.dynamic_shape_info = {
"min_input_shape": min_input_shape,
"max_input_shape": max_input_shape,
......@@ -137,8 +134,10 @@ class RecOp(Op):
raw_im = input_dict["image"]
data = np.frombuffer(raw_im, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
self.dt_list = input_dict["dt_boxes"]
self.dt_list = self.sorted_boxes(self.dt_list)
# deepcopy to save origin dt_boxes
dt_boxes = copy.deepcopy(self.dt_list)
feed_list = []
img_list = []
max_wh_ratio = 0
......@@ -205,26 +204,31 @@ class RecOp(Op):
imgs[id] = norm_img
feed = {"x": imgs.copy()}
feed_list.append(feed)
#_LOGGER.info("feed_list : {}".format(feed_list))
return feed_list, False, None, ""
def postprocess(self, input_dicts, fetch_data, data_id, log_id):
res_list = []
rec_list = []
dt_num = len(self.dt_list)
if isinstance(fetch_data, dict):
if len(fetch_data) > 0:
rec_batch_res = self.ocr_reader.postprocess_ocrv2(
fetch_data, with_score=True)
for res in rec_batch_res:
res_list.append(res[0])
rec_list.append(res)
elif isinstance(fetch_data, list):
for one_batch in fetch_data:
one_batch_res = self.ocr_reader.postprocess_ocrv2(
one_batch, with_score=True)
for res in one_batch_res:
res_list.append(res[0])
res = {"res": str(res_list)}
rec_list.append(res)
result_list = []
for i in range(dt_num):
text = rec_list[i]
dt_box = self.dt_list[i]
result_list.append([text, dt_box.tolist()])
res = {"result": str(result_list)}
return res, None, ""
......
......@@ -35,12 +35,12 @@ else:
raise Exception("Error Python version")
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
check_feed_dict=ParamVerify.check_feed_dict
check_fetch_list=ParamVerify.check_fetch_list
check_feed_dict = ParamVerify.check_feed_dict
check_fetch_list = ParamVerify.check_fetch_list
from .proto import pipeline_service_pb2
from .channel import (ThreadChannel, ProcessChannel,ChannelData,
from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataType, ChannelStopError, ChannelTimeoutError)
from .error_catch import ProductErrCode
from .error_catch import ProductErrCode
from .error_catch import CustomExceptionCode as ChannelDataErrcode
from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler
......@@ -119,9 +119,9 @@ class Op(object):
self._for_close_op_lock = threading.Lock()
self._succ_init_op = False
self._succ_close_op = False
self.dynamic_shape_info = {}
self.dynamic_shape_info = {}
self.set_dynamic_shape_info()
def set_dynamic_shape_info(self):
"""
when opening tensorrt(configure in config.yml) and each time the input shape
......@@ -141,7 +141,6 @@ class Op(object):
feed_names = client.feed_names_
fetch_names = client.fetch_names_
return feed_names, fetch_names
def init_from_dict(self, conf):
"""
......@@ -164,9 +163,10 @@ class Op(object):
if self._client_config is None:
self._client_config = conf.get("client_config")
if self._use_encryption_model is None:
print ("config use_encryption model here", conf.get("use_encryption_model"))
print("config use_encryption model here",
conf.get("use_encryption_model"))
self._use_encryption_model = conf.get("use_encryption_model")
if self._encryption_key is None or self._encryption_key=="":
if self._encryption_key is None or self._encryption_key == "":
self._encryption_key = conf.get("encryption_key")
if self._timeout is None:
self._timeout = conf["timeout"]
......@@ -401,14 +401,16 @@ class Op(object):
if self.client_type == 'brpc':
client = Client()
client.load_client_config(client_config)
self.right_feed_names, self.right_fetch_names = self.get_feed_fetch_list(client)
self.right_feed_names, self.right_fetch_names = self.get_feed_fetch_list(
client)
elif self.client_type == 'pipeline_grpc':
client = PPClient()
elif self.client_type == 'local_predictor':
if self.local_predictor is None:
raise ValueError("local predictor not yet created")
client = self.local_predictor
self.right_feed_names, self.right_fetch_names = self.get_feed_fetch_list(client)
self.right_feed_names, self.right_fetch_names = self.get_feed_fetch_list(
client)
else:
raise ValueError("Failed to init client: unknow client "
"type {}".format(self.client_type))
......@@ -417,12 +419,13 @@ class Op(object):
_LOGGER.info("Op({}) has no fetch name set. So fetch all vars")
if self.client_type != "local_predictor":
if self._use_encryption_model is None or self._use_encryption_model is False:
client.connect(server_endpoints)
client.connect(server_endpoints)
else:
print("connect to encryption rpc client")
client.use_key(self._encryption_key)
client.connect(server_endpoints, encryption=True)
_LOGGER.info("init_client, feed_list:{}, fetch_list: {}".format(self.right_feed_names, self.right_fetch_names))
print("connect to encryption rpc client")
client.use_key(self._encryption_key)
client.connect(server_endpoints, encryption=True)
_LOGGER.info("init_client, feed_list:{}, fetch_list: {}".format(
self.right_feed_names, self.right_fetch_names))
return client
def get_input_ops(self):
......@@ -599,7 +602,7 @@ class Op(object):
(_, input_dict), = input_dicts.items()
return input_dict, False, None, ""
def process(self, feed_batch, typical_logid=0):
"""
In process stage, send requests to the inference server or predict locally.
......@@ -616,19 +619,23 @@ class Op(object):
call_result = None
err_code = ChannelDataErrcode.OK.value
err_info = ""
@ErrorCatch
@ErrorCatch
@ParamChecker
def feed_fetch_list_check_helper(feed_batch : lambda feed_batch: check_feed_dict(feed_batch[0], self.right_feed_names),
fetch_list : lambda fetch_list: check_fetch_list(fetch_list, self.right_fetch_names),
log_id):
def feed_fetch_list_check_helper(
feed_batch: lambda feed_batch: check_feed_dict(feed_batch[0], self.right_feed_names),
fetch_list: lambda fetch_list: check_fetch_list(fetch_list, self.right_fetch_names),
log_id):
return None
_, resp = feed_fetch_list_check_helper(feed_batch, self._fetch_names, log_id=typical_logid)
_, resp = feed_fetch_list_check_helper(
feed_batch, self._fetch_names, log_id=typical_logid)
if resp.err_no != CustomExceptionCode.OK.value:
err_code = resp.err_no
err_info = resp.err_msg
call_result = None
return call_result, err_code, err_info
if self.client_type == "local_predictor":
err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0:
......@@ -804,7 +811,7 @@ class Op(object):
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.get_output_channels_of_jump_ops(),
self.min_subgraph_size, self.dynamic_shape_info,
self.min_subgraph_size, self.dynamic_shape_info,
self.use_calib))
p.daemon = True
p.start()
......@@ -839,9 +846,9 @@ class Op(object):
self._get_output_channels(), True, trace_buffer,
self.model_config, self.workdir, self.thread_num,
self.device_type, self.devices, self.mem_optim,
self.ir_optim, self.precision, self.use_mkldnn,
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.ir_optim, self.precision, self.use_mkldnn,
self.mkldnn_cache_capacity, self.mkldnn_op_list,
self.mkldnn_bf16_op_list, self.is_jump_op(),
self.get_output_channels_of_jump_ops(),
self.min_subgraph_size, self.dynamic_shape_info,
self.use_calib))
......@@ -873,40 +880,43 @@ class Op(object):
preped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
skip_process_dict = {}
@ErrorCatch
def preprocess_help(self, parsed_data, data_id, logid_dict):
preped_data, is_skip_process, prod_errcode, prod_errinfo = self.preprocess(
parsed_data, data_id, logid_dict.get(data_id))
return preped_data, is_skip_process, prod_errcode, prod_errinfo
for data_id, parsed_data in parsed_data_dict.items():
preped_data, error_channeldata = None, None
is_skip_process = False
prod_errcode, prod_errinfo = None, None
log_id = logid_dict.get(data_id)
process_res, resp = preprocess_help(self, parsed_data, data_id = data_id,
logid_dict = logid_dict)
process_res, resp = preprocess_help(
self, parsed_data, data_id=data_id, logid_dict=logid_dict)
if resp.err_no == CustomExceptionCode.OK.value:
preped_data, is_skip_process, prod_errcode, prod_errinfo = process_res
if is_skip_process is True:
skip_process_dict[data_id] = True
if prod_errcode is not None:
_LOGGER.error("data_id: {} return product error. Product ErrNo:{}, Product ErrMsg: {}".format(data_id, prod_errcode, prod_errinfo))
_LOGGER.error(
"data_id: {} return product error. Product ErrNo:{}, Product ErrMsg: {}".
format(data_id, prod_errcode, prod_errinfo))
error_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
else:
error_channeldata = ChannelData(
error_code=resp.err_no,
error_info=resp.err_msg,
data_id=data_id,
log_id=log_id)
skip_process_dict[data_id] = True
error_code=resp.err_no,
error_info=resp.err_msg,
data_id=data_id,
log_id=log_id)
skip_process_dict[data_id] = True
if error_channeldata is not None:
err_channeldata_dict[data_id] = error_channeldata
......@@ -1086,8 +1096,8 @@ class Op(object):
# 2 kinds of errors
if error_code != ChannelDataErrcode.OK.value or midped_batch is None:
error_info = "[{}] failed to predict. {}. Please check the input dict and checkout PipelineServingLogs/pipeline.log for more details.".format(
self.name, error_info)
self.name, error_info)
_LOGGER.error(error_info)
for data_id in data_ids:
err_channeldata_dict[data_id] = ChannelData(
......@@ -1162,12 +1172,16 @@ class Op(object):
_LOGGER.debug("{} Running postprocess".format(op_info_prefix))
postped_data_dict = collections.OrderedDict()
err_channeldata_dict = collections.OrderedDict()
@ErrorCatch
def postprocess_help(self, parsed_data_dict, midped_data, data_id, logid_dict):
postped_data, prod_errcode, prod_errinfo = self.postprocess(parsed_data_dict[data_id],
midped_data, data_id, logid_dict.get(data_id))
def postprocess_help(self, parsed_data_dict, midped_data, data_id,
logid_dict):
postped_data, prod_errcode, prod_errinfo = self.postprocess(
parsed_data_dict[data_id], midped_data, data_id,
logid_dict.get(data_id))
if not isinstance(postped_data, dict):
raise CustomException(CustomExceptionCode.TYPE_ERROR, "postprocess should return dict", True)
raise CustomException(CustomExceptionCode.TYPE_ERROR,
"postprocess should return dict", True)
return postped_data, prod_errcode, prod_errinfo
for data_id, midped_data in midped_data_dict.items():
......@@ -1175,19 +1189,23 @@ class Op(object):
postped_data, err_channeldata = None, None
prod_errcode, prod_errinfo = None, None
post_res, resp = postprocess_help(self, parsed_data_dict, midped_data, data_id
= data_id, logid_dict = logid_dict)
post_res, resp = postprocess_help(
self,
parsed_data_dict,
midped_data,
data_id=data_id,
logid_dict=logid_dict)
if resp.err_no == CustomExceptionCode.OK.value:
postped_data, prod_errcode, prod_errinfo = post_res
if prod_errcode is not None:
# product errors occured
# product errors occured
err_channeldata = ChannelData(
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
error_info="",
prod_error_code=prod_errcode,
prod_error_info=prod_errinfo,
data_id=data_id,
log_id=log_id)
else:
err_channeldata = ChannelData(
error_code=resp.err_no,
......@@ -1203,16 +1221,16 @@ class Op(object):
err, _ = ChannelData.check_npdata(postped_data)
if err == 0:
output_data = ChannelData(
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id,
log_id=log_id)
ChannelDataType.CHANNEL_NPDATA.value,
npdata=postped_data,
data_id=data_id,
log_id=log_id)
else:
output_data = ChannelData(
ChannelDataType.DICT.value,
dictdata=postped_data,
data_id=data_id,
log_id=log_id)
ChannelDataType.DICT.value,
dictdata=postped_data,
data_id=data_id,
log_id=log_id)
postped_data_dict[data_id] = output_data
_LOGGER.debug("{} Succ postprocess".format(op_info_prefix))
return postped_data_dict, err_channeldata_dict
......@@ -1303,10 +1321,10 @@ class Op(object):
def _run(self, concurrency_idx, input_channel, output_channels,
is_thread_op, trace_buffer, model_config, workdir, thread_num,
device_type, devices, mem_optim, ir_optim, precision,
use_mkldnn, mkldnn_cache_capacity, mkldnn_op_list,
mkldnn_bf16_op_list, is_jump_op, output_channels_of_jump_ops,
min_subgraph_size, dynamic_shape_info, use_calib):
device_type, devices, mem_optim, ir_optim, precision, use_mkldnn,
mkldnn_cache_capacity, mkldnn_op_list, mkldnn_bf16_op_list,
is_jump_op, output_channels_of_jump_ops, min_subgraph_size,
dynamic_shape_info, use_calib):
"""
_run() is the entry function of OP process / thread model.When client
type is local_predictor in process mode, the CUDA environment needs to
......@@ -1344,12 +1362,14 @@ class Op(object):
# init ops
profiler = None
@ErrorCatch
def check_helper(self, is_thread_op, model_config, workdir,
thread_num, device_type, devices, mem_optim, ir_optim,
precision, use_mkldnn, mkldnn_cache_capacity, mkldnn_op_list,
mkldnn_bf16_op_list, min_subgraph_size, dynamic_shape_info):
def check_helper(self, is_thread_op, model_config, workdir, thread_num,
device_type, devices, mem_optim, ir_optim, precision,
use_mkldnn, mkldnn_cache_capacity, mkldnn_op_list,
mkldnn_bf16_op_list, min_subgraph_size,
dynamic_shape_info):
if is_thread_op == False and self.client_type == "local_predictor":
self.service_handler = local_service_handler.LocalServiceHandler(
model_config=model_config,
......@@ -1377,17 +1397,19 @@ class Op(object):
profiler = self._initialize(is_thread_op, concurrency_idx)
return profiler
profiler, resp = check_helper(self, is_thread_op, model_config, workdir,
thread_num, device_type, devices, mem_optim, ir_optim,
precision, use_mkldnn, mkldnn_cache_capacity, mkldnn_op_list,
mkldnn_bf16_op_list, min_subgraph_size, dynamic_shape_info)
profiler, resp = check_helper(
self, is_thread_op, model_config, workdir, thread_num, device_type,
devices, mem_optim, ir_optim, precision, use_mkldnn,
mkldnn_cache_capacity, mkldnn_op_list, mkldnn_bf16_op_list,
min_subgraph_size, dynamic_shape_info)
if resp.err_no != CustomExceptionCode.OK.value:
_LOGGER.critical(
"{} failed to init op: {}".format(op_info_prefix, resp.err_msg),
exc_info=False)
print("{} failed to init op: {}".format(op_info_prefix, resp.err_msg))
print("{} failed to init op: {}".format(op_info_prefix,
resp.err_msg))
kill_stop_process_by_pid("kill", os.getpgid(os.getpid()))
_LOGGER.info("{} Succ init".format(op_info_prefix))
......@@ -1583,6 +1605,7 @@ class Op(object):
Returns:
TimeProfiler
"""
@ErrorCatch
def init_helper(self, is_thread_op, concurrency_idx):
if is_thread_op:
......@@ -1592,7 +1615,7 @@ class Op(object):
self.concurrency_idx = None
# init client
self.client = self.init_client(self._client_config,
self._server_endpoints)
self._server_endpoints)
# user defined
self.init_op()
self._succ_init_op = True
......@@ -1601,10 +1624,10 @@ class Op(object):
self.concurrency_idx = concurrency_idx
# init client
self.client = self.init_client(self._client_config,
self._server_endpoints)
self._server_endpoints)
# user defined
self.init_op()
self.init_op()
init_helper(self, is_thread_op, concurrency_idx)
print("[OP Object] init success")
# use a separate TimeProfiler per thread or process
......@@ -1910,8 +1933,8 @@ class VirtualOp(Op):
\-> E ----------/
DAG view: [[A], [B, E], [C], [D], [F]]
BUILD DAG: [A -> B -> C -> D -> E -> F]
\-> E -> V1-> V2-> V3/
BUILD DAG: [A -> B -> C -> D -> F]
\-> E -> V1-> V2->/
"""
def __init__(self, name, concurrency=1):
......
......@@ -84,7 +84,7 @@ RUN ln -sf /usr/local/bin/python3.6 /usr/local/bin/python3 && ln -sf /usr/local/
RUN rm -r /root/python_build
# Install Go and glide
RUN wget -qO- https://dl.google.com/go/go1.14.linux-amd64.tar.gz | \
RUN wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \
tar -xz -C /usr/local && \
mkdir /root/go && \
mkdir /root/go/bin && \
......
# A image for building paddle binaries
# # Use cuda devel base image for both cpu and gpu environment
# # When you modify it, please be aware of cudnn-runtime version
FROM hub.baidubce.com/paddlepaddle/serving:latest-cuda10.2-cudnn8-devel
FROM registry.baidubce.com/paddlepaddle/serving:0.8.0-cuda10.2-cudnn8-devel
MAINTAINER PaddlePaddle Authors <paddle-dev@baidu.com>
......
......@@ -40,7 +40,7 @@ WORKDIR /home
RUN bash /build_scripts/install_trt.sh <<run_env>> && rm -rf /build_scripts
# install go
RUN wget -qO- https://dl.google.com/go/go1.14.linux-amd64.tar.gz | \
RUN wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \
tar -xz -C /usr/local && \
mkdir /root/go && \
mkdir /root/go/bin && \
......
......@@ -12,6 +12,7 @@ function usage
echo " --workdir : workdir in image";
echo " --command : command to launch serving"
echo " --port : serving port"
echo " --pod_num : number of pod replicas"
echo " -h | --help : helper";
}
......@@ -20,6 +21,9 @@ function parse_args
# positional args
args=()
# default
pod_num=1
# named args
while [ "$1" != "" ]; do
case "$1" in
......@@ -28,6 +32,7 @@ function parse_args
--workdir ) workdir="$2"; shift;;
--command ) start_command="$2"; shift;;
--port ) port="$2"; shift;;
--pod_num ) pod_num="$2"; shift;;
-h | --help ) usage; exit;; # quit and show usage
* ) args+=("$1") # if no match, add it to the positional args
esac
......@@ -41,7 +46,7 @@ function parse_args
positional_2="${args[1]}"
# validate required args
if [[ -z "${app_name}" || -z "${image_name}" || -z "${workdir}" || -z "${start_command}" || -z "${port}" ]]; then
if [[ -z "${app_name}" || -z "${image_name}" || -z "${workdir}" || -z "${start_command}" || -z "${port}" || -z "${pod_num}"]]; then
echo "Invalid arguments. check your params again."
usage
exit;
......@@ -59,6 +64,7 @@ function run
echo "named arg: workdir: $workdir"
echo "named arg: command: $start_command"
echo "named arg: port: $port"
echo "named arg: pod_num: $pod_num"
sed -e "s/<< APP_NAME >>/$app_name/g" -e "s/<< IMAGE_NAME >>/$(echo $image_name | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< WORKDIR >>/$(echo $workdir | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< COMMAND >>/\"$(echo $start_command | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')\"/g" -e "s/<< PORT >>/$port/g" tools/k8s_serving.yaml_template > k8s_serving.yaml
sed -e "s/<< APP_NAME >>/$app_name/g" -e "s/<< IMAGE_NAME >>/$(echo $image_name | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< WORKDIR >>/$(echo $workdir | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')/g" -e "s/<< COMMAND >>/\"$(echo $start_command | sed -e 's/\\/\\\\/g; s/\//\\\//g; s/&/\\\&/g')\"/g" -e "s/<< PORT >>/$port/g" tools/k8s_ingress.yaml_template > k8s_ingress.yaml
......
......@@ -78,7 +78,8 @@ function run
echo "named arg: image_name: $image_name"
sed -e "s/<<base_image>>/$base_image/g" -e "s/<<python_version>>/$python/g" -e "s/<<run_env>>/$env/g" -e "s/<<serving_version>>/$serving/g" -e "s/<<paddle_version>>/$paddle/g" tools/Dockerfile.runtime_template > Dockerfile.tmp
docker build --network=host --build-arg ftp_proxy=http://172.19.57.45:3128 --build-arg https_proxy=http://172.19.57.45:3128 --build-arg http_proxy=http://172.19.57.45:3128 --build-arg HTTP_PROXY=http://172.19.57.45:3128 --build-arg HTTPS_PROXY=http://172.19.57.45:3128 -t $image_name -f Dockerfile.tmp .
#docker build --network=host --build-arg ftp_proxy=http://172.19.57.45:3128 --build-arg https_proxy=http://172.19.57.45:3128 --build-arg http_proxy=http://172.19.57.45:3128 --build-arg HTTP_PROXY=http://172.19.57.45:3128 --build-arg HTTPS_PROXY=http://172.19.57.45:3128 -t $image_name -f Dockerfile.tmp .
docker build --network=host -t $image_name -f Dockerfile.tmp .
}
run "$@";
......@@ -20,7 +20,7 @@ metadata:
app: << APP_NAME >>
name: << APP_NAME >>
spec:
replicas: 1
replicas: << POD_NUM >>
selector:
matchLabels:
app: << APP_NAME >>
......
......@@ -21,7 +21,7 @@ function env_install()
{
apt install -y libcurl4-openssl-dev libbz2-dev
wget https://paddle-serving.bj.bcebos.com/others/centos_ssl.tar && tar xf centos_ssl.tar && rm -rf centos_ssl.tar && mv libcrypto.so.1.0.2k /usr/lib/libcrypto.so.1.0.2k && mv libssl.so.1.0.2k /usr/lib/libssl.so.1.0.2k && ln -sf /usr/lib/libcrypto.so.1.0.2k /usr/lib/libcrypto.so.10 && ln -sf /usr/lib/libssl.so.1.0.2k /usr/lib/libssl.so.10 && ln -sf /usr/lib/libcrypto.so.10 /usr/lib/libcrypto.so && ln -sf /usr/lib/libssl.so.10 /usr/lib/libssl.so
rm -rf /usr/local/go && wget -qO- https://paddle-ci.gz.bcebos.com/go1.15.12.linux-amd64.tar.gz | \
rm -rf /usr/local/go && wget -qO- https://paddle-ci.cdn.bcebos.com/go1.17.2.linux-amd64.tar.gz | \
tar -xz -C /usr/local && \
mkdir /root/go && \
mkdir /root/go/bin && \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册