提交 cfdc459f 编写于 作者: W wangjiawei04

reset PR821

上级 4031781a
...@@ -218,25 +218,15 @@ class PredictorClient { ...@@ -218,25 +218,15 @@ class PredictorClient {
int destroy_predictor(); int destroy_predictor();
int batch_predict(
const std::vector<std::vector<std::vector<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int>>& float_shape,
const std::vector<std::vector<std::vector<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid,
const uint64_t log_id);
int numpy_predict( int numpy_predict(
const std::vector<std::vector<py::array_t<float>>>& float_feed_batch, const std::vector<std::vector<py::array_t<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name, const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int>>& float_shape, const std::vector<std::vector<int>>& float_shape,
const std::vector<std::vector<int>>& float_lod_slot_batch,
const std::vector<std::vector<py::array_t<int64_t>>>& int_feed_batch, const std::vector<std::vector<py::array_t<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name, const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape, const std::vector<std::vector<int>>& int_shape,
const std::vector<std::vector<int>>& int_lod_slot_batch,
const std::vector<std::string>& fetch_name, const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT PredictorRes& predict_res_batch, // NOLINT
const int& pid, const int& pid,
......
...@@ -137,227 +137,15 @@ int PredictorClient::create_predictor() { ...@@ -137,227 +137,15 @@ int PredictorClient::create_predictor() {
return 0; return 0;
} }
int PredictorClient::batch_predict(
const std::vector<std::vector<std::vector<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<std::vector<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch.clear();
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
int fetch_name_num = fetch_name.size();
_api.thrd_initialize();
std::string variant_tag;
_predictor = _api.fetch_predictor("general_model", &variant_tag);
predict_res_batch.set_variant_tag(variant_tag);
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "float feed name size: " << float_feed_name.size();
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
req.set_log_id(log_id);
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec;
FeedInst *inst = req.add_insts();
std::vector<std::vector<float>> float_feed = float_feed_batch[bi];
std::vector<std::vector<int64_t>> int_feed = int_feed_batch[bi];
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
for (auto &name : int_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name "
<< "prepared";
int vec_idx = 0;
VLOG(2) << "tensor_vec size " << tensor_vec.size() << " float shape "
<< float_shape.size();
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size();
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
}
tensor->set_elem_type(1);
for (uint32_t j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_float_data(float_feed[vec_idx][j]);
}
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "float feed value prepared";
vec_idx = 0;
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
if (_type[idx] == 0) {
VLOG(2) << "prepare int64 feed " << name << " shape size "
<< int_shape[vec_idx].size();
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int_feed[vec_idx][0];
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int64_data(int_feed[vec_idx][j]);
}
} else if (_type[idx] == 2) {
VLOG(2) << "prepare int32 feed " << name << " shape size "
<< int_shape[vec_idx].size();
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int32_t(int_feed[vec_idx][0]);
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int_data(int32_t(int_feed[vec_idx][j]));
}
}
for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]);
}
tensor->set_elem_type(_type[idx]);
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "int feed value prepared";
}
int64_t preprocess_end = timeline.TimeStampUS();
int64_t client_infer_start = timeline.TimeStampUS();
Response res;
int64_t client_infer_end = 0;
int64_t postprocess_start = 0;
int64_t postprocess_end = 0;
if (FLAGS_profile_client) {
if (FLAGS_profile_server) {
req.set_profile_server(true);
}
}
res.Clear();
if (_predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
_api.thrd_clear();
return -1;
} else {
client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end;
VLOG(2) << "get model output num";
uint32_t model_num = res.outputs_size();
VLOG(2) << "model num: " << model_num;
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
int idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int shape_size = output.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
model._shape_map[name][i] =
output.insts(0).tensor_array(idx).shape(i);
}
int lod_size = output.insts(0).tensor_array(idx).lod_size();
if (lod_size > 0) {
model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
}
}
idx += 1;
}
idx = 0;
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int64";
int size = output.insts(0).tensor_array(idx).int64_data_size();
model._int64_value_map[name] = std::vector<int64_t>(
output.insts(0).tensor_array(idx).int64_data().begin(),
output.insts(0).tensor_array(idx).int64_data().begin() + size);
} else if (_fetch_name_to_type[name] == 1) {
VLOG(2) << "fetch var " << name << "type float";
int size = output.insts(0).tensor_array(idx).float_data_size();
model._float_value_map[name] = std::vector<float>(
output.insts(0).tensor_array(idx).float_data().begin(),
output.insts(0).tensor_array(idx).float_data().begin() + size);
} else if (_fetch_name_to_type[name] == 2) {
VLOG(2) << "fetch var " << name << "type int32";
int size = output.insts(0).tensor_array(idx).int_data_size();
model._int32_value_map[name] = std::vector<int32_t>(
output.insts(0).tensor_array(idx).int_data().begin(),
output.insts(0).tensor_array(idx).int_data().begin() + size);
}
idx += 1;
}
predict_res_batch.add_model_res(std::move(model));
}
postprocess_end = timeline.TimeStampUS();
}
if (FLAGS_profile_client) {
std::ostringstream oss;
oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
oss << "op" << i << "_0:" << res.profile_time(i * 2) << " ";
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
}
}
oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str());
}
_api.thrd_clear();
return 0;
}
int PredictorClient::numpy_predict( int PredictorClient::numpy_predict(
const std::vector<std::vector<py::array_t<float>>> &float_feed_batch, const std::vector<std::vector<py::array_t<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name, const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape, const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<int>> &float_lod_slot_batch,
const std::vector<std::vector<py::array_t<int64_t>>> &int_feed_batch, const std::vector<std::vector<py::array_t<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name, const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>> &int_lod_slot_batch,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid, const int &pid,
...@@ -412,6 +200,9 @@ int PredictorClient::numpy_predict( ...@@ -412,6 +200,9 @@ int PredictorClient::numpy_predict(
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) { for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]); tensor->add_shape(float_shape[vec_idx][j]);
} }
for (uint32_t j = 0; j < float_lod_slot_batch[vec_idx].size(); ++j) {
tensor->add_lod(float_lod_slot_batch[vec_idx][j]);
}
tensor->set_elem_type(1); tensor->set_elem_type(1);
const int float_shape_size = float_shape[vec_idx].size(); const int float_shape_size = float_shape[vec_idx].size();
switch (float_shape_size) { switch (float_shape_size) {
...@@ -470,6 +261,9 @@ int PredictorClient::numpy_predict( ...@@ -470,6 +261,9 @@ int PredictorClient::numpy_predict(
for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) { for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]); tensor->add_shape(int_shape[vec_idx][j]);
} }
for (uint32_t j = 0; j < int_lod_slot_batch[vec_idx].size(); ++j) {
tensor->add_lod(int_lod_slot_batch[vec_idx][j]);
}
tensor->set_elem_type(_type[idx]); tensor->set_elem_type(_type[idx]);
if (_type[idx] == 0) { if (_type[idx] == 0) {
......
...@@ -95,42 +95,18 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -95,42 +95,18 @@ PYBIND11_MODULE(serving_client, m) {
[](PredictorClient &self) { self.create_predictor(); }) [](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor", .def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); }) [](PredictorClient &self) { self.destroy_predictor(); })
.def("batch_predict",
[](PredictorClient &self,
const std::vector<std::vector<std::vector<float>>>
&float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<std::vector<int64_t>>>
&int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid,
const uint64_t log_id) {
return self.batch_predict(float_feed_batch,
float_feed_name,
float_shape,
int_feed_batch,
int_feed_name,
int_shape,
fetch_name,
predict_res_batch,
pid,
log_id);
},
py::call_guard<py::gil_scoped_release>())
.def("numpy_predict", .def("numpy_predict",
[](PredictorClient &self, [](PredictorClient &self,
const std::vector<std::vector<py::array_t<float>>> const std::vector<std::vector<py::array_t<float>>>
&float_feed_batch, &float_feed_batch,
const std::vector<std::string> &float_feed_name, const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape, const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<int>> &float_lod_slot_batch,
const std::vector<std::vector<py::array_t<int64_t>>> const std::vector<std::vector<py::array_t<int64_t>>>
&int_feed_batch, &int_feed_batch,
const std::vector<std::string> &int_feed_name, const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape, const std::vector<std::vector<int>> &int_shape,
const std::vector<std::vector<int>> &int_lod_slot_batch,
const std::vector<std::string> &fetch_name, const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch, PredictorRes &predict_res_batch,
const int &pid, const int &pid,
...@@ -138,9 +114,11 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -138,9 +114,11 @@ PYBIND11_MODULE(serving_client, m) {
return self.numpy_predict(float_feed_batch, return self.numpy_predict(float_feed_batch,
float_feed_name, float_feed_name,
float_shape, float_shape,
float_lod_slot_batch,
int_feed_batch, int_feed_batch,
int_feed_name, int_feed_name,
int_shape, int_shape,
int_lod_slot_batch,
fetch_name, fetch_name,
predict_res_batch, predict_res_batch,
pid, pid,
......
...@@ -73,8 +73,6 @@ int GeneralReaderOp::inference() { ...@@ -73,8 +73,6 @@ int GeneralReaderOp::inference() {
// reade request from client // reade request from client
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
uint64_t log_id = req->log_id(); uint64_t log_id = req->log_id();
int batch_size = req->insts_size();
int input_var_num = 0; int input_var_num = 0;
std::vector<int64_t> elem_type; std::vector<int64_t> elem_type;
std::vector<int64_t> elem_size; std::vector<int64_t> elem_size;
...@@ -83,7 +81,6 @@ int GeneralReaderOp::inference() { ...@@ -83,7 +81,6 @@ int GeneralReaderOp::inference() {
GeneralBlob *res = mutable_data<GeneralBlob>(); GeneralBlob *res = mutable_data<GeneralBlob>();
TensorVector *out = &res->tensor_vector; TensorVector *out = &res->tensor_vector;
res->SetBatchSize(batch_size);
res->SetLogId(log_id); res->SetLogId(log_id);
if (!res) { if (!res) {
...@@ -98,11 +95,11 @@ int GeneralReaderOp::inference() { ...@@ -98,11 +95,11 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op"; << ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance(); baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done."; VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config = std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config(); resource.get_general_model_config();
...@@ -122,13 +119,11 @@ int GeneralReaderOp::inference() { ...@@ -122,13 +119,11 @@ int GeneralReaderOp::inference() {
elem_type.resize(var_num); elem_type.resize(var_num);
elem_size.resize(var_num); elem_size.resize(var_num);
capacity.resize(var_num); capacity.resize(var_num);
// prepare basic information for input // prepare basic information for input
for (int i = 0; i < var_num; ++i) { for (int i = 0; i < var_num; ++i) {
paddle::PaddleTensor lod_tensor; paddle::PaddleTensor lod_tensor;
elem_type[i] = req->insts(0).tensor_array(i).elem_type(); elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
<< "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64 if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t); elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64; lod_tensor.dtype = paddle::PaddleDType::INT64;
...@@ -139,13 +134,24 @@ int GeneralReaderOp::inference() { ...@@ -139,13 +134,24 @@ int GeneralReaderOp::inference() {
elem_size[i] = sizeof(int32_t); elem_size[i] = sizeof(int32_t);
lod_tensor.dtype = paddle::PaddleDType::INT32; lod_tensor.dtype = paddle::PaddleDType::INT32;
} }
// implement lod tensor here
if (model_config->_is_lod_feed[i]) { if (req->insts(0).tensor_array(i).lod_size() > 0) {
lod_tensor.lod.resize(1);
lod_tensor.lod[0].push_back(0);
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor"; VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
lod_tensor.lod.resize(1);
for (int k = 0; k < req->insts(0).tensor_array(i).lod_size(); ++k) {
lod_tensor.lod[0].push_back(req->insts(0).tensor_array(i).lod(k));
}
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k);
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
} else { } else {
lod_tensor.shape.push_back(batch_size);
capacity[i] = 1; capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) { for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
int dim = req->insts(0).tensor_array(i).shape(k); int dim = req->insts(0).tensor_array(i).shape(k);
...@@ -160,51 +166,40 @@ int GeneralReaderOp::inference() { ...@@ -160,51 +166,40 @@ int GeneralReaderOp::inference() {
lod_tensor.name = model_config->_feed_name[i]; lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor); out->push_back(lod_tensor);
} }
// specify the memory needed for output tensor_vector // specify the memory needed for output tensor_vector
for (int i = 0; i < var_num; ++i) { for (int i = 0; i < var_num; ++i) {
if (out->at(i).lod.size() == 1) { if (out->at(i).lod.size() == 1) {
int tensor_size = 0; int tensor_size = 0;
for (int j = 0; j < batch_size; ++j) { const Tensor &tensor = req->insts(0).tensor_array(i);
const Tensor &tensor = req->insts(j).tensor_array(i); int data_len = 0;
int data_len = 0; if (tensor.int64_data_size() > 0) {
if (tensor.int64_data_size() > 0) { data_len = tensor.int64_data_size();
data_len = tensor.int64_data_size(); } else if (tensor.float_data_size() > 0) {
} else if (tensor.float_data_size() > 0) { data_len = tensor.float_data_size();
data_len = tensor.float_data_size(); } else if (tensor.int_data_size() > 0) {
} else if (tensor.int_data_size() > 0) { data_len = tensor.int_data_size();
data_len = tensor.int_data_size();
}
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
tensor_size += data_len;
int cur_len = out->at(i).lod[0].back();
VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
int sample_len = 0;
if (tensor.shape_size() == 1) {
sample_len = data_len;
} else {
sample_len = tensor.shape(0);
}
out->at(i).lod[0].push_back(cur_len + sample_len);
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
}
out->at(i).data.Resize(tensor_size * elem_size[i]);
out->at(i).shape = {out->at(i).lod[0].back()};
for (int j = 1; j < req->insts(0).tensor_array(i).shape_size(); ++j) {
out->at(i).shape.push_back(req->insts(0).tensor_array(i).shape(j));
} }
if (out->at(i).shape.size() == 1) { VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
out->at(i).shape.push_back(1); << "]: " << data_len;
tensor_size += data_len;
int cur_len = out->at(i).lod[0].back();
VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
int sample_len = 0;
if (tensor.shape_size() == 1) {
sample_len = data_len;
} else {
sample_len = tensor.shape(0);
} }
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
out->at(i).data.Resize(tensor_size * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back(); << "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else { } else {
out->at(i).data.Resize(batch_size * capacity[i] * elem_size[i]); out->at(i).data.Resize(capacity[i] * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor and capacity=" << batch_size * capacity[i]; << "] is tensor and capacity=" << capacity[i];
} }
} }
...@@ -215,58 +210,36 @@ int GeneralReaderOp::inference() { ...@@ -215,58 +210,36 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int64_data(0); << "] is " << req->insts(0).tensor_array(i).int64_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { int elem_num = req->insts(0).tensor_array(i).int64_data_size();
int elem_num = req->insts(j).tensor_array(i).int64_data_size(); for (int k = 0; k < elem_num; ++k) {
for (int k = 0; k < elem_num; ++k) { dst_ptr[offset + k] = req->insts(0).tensor_array(i).int64_data(k);
dst_ptr[offset + k] = req->insts(j).tensor_array(i).int64_data(k);
}
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
} }
} else if (elem_type[i] == 1) { } else if (elem_type[i] == 1) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data()); float *dst_ptr = static_cast<float *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).float_data(0); << "] is " << req->insts(0).tensor_array(i).float_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { int elem_num = req->insts(0).tensor_array(i).float_data_size();
int elem_num = req->insts(j).tensor_array(i).float_data_size(); for (int k = 0; k < elem_num; ++k) {
for (int k = 0; k < elem_num; ++k) { dst_ptr[offset + k] = req->insts(0).tensor_array(i).float_data(k);
dst_ptr[offset + k] = req->insts(j).tensor_array(i).float_data(k);
}
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
} }
} else if (elem_type[i] == 2) { } else if (elem_type[i] == 2) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data()); int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int_data(0); << "] is " << req->insts(0).tensor_array(i).int_data(0);
int offset = 0; int offset = 0;
for (int j = 0; j < batch_size; ++j) { int elem_num = req->insts(0).tensor_array(i).int_data_size();
int elem_num = req->insts(j).tensor_array(i).int_data_size(); for (int k = 0; k < elem_num; ++k) {
for (int k = 0; k < elem_num; ++k) { dst_ptr[offset + k] = req->insts(0).tensor_array(i).int_data(k);
dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k);
}
if (out->at(i).lod.size() == 1) {
offset = out->at(i).lod[0][j + 1];
} else {
offset += capacity[i];
}
} }
} }
} }
VLOG(2) << "(logid=" << log_id << ") output size: " << out->size(); VLOG(2) << "(logid=" << log_id << ") output size: " << out->size();
timeline.Pause(); timeline.Pause();
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
res->p_size = 0; res->p_size = 0;
res->_batch_size = batch_size; res->_batch_size = 1;
AddBlobInfo(res, start); AddBlobInfo(res, start);
AddBlobInfo(res, end); AddBlobInfo(res, end);
......
...@@ -18,16 +18,23 @@ import sys ...@@ -18,16 +18,23 @@ import sys
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_client.utils import benchmark_args from paddle_serving_client.utils import benchmark_args
from paddle_serving_app.reader import ChineseBertReader from paddle_serving_app.reader import ChineseBertReader
import numpy as np
args = benchmark_args() args = benchmark_args()
reader = ChineseBertReader({"max_seq_len": 128}) reader = ChineseBertReader({"max_seq_len": 128})
fetch = ["pooled_output"] fetch = ["pooled_output"]
endpoint_list = ["127.0.0.1:9292"] endpoint_list = ['127.0.0.1:8861']
client = Client() client = Client()
client.load_client_config(args.model) client.load_client_config(args.model)
client.connect(endpoint_list) client.connect(endpoint_list)
for line in sys.stdin: for line in sys.stdin:
feed_dict = reader.process(line) feed_dict = reader.process(line)
for key in feed_dict.keys():
feed_dict[key] = np.array(feed_dict[key]).reshape((128, 1))
#print(feed_dict)
result = client.predict(feed=feed_dict, fetch=fetch) result = client.predict(feed=feed_dict, fetch=fetch)
print(result)
print(result)
print(result)
print(result)
...@@ -13,10 +13,11 @@ ...@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing # pylint: disable=doc-string-missing
from paddle_serving_server_gpu.web_service import WebService from paddle_serving_server.web_service import WebService
from paddle_serving_app.reader import ChineseBertReader from paddle_serving_app.reader import ChineseBertReader
import sys import sys
import os import os
import numpy as np
class BertService(WebService): class BertService(WebService):
...@@ -27,18 +28,20 @@ class BertService(WebService): ...@@ -27,18 +28,20 @@ class BertService(WebService):
}) })
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
feed_res = [ feed_res = []
self.reader.process(ins["words"].encode("utf-8")) for ins in feed for ins in feed:
] feed_dict = self.reader.process(ins["words"].encode("utf-8"))
for key in feed_dict.keys():
feed_dict[key] = np.array(feed_dict[key]).reshape(
(1, len(feed_dict[key]), 1))
feed_res.append(feed_dict)
return feed_res, fetch return feed_res, fetch
bert_service = BertService(name="bert") bert_service = BertService(name="bert")
bert_service.load() bert_service.load()
bert_service.load_model_config(sys.argv[1]) bert_service.load_model_config(sys.argv[1])
gpu_ids = os.environ["CUDA_VISIBLE_DEVICES"]
bert_service.set_gpus(gpu_ids)
bert_service.prepare_server( bert_service.prepare_server(
workdir="workdir", port=int(sys.argv[2]), device="gpu") workdir="workdir", port=int(sys.argv[2]), device="cpu")
bert_service.run_rpc_service() bert_service.run_rpc_service()
bert_service.run_web_service() bert_service.run_web_service()
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_app.reader import ChineseBertReader from paddle_serving_app.reader import ChineseBertReader
import sys import sys
import numpy as np
client = Client() client = Client()
client.load_client_config("./bert_seq32_client/serving_client_conf.prototxt") client.load_client_config("./bert_seq32_client/serving_client_conf.prototxt")
...@@ -28,12 +29,21 @@ expected_shape = { ...@@ -28,12 +29,21 @@ expected_shape = {
"pooled_output": (4, 768) "pooled_output": (4, 768)
} }
batch_size = 4 batch_size = 4
feed_batch = [] feed_batch = {}
batch_len = 0
for line in sys.stdin: for line in sys.stdin:
feed = reader.process(line) feed = reader.process(line)
if batch_len == 0:
for key in feed.keys():
val_len = len(feed[key])
feed_batch[key] = np.array(feed[key]).reshape((1, val_len, 1))
continue
if len(feed_batch) < batch_size: if len(feed_batch) < batch_size:
feed_batch.append(feed) for key in feed.keys():
np.concatenate([
feed_batch[key], np.array(feed[key]).reshape((1, val_len, 1))
])
else: else:
fetch_map = client.predict(feed=feed_batch, fetch=fetch) fetch_map = client.predict(feed=feed_batch, fetch=fetch)
feed_batch = [] feed_batch = []
......
...@@ -19,6 +19,7 @@ import os ...@@ -19,6 +19,7 @@ import os
import criteo as criteo import criteo as criteo
import time import time
from paddle_serving_client.metric import auc from paddle_serving_client.metric import auc
import numpy as np
py_version = sys.version_info[0] py_version = sys.version_info[0]
...@@ -41,10 +42,15 @@ for ei in range(10000): ...@@ -41,10 +42,15 @@ for ei in range(10000):
else: else:
data = reader().__next__() data = reader().__next__()
feed_dict = {} feed_dict = {}
feed_dict['dense_input'] = data[0][0] feed_dict['dense_input'] = np.array(data[0][0]).astype("float32").reshape(
1, 13)
feed_dict['dense_input.lod'] = [0, 1]
for i in range(1, 27): for i in range(1, 27):
feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i] tmp_data = np.array(data[0][i]).astype(np.int64)
fetch_map = client.predict(feed=feed_dict, fetch=["prob"]) feed_dict["embedding_{}.tmp_0".format(i - 1)] = tmp_data.reshape(
(1, len(data[0][i])))
feed_dict["embedding_{}.tmp_0.lod".format(i - 1)] = [0, 1]
fetch_map = client.predict(feed=feed_dict, fetch=["prob"], batch=True)
prob_list.append(fetch_map['prob'][0][1]) prob_list.append(fetch_map['prob'][0][1])
label_list.append(data[0][-1][0]) label_list.append(data[0][-1][0])
......
...@@ -27,5 +27,12 @@ test_reader = paddle.batch( ...@@ -27,5 +27,12 @@ test_reader = paddle.batch(
batch_size=1) batch_size=1)
for data in test_reader(): for data in test_reader():
fetch_map = client.predict(feed={"x": data[0][0]}, fetch=["price"]) import numpy as np
new_data = np.zeros((2, 1, 13)).astype("float32")
new_data[0] = data[0][0]
new_data[1] = data[0][0]
print(new_data)
fetch_map = client.predict(
feed={"x": new_data}, fetch=["price"], batch=True)
print("{} {}".format(fetch_map["price"][0], data[0][1][0])) print("{} {}".format(fetch_map["price"][0], data[0][1][0]))
print(fetch_map)
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_client.utils import MultiThreadRunner from paddle_serving_client.utils import MultiThreadRunner
import paddle import paddle
import numpy as np
def single_func(idx, resource): def single_func(idx, resource):
...@@ -26,6 +27,7 @@ def single_func(idx, resource): ...@@ -26,6 +27,7 @@ def single_func(idx, resource):
0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584,
0.6283, 0.4919, 0.1856, 0.0795, -0.0332 0.6283, 0.4919, 0.1856, 0.0795, -0.0332
] ]
x = np.array(x)
for i in range(1000): for i in range(1000):
fetch_map = client.predict(feed={"x": x}, fetch=["price"]) fetch_map = client.predict(feed={"x": x}, fetch=["price"])
if fetch_map is None: if fetch_map is None:
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import sys import sys
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage from paddle_serving_app.reader import Sequential, URL2Image, Resize, CenterCrop, RGB2BGR, Transpose, Div, Normalize, Base64ToImage
if len(sys.argv) != 4: if len(sys.argv) != 4:
...@@ -47,7 +48,7 @@ class ImageService(WebService): ...@@ -47,7 +48,7 @@ class ImageService(WebService):
if "image" not in ins: if "image" not in ins:
raise ("feed data error!") raise ("feed data error!")
img = self.seq(ins["image"]) img = self.seq(ins["image"])
feed_batch.append({"image": img}) feed_batch.append({"image": img[np.newaxis, :]})
return feed_batch, fetch return feed_batch, fetch
def postprocess(self, feed=[], fetch=[], fetch_map={}): def postprocess(self, feed=[], fetch=[], fetch_map={}):
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from paddle_serving_client import Client from paddle_serving_client import Client
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
import sys import sys
import numpy as np
client = Client() client = Client()
client.load_client_config(sys.argv[1]) client.load_client_config(sys.argv[1])
...@@ -28,7 +29,12 @@ imdb_dataset.load_resource(sys.argv[2]) ...@@ -28,7 +29,12 @@ imdb_dataset.load_resource(sys.argv[2])
for line in sys.stdin: for line in sys.stdin:
word_ids, label = imdb_dataset.get_words_and_label(line) word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids} word_len = len(word_ids)
feed = {
"words": np.array(word_ids).reshape(word_len, 1),
"words.lod": [0, word_len]
}
#print(feed)
fetch = ["prediction"] fetch = ["prediction"]
fetch_map = client.predict(feed=feed, fetch=fetch) fetch_map = client.predict(feed=feed, fetch=fetch, batch=True)
print("{} {}".format(fetch_map["prediction"][0], label[0])) print("{} {}".format(fetch_map["prediction"][0], label[0]))
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
from paddle_serving_server.web_service import WebService from paddle_serving_server.web_service import WebService
from paddle_serving_app.reader import IMDBDataset from paddle_serving_app.reader import IMDBDataset
import sys import sys
import numpy as np
class IMDBService(WebService): class IMDBService(WebService):
...@@ -26,10 +27,15 @@ class IMDBService(WebService): ...@@ -26,10 +27,15 @@ class IMDBService(WebService):
self.dataset.load_resource(args["dict_file_path"]) self.dataset.load_resource(args["dict_file_path"])
def preprocess(self, feed={}, fetch=[]): def preprocess(self, feed={}, fetch=[]):
res_feed = [{ feed_batch = []
"words": self.dataset.get_words_only(ins["words"]) words_lod = [0]
} for ins in feed] for ins in feed:
return res_feed, fetch words = self.dataset.get_words_only(ins["words"])
words = np.array(words).reshape(len(words), 1)
words_lod.append(words_lod[-1] + len(words))
feed_batch.append(words)
feed = {"words": np.concatenate(feed_batch), "words.lod": words_lod}
return feed, fetch
imdb_service = IMDBService(name="imdb") imdb_service = IMDBService(name="imdb")
......
...@@ -19,6 +19,7 @@ from paddle_serving_app.reader import LACReader ...@@ -19,6 +19,7 @@ from paddle_serving_app.reader import LACReader
import sys import sys
import os import os
import io import io
import numpy as np
client = Client() client = Client()
client.load_client_config(sys.argv[1]) client.load_client_config(sys.argv[1])
...@@ -31,7 +32,17 @@ for line in sys.stdin: ...@@ -31,7 +32,17 @@ for line in sys.stdin:
feed_data = reader.process(line) feed_data = reader.process(line)
if len(feed_data) <= 0: if len(feed_data) <= 0:
continue continue
fetch_map = client.predict(feed={"words": feed_data}, fetch=["crf_decode"]) print(feed_data)
#fetch_map = client.predict(feed={"words": np.array(feed_data).reshape(len(feed_data), 1), "words.lod": [0, len(feed_data)]}, fetch=["crf_decode"], batch=True)
fetch_map = client.predict(
feed={
"words": np.array(feed_data + feed_data).reshape(
len(feed_data) * 2, 1),
"words.lod": [0, len(feed_data), 2 * len(feed_data)]
},
fetch=["crf_decode"],
batch=True)
print(fetch_map)
begin = fetch_map['crf_decode.lod'][0] begin = fetch_map['crf_decode.lod'][0]
end = fetch_map['crf_decode.lod'][1] end = fetch_map['crf_decode.lod'][1]
segs = reader.parse_result(line, fetch_map["crf_decode"][begin:end]) segs = reader.parse_result(line, fetch_map["crf_decode"][begin:end])
......
...@@ -34,9 +34,9 @@ python ocr_web_server.py gpu ...@@ -34,9 +34,9 @@ python ocr_web_server.py gpu
``` ```
python ocr_web_client.py python ocr_web_client.py
``` ```
If you want a faster web service, please try Web Debugger Service If you want a faster web service, please try Web LocalPredictor Service
## Web Debugger Service ## Web LocalPredictor Service
``` ```
#choose one of cpu/gpu commands as following #choose one of cpu/gpu commands as following
#for cpu user #for cpu user
...@@ -45,7 +45,7 @@ python ocr_debugger_server.py cpu ...@@ -45,7 +45,7 @@ python ocr_debugger_server.py cpu
python ocr_debugger_server.py gpu python ocr_debugger_server.py gpu
``` ```
## Web Debugger Client Prediction ## Web LocalPredictor Client Prediction
``` ```
python ocr_web_client.py python ocr_web_client.py
``` ```
...@@ -61,7 +61,7 @@ Dataset: RCTW 500 sample images ...@@ -61,7 +61,7 @@ Dataset: RCTW 500 sample images
| engine | client read image(ms) | client-server tras time(ms) | server read image(ms) | det pre(ms) | det infer(ms) | det post(ms) | rec pre(ms) | rec infer(ms) | rec post(ms) | server-client trans time(ms) | server side time consumption(ms) | server side overhead(ms) | total time(ms) | | engine | client read image(ms) | client-server tras time(ms) | server read image(ms) | det pre(ms) | det infer(ms) | det post(ms) | rec pre(ms) | rec infer(ms) | rec post(ms) | server-client trans time(ms) | server side time consumption(ms) | server side overhead(ms) | total time(ms) |
|------------------------------|----------------|----------------------------|------------------|--------------------|------------------|--------------------|--------------------|------------------|--------------------|--------------------------|--------------------|--------------|---------------| |------------------------------|----------------|----------------------------|------------------|--------------------|------------------|--------------------|--------------------|------------------|--------------------|--------------------------|--------------------|--------------|---------------|
| Serving web service | 8.69 | 13.41 | 109.97 | 2.82 | 87.76 | 4.29 | 3.98 | 78.51 | 3.66 | 4.12 | 181.02 | 136.49 | 317.51 | | Serving web service | 8.69 | 13.41 | 109.97 | 2.82 | 87.76 | 4.29 | 3.98 | 78.51 | 3.66 | 4.12 | 181.02 | 136.49 | 317.51 |
| Serving Debugger web service | 8.73 | 16.42 | 115.27 | 2.93 | 20.63 | 3.97 | 4.48 | 13.84 | 3.60 | 6.91 | 49.45 | 147.33 | 196.78 | | Serving LocalPredictor web service | 8.73 | 16.42 | 115.27 | 2.93 | 20.63 | 3.97 | 4.48 | 13.84 | 3.60 | 6.91 | 49.45 | 147.33 | 196.78 |
## Appendix: For Users who want to launch Det or Rec only ## Appendix: For Users who want to launch Det or Rec only
if you are going to detect images not recognize it or directly recognize the words from images. We also provide Det and Rec server for you. if you are going to detect images not recognize it or directly recognize the words from images. We also provide Det and Rec server for you.
......
...@@ -34,8 +34,8 @@ python ocr_web_server.py gpu ...@@ -34,8 +34,8 @@ python ocr_web_server.py gpu
python ocr_web_client.py python ocr_web_client.py
``` ```
如果用户需要更快的执行速度,请尝试Debugger版Web服务 如果用户需要更快的执行速度,请尝试LocalPredictor版Web服务
## 启动Debugger版Web服务 ## 启动LocalPredictor版Web服务
``` ```
#根据CPU/GPU设备选择一种启动方式 #根据CPU/GPU设备选择一种启动方式
#for cpu user #for cpu user
...@@ -60,7 +60,7 @@ GPU: Nvidia Tesla V100单卡 ...@@ -60,7 +60,7 @@ GPU: Nvidia Tesla V100单卡
| engine | 客户端读图(ms) | 客户端发送请求到服务端(ms) | 服务端读图(ms) | 检测预处理耗时(ms) | 检测模型耗时(ms) | 检测后处理耗时(ms) | 识别预处理耗时(ms) | 识别模型耗时(ms) | 识别后处理耗时(ms) | 服务端回传客户端时间(ms) | 服务端整体耗时(ms) | 空跑耗时(ms) | 整体耗时(ms) | | engine | 客户端读图(ms) | 客户端发送请求到服务端(ms) | 服务端读图(ms) | 检测预处理耗时(ms) | 检测模型耗时(ms) | 检测后处理耗时(ms) | 识别预处理耗时(ms) | 识别模型耗时(ms) | 识别后处理耗时(ms) | 服务端回传客户端时间(ms) | 服务端整体耗时(ms) | 空跑耗时(ms) | 整体耗时(ms) |
|------------------------------|----------------|----------------------------|------------------|--------------------|------------------|--------------------|--------------------|------------------|--------------------|--------------------------|--------------------|--------------|---------------| |------------------------------|----------------|----------------------------|------------------|--------------------|------------------|--------------------|--------------------|------------------|--------------------|--------------------------|--------------------|--------------|---------------|
| Serving web service | 8.69 | 13.41 | 109.97 | 2.82 | 87.76 | 4.29 | 3.98 | 78.51 | 3.66 | 4.12 | 181.02 | 136.49 | 317.51 | | Serving web service | 8.69 | 13.41 | 109.97 | 2.82 | 87.76 | 4.29 | 3.98 | 78.51 | 3.66 | 4.12 | 181.02 | 136.49 | 317.51 |
| Serving Debugger web service | 8.73 | 16.42 | 115.27 | 2.93 | 20.63 | 3.97 | 4.48 | 13.84 | 3.60 | 6.91 | 49.45 | 147.33 | 196.78 | | Serving LocalPredictor web service | 8.73 | 16.42 | 115.27 | 2.93 | 20.63 | 3.97 | 4.48 | 13.84 | 3.60 | 6.91 | 49.45 | 147.33 | 196.78 |
## 附录: 检测/识别单服务启动 ## 附录: 检测/识别单服务启动
......
...@@ -26,7 +26,7 @@ if sys.argv[1] == 'gpu': ...@@ -26,7 +26,7 @@ if sys.argv[1] == 'gpu':
from paddle_serving_server_gpu.web_service import WebService from paddle_serving_server_gpu.web_service import WebService
elif sys.argv[1] == 'cpu': elif sys.argv[1] == 'cpu':
from paddle_serving_server.web_service import WebService from paddle_serving_server.web_service import WebService
from paddle_serving_app.local_predict import Debugger from paddle_serving_app.local_predict import LocalPredictor
import time import time
import re import re
import base64 import base64
...@@ -39,7 +39,7 @@ class OCRService(WebService): ...@@ -39,7 +39,7 @@ class OCRService(WebService):
Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose( Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), Transpose(
(2, 0, 1)) (2, 0, 1))
]) ])
self.det_client = Debugger() self.det_client = LocalPredictor()
if sys.argv[1] == 'gpu': if sys.argv[1] == 'gpu':
self.det_client.load_model_config( self.det_client.load_model_config(
det_model_config, gpu=True, profile=False) det_model_config, gpu=True, profile=False)
......
rpc_port: 18085 rpc_port: 18080
worker_num: 4 worker_num: 4
build_dag_each_worker: false build_dag_each_worker: false
http_port: 9999
dag: dag:
is_thread_op: false is_thread_op: true
client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
tracer: op:
interval_s: 10 bow:
concurrency: 2
remote_service_conf:
client_type: brpc
model_config: ocr_det_model
devices: ""
...@@ -4,19 +4,20 @@ build_dag_each_worker: false ...@@ -4,19 +4,20 @@ build_dag_each_worker: false
http_port: 9999 http_port: 9999
dag: dag:
is_thread_op: false is_thread_op: false
client_type: brpc
retry: 1 retry: 1
use_profile: false use_profile: false
op: op:
det: det:
concurrency: 2 concurrency: 2
local_service_conf: local_service_conf:
client_type: local_predictor
model_config: ocr_det_model model_config: ocr_det_model
devices: "0" devices: ""
rec: rec:
concurrency: 1 concurrency: 1
timeout: -1 timeout: -1
retry: 1 retry: 1
local_service_conf: local_service_conf:
client_type: local_predictor
model_config: ocr_rec_model model_config: ocr_rec_model
devices: "0" devices: ""
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
try: try:
from paddle_serving_server_gpu.web_service import WebService, Op from paddle_serving_server.web_service import WebService, Op
except ImportError: except ImportError:
from paddle_serving_server.web_service import WebService, Op from paddle_serving_server.web_service import WebService, Op
import logging import logging
...@@ -52,7 +52,7 @@ class DetOp(Op): ...@@ -52,7 +52,7 @@ class DetOp(Op):
self.ori_h, self.ori_w, _ = self.im.shape self.ori_h, self.ori_w, _ = self.im.shape
det_img = self.det_preprocess(self.im) det_img = self.det_preprocess(self.im)
_, self.new_h, self.new_w = det_img.shape _, self.new_h, self.new_w = det_img.shape
return {"image": det_img} return {"image": det_img[np.newaxis, :]}
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
det_out = fetch_dict["concat_1.tmp_0"] det_out = fetch_dict["concat_1.tmp_0"]
...@@ -62,6 +62,7 @@ class DetOp(Op): ...@@ -62,6 +62,7 @@ class DetOp(Op):
dt_boxes_list = self.post_func(det_out, [ratio_list]) dt_boxes_list = self.post_func(det_out, [ratio_list])
dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w]) dt_boxes = self.filter_func(dt_boxes_list[0], [self.ori_h, self.ori_w])
out_dict = {"dt_boxes": dt_boxes, "image": self.im} out_dict = {"dt_boxes": dt_boxes, "image": self.im}
print("out dict", out_dict)
return out_dict return out_dict
...@@ -85,11 +86,14 @@ class RecOp(Op): ...@@ -85,11 +86,14 @@ class RecOp(Op):
h, w = boximg.shape[0:2] h, w = boximg.shape[0:2]
wh_ratio = w * 1.0 / h wh_ratio = w * 1.0 / h
max_wh_ratio = max(max_wh_ratio, wh_ratio) max_wh_ratio = max(max_wh_ratio, wh_ratio)
for img in img_list: _, w, h = self.ocr_reader.resize_norm_img(img_list[0],
max_wh_ratio).shape
imgs = np.zeros((len(img_list), 3, w, h)).astype('float32')
for id, img in enumerate(img_list):
norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio) norm_img = self.ocr_reader.resize_norm_img(img, max_wh_ratio)
feed = {"image": norm_img} imgs[id] = norm_img
feed_list.append(feed) feed = {"image": imgs.copy()}
return feed_list return feed
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True) rec_res = self.ocr_reader.postprocess(fetch_dict, with_score=True)
...@@ -108,5 +112,5 @@ class OcrService(WebService): ...@@ -108,5 +112,5 @@ class OcrService(WebService):
uci_service = OcrService(name="ocr") uci_service = OcrService(name="ocr")
uci_service.prepare_pipeline_config("config.yml") uci_service.prepare_pipeline_config("brpc_config.yml")
uci_service.run_service() uci_service.run_service()
...@@ -31,7 +31,8 @@ class UciOp(Op): ...@@ -31,7 +31,8 @@ class UciOp(Op):
x_value = input_dict["x"] x_value = input_dict["x"]
if isinstance(x_value, (str, unicode)): if isinstance(x_value, (str, unicode)):
input_dict["x"] = np.array( input_dict["x"] = np.array(
[float(x.strip()) for x in x_value.split(self.separator)]) [float(x.strip())
for x in x_value.split(self.separator)]).reshape(1, 13)
return input_dict return input_dict
def postprocess(self, input_dicts, fetch_dict): def postprocess(self, input_dicts, fetch_dict):
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
from paddle_serving_app.reader import Sequential, File2Image, Resize, CenterCrop from paddle_serving_app.reader import Sequential, File2Image, Resize, CenterCrop
from paddle_serving_app.reader import RGB2BGR, Transpose, Div, Normalize from paddle_serving_app.reader import RGB2BGR, Transpose, Div, Normalize
from paddle_serving_app.local_predict import Debugger from paddle_serving_app.local_predict import LocalPredictor
import sys import sys
debugger = Debugger() debugger = LocalPredictor()
debugger.load_model_config(sys.argv[1], gpu=True) debugger.load_model_config(sys.argv[1], gpu=True)
seq = Sequential([ seq = Sequential([
......
...@@ -18,7 +18,7 @@ from paddle_serving_client import Client ...@@ -18,7 +18,7 @@ from paddle_serving_client import Client
from paddle_serving_app.reader import LACReader, SentaReader from paddle_serving_app.reader import LACReader, SentaReader
import os import os
import sys import sys
import numpy as np
#senta_web_service.py #senta_web_service.py
from paddle_serving_server.web_service import WebService from paddle_serving_server.web_service import WebService
from paddle_serving_client import Client from paddle_serving_client import Client
...@@ -36,26 +36,42 @@ class SentaService(WebService): ...@@ -36,26 +36,42 @@ class SentaService(WebService):
#定义senta模型预测服务的预处理,调用顺序:lac reader->lac模型预测->预测结果后处理->senta reader #定义senta模型预测服务的预处理,调用顺序:lac reader->lac模型预测->预测结果后处理->senta reader
def preprocess(self, feed=[], fetch=[]): def preprocess(self, feed=[], fetch=[]):
feed_data = [{
"words": self.lac_reader.process(x["words"])
} for x in feed]
lac_result = self.lac_client.predict(
feed=feed_data, fetch=["crf_decode"])
feed_batch = [] feed_batch = []
words_lod = [0]
for ins in feed:
if "words" not in ins:
raise ("feed data error!")
feed_data = self.lac_reader.process(ins["words"])
words_lod.append(words_lod[-1] + len(feed_data))
feed_batch.append(np.array(feed_data).reshape(len(feed_data), 1))
words = np.concatenate(feed_batch, axis=0)
lac_result = self.lac_client.predict(
feed={"words": words,
"words.lod": words_lod},
fetch=["crf_decode"],
batch=True)
result_lod = lac_result["crf_decode.lod"] result_lod = lac_result["crf_decode.lod"]
feed_batch = []
words_lod = [0]
for i in range(len(feed)): for i in range(len(feed)):
segs = self.lac_reader.parse_result( segs = self.lac_reader.parse_result(
feed[i]["words"], feed[i]["words"],
lac_result["crf_decode"][result_lod[i]:result_lod[i + 1]]) lac_result["crf_decode"][result_lod[i]:result_lod[i + 1]])
feed_data = self.senta_reader.process(segs) feed_data = self.senta_reader.process(segs)
feed_batch.append({"words": feed_data}) feed_batch.append(np.array(feed_data).reshape(len(feed_data), 1))
return feed_batch, fetch words_lod.append(words_lod[-1] + len(feed_data))
return {
"words": np.concatenate(feed_batch),
"words.lod": words_lod
}, fetch
senta_service = SentaService(name="senta") senta_service = SentaService(name="senta")
senta_service.load_model_config("senta_bilstm_model") senta_service.load_model_config("senta_bilstm_model")
senta_service.prepare_server(workdir="workdir") senta_service.prepare_server(workdir="workdir")
senta_service.init_lac_client( senta_service.init_lac_client(
lac_port=9300, lac_client_config="lac_model/serving_server_conf.prototxt") lac_port=9300,
lac_client_config="lac/lac_model/serving_server_conf.prototxt")
senta_service.run_rpc_service() senta_service.run_rpc_service()
senta_service.run_web_service() senta_service.run_web_service()
...@@ -160,10 +160,10 @@ Therefore, a local prediction tool is built into the paddle_serving_app, which i ...@@ -160,10 +160,10 @@ Therefore, a local prediction tool is built into the paddle_serving_app, which i
Taking [fit_a_line prediction service](../examples/fit_a_line) as an example, the following code can be used to run local prediction. Taking [fit_a_line prediction service](../examples/fit_a_line) as an example, the following code can be used to run local prediction.
```python ```python
from paddle_serving_app.local_predict import Debugger from paddle_serving_app.local_predict import LocalPredictor
import numpy as np import numpy as np
debugger = Debugger() debugger = LocalPredictor()
debugger.load_model_config("./uci_housing_model", gpu=False) debugger.load_model_config("./uci_housing_model", gpu=False)
data = [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, data = [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727,
-0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332] -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]
......
...@@ -147,10 +147,10 @@ Paddle Serving框架的server预测op使用了Paddle 的预测框架,在部署 ...@@ -147,10 +147,10 @@ Paddle Serving框架的server预测op使用了Paddle 的预测框架,在部署
[fit_a_line预测服务](../examples/fit_a_line)为例,使用以下代码即可执行本地预测。 [fit_a_line预测服务](../examples/fit_a_line)为例,使用以下代码即可执行本地预测。
```python ```python
from paddle_serving_app.local_predict import Debugger from paddle_serving_app.local_predict import LocalPredictor
import numpy as np import numpy as np
debugger = Debugger() debugger = LocalPredictor()
debugger.load_model_config("./uci_housing_model", gpu=False) debugger.load_model_config("./uci_housing_model", gpu=False)
data = [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, data = [0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727,
-0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332] -0.1583, -0.0584, 0.6283, 0.4919, 0.1856, 0.0795, -0.0332]
......
...@@ -31,7 +31,7 @@ logger = logging.getLogger("fluid") ...@@ -31,7 +31,7 @@ logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class Debugger(object): class LocalPredictor(object):
def __init__(self): def __init__(self):
self.feed_names_ = [] self.feed_names_ = []
self.fetch_names_ = [] self.fetch_names_ = []
...@@ -76,7 +76,7 @@ class Debugger(object): ...@@ -76,7 +76,7 @@ class Debugger(object):
config.switch_use_feed_fetch_ops(False) config.switch_use_feed_fetch_ops(False)
self.predictor = create_paddle_predictor(config) self.predictor = create_paddle_predictor(config)
def predict(self, feed=None, fetch=None): def predict(self, feed=None, fetch=None, batch=False, log_id=0):
if feed is None or fetch is None: if feed is None or fetch is None:
raise ValueError("You should specify feed and fetch for prediction") raise ValueError("You should specify feed and fetch for prediction")
fetch_list = [] fetch_list = []
...@@ -121,10 +121,19 @@ class Debugger(object): ...@@ -121,10 +121,19 @@ class Debugger(object):
name]) name])
if self.feed_types_[name] == 0: if self.feed_types_[name] == 0:
feed[name] = feed[name].astype("int64") feed[name] = feed[name].astype("int64")
else: elif self.feed_types_[name] == 1:
feed[name] = feed[name].astype("float32") feed[name] = feed[name].astype("float32")
elif self.feed_types_[name] == 2:
feed[name] = feed[name].astype("int32")
else:
raise ValueError("local predictor receives wrong data type")
input_tensor = self.predictor.get_input_tensor(name) input_tensor = self.predictor.get_input_tensor(name)
input_tensor.copy_from_cpu(feed[name]) if "{}.lod".format(name) in feed:
input_tensor.set_lod([feed["{}.lod".format(name)]])
if batch == False:
input_tensor.copy_from_cpu(feed[name][np.newaxis, :])
else:
input_tensor.copy_from_cpu(feed[name])
output_tensors = [] output_tensors = []
output_names = self.predictor.get_output_names() output_names = self.predictor.get_output_names()
for output_name in output_names: for output_name in output_names:
...@@ -139,5 +148,6 @@ class Debugger(object): ...@@ -139,5 +148,6 @@ class Debugger(object):
for i, name in enumerate(fetch): for i, name in enumerate(fetch):
fetch_map[name] = outputs[i] fetch_map[name] = outputs[i]
if len(output_tensors[i].lod()) > 0: if len(output_tensors[i].lod()) > 0:
fetch_map[name + ".lod"] = output_tensors[i].lod()[0] fetch_map[name + ".lod"] = np.array(output_tensors[i].lod()[
0]).astype('int32')
return fetch_map return fetch_map
...@@ -233,7 +233,12 @@ class Client(object): ...@@ -233,7 +233,12 @@ class Client(object):
# key)) # key))
pass pass
def predict(self, feed=None, fetch=None, need_variant_tag=False, log_id=0): def predict(self,
feed=None,
fetch=None,
batch=False,
need_variant_tag=False,
log_id=0):
self.profile_.record('py_prepro_0') self.profile_.record('py_prepro_0')
if feed is None or fetch is None: if feed is None or fetch is None:
...@@ -260,7 +265,10 @@ class Client(object): ...@@ -260,7 +265,10 @@ class Client(object):
int_feed_names = [] int_feed_names = []
float_feed_names = [] float_feed_names = []
int_shape = [] int_shape = []
int_lod_slot_batch = []
float_lod_slot_batch = []
float_shape = [] float_shape = []
fetch_names = [] fetch_names = []
counter = 0 counter = 0
batch_size = len(feed_batch) batch_size = len(feed_batch)
...@@ -277,31 +285,56 @@ class Client(object): ...@@ -277,31 +285,56 @@ class Client(object):
for i, feed_i in enumerate(feed_batch): for i, feed_i in enumerate(feed_batch):
int_slot = [] int_slot = []
float_slot = [] float_slot = []
int_lod_slot = []
float_lod_slot = []
for key in feed_i: for key in feed_i:
if key not in self.feed_names_: if ".lod" not in key and key not in self.feed_names_:
raise ValueError("Wrong feed name: {}.".format(key)) raise ValueError("Wrong feed name: {}.".format(key))
if ".lod" in key:
continue
#if not isinstance(feed_i[key], np.ndarray): #if not isinstance(feed_i[key], np.ndarray):
self.shape_check(feed_i, key) self.shape_check(feed_i, key)
if self.feed_types_[key] in int_type: if self.feed_types_[key] in int_type:
if i == 0: if i == 0:
int_feed_names.append(key) int_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = feed_i[key][np.newaxis, :]
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
int_shape.append(list(feed_i[key].shape)) shape_lst.extend(list(feed_i[key].shape))
int_shape.append(shape_lst)
else: else:
int_shape.append(self.feed_shapes_[key]) int_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
int_lod_slot_batch.append(feed_i["{}.lod".format(
key)])
else:
int_lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
int_slot.append(feed_i[key]) int_slot.append(feed_i[key])
self.has_numpy_input = True self.has_numpy_input = True
else: else:
int_slot.append(feed_i[key]) int_slot.append(feed_i[key])
self.all_numpy_input = False self.all_numpy_input = False
elif self.feed_types_[key] in float_type: elif self.feed_types_[key] in float_type:
if i == 0: if i == 0:
float_feed_names.append(key) float_feed_names.append(key)
shape_lst = []
if batch == False:
feed_i[key] = feed_i[key][np.newaxis, :]
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
float_shape.append(list(feed_i[key].shape)) shape_lst.extend(list(feed_i[key].shape))
float_shape.append(shape_lst)
else: else:
float_shape.append(self.feed_shapes_[key]) float_shape.append(self.feed_shapes_[key])
if "{}.lod".format(key) in feed_i:
float_lod_slot_batch.append(feed_i["{}.lod".format(
key)])
else:
float_lod_slot_batch.append([])
if isinstance(feed_i[key], np.ndarray): if isinstance(feed_i[key], np.ndarray):
float_slot.append(feed_i[key]) float_slot.append(feed_i[key])
self.has_numpy_input = True self.has_numpy_input = True
...@@ -310,6 +343,8 @@ class Client(object): ...@@ -310,6 +343,8 @@ class Client(object):
self.all_numpy_input = False self.all_numpy_input = False
int_slot_batch.append(int_slot) int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot) float_slot_batch.append(float_slot)
int_lod_slot_batch.append(int_lod_slot)
float_lod_slot_batch.append(float_lod_slot)
self.profile_.record('py_prepro_1') self.profile_.record('py_prepro_1')
self.profile_.record('py_client_infer_0') self.profile_.record('py_client_infer_0')
...@@ -317,14 +352,13 @@ class Client(object): ...@@ -317,14 +352,13 @@ class Client(object):
result_batch_handle = self.predictorres_constructor() result_batch_handle = self.predictorres_constructor()
if self.all_numpy_input: if self.all_numpy_input:
res = self.client_handle_.numpy_predict( res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch, float_slot_batch, float_feed_names, float_shape,
int_feed_names, int_shape, fetch_names, result_batch_handle, float_lod_slot_batch, int_slot_batch, int_feed_names, int_shape,
self.pid, log_id) int_lod_slot_batch, fetch_names, result_batch_handle, self.pid,
log_id)
elif self.has_numpy_input == False: elif self.has_numpy_input == False:
res = self.client_handle_.batch_predict( raise ValueError(
float_slot_batch, float_feed_names, float_shape, int_slot_batch, "Please make sure all of your inputs are numpy array")
int_feed_names, int_shape, fetch_names, result_batch_handle,
self.pid, log_id)
else: else:
raise ValueError( raise ValueError(
"Please make sure the inputs are all in list type or all in numpy.array type" "Please make sure the inputs are all in list type or all in numpy.array type"
......
...@@ -118,7 +118,7 @@ class WebService(object): ...@@ -118,7 +118,7 @@ class WebService(object):
del feed["fetch"] del feed["fetch"]
if len(feed) == 0: if len(feed) == 0:
raise ValueError("empty input") raise ValueError("empty input")
fetch_map = self.client.predict(feed=feed, fetch=fetch) fetch_map = self.client.predict(feed=feed, fetch=fetch, batch=True)
result = self.postprocess( result = self.postprocess(
feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map) feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map)
result = {"result": result} result = {"result": result}
...@@ -171,8 +171,8 @@ class WebService(object): ...@@ -171,8 +171,8 @@ class WebService(object):
self.app_instance = app_instance self.app_instance = app_instance
def _launch_local_predictor(self): def _launch_local_predictor(self):
from paddle_serving_app.local_predict import Debugger from paddle_serving_app.local_predict import LocalPredictor
self.client = Debugger() self.client = LocalPredictor()
self.client.load_model_config( self.client.load_model_config(
"{}".format(self.model_config), gpu=False, profile=False) "{}".format(self.model_config), gpu=False, profile=False)
......
...@@ -232,8 +232,8 @@ class WebService(object): ...@@ -232,8 +232,8 @@ class WebService(object):
self.app_instance = app_instance self.app_instance = app_instance
def _launch_local_predictor(self, gpu): def _launch_local_predictor(self, gpu):
from paddle_serving_app.local_predict import Debugger from paddle_serving_app.local_predict import LocalPredictor
self.client = Debugger() self.client = LocalPredictor()
self.client.load_model_config( self.client.load_model_config(
"{}".format(self.model_config), gpu=gpu, profile=False) "{}".format(self.model_config), gpu=gpu, profile=False)
......
...@@ -43,7 +43,6 @@ class DAGExecutor(object): ...@@ -43,7 +43,6 @@ class DAGExecutor(object):
dag_conf = server_conf["dag"] dag_conf = server_conf["dag"]
self._retry = dag_conf["retry"] self._retry = dag_conf["retry"]
client_type = dag_conf["client_type"]
self._server_use_profile = dag_conf["use_profile"] self._server_use_profile = dag_conf["use_profile"]
channel_size = dag_conf["channel_size"] channel_size = dag_conf["channel_size"]
self._is_thread_op = dag_conf["is_thread_op"] self._is_thread_op = dag_conf["is_thread_op"]
...@@ -61,8 +60,8 @@ class DAGExecutor(object): ...@@ -61,8 +60,8 @@ class DAGExecutor(object):
self._is_thread_op, tracer_interval_s, server_worker_num) self._is_thread_op, tracer_interval_s, server_worker_num)
self._dag = DAG(self.name, response_op, self._server_use_profile, self._dag = DAG(self.name, response_op, self._server_use_profile,
self._is_thread_op, client_type, channel_size, self._is_thread_op, channel_size, build_dag_each_worker,
build_dag_each_worker, self._tracer) self._tracer)
(in_channel, out_channel, pack_rpc_func, (in_channel, out_channel, pack_rpc_func,
unpack_rpc_func) = self._dag.build() unpack_rpc_func) = self._dag.build()
self._dag.start() self._dag.start()
...@@ -324,13 +323,12 @@ class DAGExecutor(object): ...@@ -324,13 +323,12 @@ class DAGExecutor(object):
class DAG(object): class DAG(object):
def __init__(self, request_name, response_op, use_profile, is_thread_op, def __init__(self, request_name, response_op, use_profile, is_thread_op,
client_type, channel_size, build_dag_each_worker, tracer): channel_size, build_dag_each_worker, tracer):
self._request_name = request_name self._request_name = request_name
self._response_op = response_op self._response_op = response_op
self._use_profile = use_profile self._use_profile = use_profile
self._is_thread_op = is_thread_op self._is_thread_op = is_thread_op
self._channel_size = channel_size self._channel_size = channel_size
self._client_type = client_type
self._build_dag_each_worker = build_dag_each_worker self._build_dag_each_worker = build_dag_each_worker
self._tracer = tracer self._tracer = tracer
if not self._is_thread_op: if not self._is_thread_op:
...@@ -570,11 +568,9 @@ class DAG(object): ...@@ -570,11 +568,9 @@ class DAG(object):
op.use_profiler(self._use_profile) op.use_profiler(self._use_profile)
op.set_tracer(self._tracer) op.set_tracer(self._tracer)
if self._is_thread_op: if self._is_thread_op:
self._threads_or_proces.extend( self._threads_or_proces.extend(op.start_with_thread())
op.start_with_thread(self._client_type))
else: else:
self._threads_or_proces.extend( self._threads_or_proces.extend(op.start_with_process())
op.start_with_process(self._client_type))
_LOGGER.info("[DAG] start") _LOGGER.info("[DAG] start")
# not join yet # not join yet
...@@ -582,7 +578,8 @@ class DAG(object): ...@@ -582,7 +578,8 @@ class DAG(object):
def join(self): def join(self):
for x in self._threads_or_proces: for x in self._threads_or_proces:
x.join() if x is not None:
x.join()
def stop(self): def stop(self):
for chl in self._channels: for chl in self._channels:
......
...@@ -38,7 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode, ...@@ -38,7 +38,7 @@ from .channel import (ThreadChannel, ProcessChannel, ChannelDataEcode,
ChannelTimeoutError) ChannelTimeoutError)
from .util import NameGenerator from .util import NameGenerator
from .profiler import UnsafeTimeProfiler as TimeProfiler from .profiler import UnsafeTimeProfiler as TimeProfiler
from . import local_rpc_service_handler from . import local_service_handler
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_op_name_gen = NameGenerator("Op") _op_name_gen = NameGenerator("Op")
...@@ -56,7 +56,7 @@ class Op(object): ...@@ -56,7 +56,7 @@ class Op(object):
retry=None, retry=None,
batch_size=None, batch_size=None,
auto_batching_timeout=None, auto_batching_timeout=None,
local_rpc_service_handler=None): local_service_handler=None):
# In __init__, all the parameters are just saved and Op is not initialized # In __init__, all the parameters are just saved and Op is not initialized
if name is None: if name is None:
name = _op_name_gen.next() name = _op_name_gen.next()
...@@ -64,7 +64,7 @@ class Op(object): ...@@ -64,7 +64,7 @@ class Op(object):
self.concurrency = concurrency # amount of concurrency self.concurrency = concurrency # amount of concurrency
self.set_input_ops(input_ops) self.set_input_ops(input_ops)
self._local_rpc_service_handler = local_rpc_service_handler self._local_service_handler = local_service_handler
self._server_endpoints = server_endpoints self._server_endpoints = server_endpoints
self._fetch_names = fetch_list self._fetch_names = fetch_list
self._client_config = client_config self._client_config = client_config
...@@ -123,49 +123,65 @@ class Op(object): ...@@ -123,49 +123,65 @@ class Op(object):
self.with_serving = True self.with_serving = True
self._server_endpoints = server_endpoints self._server_endpoints = server_endpoints
else: else:
if self._local_rpc_service_handler is None: if self._local_service_handler is None:
local_service_conf = conf.get("local_service_conf") local_service_conf = conf.get("local_service_conf")
_LOGGER.info("local_service_conf: {}".format( _LOGGER.info("local_service_conf: {}".format(
local_service_conf)) local_service_conf))
model_config = local_service_conf.get("model_config") model_config = local_service_conf.get("model_config")
self.client_type = local_service_conf.get("client_type")
_LOGGER.info("model_config: {}".format(model_config)) _LOGGER.info("model_config: {}".format(model_config))
if model_config is None: if model_config is None:
self.with_serving = False self.with_serving = False
else: else:
# local rpc service # local rpc service
self.with_serving = True self.with_serving = True
service_handler = local_rpc_service_handler.LocalRpcServiceHandler( if self.client_type == "brpc" or self.client_type == "grpc":
model_config=model_config, service_handler = local_service_handler.LocalServiceHandler(
workdir=local_service_conf["workdir"], model_config=model_config,
thread_num=local_service_conf["thread_num"], workdir=local_service_conf["workdir"],
devices=local_service_conf["devices"], thread_num=local_service_conf["thread_num"],
mem_optim=local_service_conf["mem_optim"], devices=local_service_conf["devices"],
ir_optim=local_service_conf["ir_optim"]) mem_optim=local_service_conf["mem_optim"],
service_handler.prepare_server() # get fetch_list ir_optim=local_service_conf["ir_optim"])
serivce_ports = service_handler.get_port_list() service_handler.prepare_server() # get fetch_list
self._server_endpoints = [ serivce_ports = service_handler.get_port_list()
"127.0.0.1:{}".format(p) for p in serivce_ports self._server_endpoints = [
] "127.0.0.1:{}".format(p) for p in serivce_ports
if self._client_config is None: ]
self._client_config = service_handler.get_client_config( if self._client_config is None:
) self._client_config = service_handler.get_client_config(
if self._fetch_names is None: )
self._fetch_names = service_handler.get_fetch_list() if self._fetch_names is None:
self._local_rpc_service_handler = service_handler self._fetch_names = service_handler.get_fetch_list(
)
elif self.client_type == "local_predictor":
service_handler = local_service_handler.LocalPredictorServiceHandler(
model_config=model_config,
workdir=local_service_conf["workdir"],
thread_num=local_service_conf["thread_num"],
devices=local_service_conf["devices"])
service_handler.prepare_server() # get fetch_list
self.local_predictor = service_handler.get_client()
if self._client_config is None:
self._client_config = service_handler.get_client_config(
)
if self._fetch_names is None:
self._fetch_names = service_handler.get_fetch_list(
)
self._local_service_handler = service_handler
else: else:
self.with_serving = True self.with_serving = True
self._local_rpc_service_handler.prepare_server( self._local_service_handler.prepare_server(
) # get fetch_list ) # get fetch_list
serivce_ports = self._local_rpc_service_handler.get_port_list( serivce_ports = self._local_service_handler.get_port_list()
)
self._server_endpoints = [ self._server_endpoints = [
"127.0.0.1:{}".format(p) for p in serivce_ports "127.0.0.1:{}".format(p) for p in serivce_ports
] ]
if self._client_config is None: if self._client_config is None:
self._client_config = self._local_rpc_service_handler.get_client_config( self._client_config = self._local_service_handler.get_client_config(
) )
if self._fetch_names is None: if self._fetch_names is None:
self._fetch_names = self._local_rpc_service_handler.get_fetch_list( self._fetch_names = self._local_service_handler.get_fetch_list(
) )
else: else:
self.with_serving = True self.with_serving = True
...@@ -188,13 +204,13 @@ class Op(object): ...@@ -188,13 +204,13 @@ class Op(object):
self._batch_size, self._auto_batching_timeout))) self._batch_size, self._auto_batching_timeout)))
def launch_local_rpc_service(self): def launch_local_rpc_service(self):
if self._local_rpc_service_handler is None: if self._local_service_handler is None:
_LOGGER.warning( _LOGGER.warning(
self._log("Failed to launch local rpc" self._log("Failed to launch local rpc"
" service: local_rpc_service_handler is None.")) " service: local_service_handler is None."))
return return
port = self._local_rpc_service_handler.get_port_list() port = self._local_service_handler.get_port_list()
self._local_rpc_service_handler.start_server() self._local_service_handler.start_server()
_LOGGER.info("Op({}) use local rpc service at port: {}" _LOGGER.info("Op({}) use local rpc service at port: {}"
.format(self.name, port)) .format(self.name, port))
...@@ -215,22 +231,25 @@ class Op(object): ...@@ -215,22 +231,25 @@ class Op(object):
def set_tracer(self, tracer): def set_tracer(self, tracer):
self._tracer = tracer self._tracer = tracer
def init_client(self, client_type, client_config, server_endpoints, def init_client(self, client_config, server_endpoints):
fetch_names):
if self.with_serving == False: if self.with_serving == False:
_LOGGER.info("Op({}) has no client (and it also do not " _LOGGER.info("Op({}) has no client (and it also do not "
"run the process function)".format(self.name)) "run the process function)".format(self.name))
return None return None
if client_type == 'brpc': if self.client_type == 'brpc':
client = Client() client = Client()
client.load_client_config(client_config) client.load_client_config(client_config)
elif client_type == 'grpc': elif self.client_type == 'grpc':
client = MultiLangClient() client = MultiLangClient()
elif self.client_type == 'local_predictor':
if self.local_predictor is None:
raise ValueError("local predictor not yet created")
client = self.local_predictor
else: else:
raise ValueError("Failed to init client: unknow client " raise ValueError("Failed to init client: unknow client "
"type {}".format(client_type)) "type {}".format(self.client_type))
client.connect(server_endpoints) if self.client_type != "local_predictor":
self._fetch_names = fetch_names client.connect(server_endpoints)
return client return client
def get_input_ops(self): def get_input_ops(self):
...@@ -291,15 +310,25 @@ class Op(object): ...@@ -291,15 +310,25 @@ class Op(object):
(_, input_dict), = input_dicts.items() (_, input_dict), = input_dicts.items()
return input_dict return input_dict
def process(self, feed_batch, typical_logid): def process(self, feed_batch, fetch_names, typical_logid):
err, err_info = ChannelData.check_batch_npdata(feed_batch) err, err_info = ChannelData.check_batch_npdata(feed_batch)
if err != 0: if err != 0:
_LOGGER.critical( _LOGGER.critical(
self._log("Failed to run process: {}. Please override " self._log("Failed to run process: {}. Please override "
"preprocess func.".format(err_info))) "preprocess func.".format(err_info)))
os._exit(-1) os._exit(-1)
call_result = self.client.predict( if self.client_type == "local_predictor":
feed=feed_batch, fetch=self._fetch_names, log_id=typical_logid) call_result = self.client.predict(
feed=feed_batch[0],
fetch=fetch_names,
batch=True,
log_id=typical_logid)
else:
call_result = self.client.predict(
feed=feed_batch,
fetch=fetch_names,
batch=True,
log_id=typical_logid)
if isinstance(self.client, MultiLangClient): if isinstance(self.client, MultiLangClient):
if call_result is None or call_result["serving_status_code"] != 0: if call_result is None or call_result["serving_status_code"] != 0:
return None return None
...@@ -347,23 +376,22 @@ class Op(object): ...@@ -347,23 +376,22 @@ class Op(object):
for channel in channels: for channel in channels:
channel.push(data, name) channel.push(data, name)
def start_with_process(self, client_type): def start_with_process(self):
trace_buffer = None trace_buffer = None
if self._tracer is not None: if self._tracer is not None:
trace_buffer = self._tracer.data_buffer() trace_buffer = self._tracer.data_buffer()
proces = [] process = []
for concurrency_idx in range(self.concurrency): for concurrency_idx in range(self.concurrency):
p = multiprocessing.Process( p = multiprocessing.Process(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), client_type, False, self._get_output_channels(), False, trace_buffer))
trace_buffer))
p.daemon = True p.daemon = True
p.start() p.start()
proces.append(p) process.append(p)
return proces return process
def start_with_thread(self, client_type): def start_with_thread(self):
trace_buffer = None trace_buffer = None
if self._tracer is not None: if self._tracer is not None:
trace_buffer = self._tracer.data_buffer() trace_buffer = self._tracer.data_buffer()
...@@ -372,8 +400,7 @@ class Op(object): ...@@ -372,8 +400,7 @@ class Op(object):
t = threading.Thread( t = threading.Thread(
target=self._run, target=self._run,
args=(concurrency_idx, self._get_input_channel(), args=(concurrency_idx, self._get_input_channel(),
self._get_output_channels(), client_type, True, self._get_output_channels(), True, trace_buffer))
trace_buffer))
# When a process exits, it attempts to terminate # When a process exits, it attempts to terminate
# all of its daemonic child processes. # all of its daemonic child processes.
t.daemon = True t.daemon = True
...@@ -652,7 +679,7 @@ class Op(object): ...@@ -652,7 +679,7 @@ class Op(object):
return parsed_data_dict, need_profile_dict, profile_dict return parsed_data_dict, need_profile_dict, profile_dict
def _run(self, concurrency_idx, input_channel, output_channels, client_type, def _run(self, concurrency_idx, input_channel, output_channels,
is_thread_op, trace_buffer): is_thread_op, trace_buffer):
op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx) op_info_prefix = "[{}|{}]".format(self.name, concurrency_idx)
tid = threading.current_thread().ident tid = threading.current_thread().ident
...@@ -660,8 +687,7 @@ class Op(object): ...@@ -660,8 +687,7 @@ class Op(object):
# init op # init op
profiler = None profiler = None
try: try:
profiler = self._initialize(is_thread_op, client_type, profiler = self._initialize(is_thread_op, concurrency_idx)
concurrency_idx)
except Exception as e: except Exception as e:
_LOGGER.critical( _LOGGER.critical(
"{} Failed to init op: {}".format(op_info_prefix, e), "{} Failed to init op: {}".format(op_info_prefix, e),
...@@ -801,16 +827,16 @@ class Op(object): ...@@ -801,16 +827,16 @@ class Op(object):
except Queue.Full: except Queue.Full:
break break
def _initialize(self, is_thread_op, client_type, concurrency_idx): def _initialize(self, is_thread_op, concurrency_idx):
if is_thread_op: if is_thread_op:
with self._for_init_op_lock: with self._for_init_op_lock:
if not self._succ_init_op: if not self._succ_init_op:
# for the threaded version of Op, each thread cannot get its concurrency_idx # for the threaded version of Op, each thread cannot get its concurrency_idx
self.concurrency_idx = None self.concurrency_idx = None
# init client # init client
self.client = self.init_client( self.client = self.init_client(self._client_config,
client_type, self._client_config, self._server_endpoints,
self._server_endpoints, self._fetch_names) self._fetch_names)
# user defined # user defined
self.init_op() self.init_op()
self._succ_init_op = True self._succ_init_op = True
...@@ -818,9 +844,8 @@ class Op(object): ...@@ -818,9 +844,8 @@ class Op(object):
else: else:
self.concurrency_idx = concurrency_idx self.concurrency_idx = concurrency_idx
# init client # init client
self.client = self.init_client(client_type, self._client_config, self.client = self.init_client(
self._server_endpoints, self._client_config, self._server_endpoints, self._fetch_names)
self._fetch_names)
# user defined # user defined
self.init_op() self.init_op()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册