提交 e54b5ad9 编写于 作者: W wangjiawei04

local predictor and rpc unified

上级 15cba56f
...@@ -217,19 +217,7 @@ class PredictorClient { ...@@ -217,19 +217,7 @@ class PredictorClient {
int create_predictor(); int create_predictor();
int destroy_predictor(); int destroy_predictor();
int batch_predict(
const std::vector<std::vector<std::vector<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int>>& float_shape,
const std::vector<std::vector<std::vector<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid,
const uint64_t log_id);
int numpy_predict( int numpy_predict(
const std::vector<std::vector<py::array_t<float>>>& float_feed_batch, const std::vector<std::vector<py::array_t<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name, const std::vector<std::string>& float_feed_name,
...@@ -237,6 +225,7 @@ class PredictorClient { ...@@ -237,6 +225,7 @@ class PredictorClient {
const std::vector<std::vector<py::array_t<int64_t>>>& int_feed_batch, const std::vector<std::vector<py::array_t<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name, const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape, const std::vector<std::vector<int>>& int_shape,
const std::vector<std::vector<int>>& lod_slot_batch,
const std::vector<std::string>& fetch_name, const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT PredictorRes& predict_res_batch, // NOLINT
const int& pid, const int& pid,
......
...@@ -135,219 +135,6 @@ int PredictorClient::create_predictor() { ...@@ -135,219 +135,6 @@ int PredictorClient::create_predictor() {
return 0; return 0;
} }
int PredictorClient::batch_predict(
const std::vector<std::vector<std::vector<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<std::vector<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch.clear();
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
int fetch_name_num = fetch_name.size();
_api.thrd_initialize();
std::string variant_tag;
_predictor = _api.fetch_predictor("general_model", &variant_tag);
predict_res_batch.set_variant_tag(variant_tag);
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "float feed name size: " << float_feed_name.size();
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec;
FeedInst *inst = req.add_insts();
std::vector<std::vector<float>> float_feed = float_feed_batch[bi];
std::vector<std::vector<int64_t>> int_feed = int_feed_batch[bi];
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
for (auto &name : int_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name "
<< "prepared";
int vec_idx = 0;
VLOG(2) << "tensor_vec size " << tensor_vec.size() << " float shape "
<< float_shape.size();
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size();
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
}
tensor->set_elem_type(1);
for (uint32_t j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_float_data(float_feed[vec_idx][j]);
}
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "float feed value prepared";
vec_idx = 0;
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
if (_type[idx] == 0) {
VLOG(2) << "prepare int64 feed " << name << " shape size "
<< int_shape[vec_idx].size();
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int_feed[vec_idx][0];
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int64_data(int_feed[vec_idx][j]);
}
} else if (_type[idx] == 2) {
VLOG(2) << "prepare int32 feed " << name << " shape size "
<< int_shape[vec_idx].size();
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int32_t(int_feed[vec_idx][0]);
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int_data(int32_t(int_feed[vec_idx][j]));
}
}
for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]);
}
tensor->set_elem_type(_type[idx]);
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "int feed value prepared";
}
int64_t preprocess_end = timeline.TimeStampUS();
int64_t client_infer_start = timeline.TimeStampUS();
Response res;
int64_t client_infer_end = 0;
int64_t postprocess_start = 0;
int64_t postprocess_end = 0;
if (FLAGS_profile_client) {
if (FLAGS_profile_server) {
req.set_profile_server(true);
}
}
res.Clear();
if (_predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
_api.thrd_clear();
return -1;
} else {
client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end;
VLOG(2) << "get model output num";
uint32_t model_num = res.outputs_size();
VLOG(2) << "model num: " << model_num;
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
int idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int shape_size = output.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
model._shape_map[name][i] =
output.insts(0).tensor_array(idx).shape(i);
}
int lod_size = output.insts(0).tensor_array(idx).lod_size();
if (lod_size > 0) {
model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
}
}
idx += 1;
}
idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int64";
int size = output.insts(0).tensor_array(idx).int64_data_size();
model._int64_value_map[name] = std::vector<int64_t>(
output.insts(0).tensor_array(idx).int64_data().begin(),
output.insts(0).tensor_array(idx).int64_data().begin() + size);
} else if (_fetch_name_to_type[name] == 1) {
VLOG(2) << "fetch var " << name << "type float";
int size = output.insts(0).tensor_array(idx).float_data_size();
model._float_value_map[name] = std::vector<float>(
output.insts(0).tensor_array(idx).float_data().begin(),
output.insts(0).tensor_array(idx).float_data().begin() + size);
} else if (_fetch_name_to_type[name] == 2) {
VLOG(2) << "fetch var " << name << "type int32";
int size = output.insts(0).tensor_array(idx).int_data_size();
model._int32_value_map[name] = std::vector<int32_t>(
output.insts(0).tensor_array(idx).int_data().begin(),
output.insts(0).tensor_array(idx).int_data().begin() + size);
}
idx += 1;
}
predict_res_batch.add_model_res(std::move(model));
}
postprocess_end = timeline.TimeStampUS();
}
if (FLAGS_profile_client) {
std::ostringstream oss;
oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
oss << "op" << i << "_0:" << res.profile_time(i * 2) << " ";
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
}
}
oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str());
}
_api.thrd_clear();
return 0;
}
int PredictorClient::numpy_predict( int PredictorClient::numpy_predict(
const std::vector<std::vector<py::array_t<float>>> &float_feed_batch, const std::vector<std::vector<py::array_t<float>>> &float_feed_batch,
...@@ -356,6 +143,7 @@ int PredictorClient::numpy_predict( ...@@ -356,6 +143,7 @@ int PredictorClient::numpy_predict(
const std::vector<std::vector<py::array_t<int64_t>>> &int_feed_batch, const std::vector<std::vector<py::array_t<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name, const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>>& lod_slot_batch,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid, const int &pid,
...@@ -409,6 +197,10 @@ int PredictorClient::numpy_predict( ...@@ -409,6 +197,10 @@ int PredictorClient::numpy_predict(
<< float_shape[vec_idx].size(); << float_shape[vec_idx].size();
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) { for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]); tensor->add_shape(float_shape[vec_idx][j]);
std::cout << "shape " << j << " : " << float_shape[vec_idx][j] << std::endl;
}
for (uint32_t j = 0; j < lod_slot_batch[vec_idx].size(); ++j) {
tensor->add_lod(lod_slot_batch[vec_idx][j]);
} }
tensor->set_elem_type(1); tensor->set_elem_type(1);
const int float_shape_size = float_shape[vec_idx].size(); const int float_shape_size = float_shape[vec_idx].size();
......
...@@ -95,32 +95,6 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -95,32 +95,6 @@ PYBIND11_MODULE(serving_client, m) {
[](PredictorClient &self) { self.create_predictor(); }) [](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor", .def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); }) [](PredictorClient &self) { self.destroy_predictor(); })
.def("batch_predict",
[](PredictorClient &self,
const std::vector<std::vector<std::vector<float>>>
&float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<std::vector<int64_t>>>
&int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
return self.batch_predict(float_feed_batch,
float_feed_name,
float_shape,
int_feed_batch,
int_feed_name,
int_shape,
fetch_name,
predict_res_batch,
pid,
log_id);
},
py::call_guard<py::gil_scoped_release>())
.def("numpy_predict", .def("numpy_predict",
[](PredictorClient &self, [](PredictorClient &self,
const std::vector<std::vector<py::array_t<float>>> const std::vector<std::vector<py::array_t<float>>>
...@@ -131,6 +105,7 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -131,6 +105,7 @@ PYBIND11_MODULE(serving_client, m) {
&int_feed_batch, &int_feed_batch,
const std::vector<std::string> &int_feed_name, const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>>& lod_slot_batch,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid, const int &pid,
...@@ -141,6 +116,7 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -141,6 +116,7 @@ PYBIND11_MODULE(serving_client, m) {
int_feed_batch, int_feed_batch,
int_feed_name, int_feed_name,
int_shape, int_shape,
lod_slot_batch,
fetch_name, fetch_name,
predict_res_batch, predict_res_batch,
pid, pid,
......
...@@ -37,9 +37,9 @@ int conf_check(const Request *req, ...@@ -37,9 +37,9 @@ int conf_check(const Request *req,
const std::shared_ptr<PaddleGeneralModelConfig> &model_config) { const std::shared_ptr<PaddleGeneralModelConfig> &model_config) {
int var_num = req->insts(0).tensor_array_size(); int var_num = req->insts(0).tensor_array_size();
if (var_num != model_config->_feed_type.size()) { if (var_num != model_config->_feed_type.size()) {
LOG(ERROR) << "feed var number not match: model config[" VLOG(2) << "var num: " << var_num;
<< model_config->_feed_type.size() << "] vs. actual[" << var_num VLOG(2) << "model config var num: " << model_config->_feed_type.size();
<< "]"; LOG(ERROR) << "feed var number not match.";
return -1; return -1;
} }
...@@ -72,7 +72,6 @@ int conf_check(const Request *req, ...@@ -72,7 +72,6 @@ int conf_check(const Request *req,
int GeneralReaderOp::inference() { int GeneralReaderOp::inference() {
// reade request from client // reade request from client
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
uint64_t log_id = req->log_id();
int batch_size = req->insts_size(); int batch_size = req->insts_size();
int input_var_num = 0; int input_var_num = 0;
...@@ -84,28 +83,25 @@ int GeneralReaderOp::inference() { ...@@ -84,28 +83,25 @@ int GeneralReaderOp::inference() {
TensorVector *out = &res->tensor_vector; TensorVector *out = &res->tensor_vector;
res->SetBatchSize(batch_size); res->SetBatchSize(batch_size);
res->SetLogId(log_id);
if (!res) { if (!res) {
LOG(ERROR) << "(logid=" << log_id LOG(ERROR) << "Failed get op tls reader object output";
<< ") Failed get op tls reader object output";
} }
Timer timeline; Timer timeline;
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
int var_num = req->insts(0).tensor_array_size(); int var_num = req->insts(0).tensor_array_size();
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num; VLOG(2) << "var num: " << var_num;
VLOG(2) << "(logid=" << log_id VLOG(2) << "start to call load general model_conf op";
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance(); baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done."; VLOG(2) << "get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config = std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config(); resource.get_general_model_config();
VLOG(2) << "(logid=" << log_id << ") print general model config done."; VLOG(2) << "print general model config done.";
// TODO(guru4elephant): how to do conditional check? // TODO(guru4elephant): how to do conditional check?
/* /*
...@@ -121,13 +117,22 @@ int GeneralReaderOp::inference() { ...@@ -121,13 +117,22 @@ int GeneralReaderOp::inference() {
elem_type.resize(var_num); elem_type.resize(var_num);
elem_size.resize(var_num); elem_size.resize(var_num);
capacity.resize(var_num); capacity.resize(var_num);
/*
message Tensor {
repeated bytes data = 1;
repeated int32 int_data = 2;
repeated int64 int64_data = 3;
repeated float float_data = 4;
optional int32 elem_type = 5;
repeated int32 shape = 6;
repeated int32 lod = 7; // only for fetch tensor currently
};
*/
// prepare basic information for input // prepare basic information for input
for (int i = 0; i < var_num; ++i) { for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor lod_tensor; paddle::PaddleTensor lod_tensor;
elem_type[i] = req->insts(0).tensor_array(i).elem_type(); elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
<< "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64 if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t); elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64; lod_tensor.dtype = paddle::PaddleDType::INT64;
...@@ -138,23 +143,30 @@ int GeneralReaderOp::inference() { ...@@ -138,23 +143,30 @@ int GeneralReaderOp::inference() {
elem_size[i] = sizeof(int32_t); elem_size[i] = sizeof(int32_t);
lod_tensor.dtype = paddle::PaddleDType::INT32; lod_tensor.dtype = paddle::PaddleDType::INT32;
} }
//implement lod tensor here
if (model_config->_is_lod_feed[i]) { if (req->insts(0).tensor_array(i).lod_size() > 0) {
lod_tensor.lod.resize(1); lod_tensor.lod.resize(1);
lod_tensor.lod[0].push_back(0); for (int k = 0; k < req->insts(0).tensor_array(i).lod_size(); ++k) {
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor"; lod_tensor.lod[0].push_back(req->insts(0).tensor_array(i).lod(k));
} else { }
lod_tensor.shape.push_back(batch_size); capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "shape for var[" << i << "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "var[" << i << "] is tensor, capacity: " << capacity[i];
}
else {
capacity[i] = 1; capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) { for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k); int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i VLOG(2) << "shape for var[" << i << "]: " << dim;
<< "]: " << dim;
capacity[i] *= dim; capacity[i] *= dim;
lod_tensor.shape.push_back(dim); lod_tensor.shape.push_back(dim);
} }
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "var[" << i << "] is tensor, capacity: " << capacity[i];
<< "] is tensor, capacity: " << capacity[i];
} }
lod_tensor.name = model_config->_feed_name[i]; lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor); out->push_back(lod_tensor);
...@@ -174,12 +186,11 @@ int GeneralReaderOp::inference() { ...@@ -174,12 +186,11 @@ int GeneralReaderOp::inference() {
} else if (tensor.int_data_size() > 0) { } else if (tensor.int_data_size() > 0) {
data_len = tensor.int_data_size(); data_len = tensor.int_data_size();
} }
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i VLOG(2) << "tensor size for var[" << i << "]: " << data_len;
<< "]: " << data_len;
tensor_size += data_len; tensor_size += data_len;
int cur_len = out->at(i).lod[0].back(); int cur_len = out->at(i).lod[0].back();
VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len; VLOG(2) << "current len: " << cur_len;
int sample_len = 0; int sample_len = 0;
if (tensor.shape_size() == 1) { if (tensor.shape_size() == 1) {
...@@ -188,21 +199,21 @@ int GeneralReaderOp::inference() { ...@@ -188,21 +199,21 @@ int GeneralReaderOp::inference() {
sample_len = tensor.shape(0); sample_len = tensor.shape(0);
} }
out->at(i).lod[0].push_back(cur_len + sample_len); out->at(i).lod[0].push_back(cur_len + sample_len);
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len; VLOG(2) << "new len: " << cur_len + sample_len;
} }
out->at(i).data.Resize(tensor_size * elem_size[i]); out->at(i).data.Resize(tensor_size * elem_size[i]);
out->at(i).shape = {out->at(i).lod[0].back()}; out->at(i).shape = {};
for (int j = 1; j < req->insts(0).tensor_array(i).shape_size(); ++j) { for (int j = 1; j < req->insts(0).tensor_array(i).shape_size(); ++j) {
out->at(i).shape.push_back(req->insts(0).tensor_array(i).shape(j)); out->at(i).shape.push_back(req->insts(0).tensor_array(i).shape(j));
} }
if (out->at(i).shape.size() == 1) { //if (out->at(i).shape.size() == 1) {
out->at(i).shape.push_back(1); // out->at(i).shape.push_back(1);
} //}
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back(); << "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else { } else {
out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]); out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "var[" << i
<< "] is tensor and capacity=" << batch_size * capacity[i]; << "] is tensor and capacity=" << batch_size * capacity[i];
} }
} }
...@@ -211,56 +222,41 @@ int GeneralReaderOp::inference() { ...@@ -211,56 +222,41 @@ int GeneralReaderOp::inference() {
for (int i = 0; i < var_num; ++i) { for (int i = 0; i < var_num; ++i) {
if (elem_type[i] == 0) { if (elem_type[i] == 0) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data()); int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i VLOG(2) << "first element data in var[" << i << "] is "
<< "] is " << req->insts(0).tensor_array(i).int64_data(0); << req->insts(0).tensor_array(i).int64_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).int64_data_size(); int elem_num = req->insts(j).tensor_array(i).int64_data_size();
for (int k = 0; k < elem_num; ++k) { for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(j).tensor_array(i).int64_data(k); dst_ptr[offset + k] = req->insts(j).tensor_array(i).int64_data(k);
} }
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
} }
} else if (elem_type[i] == 1) { } else if (elem_type[i] == 1) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data()); float *dst_ptr = static_cast<float *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i VLOG(2) << "first element data in var[" << i << "] is "
<< "] is " << req->insts(0).tensor_array(i).float_data(0); << req->insts(0).tensor_array(i).float_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).float_data_size(); int elem_num = req->insts(j).tensor_array(i).float_data_size();
for (int k = 0; k < elem_num; ++k) { for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(j).tensor_array(i).float_data(k); dst_ptr[offset + k] = req->insts(j).tensor_array(i).float_data(k);
} }
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
} }
} else if (elem_type[i] == 2) { } else if (elem_type[i] == 2) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data()); int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i VLOG(2) << "first element data in var[" << i << "] is "
<< "] is " << req->insts(0).tensor_array(i).int_data(0); << req->insts(0).tensor_array(i).int_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
int elem_num = req->insts(j).tensor_array(i).int_data_size(); int elem_num = req->insts(j).tensor_array(i).int_data_size();
for (int k = 0; k < elem_num; ++k) { for (int k = 0; k < elem_num; ++k) {
dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k); dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k);
} }
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
} }
} }
} }
VLOG(2) << "(logid=" << log_id << ") output size: " << out->size(); VLOG(2) << "output size: " << out->size();
timeline.Pause(); timeline.Pause();
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
...@@ -268,7 +264,7 @@ int GeneralReaderOp::inference() { ...@@ -268,7 +264,7 @@ int GeneralReaderOp::inference() {
AddBlobInfo(res, start); AddBlobInfo(res, start);
AddBlobInfo(res, end); AddBlobInfo(res, end);
VLOG(2) << "(logid=" << log_id << ") read data from client success"; VLOG(2) << "read data from client success";
return 0; return 0;
} }
DEFINE_OP(GeneralReaderOp); DEFINE_OP(GeneralReaderOp);
......
# 如何配置Web Service的Op
## ## rpc和local predictor
目前一共支持两种Serving的运行方式,一种是rpc,一种是local predictor,二者各有优劣。
| 版本 | 特点 | 适用场景 |
| --------------- | -------------------- | ---------------------------------------- |
| Rpc | 稳定性高,分布式部署 | 适用于吞吐量大,需要跨机房部署的情况 |
| local predictor | 部署方便,预测速度快 | 适用于对预测速度要求高,迭代速度快的场景 |
rpc模式的原理是启动一个rpc服务,客户端用protobuf格式打包预测请求的内容,在rpc服务端完成预测。适合稳定性较高的场景,web服务和预测服务可以解耦合实现多地部署。
local predictor的原理是启动一个python的predictor,客户端可以直接调用python的predictor来实现。适合快速迭代以及规模较小的场景,web服务和预测服务需要在同一台机器上,
在web模式下,我们通过配置Op的方式来配置每一个Serving模型。
我们以OCR的识别模型作为例子,以下是RecOp的实现。
```python
class RecOp(Op):
def init_op(self):
self.ocr_reader = OCRReader()
self.get_rotate_crop_image = GetRotateCropImage()
self.sorted_boxes = SortedBoxes()
def preprocess(self, input_dicts):
(_, input_dict), = input_dicts.items()
im = input_dict["image"]
dt_boxes = input_dict["dt_boxes"]
dt_boxes = self.sorted_boxes(dt_boxes)
feed_list = []
img_list = []
max_wh_ratio = 0
for i, dtbox in enumerate(dt_boxes):
boximg = self.get_rotate_crop_image(im, dt_boxes[i])
img_list.append(boximg)
h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list:
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img}
feed_list.append(feed)
return feed_list
def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
res_lst = []
for res in rec_res:
res_lst.append(res[0])
res = {"res": str(res_lst)}
return res
```
在做好init_op,preprocess和postprocess这些函数的重载之后,我们就在调用这个Op的地方去控制rpc和local predictor。
```python
#RPC
rec_op = RecOp(
name="rec",
input_ops=[det_op],
server_endpoints=["127.0.0.1:12001"], #if server endpoint exist, use rpc
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
client_config="ocr_rec_client/serving_client_conf.prototxt",
concurrency=1)
# local predictor
rec_op = RecOp(
name="rec",
input_ops=[det_op],
fetch_list=["ctc_greedy_decoder_0.tmp_0", "softmax_0.tmp_0"],
model_config="ocr_rec_server/serving_server_conf.prototxt",
concurrency=1)
```
在上面的例子可以看到,当我们在Op的构造函数里,指定了server_endpoints和client_config时,就会采用rpc的方式。因为这些在运算Op的时候需要先执行preprocess,然后访问rpc服务请求预测,最后再执行postprocess。请求预测的过程,可能预测服务在本地,也可能在远端,可能是单点可能是分布式,需要给出对应的IP地址作为server_endpoints
如果是local predictor,我们就不需要指定endpoints。
| 属性名 | 定义 | 其他 |
| ------------------- | --------------- | ------------------------------------------------------------ |
| name | op名 | |
| input_ops | 前向输入 op | 可以为多个,前向Op的结果会作为此Op的输入 |
| fetch_list | fetch字段名 | 模型预测服务的结果字典包含所有在此定义的fetch字段 |
| rpc限定 | | |
| server_endpoints | rpc服务地址列表 | 分布式部署时可以有多个rpc地址 |
| concurrency | 并行度 | 并行线程数 |
| client_config | 客户端配置文件 | Op接收请求作为客户端访问rpc服务,因此需要客户端的配置文件 |
| local predictor限定 | | |
| model_config | 模型配置文件 | 由于local predictor和Op运行在一台机器上,因此需要模型配置来启动local predictor |
...@@ -76,7 +76,7 @@ class Debugger(object): ...@@ -76,7 +76,7 @@ class Debugger(object):
config.switch_use_feed_fetch_ops(False) config.switch_use_feed_fetch_ops(False)
self.predictor = create_paddle_predictor(config) self.predictor = create_paddle_predictor(config)
def predict(self, feed=None, fetch=None): def predict(self, feed=None, fetch=None, batch=False):
if feed is None or fetch is None: if feed is None or fetch is None:
raise ValueError("You should specify feed and fetch for prediction") raise ValueError("You should specify feed and fetch for prediction")
fetch_list = [] fetch_list = []
...@@ -121,10 +121,19 @@ class Debugger(object): ...@@ -121,10 +121,19 @@ class Debugger(object):
name]) name])
if self.feed_types_[name] == 0: if self.feed_types_[name] == 0:
feed[name] = feed[name].astype("int64") feed[name] = feed[name].astype("int64")
else: elif self.feed_types_[name] == 1:
feed[name] = feed[name].astype("float32") feed[name] = feed[name].astype("float32")
elif self.feed_types_[name] == 2:
feed[name] = feed[name].astype("int32")
else:
raise ValueError("local predictor receives wrong data type")
input_tensor = self.predictor.get_input_tensor(name) input_tensor = self.predictor.get_input_tensor(name)
input_tensor.copy_from_cpu(feed[name]) if "{}.lod".format(name) in feed:
input_tensor.set_lod([feed["{}.lod".format(name)]])
if batch == False:
input_tensor.copy_from_cpu(feed[name][np.newaxis,:])
else:
input_tensor.copy_from_cpu(feed[name])
output_tensors = [] output_tensors = []
output_names = self.predictor.get_output_names() output_names = self.predictor.get_output_names()
for output_name in output_names: for output_name in output_names:
......
...@@ -233,7 +233,7 @@ class Client(object): ...@@ -233,7 +233,7 @@ class Client(object):
# key)) # key))
pass pass
def predict(self, feed=None, fetch=None, need_variant_tag=False, log_id=0): def predict(self, feed=None, fetch=None, batch=False, need_variant_tag=False, log_id=0):
self.profile_.record('py_prepro_0') self.profile_.record('py_prepro_0')
if feed is None or fetch is None: if feed is None or fetch is None:
...@@ -260,7 +260,9 @@ class Client(object): ...@@ -260,7 +260,9 @@ class Client(object):
int_feed_names = [] int_feed_names = []
float_feed_names = [] float_feed_names = []
int_shape = [] int_shape = []
lod_slot_batch = []
float_shape = [] float_shape = []
fetch_names = [] fetch_names = []
counter = 0 counter = 0
batch_size = len(feed_batch) batch_size = len(feed_batch)
...@@ -277,31 +279,59 @@ class Client(object): ...@@ -277,31 +279,59 @@ class Client(object):
for i, feed_i in enumerate(feed_batch): for i, feed_i in enumerate(feed_batch):
int_slot = [] int_slot = []
float_slot = [] float_slot = []
lod_slot = []
for key in feed_i: for key in feed_i:
if key not in self.feed_names_: if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key)) raise ValueError("Wrong feed name: {}.".format(key))
if ".lod" in key:
continue
#if not isinstance(feed_i[key], np.ndarray): #if not isinstance(feed_i[key], np.ndarray):
self.shape_check(feed_i, key) self.shape_check(feed_i, key)
if self.feed_types_[key] in int_type: if self.feed_types_[key] in int_type:
if i == 0: if i == 0:
int_feed_names.append(key) int_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = feed_i[key][np.newaxis,:]
shape_lst.append(1)
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
int_shape.append(list(feed_i[key].shape)) print("feed_i_key shape", feed_i[key].shape)
shape_lst.extend(list(feed_i[key].shape))
print("shape list", shape_lst)
int_shape.append(shape_lst)
else: else:
int_shape.append(self.feed_shapes_[key]) int_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
lod_slot_batch.append(feed_i["{}.lod".format(key)])
else:
lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
int_slot.append(feed_i[key]) int_slot.append(feed_i[key])
self.has_numpy_input = True self.has_numpy_input = True
else: else:
int_slot.append(feed_i[key]) int_slot.append(feed_i[key])
self.all_numpy_input = False self.all_numpy_input = False
elif self.feed_types_[key] in float_type: elif self.feed_types_[key] in float_type:
if i == 0: if i == 0:
float_feed_names.append(key) float_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = feed_i[key][np.newaxis,:]
shape_lst.append(1)
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
float_shape.append(list(feed_i[key].shape)) print("feed_i_key shape", feed_i[key].shape)
shape_lst.extend(list(feed_i[key].shape))
print("shape list", shape_lst)
float_shape.append(shape_lst)
else: else:
float_shape.append(self.feed_shapes_[key]) float_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
lod_slot_batch.append(feed_i["{}.lod".format(key)])
else:
lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
float_slot.append(feed_i[key]) float_slot.append(feed_i[key])
self.has_numpy_input = True self.has_numpy_input = True
...@@ -310,6 +340,7 @@ class Client(object): ...@@ -310,6 +340,7 @@ class Client(object):
self.all_numpy_input = False self.all_numpy_input = False
int_slot_batch.append(int_slot) int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot) float_slot_batch.append(float_slot)
lod_slot_batch.append(lod_slot)
self.profile_.record('py_prepro_1') self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0') self.profile_.record('py_client_infer_0')
...@@ -318,13 +349,10 @@ class Client(object): ...@@ -318,13 +349,10 @@ class Client(object):
if self.all_numpy_input: if self.all_numpy_input:
res = self.client_handle_.numpy_predict( res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch, float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle, int_feed_names, int_shape, lod_slot_batch, fetch_names, result_batch_handle,
self.pid, log_id) self.pid, log_id)
elif self.has_numpy_input == False: elif self.has_numpy_input == False:
res = self.client_handle_.batch_predict( raise ValueError("Please make sure all of your inputs are numpy array")
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid, log_id)
else: else:
raise ValueError( raise ValueError(
"Please make sure the inputs are all in list type or all in numpy.array type" "Please make sure the inputs are all in list type or all in numpy.array type"
......
...@@ -93,11 +93,13 @@ class WebService(object): ...@@ -93,11 +93,13 @@ class WebService(object):
del feed["fetch"] del feed["fetch"]
if len(feed) == 0: if len(feed) == 0:
raise ValueError("empty input") raise ValueError("empty input")
fetch_map = self.client.predict(feed=feed, fetch=fetch) fetch_map = self.client.predict(feed=feed, fetch=fetch, batch=True)
result = self.postprocess( result = self.postprocess(
feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map) feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
result = {"result": result} result = {"result": result}
except ValueError as err: except ValueError as err:
import traceback
print(traceback.format_exc())
result = {"result": err} result = {"result": err}
return result return result
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册