提交 c3f7f006 编写于 作者: W wangjiawei04

merge with local

......@@ -218,18 +218,6 @@ class PredictorClient {
int destroy_predictor();
int batch_predict(
const std::vector<std::vector<std::vector<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int>>& float_shape,
const std::vector<std::vector<std::vector<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid,
const uint64_t log_id);
int numpy_predict(
const std::vector<std::vector<py::array_t<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name,
......@@ -237,6 +225,7 @@ class PredictorClient {
const std::vector<std::vector<py::array_t<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::vector<int>>& lod_slot_batch,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid,
......
......@@ -137,220 +137,6 @@ int PredictorClient::create_predictor() {
return 0;
}
int PredictorClient::batch_predict(
const std::vector<std::vector<std::vector<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<std::vector<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch.clear();
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
int fetch_name_num = fetch_name.size();
_api.thrd_initialize();
std::string variant_tag;
_predictor = _api.fetch_predictor("general_model", &variant_tag);
predict_res_batch.set_variant_tag(variant_tag);
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "float feed name size: " << float_feed_name.size();
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec;
FeedInst *inst = req.add_insts();
std::vector<std::vector<float>> float_feed = float_feed_batch[bi];
std::vector<std::vector<int64_t>> int_feed = int_feed_batch[bi];
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
for (auto &name : int_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name "
<< "prepared";
int vec_idx = 0;
VLOG(2) << "tensor_vec size " << tensor_vec.size() << " float shape "
<< float_shape.size();
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size();
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
}
tensor->set_elem_type(1);
for (uint32_t j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_float_data(float_feed[vec_idx][j]);
}
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "float feed value prepared";
vec_idx = 0;
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
if (_type[idx] == 0) {
VLOG(2) << "prepare int64 feed " << name << " shape size "
<< int_shape[vec_idx].size();
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int_feed[vec_idx][0];
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int64_data(int_feed[vec_idx][j]);
}
} else if (_type[idx] == 2) {
VLOG(2) << "prepare int32 feed " << name << " shape size "
<< int_shape[vec_idx].size();
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int32_t(int_feed[vec_idx][0]);
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int_data(int32_t(int_feed[vec_idx][j]));
}
}
for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]);
}
tensor->set_elem_type(_type[idx]);
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "int feed value prepared";
}
int64_t preprocess_end = timeline.TimeStampUS();
int64_t client_infer_start = timeline.TimeStampUS();
Response res;
int64_t client_infer_end = 0;
int64_t postprocess_start = 0;
int64_t postprocess_end = 0;
if (FLAGS_profile_client) {
if (FLAGS_profile_server) {
req.set_profile_server(true);
}
}
res.Clear();
if (_predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
_api.thrd_clear();
return -1;
} else {
client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end;
VLOG(2) << "get model output num";
uint32_t model_num = res.outputs_size();
VLOG(2) << "model num: " << model_num;
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
int idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int shape_size = output.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
model._shape_map[name][i] =
output.insts(0).tensor_array(idx).shape(i);
}
int lod_size = output.insts(0).tensor_array(idx).lod_size();
if (lod_size > 0) {
model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
}
}
idx += 1;
}
idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int64";
int size = output.insts(0).tensor_array(idx).int64_data_size();
model._int64_value_map[name] = std::vector<int64_t>(
output.insts(0).tensor_array(idx).int64_data().begin(),
output.insts(0).tensor_array(idx).int64_data().begin() + size);
} else if (_fetch_name_to_type[name] == 1) {
VLOG(2) << "fetch var " << name << "type float";
int size = output.insts(0).tensor_array(idx).float_data_size();
model._float_value_map[name] = std::vector<float>(
output.insts(0).tensor_array(idx).float_data().begin(),
output.insts(0).tensor_array(idx).float_data().begin() + size);
} else if (_fetch_name_to_type[name] == 2) {
VLOG(2) << "fetch var " << name << "type int32";
int size = output.insts(0).tensor_array(idx).int_data_size();
model._int32_value_map[name] = std::vector<int32_t>(
output.insts(0).tensor_array(idx).int_data().begin(),
output.insts(0).tensor_array(idx).int_data().begin() + size);
}
idx += 1;
}
predict_res_batch.add_model_res(std::move(model));
}
postprocess_end = timeline.TimeStampUS();
}
if (FLAGS_profile_client) {
std::ostringstream oss;
oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
oss << "op" << i << "_0:" << res.profile_time(i * 2) << " ";
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
}
}
oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str());
}
_api.thrd_clear();
return 0;
}
int PredictorClient::numpy_predict(
const std::vector<std::vector<py::array_t<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name,
......@@ -358,6 +144,7 @@ int PredictorClient::numpy_predict(
const std::vector<std::vector<py::array_t<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>> &lod_slot_batch,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
......@@ -411,6 +198,11 @@ int PredictorClient::numpy_predict(
<< float_shape[vec_idx].size();
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
std::cout << "shape " << j << " : " << float_shape[vec_idx][j]
<< std::endl;
}
for (uint32_t j = 0; j < lod_slot_batch[vec_idx].size(); ++j) {
tensor->add_lod(lod_slot_batch[vec_idx][j]);
}
tensor->set_elem_type(1);
const int float_shape_size = float_shape[vec_idx].size();
......
......@@ -95,32 +95,6 @@ PYBIND11_MODULE(serving_client, m) {
[](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); })
.def("batch_predict",
[](PredictorClient &self,
const std::vector<std::vector<std::vector<float>>>
&float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<std::vector<int64_t>>>
&int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
return self.batch_predict(float_feed_batch,
float_feed_name,
float_shape,
int_feed_batch,
int_feed_name,
int_shape,
fetch_name,
predict_res_batch,
pid,
log_id);
},
py::call_guard<py::gil_scoped_release>())
.def("numpy_predict",
[](PredictorClient &self,
const std::vector<std::vector<py::array_t<float>>>
......@@ -131,6 +105,7 @@ PYBIND11_MODULE(serving_client, m) {
&int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>> &lod_slot_batch,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
......@@ -141,6 +116,7 @@ PYBIND11_MODULE(serving_client, m) {
int_feed_batch,
int_feed_name,
int_shape,
lod_slot_batch,
fetch_name,
predict_res_batch,
pid,
......
......@@ -73,8 +73,6 @@ int GeneralReaderOp::inference() {
// reade request from client
const Request *req = dynamic_cast<const Request *>(get_request_message());
uint64_t log_id = req->log_id();
int batch_size = req->insts_size();
int input_var_num = 0;
std::vector<int64_t> elem_type;
std::vector<int64_t> elem_size;
......@@ -83,7 +81,6 @@ int GeneralReaderOp::inference() {
GeneralBlob *res = mutable_data<GeneralBlob>();
TensorVector *out = &res->tensor_vector;
res->SetBatchSize(batch_size);
res->SetLogId(log_id);
if (!res) {
......@@ -98,11 +95,11 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
......@@ -122,13 +119,11 @@ int GeneralReaderOp::inference() {
elem_type.resize(var_num);
elem_size.resize(var_num);
capacity.resize(var_num);
// prepare basic information for input
for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor lod_tensor;
elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] has elem type: " << elem_type[i];
VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64;
......@@ -139,13 +134,24 @@ int GeneralReaderOp::inference() {
elem_size[i] = sizeof(int32_t);
lod_tensor.dtype = paddle::PaddleDType::INT32;
}
if (model_config->_is_lod_feed[i]) {
lod_tensor.lod.resize(1);
lod_tensor.lod[0].push_back(0);
// implement lod tensor here
if (req->insts(0).tensor_array(i).lod_size() > 0) {
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
lod_tensor.lod.resize(1);
for (int k = 0; k < req->insts(0).tensor_array(i).lod_size(); ++k) {
lod_tensor.lod[0].push_back(req->insts(0).tensor_array(i).lod(k));
}
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
} else {
lod_tensor.shape.push_back(batch_size);
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
......@@ -160,7 +166,7 @@ int GeneralReaderOp::inference() {
lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor);
}
int batch_size = 1;
// specify the memory needed for output tensor_vector
for (int i = 0; i < var_num; ++i) {
if (out->at(i).lod.size() == 1) {
......@@ -192,13 +198,13 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
}
out->at(i).data.Resize(tensor_size * elem_size[i]);
out->at(i).shape = {out->at(i).lod[0].back()};
out->at(i).shape = {};
for (int j = 1; j < req->insts(0).tensor_array(i).shape_size(); ++j) {
out->at(i).shape.push_back(req->insts(0).tensor_array(i).shape(j));
}
if (out->at(i).shape.size() == 1) {
out->at(i).shape.push_back(1);
}
// if (out->at(i).shape.size() == 1) {
// out->at(i).shape.push_back(1);
//}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else {
......@@ -220,11 +226,6 @@ int GeneralReaderOp::inference() {
for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(j).tensor_array(i).int64_data(k);
}
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
}
} else if (elem_type[i] == 1) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data());
......@@ -236,11 +237,6 @@ int GeneralReaderOp::inference() {
for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(j).tensor_array(i).float_data(k);
}
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
}
} else if (elem_type[i] == 2) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
......@@ -252,21 +248,15 @@ int GeneralReaderOp::inference() {
for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k);
}
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
}
}
}
VLOG(2) << "(logid=" << log_id << ") output size: " << out->size();
timeline.Pause();
int64_t end = timeline.TimeStampUS();
res->p_size = 0;
res->_batch_size = batch_size;
res->_batch_size = 1;
AddBlobInfo(res, start);
AddBlobInfo(res, end);
......
# 如何配置Web Service的Op
## ## rpc和local predictor
目前一共支持两种Serving的运行方式,一种是rpc,一种是local predictor,二者各有优劣。
| 版本 | 特点 | 适用场景 |
| --------------- | -------------------- | ---------------------------------------- |
| Rpc | 稳定性高,分布式部署 | 适用于吞吐量大,需要跨机房部署的情况 |
| local predictor | 部署方便,预测速度快 | 适用于对预测速度要求高,迭代速度快的场景 |
rpc模式的原理是启动一个rpc服务,客户端用protobuf格式打包预测请求的内容,在rpc服务端完成预测。适合稳定性较高的场景,web服务和预测服务可以解耦合实现多地部署。
local predictor的原理是启动一个python的predictor,客户端可以直接调用python的predictor来实现。适合快速迭代以及规模较小的场景,web服务和预测服务需要在同一台机器上,
在web模式下,我们通过配置Op的方式来配置每一个Serving模型。
我们以OCR的识别模型作为例子,以下是RecOp的实现。
```python
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
```
在做好init_op,preprocess和postprocess这些函数的重载之后,我们就在调用这个Op的地方去控制rpc和local predictor。
```python
#RPC
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"], #if server endpoint exist, use rpc
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
# local predictor
rec_op = RecOp(
name="rec",
input_ops=[det_op],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
model_config="ocr_rec_server/serving_server_conf.prototxt",
concurrency=1)
```
在上面的例子可以看到,当我们在Op的构造函数里,指定了server_endpoints和client_config时,就会采用rpc的方式。因为这些在运算Op的时候需要先执行preprocess,然后访问rpc服务请求预测,最后再执行postprocess。请求预测的过程,可能预测服务在本地,也可能在远端,可能是单点可能是分布式,需要给出对应的IP地址作为server_endpoints
如果是local predictor,我们就不需要指定endpoints。
| 属性名 | 定义 | 其他 |
| ------------------- | --------------- | ------------------------------------------------------------ |
| name | op名 | |
| input_ops | 前向输入 op | 可以为多个,前向Op的结果会作为此Op的输入 |
| fetch_list | fetch字段名 | 模型预测服务的结果字典包含所有在此定义的fetch字段 |
| rpc限定 | | |
| server_endpoints | rpc服务地址列表 | 分布式部署时可以有多个rpc地址 |
| concurrency | 并行度 | 并行线程数 |
| client_config | 客户端配置文件 | Op接收请求作为客户端访问rpc服务,因此需要客户端的配置文件 |
| local predictor限定 | | |
| model_config | 模型配置文件 | 由于local predictor和Op运行在一台机器上,因此需要模型配置来启动local predictor |
......@@ -52,7 +52,7 @@ class DetOp(Op):
self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape
return {"image": det_img}
return {"image": det_img[np.newaxis,:]}
def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"]
......@@ -62,6 +62,7 @@ class DetOp(Op):
dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im}
print("out dict", out_dict)
return out_dict
......@@ -85,11 +86,14 @@ class RecOp(Op):
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
_, w, h = self.ocr_reader.resize_norm_img(img_list[0],
max_wh_ratio).shape
imgs = np.zeros((len(img_list), 3, w, h)).astype('float32')
for id, img in enumerate(img_list):
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
imgs[id] = norm_img
feed = {"image": imgs.copy()}
return feed
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
......
......@@ -123,10 +123,19 @@ class Debugger(object):
name])
if self.feed_types_[name] == 0:
feed[name] = feed[name].astype("int64")
else:
elif self.feed_types_[name] == 1:
feed[name] = feed[name].astype("float32")
elif self.feed_types_[name] == 2:
feed[name] = feed[name].astype("int32")
else:
raise ValueError("local predictor receives wrong data type")
input_tensor = self.predictor.get_input_tensor(name)
input_tensor.copy_from_cpu(feed[name])
if "{}.lod".format(name) in feed:
input_tensor.set_lod([feed["{}.lod".format(name)]])
if batch == False:
input_tensor.copy_from_cpu(feed[name][np.newaxis, :])
else:
input_tensor.copy_from_cpu(feed[name])
output_tensors = []
output_names = self.predictor.get_output_names()
for output_name in output_names:
......
......@@ -233,7 +233,12 @@ class Client(object):
# key))
pass
def predict(self, feed=None, fetch=None, need_variant_tag=False, log_id=0):
def predict(self,
feed=None,
fetch=None,
batch=False,
need_variant_tag=False,
log_id=0):
self.profile_.record('py_prepro_0')
if feed is None or fetch is None:
......@@ -260,7 +265,9 @@ class Client(object):
int_feed_names = []
float_feed_names = []
int_shape = []
lod_slot_batch = []
float_shape = []
fetch_names = []
counter = 0
batch_size = len(feed_batch)
......@@ -277,31 +284,59 @@ class Client(object):
for i, feed_i in enumerate(feed_batch):
int_slot = []
float_slot = []
lod_slot = []
for key in feed_i:
if key not in self.feed_names_:
if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key))
if ".lod" in key:
continue
#if not isinstance(feed_i[key], np.ndarray):
self.shape_check(feed_i, key)
if self.feed_types_[key] in int_type:
if i == 0:
int_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = feed_i[key][np.newaxis, :]
shape_lst.append(1)
if isinstance(feed_i[key], np.ndarray):
int_shape.append(list(feed_i[key].shape))
print("feed_i_key shape", feed_i[key].shape)
shape_lst.extend(list(feed_i[key].shape))
print("shape list", shape_lst)
int_shape.append(shape_lst)
else:
int_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
lod_slot_batch.append(feed_i["{}.lod".format(key)])
else:
lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray):
int_slot.append(feed_i[key])
self.has_numpy_input = True
else:
int_slot.append(feed_i[key])
self.all_numpy_input = False
elif self.feed_types_[key] in float_type:
if i == 0:
float_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = feed_i[key][np.newaxis, :]
shape_lst.append(1)
if isinstance(feed_i[key], np.ndarray):
float_shape.append(list(feed_i[key].shape))
print("feed_i_key shape", feed_i[key].shape)
shape_lst.extend(list(feed_i[key].shape))
print("shape list", shape_lst)
float_shape.append(shape_lst)
else:
float_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
lod_slot_batch.append(feed_i["{}.lod".format(key)])
else:
lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray):
float_slot.append(feed_i[key])
self.has_numpy_input = True
......@@ -310,6 +345,7 @@ class Client(object):
self.all_numpy_input = False
int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot)
lod_slot_batch.append(lod_slot)
self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0')
......@@ -318,13 +354,11 @@ class Client(object):
if self.all_numpy_input:
res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid, log_id)
int_feed_names, int_shape, lod_slot_batch, fetch_names,
result_batch_handle, self.pid, log_id)
elif self.has_numpy_input == False:
res = self.client_handle_.batch_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid, log_id)
raise ValueError(
"Please make sure all of your inputs are numpy array")
else:
raise ValueError(
"Please make sure the inputs are all in list type or all in numpy.array type"
......
......@@ -118,11 +118,13 @@ class WebService(object):
del feed["fetch"]
if len(feed) == 0:
raise ValueError("empty input")
fetch_map = self.client.predict(feed=feed, fetch=fetch)
fetch_map = self.client.predict(feed=feed, fetch=fetch, batch=True)
result = self.postprocess(
feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError as err:
import traceback
print(traceback.format_exc())
result = {"result": err}
return result
......
......@@ -157,7 +157,7 @@ function python_test_fit_a_line() {
cd fit_a_line # pwd: /Serving/python/examples/fit_a_line
sh get_data.sh
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
case $TYPE in
CPU)
# test rpc
......@@ -225,7 +225,7 @@ function python_test_fit_a_line() {
esac
echo "test fit_a_line $TYPE part finished as expected."
rm -rf image kvdb log uci_housing* work*
unset SERVING_BIN
#unset SERVING_BIN
cd .. # pwd: /Serving/python/examples
}
......@@ -234,7 +234,7 @@ function python_run_criteo_ctr_with_cube() {
local TYPE=$1
yum install -y bc >/dev/null
cd criteo_ctr_with_cube # pwd: /Serving/python/examples/criteo_ctr_with_cube
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
case $TYPE in
CPU)
check_cmd "wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz"
......@@ -293,7 +293,7 @@ function python_run_criteo_ctr_with_cube() {
exit 1
;;
esac
unset SERVING_BIN
#unset SERVING_BIN
echo "test criteo_ctr_with_cube $TYPE part finished as expected."
cd .. # pwd: /Serving/python/examples
}
......@@ -301,7 +301,7 @@ function python_run_criteo_ctr_with_cube() {
function python_test_bert() {
# pwd: /Serving/python/examples
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd bert # pwd: /Serving/python/examples/bert
case $TYPE in
CPU)
......@@ -335,14 +335,14 @@ function python_test_bert() {
;;
esac
echo "test bert $TYPE finished as expected."
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function python_test_multi_fetch() {
# pwd: /Serving/python/examples
local TYPT=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd bert # pwd: /Serving/python/examples/bert
case $TYPE in
CPU)
......@@ -371,14 +371,14 @@ function python_test_multi_fetch() {
;;
esac
echo "test multi fetch $TYPE finished as expected."
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function python_test_multi_process(){
# pwd: /Serving/python/examples
local TYPT=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd fit_a_line # pwd: /Serving/python/examples/fit_a_line
sh get_data.sh
case $TYPE in
......@@ -405,14 +405,14 @@ function python_test_multi_process(){
;;
esac
echo "test multi process $TYPE finished as expected."
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function python_test_imdb() {
# pwd: /Serving/python/examples
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd imdb # pwd: /Serving/python/examples/imdb
case $TYPE in
CPU)
......@@ -452,14 +452,14 @@ function python_test_imdb() {
;;
esac
echo "test imdb $TYPE finished as expected."
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function python_test_lac() {
# pwd: /Serving/python/examples
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd lac # pwd: /Serving/python/examples/lac
case $TYPE in
CPU)
......@@ -504,14 +504,14 @@ function python_test_lac() {
;;
esac
echo "test lac $TYPE finished as expected."
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function java_run_test() {
# pwd: /Serving
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
unsetproxy
case $TYPE in
CPU)
......@@ -563,14 +563,14 @@ function java_run_test() {
esac
echo "java-sdk $TYPE part finished as expected."
setproxy
unset SERVING_BIN
#unset SERVING_BIN
}
function python_test_grpc_impl() {
# pwd: /Serving/python/examples
cd grpc_impl_example # pwd: /Serving/python/examples/grpc_impl_example
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
unsetproxy
case $TYPE in
CPU)
......@@ -702,7 +702,7 @@ function python_test_grpc_impl() {
esac
echo "test grpc impl $TYPE part finished as expected."
setproxy
unset SERVING_BIN
#unset SERVING_BIN
cd .. # pwd: /Serving/python/examples
}
......@@ -710,7 +710,7 @@ function python_test_grpc_impl() {
function python_test_yolov4(){
#pwd:/ Serving/python/examples
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd yolov4
case $TYPE in
CPU)
......@@ -731,14 +731,14 @@ function python_test_yolov4(){
;;
esac
echo "test yolov4 $TYPE finished as expected."
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function python_test_resnet50(){
#pwd:/ Serving/python/examples
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
cd imagenet
case $TYPE in
CPU)
......@@ -758,14 +758,14 @@ function python_test_resnet50(){
;;
esac
echo "test resnet $TYPE finished as expected"
unset SERVING_BIN
#unset SERVING_BIN
cd ..
}
function python_test_pipeline(){
# pwd: /Serving/python/examples
local TYPE=$1
export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
#export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving
unsetproxy
cd pipeline # pwd: /Serving/python/examples/pipeline
case $TYPE in
......@@ -915,7 +915,7 @@ EOF
esac
cd ..
setproxy
unset SERVING_BIN
#unset SERVING_BIN
}
function python_app_api_test(){
......@@ -944,16 +944,16 @@ function python_run_test() {
local TYPE=$1 # pwd: /Serving
cd python/examples # pwd: /Serving/python/examples
python_test_fit_a_line $TYPE # pwd: /Serving/python/examples
python_run_criteo_ctr_with_cube $TYPE # pwd: /Serving/python/examples
python_test_bert $TYPE # pwd: /Serving/python/examples
python_test_imdb $TYPE # pwd: /Serving/python/examples
python_test_lac $TYPE # pwd: /Serving/python/examples
python_test_multi_process $TYPE # pwd: /Serving/python/examples
python_test_multi_fetch $TYPE # pwd: /Serving/python/examples
python_test_yolov4 $TYPE # pwd: /Serving/python/examples
python_test_grpc_impl $TYPE # pwd: /Serving/python/examples
python_test_resnet50 $TYPE # pwd: /Serving/python/examples
python_test_pipeline $TYPE # pwd: /Serving/python/examples
#python_run_criteo_ctr_with_cube $TYPE # pwd: /Serving/python/examples
#python_test_bert $TYPE # pwd: /Serving/python/examples
#python_test_imdb $TYPE # pwd: /Serving/python/examples
#python_test_lac $TYPE # pwd: /Serving/python/examples
#python_test_multi_process $TYPE # pwd: /Serving/python/examples
#python_test_multi_fetch $TYPE # pwd: /Serving/python/examples
#python_test_yolov4 $TYPE # pwd: /Serving/python/examples
#python_test_grpc_impl $TYPE # pwd: /Serving/python/examples
#python_test_resnet50 $TYPE # pwd: /Serving/python/examples
#python_test_pipeline $TYPE # pwd: /Serving/python/examples
echo "test python $TYPE part finished as expected."
cd ../.. # pwd: /Serving
}
......@@ -1092,11 +1092,11 @@ function monitor_test() {
function main() {
local TYPE=$1 # pwd: /
init # pwd: /Serving
build_client $TYPE # pwd: /Serving
build_server $TYPE # pwd: /Serving
build_app $TYPE # pwd: /Serving
java_run_test $TYPE # pwd: /Serving
#init # pwd: /Serving
#build_client $TYPE # pwd: /Serving
#build_server $TYPE # pwd: /Serving
#build_app $TYPE # pwd: /Serving
#java_run_test $TYPE # pwd: /Serving
python_run_test $TYPE # pwd: /Serving
monitor_test $TYPE # pwd: /Serving
echo "serving $TYPE part finished as expected."
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册