提交 a6655a6b 编写于 作者: M MRXLT 提交者: GitHub

Merge pull request #496 from MRXLT/numpy-input

improve client performance for numpy input
......@@ -17,18 +17,17 @@
#include <sys/types.h>
#include <unistd.h>
#include <pybind11/numpy.h>
#include <algorithm>
#include <fstream>
#include <map>
#include <string>
#include <utility> // move
#include <vector>
#include "core/sdk-cpp/builtin_format.pb.h"
#include "core/sdk-cpp/general_model_service.pb.h"
#include "core/sdk-cpp/include/common.h"
#include "core/sdk-cpp/include/predictor_sdk.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
......@@ -36,6 +35,7 @@ DECLARE_bool(profile_client);
DECLARE_bool(profile_server);
// given some input data, pack into pb, and send request
namespace py = pybind11;
namespace baidu {
namespace paddle_serving {
namespace general_model {
......@@ -178,6 +178,17 @@ class PredictorClient {
PredictorRes& predict_res_batch, // NOLINT
const int& pid);
int numpy_predict(
const std::vector<std::vector<py::array_t<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int>>& float_shape,
const std::vector<std::vector<py::array_t<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name,
const std::vector<std::vector<int>>& int_shape,
const std::vector<std::string>& fetch_name,
PredictorRes& predict_res_batch, // NOLINT
const int& pid);
private:
PredictorApi _api;
Predictor* _predictor;
......
......@@ -30,6 +30,7 @@ using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::general_model::FetchInst;
std::once_flag gflags_init_flag;
namespace py = pybind11;
namespace baidu {
namespace paddle_serving {
......@@ -332,6 +333,284 @@ int PredictorClient::batch_predict(
return 0;
}
int PredictorClient::numpy_predict(
const std::vector<std::vector<py::array_t<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<py::array_t<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch.clear();
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
int fetch_name_num = fetch_name.size();
_api.thrd_initialize();
std::string variant_tag;
_predictor = _api.fetch_predictor("general_model", &variant_tag);
predict_res_batch.set_variant_tag(variant_tag);
VLOG(2) << "fetch general model predictor done.";
VLOG(2) << "float feed name size: " << float_feed_name.size();
VLOG(2) << "int feed name size: " << int_feed_name.size();
VLOG(2) << "max body size : " << brpc::fLU64::FLAGS_max_body_size;
Request req;
for (auto &name : fetch_name) {
req.add_fetch_var_names(name);
}
for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec;
FeedInst *inst = req.add_insts();
std::vector<py::array_t<float>> float_feed = float_feed_batch[bi];
std::vector<py::array_t<int64_t>> int_feed = int_feed_batch[bi];
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
for (auto &name : int_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
VLOG(2) << "batch [" << bi << "] int_feed_name and float_feed_name "
<< "prepared";
int vec_idx = 0;
VLOG(2) << "tensor_vec size " << tensor_vec.size() << " float shape "
<< float_shape.size();
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size();
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
}
tensor->set_elem_type(1);
const int float_shape_size = float_shape[vec_idx].size();
switch (float_shape_size) {
case 4: {
auto float_array = float_feed[vec_idx].unchecked<4>();
for (ssize_t i = 0; i < float_array.shape(0); i++) {
for (ssize_t j = 0; j < float_array.shape(1); j++) {
for (ssize_t k = 0; k < float_array.shape(2); k++) {
for (ssize_t l = 0; l < float_array.shape(3); l++) {
tensor->add_float_data(float_array(i, j, k, l));
}
}
}
}
break;
}
case 3: {
auto float_array = float_feed[vec_idx].unchecked<3>();
for (ssize_t i = 0; i < float_array.shape(0); i++) {
for (ssize_t j = 0; j < float_array.shape(1); j++) {
for (ssize_t k = 0; k < float_array.shape(2); k++) {
tensor->add_float_data(float_array(i, j, k));
}
}
}
break;
}
case 2: {
auto float_array = float_feed[vec_idx].unchecked<2>();
for (ssize_t i = 0; i < float_array.shape(0); i++) {
for (ssize_t j = 0; j < float_array.shape(1); j++) {
tensor->add_float_data(float_array(i, j));
}
}
break;
}
case 1: {
auto float_array = float_feed[vec_idx].unchecked<1>();
for (ssize_t i = 0; i < float_array.shape(0); i++) {
tensor->add_float_data(float_array(i));
}
break;
}
}
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "float feed value prepared";
vec_idx = 0;
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare int feed " << name << " shape size "
<< int_shape[vec_idx].size();
for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]);
}
tensor->set_elem_type(0);
const int int_shape_size = int_shape[vec_idx].size();
switch (int_shape_size) {
case 4: {
auto int_array = int_feed[vec_idx].unchecked<4>();
for (ssize_t i = 0; i < int_array.shape(0); i++) {
for (ssize_t j = 0; j < int_array.shape(1); j++) {
for (ssize_t k = 0; k < int_array.shape(2); k++) {
for (ssize_t l = 0; k < int_array.shape(3); l++) {
tensor->add_float_data(int_array(i, j, k, l));
}
}
}
}
break;
}
case 3: {
auto int_array = int_feed[vec_idx].unchecked<3>();
for (ssize_t i = 0; i < int_array.shape(0); i++) {
for (ssize_t j = 0; j < int_array.shape(1); j++) {
for (ssize_t k = 0; k < int_array.shape(2); k++) {
tensor->add_float_data(int_array(i, j, k));
}
}
}
break;
}
case 2: {
auto int_array = int_feed[vec_idx].unchecked<2>();
for (ssize_t i = 0; i < int_array.shape(0); i++) {
for (ssize_t j = 0; j < int_array.shape(1); j++) {
tensor->add_float_data(int_array(i, j));
}
}
break;
}
case 1: {
auto int_array = int_feed[vec_idx].unchecked<1>();
for (ssize_t i = 0; i < int_array.shape(0); i++) {
tensor->add_float_data(int_array(i));
}
break;
}
}
vec_idx++;
}
VLOG(2) << "batch [" << bi << "] "
<< "int feed value prepared";
}
int64_t preprocess_end = timeline.TimeStampUS();
int64_t client_infer_start = timeline.TimeStampUS();
Response res;
int64_t client_infer_end = 0;
int64_t postprocess_start = 0;
int64_t postprocess_end = 0;
if (FLAGS_profile_client) {
if (FLAGS_profile_server) {
req.set_profile_server(true);
}
}
res.Clear();
if (_predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
return -1;
} else {
client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end;
VLOG(2) << "get model output num";
uint32_t model_num = res.outputs_size();
VLOG(2) << "model num: " << model_num;
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int idx = 0;
int shape_size = output.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
model._shape_map[name][i] =
output.insts(0).tensor_array(idx).shape(i);
}
int lod_size = output.insts(0).tensor_array(idx).lod_size();
if (lod_size > 0) {
model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
}
}
idx += 1;
}
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int idx = 0;
if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int";
model._int64_value_map[name].resize(
output.insts(0).tensor_array(idx).int64_data_size());
int size = output.insts(0).tensor_array(idx).int64_data_size();
for (int i = 0; i < size; ++i) {
model._int64_value_map[name][i] =
output.insts(0).tensor_array(idx).int64_data(i);
}
} else {
VLOG(2) << "fetch var " << name << "type float";
model._float_value_map[name].resize(
output.insts(0).tensor_array(idx).float_data_size());
int size = output.insts(0).tensor_array(idx).float_data_size();
for (int i = 0; i < size; ++i) {
model._float_value_map[name][i] =
output.insts(0).tensor_array(idx).float_data(i);
}
}
idx += 1;
}
predict_res_batch.add_model_res(std::move(model));
}
postprocess_end = timeline.TimeStampUS();
}
if (FLAGS_profile_client) {
std::ostringstream oss;
oss << "PROFILE\t"
<< "pid:" << pid << "\t"
<< "prepro_0:" << preprocess_start << " "
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
oss << "op" << i << "_0:" << res.profile_time(i * 2) << " ";
oss << "op" << i << "_1:" << res.profile_time(i * 2 + 1) << " ";
}
}
oss << "postpro_0:" << postprocess_start << " ";
oss << "postpro_1:" << postprocess_end;
fprintf(stderr, "%s\n", oss.str().c_str());
}
_api.thrd_clear();
return 0;
}
} // namespace general_model
} // namespace paddle_serving
} // namespace baidu
......@@ -100,6 +100,29 @@ PYBIND11_MODULE(serving_client, m) {
fetch_name,
predict_res_batch,
pid);
})
.def("numpy_predict",
[](PredictorClient &self,
const std::vector<std::vector<py::array_t<float>>>
&float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int>> &float_shape,
const std::vector<std::vector<py::array_t<int64_t>>>
&int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::vector<int>> &int_shape,
const std::vector<std::string> &fetch_name,
PredictorRes &predict_res_batch,
const int &pid) {
return self.numpy_predict(float_feed_batch,
float_feed_name,
float_shape,
int_feed_batch,
int_feed_name,
int_shape,
fetch_name,
predict_res_batch,
pid);
},
py::call_guard<py::gil_scoped_release>());
}
......
......@@ -118,6 +118,8 @@ class Client(object):
self.producers = []
self.consumer = None
self.profile_ = _Profiler()
self.all_numpy_input = True
self.has_numpy_input = False
def rpath(self):
lib_path = os.path.dirname(paddle_serving_client.__file__)
......@@ -269,9 +271,12 @@ class Client(object):
else:
int_shape.append(self.feed_shapes_[key])
if isinstance(feed_i[key], np.ndarray):
int_slot.append(np.reshape(feed_i[key], (-1)).tolist())
#int_slot.append(np.reshape(feed_i[key], (-1)).tolist())
int_slot.append(feed_i[key])
self.has_numpy_input = True
else:
int_slot.append(feed_i[key])
self.all_numpy_input = False
elif self.feed_types_[key] == float_type:
if i == 0:
float_feed_names.append(key)
......@@ -280,10 +285,12 @@ class Client(object):
else:
float_shape.append(self.feed_shapes_[key])
if isinstance(feed_i[key], np.ndarray):
float_slot.append(
np.reshape(feed_i[key], (-1)).tolist())
#float_slot.append(np.reshape(feed_i[key], (-1)).tolist())
float_slot.append(feed_i[key])
self.has_numpy_input = True
else:
float_slot.append(feed_i[key])
self.all_numpy_input = False
int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot)
......@@ -291,9 +298,18 @@ class Client(object):
self.profile_.record('py_client_infer_0')
result_batch = self.result_handle_
res = self.client_handle_.batch_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch, self.pid)
if self.all_numpy_input:
res = self.client_handle_.numpy_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch, self.pid)
elif self.has_numpy_input == False:
res = self.client_handle_.batch_predict(
float_slot_batch, float_feed_names, float_shape, int_slot_batch,
int_feed_names, int_shape, fetch_names, result_batch, self.pid)
else:
raise SystemExit(
"Please make sure the inputs are all in list type or all in numpy.array type"
)
self.profile_.record('py_client_infer_1')
self.profile_.record('py_postpro_0')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册