未验证 提交 984f969b 编写于 作者: T TeslaZhao 提交者: GitHub

Merge pull request #1753 from TeslaZhao/develop

大模型分布式推理
......@@ -30,7 +30,7 @@ message( "WITH_GPU = ${WITH_GPU}")
# Paddle Version should be one of:
# latest: latest develop build
# version number like 1.5.2
SET(PADDLE_VERSION "2.2.2")
SET(PADDLE_VERSION "2.3.0-rc0")
if (WITH_GPU)
message("CUDA: ${CUDA_VERSION}, CUDNN_MAJOR_VERSION: ${CUDNN_MAJOR_VERSION}")
# cuda 11.0 is not supported, 11.2 would be added.
......@@ -177,7 +177,7 @@ if (NOT WITH_MKLML)
endif()
ADD_LIBRARY(paddle_inference STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.a)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so)
if (WITH_ASCEND_CL)
SET_PROPERTY(TARGET paddle_inference PROPERTY IMPORTED_LOCATION ${PADDLE_INSTALL_DIR}/lib/libpaddle_inference.so)
endif()
......
......@@ -66,9 +66,27 @@ message EngineDesc {
optional bool enable_overrun = 32 [ default = false ];
optional bool allow_split_request = 33 [ default = true ];
optional int32 min_subgraph_size = 34 [ default = 3 ];
map<string,string> min_input_shape = 35;
map<string,string> max_input_shape = 36;
map<string,string> opt_input_shape = 37;
map<string, string> min_input_shape = 35;
map<string, string> max_input_shape = 36;
map<string, string> opt_input_shape = 37;
/*
* Distributed inference params
* "enable_dist_model": enable distributed model, false default.
* "carrier_id": mark carrier
* "dist_cfg_file": file name of distributed configure.
* "dist_nranks": number of distributed nodes.
* "dist_endpoints": all endpoints(ip:port) of distributed nodes.
* "dist_subgraph_index": distributed subgraph index, auto increment from 0.
* It is
* used to select the endpoint of the current shard in distribute model.
*/
optional bool enable_dist_model = 40 [ default = false ];
optional string dist_carrier_id = 41 [ default = "inference" ];
optional string dist_cfg_file = 42;
optional int32 dist_nranks = 43 [ default = 0 ];
repeated string dist_endpoints = 44;
optional int32 dist_subgraph_index = 45 [ default = 0 ];
};
// model_toolkit conf
......@@ -100,7 +118,8 @@ message DAGNodeDependency {
message DAGNode {
required string name = 1;
required string type = 2;
repeated DAGNodeDependency dependencies = 3;
repeated string address = 3;
repeated DAGNodeDependency dependencies = 4;
};
// workflow entry
......
......@@ -80,8 +80,8 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num
<< ") start to call load general model_conf op";
if (var_num < 1) {
LOG(ERROR) << "(logid=" << log_id << ") Failed get feed_var, var_num="
<< var_num;
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get feed_var, var_num=" << var_num;
return -1;
}
......@@ -98,7 +98,7 @@ int GeneralReaderOp::inference() {
int64_t elem_type = 0;
int64_t elem_size = 0;
int64_t databuf_size = 0;
const void* src_ptr = nullptr;
const void *src_ptr = nullptr;
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor paddleTensor;
const Tensor &tensor = req->tensor(i);
......@@ -107,7 +107,7 @@ int GeneralReaderOp::inference() {
elem_size = 0;
databuf_size = 0;
elem_type = tensor.elem_type();
src_ptr = nullptr ;
src_ptr = nullptr;
if (elem_type == P_INT64) { // int64
elem_size = sizeof(int64_t);
paddleTensor.dtype = paddle::PaddleDType::INT64;
......@@ -157,8 +157,8 @@ int GeneralReaderOp::inference() {
<< "dtype=" << paddleTensor.dtype << ";"
<< "data_len=" << data_len;
if (src_ptr == nullptr) {
LOG(ERROR) << "Not support var[" << i << "] with elem_type["
<< elem_type << "]";
LOG(ERROR) << "Not support var[" << i << "] with elem_type[" << elem_type
<< "]";
continue;
}
// implement lod tensor here
......@@ -166,9 +166,15 @@ int GeneralReaderOp::inference() {
// TODO(HexToString): support 2-D lod
if (tensor.lod_size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
paddleTensor.lod.resize(1);
int lod_index = -1;
for (int k = 0; k < tensor.lod_size(); ++k) {
paddleTensor.lod[0].push_back(tensor.lod(k));
if (tensor.lod(k) == 0) {
lod_index++;
paddleTensor.lod.resize(lod_index + 1);
}
paddleTensor.lod[lod_index].push_back(tensor.lod(k));
VLOG(2) << "(logid=" << log_id << ") lod[" << lod_index
<< "]=" << tensor.lod(k);
}
}
......@@ -191,7 +197,7 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has lod_tensor and len=" << out->at(i).lod[0].back();
}
void* dst_ptr = out->at(i).data.data();
void *dst_ptr = out->at(i).data.data();
if (!dst_ptr) {
LOG(ERROR) << "dst_ptr is nullptr";
return -1;
......
......@@ -92,11 +92,13 @@ message Request {
message Response {
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
bool profile_server = 3;
uint64 log_id = 4;
// Error code
int32 err_no = 3;
int32 err_no = 5;
// Error messages
string err_msg = 4;
string err_msg = 6;
};
message ModelOutput {
......
......@@ -55,7 +55,7 @@ int CubeCache::reload_data(const std::string& cache_path) {
// loading data from cache files
if (stat(cache_path.c_str(), &st) < 0 || !S_ISDIR(st.st_mode)) {
LOG(ERROR) << "invalid cache path " << cache_path;
LOG(WARNING) << "No cube cache directory " << cache_path << " provided, ignore it";
return -1;
}
if ((dp = opendir(cache_path.c_str())) == nullptr) {
......
......@@ -129,6 +129,10 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
node->id = i + 1; // 0 is reserved for begginer-op
node->name = conf.nodes(i).name();
node->type = conf.nodes(i).type();
for (int add_index = 0; add_index < conf.nodes(i).address_size();
++add_index) {
node->address.push_back(conf.nodes(i).address(add_index));
}
uint32_t depend_size = conf.nodes(i).dependencies_size();
for (uint32_t j = 0; j < depend_size; j++) {
const configure::DAGNodeDependency& depend =
......@@ -159,7 +163,8 @@ int Dag::init(const configure::Workflow& conf, const std::string& name) {
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
DagNode* node = _index_nodes[nid];
LOG(INFO) << "OP-" << node->id << "-" << node->name << "-" << node->type
<< " depends: " << node->depends.size();
<< " depends: " << node->depends.size()
<< " address: " << node->address.size();
boost::unordered_map<std::string, EdgeMode>::iterator it;
for (it = node->depends.begin(); it != node->depends.end(); it++) {
......
......@@ -29,6 +29,7 @@ struct DagNode {
std::string name; // opname
std::string full_name; // workflow_stageindex_opname
std::string type;
std::vector<std::string> address;
void* conf;
boost::unordered_map<std::string, EdgeMode> depends;
};
......
......@@ -90,6 +90,7 @@ int DagView::init(Dag* dag,
node->name,
node->type,
node->conf,
node->address,
log_id) != 0) {
LOG(WARNING) << "(logid=" << log_id
<< ") Failed init op, type:" << node->type;
......
......@@ -32,7 +32,8 @@
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/predictor_metric.h"
#include "paddle_inference_api.h" // NOLINT
#include "experimental/float16.h"
//#include "experimental/float16.h"
#include "experimental/phi/common/float16.h"
namespace baidu {
namespace paddle_serving {
namespace predictor {
......@@ -548,9 +549,9 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
int8_t* data = static_cast<int8_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else if ((*tensorVector_in_pointer)[i].dtype ==
paddle::PaddleDType::FLOAT16) {
paddle::platform::float16* data =
static_cast<paddle::platform::float16*>(origin_data);
paddle::PaddleDType::FLOAT16) {
phi::dtype::float16* data =
static_cast<phi::dtype::float16*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
} else {
LOG(ERROR) << "Inference not support type["
......@@ -646,14 +647,14 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<EngineCore> {
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
} else if (dataType == paddle::PaddleDType::FLOAT16) {
databuf_size = out_num * sizeof(paddle::platform::float16);
databuf_size = out_num * sizeof(phi::dtype::float16);
databuf_data = MempoolWrapper::instance().malloc(databuf_size);
if (!databuf_data) {
LOG(ERROR) << "Malloc failed, size: " << databuf_size;
return -1;
}
paddle::platform::float16* data_out =
reinterpret_cast<paddle::platform::float16*>(databuf_data);
phi::dtype::float16* data_out =
reinterpret_cast<phi::dtype::float16*>(databuf_data);
lod_tensor_out->CopyToCpu(data_out);
databuf_char = reinterpret_cast<char*>(data_out);
}
......
......@@ -96,6 +96,10 @@ int ServerManager::start_and_wait() {
LOG(ERROR) << "Failed to start Paddle Inference Server";
return -1;
}
std::cout << "C++ Serving service started successfully!" << std::endl;
LOG(INFO) << "C++ Serving service started successfully!";
_server.RunUntilAskedToQuit();
ServerManager::stop_reloader();
......
......@@ -36,12 +36,14 @@ int Op::init(Bus* bus,
const std::string& name,
const std::string& type,
void* conf,
const std::vector<std::string>& address,
const uint64_t log_id) {
_bus = bus;
_dag = dag;
_id = id;
_name = name;
_type = type;
_address = address;
set_config(conf);
_timer = butil::get_object<TimerFlow>();
......@@ -110,11 +112,13 @@ int Op::process(const uint64_t log_id, bool debug) {
return ERR_INTERNAL_FAILURE;
}
/*
if (_has_calc) {
LOG(INFO) << "(logid=" << log_id << ") Op: " << _name
<< " already processed before";
return ERR_OK;
}
*/
// 1. dependency inference
/*
......@@ -147,8 +151,10 @@ int Op::process(const uint64_t log_id, bool debug) {
}
// 3. share output to bus
Channel* channel = mutable_channel();
channel->share_to_bus(_bus, log_id);
if (!_has_calc) {
Channel* channel = mutable_channel();
channel->share_to_bus(_bus, log_id);
}
// 4. mark has calculated
_has_calc = true;
......
......@@ -114,6 +114,7 @@ class Op {
const std::string& name,
const std::string& type,
void* conf,
const std::vector<std::string>& address,
const uint64_t log_id);
int deinit();
......@@ -135,6 +136,8 @@ class Op {
const std::string& full_name() const { return _full_name; }
const std::vector<std::string>& address() const { return _address; }
const std::vector<std::string>& pre_names() const { return _pre_node_names; }
void set_full_name(const std::string full_name) { _full_name = full_name; }
......@@ -206,6 +209,7 @@ class Op {
std::string _name;
std::string _full_name; // service_workflow_stageindex_opname
std::string _type;
std::vector<std::string> _address;
bool _has_calc;
bool _has_init;
TimerFlow* _timer;
......
......@@ -92,11 +92,13 @@ message Request {
message Response {
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
bool profile_server = 3;
uint64 log_id = 4;
// Error code
int32 err_no = 3;
int32 err_no = 5;
// Error messages
string err_msg = 4;
string err_msg = 6;
};
message ModelOutput {
......
......@@ -6,7 +6,7 @@ Paddle Serving 实现了一套通用的多模型组合服务编程框架 Python
Python Pipeline 使用案例请阅读[Python Pipeline 快速部署案例](./3-2_QuickStart_Pipeline_OCR_CN.md)
通过阅读以下内容掌握 Python Pipeline 设计方案、高阶用法和优化指南等。
通过阅读以下内容掌握 Python Pipeline 核心功能和使用方法、高阶功能用法和性能优化指南等。
- [Python Pipeline 框架设计](7-1_Python_Pipeline_Design_CN.md)
- [Python Pipeline 高阶用法](7-2_Python_Pipeline_Senior_CN.md)
- [Python Pipeline 优化指南](7-3_Python_Pipeline_Optimize_CN.md)
......@@ -42,7 +42,7 @@ Request 是输入结构,`key` 与 `value` 是配对的 string 数组。 `name`
Response 是输出结构,`err_no``err_msg` 表达处理结果的正确性和错误信息,`key``value` 为结果。
Pipeline 服务包装了继承于 WebService 类,以 OCR 示例为例,派生出 OcrService 类,get_pipeline_response 函数内实现 DAG 拓扑关系,默认服务入口为 read_op,函数返回的 Op 为最后一个处理,此处要求最后返回的 Op 必须唯一。
Pipeline 服务包装了继承于 WebService 类,以 [OCR 示例](https://github.com/PaddlePaddle/Serving/tree/develop/examples/Pipeline/PaddleOCR/ocr)为例,派生出 OcrService 类,get_pipeline_response 函数内实现 DAG 拓扑关系,默认服务入口为 read_op,函数返回的 Op 为最后一个处理,此处要求最后返回的 Op 必须唯一。
所有服务和模型的所有配置信息在 `config.yml` 中记录,URL 的 name 字段由 OcrService 初始化定义;run_service 函数启动服务。
......@@ -177,6 +177,8 @@ Pipeline 的日志模块在 `logger.py` 中定义,使用了 `logging.handlers.
```
**四. 服务超时与重试**
## 自定义信息
......@@ -297,7 +299,6 @@ def init_op(self):
```
RequestOp 和 ResponseOp 是 Python Pipeline 的中2个特殊 Op,分别是用分解 RPC 数据加入到图执行引擎中,和拿到图执行引擎的预测结果并打包 RPC 数据到客户端。
RequestOp 类的设计如下所示,核心是在 unpack_request_package 函数中解析请求数据,因此,当修改 Request 结构后重写此函数实现全新的解包处理。
| 接口 | 说明 |
......@@ -334,7 +335,6 @@ class RequestOp(Op):
return dict_data, log_id, None, ""
```
ResponseOp 类的设计如下所示,核心是在 pack_response_package 中打包返回结构,因此修改 Response 结构后重写此函数实现全新的打包格式。
| 接口 | 说明 |
......@@ -381,3 +381,35 @@ class ProductErrCode(enum.Enum):
"""
pass
```
其使用方法如下所示,定义了一种错误类型 `Product_Error` ,在 `preprocess` 函数返回值中设置错误信息,在 `postprocess` 函数中也可以设置。
```python
class ProductErrCode(enum.Enum):
"""
ProductErrCode is a base class for recording business error code.
product developers inherit this class and extend more error codes.
"""
Product_Error = 100001,
def preprocess(self, input_dicts, data_id, log_id):
"""
In preprocess stage, assembling data for process stage. users can
override this function for model feed features.
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id
log_id: global unique id for RTT
Return:
input_dict: data for process stage
is_skip_process: skip process stage or not, False default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
(_, input_dict), = input_dicts.items()
if input_dict.get_key("product_error"):
return input_dict, False, Product_Error, "Product Error Occured"
return input_dict, False, None, ""
```
......@@ -10,113 +10,347 @@
- MKLDNN 推理加速
**一. DAG 结构跳过某个 Op 运行 **
**一.DAG 结构跳过某个 Op 运行**
此应用场景一般在 Op 前后处理中有 if 条件判断时,不满足条件时,跳过后面处理。实际做法是在跳过此 Op 的 process 阶段,只要在 preprocess 做好判断,跳过 process 阶段,在和 postprocess 后直接返回即可。
preprocess 返回结果列表的第二个结果是 `is_skip_process=True` 表示是否跳过当前 Op 的 process 阶段,直接进入 postprocess 处理。
```python
## Op::preprocess() 函数实现
def preprocess(self, input_dicts, data_id, log_id):
"""
In preprocess stage, assembling data for process stage. users can
override this function for model feed features.
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id
log_id: global unique id for RTT
Return:
input_dict: data for process stage
is_skip_process: skip process stage or not, False default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
# multiple previous Op
if len(input_dicts) != 1:
_LOGGER.critical(
self._log(
"Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func."))
os._exit(-1)
(_, input_dict), = input_dicts.items()
return input_dict, False, None, ""
"""
In preprocess stage, assembling data for process stage. users can
override this function for model feed features.
Args:
input_dicts: input data to be preprocessed
data_id: inner unique id
log_id: global unique id for RTT
Return:
input_dict: data for process stage
is_skip_process: skip process stage or not, False default
prod_errcode: None default, otherwise, product errores occured.
It is handled in the same way as exception.
prod_errinfo: "" default
"""
# multiple previous Op
if len(input_dicts) != 1:
_LOGGER.critical(
self._log(
"Failed to run preprocess: this Op has multiple previous "
"inputs. Please override this func."))
os._exit(-1)
(_, input_dict), = input_dicts.items()
return input_dict, False, None, ""
```
以下示例 Jump::preprocess() 重载了原函数,返回了 True 字段
```python
class JumpOp(Op):
## Overload func JumpOp::preprocess
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
if input_dict.has_key("jump"):
return input_dict, True, None, ""
else
return input_dict, False, None, ""
```
** 二. 批量推理 **
**二. 批量推理**
Pipeline 支持批量推理,通过增大 batch size 可以提高 GPU 利用率。Python Pipeline 支持3种 batch 形式以及适用的场景如下:
- 场景1:一个推理请求包含批量数据(batch)
- 单条数据定长,批量变长,数据转成BCHW格式
- 单条数据变长,前处理中将单条数据做 padding 转成定长
- 场景2:一个推理请求的批量数据拆分成多个小块推理(mini-batch)
- 由于 padding 会按最长对齐,当一批数据中有个"极大"尺寸数据时会导致推理变慢
- 指定一个块大小,从而缩小"极大"尺寸数据的作用范围
- 场景3:合并多个请求数据批量推理(auto-batching)
- 推理耗时明显长于前后处理,合并多个请求数据推理一次会提高吞吐和GPU利用率
- 要求多个请求数据的 shape 一致
- 场景1:客户端打包批量数据(Client Batch)
- 场景2:服务端合并多个请求动态合并批量(Server auto-batching)
- 场景3:服务端拆分一个批量数据推理请求成为多个小块推理(Server mini-batch)
1. 客户端打包批量数据
当输入数据是 numpy 类型,如shape 为[4, 3, 512, 512]的 numpy 数据,即4张图片,可直接作为输入数据。
当输入数据的 shape 不同时,需要按最大的shape的尺寸 Padding 对齐后发送给服务端
2. 服务端合并多个请求动态合并批量
有助于提升吞吐和计算资源的利用率,当多个请求的 shape 尺寸不相同时,不支持合并。当前有2种合并策略,分别是:
- 等待时间与最大批量结合(推荐):结合`batch_size``auto_batching_timeout`配合使用,实际请求的批量条数超过`batch_size`时会立即执行,不超过时会等待`auto_batching_timeout`时间再执行
```
op:
bow:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# client连接类型,brpc, grpc和local_predictor
client_type: brpc
# Serving IPs
server_endpoints: ["127.0.0.1:9393"]
# bow模型client端配置
client_config: "imdb_bow_client_conf/serving_client_conf.prototxt"
# 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
```
- 阻塞式等待:仅设置`batch_size`,不设置`auto_batching_timeout``auto_batching_timeout=0`,会一直等待接受 `batch_size` 个请求后再推理。
```
op:
bow:
# 并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
# client连接类型,brpc, grpc和local_predictor
client_type: brpc
# Serving IPs
server_endpoints: ["127.0.0.1:9393"]
| 接口 | 说明 |
| :------------------------------------------: | :-----------------------------------------: |
| batch | client 发送批量数据,client.predict 的 batch=True |
| mini-batch | preprocess 按 list 类型返回,参考 OCR 示例 RecOp的preprocess|
| auto-batching | config.yml 中 Op 级别设置 batch_size 和 auto_batching_timeout |
# bow模型client端配置
client_config: "imdb_bow_client_conf/serving_client_conf.prototxt"
# 批量查询Serving的数量, 默认1。batch_size>1要设置auto_batching_timeout,否则不足batch_size时会阻塞
batch_size: 2
# 批量查询超时,与batch_size配合使用
auto_batching_timeout: 2000
```
** 三. 单机多卡推理 **
单机多卡推理,M 个 Op 进程与 N 个 GPU 卡绑定,在 `config.yml` 中配置3个参数有关系,首先选择进程模式、并发数即进程数,devices 是 GPU 卡 ID。绑定方法是进程启动时遍历 GPU 卡 ID,例如启动7个 Op 进程 `config.yml` 设置 devices:0,1,2,那么第1,4,7个启动的进程与0卡绑定,第2,4个启动的进程与1卡绑定,3,6进程与卡2绑定。
- 进程ID: 0 绑定 GPU 卡0
- 进程ID: 1 绑定 GPU 卡1
- 进程ID: 2 绑定 GPU 卡2
- 进程ID: 3 绑定 GPU 卡0
- 进程ID: 4 绑定 GPU 卡1
- 进程ID: 5 绑定 GPU 卡2
- 进程ID: 6 绑定 GPU 卡0
`config.yml` 中硬件配置:
3.服务端拆分一个批量数据推理请求成为多个小块推理:会降低批量数据 Padding 对齐的大小,从而提升速度。可参考 [OCR 示例](),核心思路是拆分数据成多个小批量,放入 list 对象 feed_list 并返回
```
#计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡
devices: "0,1,2"
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
raw_im = input_dict["image"]
data = np.frombuffer(raw_im, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
## Many mini-batchs, the type of feed_data is list.
max_batch_size = len(dt_boxes)
# If max_batch_size is 0, skipping predict stage
if max_batch_size == 0:
return {}, True, None, ""
boxes_size = len(dt_boxes)
batch_size = boxes_size // max_batch_size
rem = boxes_size % max_batch_size
for bt_idx in range(0, batch_size + 1):
imgs = None
boxes_num_in_one_batch = 0
if bt_idx == batch_size:
if rem == 0:
continue
else:
boxes_num_in_one_batch = rem
elif bt_idx < batch_size:
boxes_num_in_one_batch = max_batch_size
else:
_LOGGER.error("batch_size error, bt_idx={}, batch_size={}".
format(bt_idx, batch_size))
break
start = bt_idx * max_batch_size
end = start + boxes_num_in_one_batch
img_list = []
for box_idx in range(start, end):
boximg = self.get_rotate_crop_image(im, dt_boxes[box_idx])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
_, w, h = self.ocr_reader.resize_norm_img(img_list[0],
max_wh_ratio).shape
imgs = np.zeros((boxes_num_in_one_batch, 3, w, h)).astype('float32')
for id, img in enumerate(img_list):
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
imgs[id] = norm_img
feed = {"x": imgs.copy()}
feed_list.append(feed)
return feed_list, False, None, ""
```
** 四. 多种计算芯片上推理 **
**三. 单机多卡推理**
单机多卡推理与 `config.yml` 中配置4个参数关系紧密,`is_thread_op``concurrency``device_type``devices`,必须在进程模型和 GPU 模式,每张卡上可分配多个进程,即 M 个 Op 进程与 N 个 GPU 卡绑定。
```
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op:
det:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 6
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
client_type: local_predictor
# device_type, 0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
# 计算硬件 ID,当 devices 为""或不写时为 CPU 预测;当 devices 为"0", "0,1,2"时为 GPU 预测,表示使用的 GPU 卡
devices: "0,1,2"
```
以上述案例为例,`concurrency:6`,即启动6个进程,`devices:0,1,2`,根据轮询分配机制,得到如下绑定关系:
- 进程ID: 0 绑定 GPU 卡0
- 进程ID: 1 绑定 GPU 卡1
- 进程ID: 2 绑定 GPU 卡2
- 进程ID: 3 绑定 GPU 卡0
- 进程ID: 4 绑定 GPU 卡1
- 进程ID: 5 绑定 GPU 卡2
- 进程ID: 6 绑定 GPU 卡0
对于更灵活的进程与 GPU 卡绑定方式,会持续开发。
Pipeline 除了支持 CPU、GPU 芯片推理之外,还支持在多种计算硬件推理部署。在 `config.yml` 中由 `device_type``devices`。优先使用 `device_type` 指定类型,当空缺时根据 `devices` 判断。`device_type` 描述如下:
**四. 多种计算芯片上推理**
除了支持 CPU、GPU 芯片推理之外,Python Pipeline 还支持在多种计算硬件上推理。根据 `config.yml` 中的 `device_type``devices`来设置推理硬件和加速库如下:
- CPU(Intel) : 0
- GPU(Jetson/海光DCU) : 1
- GPU(GPU / Jetson / 海光 DCU) : 1
- TensorRT : 2
- CPU(Arm) : 3
- XPU : 4
- Ascend310 : 5
- ascend910 : 6
config.yml中硬件配置:
当不设置`device_type`时,根据 `devices` 来设置,即当 `device_type` 为 "" 或空缺时为 CPU 推理;当有设定如"0,1,2"时,为 GPU 推理,并指定 GPU 卡。
以使用 GPU 的编号为0和1号卡并开启 TensorRT 为例,TensorRT 要配合 `ir_optim` 一同开启,`config.yml`详细配置如下:
```
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
# 计算硬件类型
device_type: 2
# 计算硬件ID,优先由device_type决定硬件类型
devices: "0,1"
# 开启ir优化
ir_optim: True
#计算硬件ID,优先由device_type决定硬件类型。devices为""或空缺时为CPU预测;当为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "" # "0,1"
```
** 五. 低精度推理 **
**五. 低精度推理**
Pipeline Serving支持低精度推理,CPU、GPU和TensoRT支持的精度类型如下图所示:
- CPU
- fp32(default)
- fp16
- bf16(mkldnn)
- GPU
- fp32(default)
- fp16
- int8
- Tensor RT
- fp32(default)
- fp16
- int8
使用int8时,要开启use_calib: True
参考[simple_web_service](../../examples/Pipeline/simple_web_service)示例
低精度推理需要有量化模型,配合`config.yml`配置一起使用,以[低精度示例]() 为例
1. CPU 低精度推理配置
通过设置,`device_type``devices` 字段使用 CPU 推理,通过调整`precision``thread_num``use_mkldnn`参数选择低精度和性能调优。
```
op:
imagenet:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: serving_server/
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 0
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: ""
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["score"]
#精度,CPU 支持: "fp32"(default), "bf16"(mkldnn); 不支持: "int8"
precision: "bf16"
#CPU 算数计算线程数,默认4线程
thread_num: 10
#开启 MKLDNN
use_mkldnn: True
```
2. GPU + TensorRT 低精度推理
通过设置,`device_type``devices` 字段使用原生 GPU 或 TensorRT 推理,通过调整`precision``ir_optim``use_calib`参数选择低精度和性能调优,如开启 TensorRT,必须一同开启`ir_optim``use_calib`仅配合 int8 使用。
```
op:
imagenet:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: serving_server/
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 2
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "1" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["score"]
#精度,GPU 支持: "fp32"(default), "fp16", "int8"
precision: "int8"
#开启 TensorRT int8 calibration
use_calib: True
#开启 ir_optim
ir_optim: True
```
3. 性能测试
测试环境如下:
- GPU 型号: A100-40GB
- CPU 型号: Intel(R) Xeon(R) Gold 6148 CPU @ 2.40GHz * 160
- CUDA: CUDA Version: 11.2
- CuDNN: 8.0
测试方法:
- 模型: Resnet50 量化模型
- 部署方法: Python Pipeline 部署
- 计时方法: 刨除第一次运行初始化,运行100次计算平均值
在此环境下测试不同精度推理结果,GPU 推理性能较好的配置是
- GPU + int8 + ir_optim + TensorRT + use_calib : 15.1 ms
- GPU + fp16 + ir_optim + TensorRT : 17.2 ms
CPU 推理性能较好的配置是
- CPU + bf16 + MKLDNN : 18.2 ms
- CPU + fp32 + thread_num=10 : 18.4 ms
完整性能指标如下:
<div align=center>
<img src='../images/low_precision_profile.png' height = "600" align="middle"/>
</div
# Imagenet Pipeline WebService
This document will takes Imagenet service as an example to introduce how to use Pipeline WebService.
## Get model
```
wget https://paddle-inference-dist.bj.bcebos.com/inference_demo/python/resnet50/ResNet50_quant.tar.gz
tar zxvf ResNet50_quant.tar.gz
```
## Start server
```
python3 resnet50_web_service.py &>log.txt &
```
## RPC test
```
python3 pipeline_rpc_client.py
```
# Imagenet Pipeline WebService
这里以 Imagenet 服务为例来介绍 Pipeline WebService 的使用。
## 获取模型
```
wget https://paddle-inference-dist.bj.bcebos.com/inference_demo/python/resnet50/ResNet50_quant.tar.gz
tar zxvf ResNet50_quant.tar.gz
```
## 启动服务
```
python3 resnet50_web_service.py &>log.txt &
```
## 测试
```
python3 pipeline_rpc_client.py
```
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import os
import base64
import yaml
import requests
import time
import json
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
from paddle_serving_client.utils import MultiThreadRunner
from paddle_serving_client.utils import benchmark_args, show_latency
def parse_benchmark(filein, fileout):
with open(filein, "r") as fin:
res = yaml.load(fin, yaml.FullLoader)
del_list = []
for key in res["DAG"].keys():
if "call" in key:
del_list.append(key)
for key in del_list:
del res["DAG"][key]
with open(fileout, "w") as fout:
yaml.dump(res, fout, default_flow_style=False)
def gen_yml(device, gpu_id):
fin = open("config.yml", "r")
config = yaml.load(fin, yaml.FullLoader)
fin.close()
config["dag"]["tracer"] = {"interval_s": 10}
if device == "gpu":
config["op"]["imagenet"]["local_service_conf"]["device_type"] = 1
config["op"]["imagenet"]["local_service_conf"]["devices"] = gpu_id
else:
config["op"]["imagenet"]["local_service_conf"]["device_type"] = 0
with open("config2.yml", "w") as fout:
yaml.dump(config, fout, default_flow_style=False)
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
def run_http(idx, batch_size):
print("start thread ({})".format(idx))
url = "http://127.0.0.1:18000/imagenet/prediction"
start = time.time()
with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
keys, values = [], []
for i in range(batch_size):
keys.append("image_{}".format(i))
values.append(image)
data = {"key": keys, "value": values}
latency_list = []
start_time = time.time()
total_num = 0
while True:
l_start = time.time()
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
l_end = time.time()
latency_list.append(l_end * 1000 - l_start * 1000)
total_num += 1
if time.time() - start_time > 20:
break
end = time.time()
return [[end - start], latency_list, [total_num]]
def multithread_http(thread, batch_size):
multi_thread_runner = MultiThreadRunner()
start = time.time()
result = multi_thread_runner.run(run_http, thread, batch_size)
end = time.time()
total_cost = end - start
avg_cost = 0
total_number = 0
for i in range(thread):
avg_cost += result[0][i]
total_number += result[2][i]
avg_cost = avg_cost / thread
print("Total cost: {}s".format(total_cost))
print("Each thread cost: {}s. ".format(avg_cost))
print("Total count: {}. ".format(total_number))
print("AVG QPS: {} samples/s".format(batch_size * total_number /
total_cost))
show_latency(result[1])
def run_rpc(thread, batch_size):
client = PipelineClient()
client.connect(['127.0.0.1:18080'])
start = time.time()
test_img_dir = "imgs/"
for img_file in os.listdir(test_img_dir):
with open(os.path.join(test_img_dir, img_file), 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
start_time = time.time()
while True:
ret = client.predict(feed_dict={"image": image}, fetch=["res"])
if time.time() - start_time > 10:
break
end = time.time()
return [[end - start]]
def multithread_rpc(thraed, batch_size):
multi_thread_runner = MultiThreadRunner()
result = multi_thread_runner.run(run_rpc, thread, batch_size)
if __name__ == "__main__":
if sys.argv[1] == "yaml":
mode = sys.argv[2] # brpc/ local predictor
thread = int(sys.argv[3])
device = sys.argv[4]
if device == "gpu":
gpu_id = sys.argv[5]
else:
gpu_id = None
gen_yml(device, gpu_id)
elif sys.argv[1] == "run":
mode = sys.argv[2] # http/ rpc
thread = int(sys.argv[3])
batch_size = int(sys.argv[4])
if mode == "http":
multithread_http(thread, batch_size)
elif mode == "rpc":
multithread_rpc(thread, batch_size)
elif sys.argv[1] == "dump":
filein = sys.argv[2]
fileout = sys.argv[3]
parse_benchmark(filein, fileout)
export FLAGS_profile_pipeline=1
alias python3="python3.6"
modelname="clas-ResNet_v2_50"
# HTTP
#ps -ef | grep web_service | awk '{print $2}' | xargs kill -9
sleep 3
# Create yaml,If you already have the config.yaml, ignore it.
#python3 benchmark.py yaml local_predictor 1 gpu
rm -rf profile_log_$modelname
echo "Starting HTTP Clients..."
# Start a client in each thread, tesing the case of multiple threads.
for thread_num in 1 2 4 8 12 16
do
for batch_size in 1
do
echo "----${modelname} thread num: ${thread_num} batch size: ${batch_size} mode:http ----" >>profile_log_$modelname
# Start one web service, If you start the service yourself, you can ignore it here.
#python3 web_service.py >web.log 2>&1 &
#sleep 3
# --id is the serial number of the GPU card, Must be the same as the gpu id used by the server.
nvidia-smi --id=3 --query-gpu=memory.used --format=csv -lms 1000 > gpu_use.log 2>&1 &
nvidia-smi --id=3 --query-gpu=utilization.gpu --format=csv -lms 1000 > gpu_utilization.log 2>&1 &
echo "import psutil\ncpu_utilization=psutil.cpu_percent(1,False)\nprint('CPU_UTILIZATION:', cpu_utilization)\n" > cpu_utilization.py
# Start http client
python3 benchmark.py run http $thread_num $batch_size > profile 2>&1
# Collect CPU metrics, Filter data that is zero momentarily, Record the maximum value of GPU memory and the average value of GPU utilization
python3 cpu_utilization.py >> profile_log_$modelname
grep -av '^0 %' gpu_utilization.log > gpu_utilization.log.tmp
awk 'BEGIN {max = 0} {if(NR>1){if ($modelname > max) max=$modelname}} END {print "MAX_GPU_MEMORY:", max}' gpu_use.log >> profile_log_$modelname
awk -F' ' '{sum+=$1} END {print "GPU_UTILIZATION:", sum/NR, sum, NR }' gpu_utilization.log.tmp >> profile_log_$modelname
# Show profiles
python3 ../../../util/show_profile.py profile $thread_num >> profile_log_$modelname
tail -n 8 profile >> profile_log_$modelname
echo '' >> profile_log_$modelname
done
done
# Kill all nvidia-smi background task.
pkill nvidia-smi
#worker_num, 最大并发数。当build_dag_each_worker=True时, 框架会创建worker_num个进程,每个进程内构建grpcSever和DAG
##当build_dag_each_worker=False时,框架会设置主线程grpc线程池的max_workers=worker_num
worker_num: 1
#http端口, rpc_port和http_port不允许同时为空。当rpc_port可用且http_port为空时,不自动生成http_port
http_port: 18080
rpc_port: 9993
dag:
#op资源类型, True, 为线程模型;False,为进程模型
is_thread_op: False
op:
imagenet:
#并发数,is_thread_op=True时,为线程并发;否则为进程并发
concurrency: 1
#当op配置没有server_endpoints时,从local_service_conf读取本地服务配置
local_service_conf:
#uci模型路径
model_config: serving_server/
#计算硬件类型: 空缺时由devices决定(CPU/GPU),0=cpu, 1=gpu, 2=tensorRT, 3=arm cpu, 4=kunlun xpu
device_type: 1
#计算硬件ID,当devices为""或不写时为CPU预测;当devices为"0", "0,1,2"时为GPU预测,表示使用的GPU卡
devices: "0" # "0,1"
#client类型,包括brpc, grpc和local_predictor.local_predictor不启动Serving服务,进程内预测
client_type: local_predictor
#Fetch结果列表,以client_config中fetch_var的alias_name为准
fetch_list: ["score"]
#precsion, 预测精度,降低预测精度可提升预测速度
#GPU 支持: "fp32"(default), "fp16", "int8";
#CPU 支持: "fp32"(default), "fp16", "bf16"(mkldnn); 不支持: "int8"
precision: "fp32"
#开启 TensorRT calibration
use_calib: True
#开启 ir_optim
ir_optim: True
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import requests
import json
import cv2
import base64
import os
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
if __name__ == "__main__":
url = "http://127.0.0.1:18080/imagenet/prediction"
with open(os.path.join(".", "daisy.jpg"), 'rb') as file:
image_data1 = file.read()
image = cv2_to_base64(image_data1)
data = {"key": ["image"], "value": [image]}
for i in range(1):
r = requests.post(url=url, data=json.dumps(data))
print(r.json())
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_server.pipeline import PipelineClient
import numpy as np
import requests
import json
import cv2
import base64
import os
client = PipelineClient()
client.connect(['127.0.0.1:9993'])
def cv2_to_base64(image):
return base64.b64encode(image).decode('utf8')
with open("daisy.jpg", 'rb') as file:
image_data = file.read()
image = cv2_to_base64(image_data)
for i in range(1):
ret = client.predict(feed_dict={"image": image}, fetch=["label", "prob"])
print(ret)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
from paddle_serving_server.web_service import WebService, Op
import logging
import numpy as np
import base64, cv2
class ImagenetOp(Op):
def init_op(self):
self.seq = Sequential([
Resize(256), CenterCrop(224), RGB2BGR(), Transpose((2, 0, 1)),
Div(255), Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225],
True)
])
self.label_dict = {}
label_idx = 0
with open("imagenet.label") as fin:
for line in fin:
self.label_dict[label_idx] = line.strip()
label_idx += 1
def preprocess(self, input_dicts, data_id, log_id):
(_, input_dict), = input_dicts.items()
batch_size = len(input_dict.keys())
imgs = []
for key in input_dict.keys():
data = base64.b64decode(input_dict[key].encode('utf8'))
data = np.fromstring(data, np.uint8)
im = cv2.imdecode(data, cv2.IMREAD_COLOR)
img = self.seq(im)
imgs.append(img[np.newaxis, :].copy())
input_imgs = np.concatenate(imgs, axis=0)
return {"image": input_imgs}, False, None, ""
def postprocess(self, input_dicts, fetch_dict, data_id, log_id):
score_list = fetch_dict["score"]
result = {"label": [], "prob": []}
for score in score_list:
score = score.tolist()
max_score = max(score)
result["label"].append(self.label_dict[score.index(max_score)]
.strip().replace(",", ""))
result["prob"].append(max_score)
result["label"] = str(result["label"])
result["prob"] = str(result["prob"])
return result, None, ""
class ImageService(WebService):
def get_pipeline_response(self, read_op):
image_op = ImagenetOp(name="imagenet", input_ops=[read_op])
return image_op
uci_service = ImageService(name="imagenet")
uci_service.prepare_pipeline_config("config.yml")
uci_service.run_service()
......@@ -37,6 +37,7 @@ using paddle_infer::PrecisionType;
using paddle_infer::Predictor;
using paddle_infer::Tensor;
using paddle_infer::CreatePredictor;
using paddle_infer::DistConfig;
DECLARE_int32(gpuid);
DECLARE_string(precision);
......@@ -206,6 +207,39 @@ class PaddleInferenceEngine : public EngineCore {
config.SetModel(model_path);
}
// Enable distributed model inferencing
DistConfig distCfg;
if (engine_conf.has_enable_dist_model() &&
engine_conf.enable_dist_model()) {
int ep_size = engine_conf.dist_endpoints_size();
int cur_index = engine_conf.dist_subgraph_index();
if (ep_size <= cur_index) {
LOG(ERROR) << "create paddle predictor failed, Distributed model error."
<< " dist_endpoints_size=" << ep_size
<< " is not bigger than dist_subgraph_index=" << cur_index;
return -1;
}
std::vector<std::string> vec_eps;
for (int i = 0; i < ep_size; ++i) {
vec_eps.emplace_back(engine_conf.dist_endpoints(i));
}
distCfg.EnableDistModel(true);
distCfg.SetCarrierId(engine_conf.dist_carrier_id());
distCfg.SetRanks(engine_conf.dist_nranks(), cur_index);
distCfg.SetEndpoints(vec_eps, engine_conf.dist_endpoints(cur_index));
distCfg.SetCommInitConfig(engine_conf.dist_cfg_file());
config.SetDistConfig(distCfg);
LOG(INFO) << "Create Distributed predictor! dist_carrier_id="
<< engine_conf.dist_carrier_id()
<< ", Ranks=" << engine_conf.dist_nranks()
<< ", current index of ranks=" << cur_index
<< ", current endpoint="
<< engine_conf.dist_endpoints(cur_index)
<< ", communicate init config file="
<< engine_conf.dist_cfg_file();
}
config.SwitchSpecifyInputNames(true);
config.SetCpuMathLibraryNumThreads(1);
if (engine_conf.has_use_gpu() && engine_conf.use_gpu()) {
......@@ -239,7 +273,7 @@ class PaddleInferenceEngine : public EngineCore {
config.EnableGpuMultiStream();
}
}
config.EnableTensorRtEngine(1 << 20,
config.EnableTensorRtEngine(1 << 25,
max_batch,
local_min_subgraph_size,
precision_type,
......@@ -255,7 +289,7 @@ class PaddleInferenceEngine : public EngineCore {
std::istringstream ss(value);
std::string word;
std::vector<int> arr;
while(ss >> word) {
while (ss >> word) {
arr.push_back(std::stoi(word));
}
min_input_shape[key] = arr;
......@@ -268,7 +302,7 @@ class PaddleInferenceEngine : public EngineCore {
std::istringstream ss(value);
std::string word;
std::vector<int> arr;
while(ss >> word) {
while (ss >> word) {
arr.push_back(std::stoi(word));
}
max_input_shape[key] = arr;
......@@ -281,15 +315,14 @@ class PaddleInferenceEngine : public EngineCore {
std::istringstream ss(value);
std::string word;
std::vector<int> arr;
while(ss >> word) {
while (ss >> word) {
arr.push_back(std::stoi(word));
}
optim_input_shape[key] = arr;
}
}
config.SetTRTDynamicShapeInfo(min_input_shape,
max_input_shape,
optim_input_shape);
config.SetTRTDynamicShapeInfo(
min_input_shape, max_input_shape, optim_input_shape);
LOG(INFO) << "create TensorRT predictor";
}
......@@ -364,6 +397,24 @@ class PaddleInferenceEngine : public EngineCore {
return -1;
}
LOG(INFO) << "paddle_engine params : enable_dist_model:"
<< engine_conf.enable_dist_model()
<< ", use_gpu: " << engine_conf.has_use_gpu()
<< ", gpu_id: " << gpu_id
<< ", use_gpu_multi_stream: " << engine_conf.gpu_multi_stream()
<< ", precision: " << FLAGS_precision
<< ", enable_ir_optimization: "
<< engine_conf.enable_ir_optimization()
<< ", use_trt: " << engine_conf.use_trt()
<< ", trt_max_batch: " << max_batch
<< ", trt_min_subgraph_size: " << min_subgraph_size
<< ", use_calib: " << FLAGS_use_calib
<< ", use_lite: " << engine_conf.use_lite()
<< ", use_ascend_cl: " << engine_conf.has_use_ascend_cl()
<< ", use_xpu: " << engine_conf.use_xpu()
<< ", enable_memory_optimization: "
<< engine_conf.enable_memory_optimization();
VLOG(2) << "create paddle predictor sucess, path: " << model_path;
return 0;
}
......
......@@ -30,10 +30,16 @@ class OpMaker(object):
"GeneralDistKVOp",
"GeneralCopyOp",
"GeneralDetectionOp",
"GeneralRemoteOp",
]
self.node_name_suffix_ = collections.defaultdict(int)
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
def create(self,
node_type,
engine_name=None,
inputs=[],
outputs=[],
addresses=[]):
if node_type not in self.op_list:
raise Exception("Op type {} is not supported right now".format(
node_type))
......@@ -55,6 +61,11 @@ class OpMaker(object):
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# for general_remote op.
if addresses:
node.address.extend(addresses)
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
......
......@@ -21,21 +21,27 @@ Usage: export PYTHON_EXECUTABLE=/usr/local/bin/python3.6
python3.6 -m paddle_serving_server.serve check
'''
import sys
import os
import pytest
inference_test_cases = ["test_fit_a_line.py::TestFitALine::test_inference"]
cpp_test_cases = ["test_fit_a_line.py::TestFitALine::test_cpu", "test_fit_a_line.py::TestFitALine::test_gpu"]
pipeline_test_cases = ["test_uci_pipeline.py::TestUCIPipeline::test_cpu", "test_uci_pipeline.py::TestUCIPipeline::test_gpu"]
cpp_test_cases = [
"test_fit_a_line.py::TestFitALine::test_cpu",
"test_fit_a_line.py::TestFitALine::test_gpu"
]
pipeline_test_cases = [
"test_uci_pipeline.py::TestUCIPipeline::test_cpu",
"test_uci_pipeline.py::TestUCIPipeline::test_gpu"
]
log_files = ["PipelineServingLogs", "log", "stderr.log", "stdout.log"]
def set_serving_log_path():
if 'SERVING_LOG_PATH' not in os.environ:
serving_log_path = os.path.expanduser(os.getcwd()) + '/'
os.environ['SERVING_LOG_PATH']=serving_log_path
os.environ['SERVING_LOG_PATH'] = serving_log_path
def mv_log_to_new_dir(dir_path):
import shutil
......@@ -46,8 +52,8 @@ def mv_log_to_new_dir(dir_path):
file_path = os.path.join(serving_log_path, file_name)
dir_path_temp = os.path.join(dir_path, file_name)
if os.path.exists(file_path):
shutil.move(file_path, dir_path_temp)
shutil.move(file_path, dir_path_temp)
def run_test_cases(cases_list, case_type, is_open_std):
old_stdout, old_stderr = sys.stdout, sys.stderr
......@@ -66,33 +72,41 @@ def run_test_cases(cases_list, case_type, is_open_std):
new_dir_path = os.path.join(serving_log_path, dir_name)
mv_log_to_new_dir(new_dir_path)
if res == 0:
print("{} {} environment running success".format(case_type, case_name))
print("{} {} environment running success".format(case_type,
case_name))
elif res == 1:
if case_name == "inference":
print("{} {} environment running failure. Please refer to https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/install/pip/linux-pip.html to configure environment".format(case_type, case_name))
print(
"{} {} environment running failure. Please refer to https://www.paddlepaddle.org.cn/install/quick?docurl=/documentation/docs/zh/install/pip/linux-pip.html to configure environment".
format(case_type, case_name))
os._exit(0)
else:
print("{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/develop/doc/Install_CN.md".format(case_type, case_name))
print(
"{} {} environment running failure, if you need this environment, please refer to https://github.com/PaddlePaddle/Serving/blob/develop/doc/Install_CN.md".
format(case_type, case_name))
def unset_env(key):
del os.environ[key]
def check_env(mode):
set_serving_log_path()
if 'https_proxy' in os.environ or 'http_proxy' in os.environ:
unset_env("https_proxy")
unset_env("http_proxy")
unset_env("https_proxy")
unset_env("http_proxy")
if 'GREP_OPTIONS' in os.environ:
unset_env("GREP_OPTIONS")
is_open_std = False
if mode is "debug":
unset_env("GREP_OPTIONS")
is_open_std = False
if mode == "debug":
is_open_std = True
if mode is "all" or mode is "inference" or mode is "debug":
if mode == "all" or mode == "inference" or mode == "debug":
run_test_cases(inference_test_cases, "PaddlePaddle", is_open_std)
if mode is "all" or mode is "cpp" or mode is "debug":
if mode == "all" or mode == "cpp" or mode == "debug":
run_test_cases(cpp_test_cases, "C++", is_open_std)
if mode is "all" or mode is "pipeline" or mode is "debug":
if mode == "all" or mode == "pipeline" or mode == "debug":
run_test_cases(pipeline_test_cases, "Pipeline", is_open_std)
if __name__ == '__main__':
check_env("debug")
......@@ -37,12 +37,15 @@ from paddle_serving_server.util import *
from paddle_serving_server.env_check.run import check_env
import cmd
def signal_handler(signal, frame):
print('Process stopped')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# web_service.py is still used by Pipeline.
def port_is_available(port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
......@@ -185,10 +188,10 @@ def serve_args():
action="store_true",
help="Use encryption model")
parser.add_argument(
"--encryption_rpc_port",
type=int,
required=False,
default=12000,
"--encryption_rpc_port",
type=int,
required=False,
default=12000,
help="Port of encryption model, only valid for arg.use_encryption_model")
parser.add_argument(
"--use_trt", default=False, action="store_true", help="Use TensorRT")
......@@ -217,13 +220,66 @@ def serve_args():
action="store_true",
help="Use gpu_multi_stream")
parser.add_argument(
"--enable_prometheus", default=False, action="store_true", help="Use Prometheus")
"--enable_prometheus",
default=False,
action="store_true",
help="Use Prometheus")
parser.add_argument(
"--prometheus_port",
type=int,
default=19393,
help="Port of the Prometheus")
parser.add_argument(
"--request_cache_size",
type=int,
default=0,
help="Max request cache size")
parser.add_argument(
"--use_dist_model",
default=False,
action="store_true",
help="Use distributed model")
parser.add_argument(
"--dist_carrier_id",
type=str,
default="",
help="carrier id of distributed model")
parser.add_argument(
"--dist_cfg_file",
type=str,
default="",
help="config file of distributed model")
parser.add_argument(
"--dist_endpoints",
type=str,
default="+",
help="endpoints of distributed model. splited by comma")
parser.add_argument(
"--dist_nranks",
type=int,
default=0,
help="nranks of distributed model")
parser.add_argument(
"--dist_subgraph_index",
type=int,
default=-1,
help="index of distributed model")
parser.add_argument(
"--prometheus_port", type=int, default=19393, help="Port of the Prometheus")
"--dist_worker_serving_endpoints",
type=str,
default=None,
help="endpoints of worker serving endpoints")
parser.add_argument(
"--request_cache_size", type=int, default=0, help="Port of the Prometheus")
"--dist_master_serving",
default=False,
action="store_true",
help="The master serving of distributed inference")
parser.add_argument(
"--min_subgraph_size", type=str, default="", nargs="+", help="min_subgraph_size")
"--min_subgraph_size",
type=str,
default="",
nargs="+",
help="min_subgraph_size")
return parser.parse_args()
......@@ -247,7 +303,7 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
workdir = "{}_{}".format(args.workdir, port)
dag_list_op = []
if model == "":
if model == "" and not args.dist_master_serving:
print("You must specify your serving model")
exit(-1)
for single_model_config in args.model:
......@@ -272,34 +328,55 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
dag_list_op.append(temp_str_list[0])
read_op = op_maker.create('GeneralReaderOp')
op_seq_maker.add_op(read_op)
is_ocr = False
#如果dag_list_op不是空,那么证明通过--op 传入了自定义OP或自定义的DAG串联关系。
#此时,根据--op 传入的顺序去组DAG串联关系
if len(dag_list_op) > 0:
for single_op in dag_list_op:
op_seq_maker.add_op(op_maker.create(single_op))
if single_op == "GeneralDetectionOp":
is_ocr = True
#否则,仍然按照原有方式根虎--model去串联。
else:
for idx, single_model in enumerate(model):
infer_op_name = "GeneralInferOp"
# 目前由于ocr的节点Det模型依赖于opencv的第三方库
# 只有使用ocr的时候,才会加入opencv的第三方库并编译GeneralDetectionOp
# 故此处做特殊处理,当不满足下述情况时,所添加的op默认为GeneralInferOp
# 以后可能考虑不用python脚本来生成配置
if len(model) == 2 and idx == 0 and single_model == "ocr_det_model":
infer_op_name = "GeneralDetectionOp"
is_ocr = True
else:
# The workflows of master serving in distributed model is different from
# worker servings. The workflow of worker servings is same to non-distributed
# model, but workerflow of master serving needs to add IP address of other
# worker serving in the machine.
if not args.dist_master_serving:
read_op = op_maker.create('GeneralReaderOp')
op_seq_maker.add_op(read_op)
is_ocr = False
#如果dag_list_op不是空,那么证明通过--op 传入了自定义OP或自定义的DAG串联关系。
#此时,根据--op 传入的顺序去组DAG串联关系
if len(dag_list_op) > 0:
for single_op in dag_list_op:
op_seq_maker.add_op(op_maker.create(single_op))
if single_op == "GeneralDetectionOp":
is_ocr = True
#否则,仍然按照原有方式根虎--model去串联。
else:
for idx, single_model in enumerate(model):
infer_op_name = "GeneralInferOp"
general_infer_op = op_maker.create(infer_op_name)
op_seq_maker.add_op(general_infer_op)
# 目前由于ocr的节点Det模型依赖于opencv的第三方库
# 只有使用ocr的时候,才会加入opencv的第三方库并编译GeneralDetectionOp
# 故此处做特殊处理,当不满足下述情况时,所添加的op默认为GeneralInferOp
# 以后可能考虑不用python脚本来生成配置
if len(model
) == 2 and idx == 0 and single_model == "ocr_det_model":
infer_op_name = "GeneralDetectionOp"
is_ocr = True
else:
infer_op_name = "GeneralInferOp"
general_infer_op = op_maker.create(infer_op_name)
op_seq_maker.add_op(general_infer_op)
general_response_op = op_maker.create('GeneralResponseOp')
op_seq_maker.add_op(general_response_op)
general_response_op = op_maker.create('GeneralResponseOp')
op_seq_maker.add_op(general_response_op)
else:
# for the master serving of distributed model only add one general_remote op.
if args.dist_worker_serving_endpoints is None:
raise ValueError(
"Params Error!. dist_worker_serving_endpoints is empty when dist_master_serving is set"
)
worker_serving_endpoints = args.dist_worker_serving_endpoints.split(",")
if len(worker_serving_endpoints) == 0:
raise ValueError(
"Params Error!. dist_worker_serving_endpoints is empty when dist_master_serving is set"
)
general_remote_op = op_maker.create(
'GeneralRemoteOp', None, [], [], addresses=worker_serving_endpoints)
op_seq_maker.add_op(general_remote_op, )
server.set_op_sequence(op_seq_maker.get_op_sequence())
server.set_num_threads(thread_num)
......@@ -312,6 +389,12 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
server.set_enable_prometheus(args.enable_prometheus)
server.set_prometheus_port(args.prometheus_port)
server.set_request_cache_size(args.request_cache_size)
server.set_enable_dist_model(args.use_dist_model)
server.set_dist_carrier_id(args.dist_carrier_id)
server.set_dist_cfg_file(args.dist_cfg_file)
server.set_dist_nranks(args.dist_nranks)
server.set_dist_endpoints(args.dist_endpoints.split(","))
server.set_dist_subgraph_index(args.dist_subgraph_index)
server.set_min_subgraph_size(args.min_subgraph_size)
if args.use_trt and device == "gpu":
......@@ -354,6 +437,7 @@ def start_gpu_card_model(gpu_mode, port, args): # pylint: disable=doc-string-mi
use_encryption_model=args.use_encryption_model)
server.run_server()
def set_ocr_dynamic_shape_info():
info = []
min_input_shape = {
......@@ -387,10 +471,7 @@ def set_ocr_dynamic_shape_info():
}
info.append(det_info)
min_input_shape = {"x": [1, 3, 32, 10], "lstm_1.tmp_0": [1, 1, 128]}
max_input_shape = {
"x": [50, 3, 32, 1000],
"lstm_1.tmp_0": [500, 50, 128]
}
max_input_shape = {"x": [50, 3, 32, 1000], "lstm_1.tmp_0": [500, 50, 128]}
opt_input_shape = {"x": [6, 3, 32, 100], "lstm_1.tmp_0": [25, 5, 128]}
rec_info = {
"min_input_shape": min_input_shape,
......@@ -400,6 +481,7 @@ def set_ocr_dynamic_shape_info():
info.append(rec_info)
return info
def start_multi_card(args, serving_port=None): # pylint: disable=doc-string-missing
if serving_port == None:
......@@ -544,8 +626,10 @@ def stop_serving(command: str, port: int=None):
os.remove(filepath)
return True
class Check_Env_Shell(cmd.Cmd):
intro = "Welcome to the check env shell.Type help to list commands.\n"
# ----- basic commands -----
def do_help(self, arg):
print("\nCommand list\t\tDescription\n"\
......@@ -562,23 +646,23 @@ class Check_Env_Shell(cmd.Cmd):
def do_check_all(self, arg):
"Check Environment of Paddle Inference, Pipeline Serving, C++ Serving"
check_env("all")
check_env("all")
def do_check_pipeline(self, arg):
"Check Environment of Pipeline Serving"
check_env("pipeline")
check_env("pipeline")
def do_check_cpp(self, arg):
"Check Environment of C++ Serving"
check_env("cpp")
check_env("cpp")
def do_check_inference(self, arg):
"Check Environment of Paddle Inference"
check_env("inference")
check_env("inference")
def do_debug(self, arg):
"Open pytest log to debug"
check_env("debug")
check_env("debug")
def do_exit(self, arg):
"Exit Check Env Shell"
......@@ -586,6 +670,7 @@ class Check_Env_Shell(cmd.Cmd):
os._exit(0)
return True
if __name__ == "__main__":
# args.device is not used at all.
# just keep the interface.
......@@ -602,7 +687,7 @@ if __name__ == "__main__":
else:
os._exit(-1)
elif args.server == "check":
Check_Env_Shell().cmdloop()
Check_Env_Shell().cmdloop()
for single_model_config in args.model:
if os.path.isdir(single_model_config):
pass
......
......@@ -53,6 +53,14 @@ class Server(object):
self.general_model_config_fn:'list'=[] # ["GeneralInferOp_0/general_model.prototxt"]The quantity is equal to the InferOp quantity,Feed and Fetch --OP
self.subdirectory:'list'=[] # The quantity is equal to the InferOp quantity, and name = node.name = engine.name
self.model_config_paths:'collections.OrderedDict()' # Save the serving_server_conf.prototxt path (feed and fetch information) this is a map for multi-model in a workflow
self.enable_dist_model: bool, enable distributed model, false default
self.dist_carrier_id: string, mark distributed model carrier name, "" default.
self.dist_cfg_file: string, file name of distributed configure, "" default.
self.dist_nranks: int, number of distributed nodes, 0 default.
self.dist_endpoints: list of string, all endpoints(ip:port) of distributed nodes, [] default.
self.dist_subgraph_index: index of distributed subgraph model, -1 default. It is used to select the endpoint of the current shard in distribute model. -1 default.
self.dist_worker_serving_endpoints: all endpoints of worker serving in the same machine. [] default.
self.dist_master_serving: the master serving is used for receiving client requests, only in pp0 of pipeline parallel, False default.
"""
self.server_handle_ = None
self.infer_service_conf = None
......@@ -101,6 +109,14 @@ class Server(object):
self.enable_prometheus = False
self.prometheus_port = 19393
self.request_cache_size = 0
self.enable_dist_model = False
self.dist_carrier_id = ""
self.dist_cfg_file = ""
self.dist_nranks = 0
self.dist_endpoints = []
self.dist_subgraph_index = -1
self.dist_worker_serving_endpoints = []
self.dist_master_serving = False
self.min_subgraph_size = []
self.trt_dynamic_shape_info = []
......@@ -213,6 +229,55 @@ class Server(object):
def set_request_cache_size(self, request_cache_size):
self.request_cache_size = request_cache_size
def set_enable_dist_model(self, status):
self.enable_dist_model = status
def set_dist_carrier_id(self, carrier_id):
if isinstance(carrier_id, int):
carrier_id = str(carrier_id)
self.dist_carrier_id = carrier_id
def set_dist_cfg_file(self, dist_cfg_file):
self.dist_cfg_file = dist_cfg_file
def set_dist_nranks(self, nranks):
if isinstance(nranks, str):
nranks = int(nranks)
elif not isinstance(nranks, int):
raise ValueError("dist_nranks type error! must be int or string")
self.dist_nranks = nranks
def set_dist_endpoints(self, endpoints):
if isinstance(endpoints, list):
self.dist_endpoints = endpoints
elif isinstance(endpoints, str):
self.dist_endpoints = [endpoints]
else:
raise ValueError(
"dist_endpoints type error! must be list or string")
def set_dist_subgraph_index(self, subgraph_index):
if isinstance(subgraph_index, str):
subgraph_index = int(subgraph_index)
elif not isinstance(subgraph_index, int):
raise ValueError("subgraph type error! must be int or string")
self.dist_subgraph_index = subgraph_index
def set_dist_worker_serving_endpoint(self, serving_endpoints):
if isinstance(serving_endpoints, list):
self.dist_worker_serving_endpoint = serving_endpoints
elif not isinstance(serving_endpoints, str):
self.dist_worker_serving_endpoint = [serving_endpoints]
else:
raise ValueError(
"dist_worker_serving_endpoint type error! must be list or string"
)
def set_dist_master_serving(self, is_master):
self.dist_master_serving = is_master
def set_min_subgraph_size(self, min_subgraph_size):
for s in min_subgraph_size:
try:
......@@ -220,7 +285,7 @@ class Server(object):
except:
size = 3
self.min_subgraph_size.append(size)
def set_trt_dynamic_shape_info(self, info):
self.trt_dynamic_shape_info = info
......@@ -278,6 +343,15 @@ class Server(object):
engine.use_ascend_cl = self.use_ascend_cl
engine.use_gpu = False
# use distributed model.
if self.dist_subgraph_index >= 0:
engine.enable_dist_model = True
engine.dist_carrier_id = self.dist_carrier_id
engine.dist_cfg_file = self.dist_cfg_file
engine.dist_nranks = self.dist_nranks
engine.dist_endpoints.extend(self.dist_endpoints)
engine.dist_subgraph_index = self.dist_subgraph_index
if len(self.gpuid) == 0:
raise ValueError("CPU: self.gpuid = -1, GPU: must set it ")
op_gpu_list = self.gpuid[index % len(self.gpuid)].split(",")
......@@ -310,7 +384,7 @@ class Server(object):
if len(self.trt_dynamic_shape_info) > index:
dynamic_shape_info = self.trt_dynamic_shape_info[index]
try:
for key,value in dynamic_shape_info.items():
for key, value in dynamic_shape_info.items():
shape_type = key
if shape_type == "min_input_shape":
local_map = engine.min_input_shape
......@@ -318,12 +392,12 @@ class Server(object):
local_map = engine.max_input_shape
if shape_type == "opt_input_shape":
local_map = engine.opt_input_shape
for name,shape in value.items():
for name, shape in value.items():
local_value = ' '.join(str(i) for i in shape)
local_map[name] = local_value
except:
raise ValueError("Set TRT dynamic shape info error!")
self.model_toolkit_conf.append(server_sdk.ModelToolkitConf())
self.model_toolkit_conf[-1].engines.extend([engine])
index = index + 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册