未验证 提交 79829eb6 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #450 from barrierye/ensemble-support

Ensemble support
......@@ -217,6 +217,7 @@ int run_m(int argc, char** argv) {
LOG(INFO) << " total_request = " << std::to_string(request_num) << " speed = "
<< std::to_string(1000000 * thread_num / mean_time) // mean_time us
<< " query per second";
return 0;
}
} // namespace mcube
......
......@@ -21,6 +21,7 @@
#include <fstream>
#include <map>
#include <string>
#include <utility> // move
#include <vector>
#include "core/sdk-cpp/builtin_format.pb.h"
......@@ -39,12 +40,32 @@ namespace baidu {
namespace paddle_serving {
namespace general_model {
class PredictorRes {
public:
PredictorRes() {}
~PredictorRes() {}
class ModelRes {
public:
ModelRes() {}
ModelRes(const ModelRes& res) {
_engine_name = res._engine_name;
_int64_value_map.insert(res._int64_value_map.begin(),
res._int64_value_map.end());
_float_value_map.insert(res._float_value_map.begin(),
res._float_value_map.end());
_shape_map.insert(res._shape_map.begin(), res._shape_map.end());
_lod_map.insert(res._lod_map.begin(), res._lod_map.end());
}
ModelRes(ModelRes&& res) {
_engine_name = std::move(res._engine_name);
_int64_value_map.insert(
std::make_move_iterator(std::begin(res._int64_value_map)),
std::make_move_iterator(std::end(res._int64_value_map)));
_float_value_map.insert(
std::make_move_iterator(std::begin(res._float_value_map)),
std::make_move_iterator(std::end(res._float_value_map)));
_shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)),
std::make_move_iterator(std::end(res._shape_map)));
_lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)),
std::make_move_iterator(std::end(res._lod_map)));
}
~ModelRes() {}
const std::vector<int64_t>& get_int64_by_name(const std::string& name) {
return _int64_value_map[name];
}
......@@ -57,19 +78,75 @@ class PredictorRes {
const std::vector<int>& get_lod(const std::string& name) {
return _lod_map[name];
}
void set_variant_tag(const std::string& variant_tag) {
_variant_tag = variant_tag;
void set_engine_name(const std::string& engine_name) {
_engine_name = engine_name;
}
const std::string& engine_name() { return _engine_name; }
ModelRes& operator=(ModelRes&& res) {
if (this != &res) {
_engine_name = std::move(res._engine_name);
_int64_value_map.insert(
std::make_move_iterator(std::begin(res._int64_value_map)),
std::make_move_iterator(std::end(res._int64_value_map)));
_float_value_map.insert(
std::make_move_iterator(std::begin(res._float_value_map)),
std::make_move_iterator(std::end(res._float_value_map)));
_shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)),
std::make_move_iterator(std::end(res._shape_map)));
_lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)),
std::make_move_iterator(std::end(res._lod_map)));
}
return *this;
}
const std::string& variant_tag() { return _variant_tag; }
public:
std::string _engine_name;
std::map<std::string, std::vector<int64_t>> _int64_value_map;
std::map<std::string, std::vector<float>> _float_value_map;
std::map<std::string, std::vector<int>> _shape_map;
std::map<std::string, std::vector<int>> _lod_map;
};
class PredictorRes {
public:
PredictorRes() {}
~PredictorRes() {}
public:
void clear() {
_models.clear();
_engine_names.clear();
}
const std::vector<int64_t>& get_int64_by_name(const int model_idx,
const std::string& name) {
return _models[model_idx].get_int64_by_name(name);
}
const std::vector<float>& get_float_by_name(const int model_idx,
const std::string& name) {
return _models[model_idx].get_float_by_name(name);
}
const std::vector<int>& get_shape(const int model_idx,
const std::string& name) {
return _models[model_idx].get_shape(name);
}
const std::vector<int>& get_lod(const int model_idx,
const std::string& name) {
return _models[model_idx].get_lod(name);
}
void add_model_res(ModelRes&& res) {
_engine_names.push_back(res.engine_name());
_models.emplace_back(std::move(res));
}
void set_variant_tag(const std::string& variant_tag) {
_variant_tag = variant_tag;
}
const std::string& variant_tag() { return _variant_tag; }
const std::vector<std::string>& get_engine_names() { return _engine_names; }
private:
std::vector<ModelRes> _models;
std::string _variant_tag;
std::vector<std::string> _engine_names;
};
class PredictorClient {
......
......@@ -111,6 +111,7 @@ void PredictorClient::set_predictor_conf(const std::string &conf_path,
int PredictorClient::destroy_predictor() {
_api.thrd_finalize();
_api.destroy();
return 0;
}
int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) {
......@@ -119,6 +120,7 @@ int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) {
return -1;
}
_api.thrd_initialize();
return 0;
}
int PredictorClient::create_predictor() {
......@@ -129,6 +131,7 @@ int PredictorClient::create_predictor() {
return -1;
}
_api.thrd_initialize();
return 0;
}
int PredictorClient::batch_predict(
......@@ -143,10 +146,7 @@ int PredictorClient::batch_predict(
const int &pid) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch._int64_value_map.clear();
predict_res_batch._float_value_map.clear();
predict_res_batch._shape_map.clear();
predict_res_batch._lod_map.clear();
predict_res_batch.clear();
Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS();
......@@ -189,11 +189,11 @@ int PredictorClient::batch_predict(
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size();
for (int j = 0; j < float_shape[vec_idx].size(); ++j) {
for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]);
}
tensor->set_elem_type(1);
for (int j = 0; j < float_feed[vec_idx].size(); ++j) {
for (uint32_t j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_float_data(float_feed[vec_idx][j]);
}
vec_idx++;
......@@ -208,13 +208,13 @@ int PredictorClient::batch_predict(
Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare int feed " << name << " shape size "
<< int_shape[vec_idx].size();
for (int j = 0; j < int_shape[vec_idx].size(); ++j) {
for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]);
}
tensor->set_elem_type(0);
VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int_feed[vec_idx][0];
for (int j = 0; j < int_feed[vec_idx].size(); ++j) {
for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int64_data(int_feed[vec_idx][j]);
}
vec_idx++;
......@@ -248,23 +248,29 @@ int PredictorClient::batch_predict(
client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end;
uint32_t model_num = res.outputs_size();
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name];
int idx = 0;
int shape_size = res.insts(0).tensor_array(idx).shape_size();
int shape_size = output.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size;
predict_res_batch._shape_map[name].resize(shape_size);
model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) {
predict_res_batch._shape_map[name][i] =
res.insts(0).tensor_array(idx).shape(i);
model._shape_map[name][i] =
output.insts(0).tensor_array(idx).shape(i);
}
int lod_size = res.insts(0).tensor_array(idx).lod_size();
int lod_size = output.insts(0).tensor_array(idx).lod_size();
if (lod_size > 0) {
predict_res_batch._lod_map[name].resize(lod_size);
model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) {
predict_res_batch._lod_map[name][i] =
res.insts(0).tensor_array(idx).lod(i);
model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
}
}
idx += 1;
......@@ -275,25 +281,27 @@ int PredictorClient::batch_predict(
int idx = 0;
if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int";
predict_res_batch._int64_value_map[name].resize(
res.insts(0).tensor_array(idx).int64_data_size());
int size = res.insts(0).tensor_array(idx).int64_data_size();
model._int64_value_map[name].resize(
output.insts(0).tensor_array(idx).int64_data_size());
int size = output.insts(0).tensor_array(idx).int64_data_size();
for (int i = 0; i < size; ++i) {
predict_res_batch._int64_value_map[name][i] =
res.insts(0).tensor_array(idx).int64_data(i);
model._int64_value_map[name][i] =
output.insts(0).tensor_array(idx).int64_data(i);
}
} else {
VLOG(2) << "fetch var " << name << "type float";
predict_res_batch._float_value_map[name].resize(
res.insts(0).tensor_array(idx).float_data_size());
int size = res.insts(0).tensor_array(idx).float_data_size();
model._float_value_map[name].resize(
output.insts(0).tensor_array(idx).float_data_size());
int size = output.insts(0).tensor_array(idx).float_data_size();
for (int i = 0; i < size; ++i) {
predict_res_batch._float_value_map[name][i] =
res.insts(0).tensor_array(idx).float_data(i);
model._float_value_map[name][i] =
output.insts(0).tensor_array(idx).float_data(i);
}
}
idx += 1;
}
predict_res_batch.add_model_res(std::move(model));
}
postprocess_end = timeline.TimeStampUS();
}
......@@ -305,7 +313,6 @@ int PredictorClient::batch_predict(
<< "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) {
......
......@@ -31,27 +31,28 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol())
.def(py::init())
.def("get_int64_by_name",
[](PredictorRes &self, std::string &name) {
return self.get_int64_by_name(name);
[](PredictorRes &self, int model_idx, std::string &name) {
return self.get_int64_by_name(model_idx, name);
},
py::return_value_policy::reference)
.def("get_float_by_name",
[](PredictorRes &self, std::string &name) {
return self.get_float_by_name(name);
[](PredictorRes &self, int model_idx, std::string &name) {
return self.get_float_by_name(model_idx, name);
},
py::return_value_policy::reference)
.def("get_shape",
[](PredictorRes &self, std::string &name) {
return self.get_shape(name);
[](PredictorRes &self, int model_idx, std::string &name) {
return self.get_shape(model_idx, name);
},
py::return_value_policy::reference)
.def("get_lod",
[](PredictorRes &self, std::string &name) {
return self.get_lod(name);
[](PredictorRes &self, int model_idx, std::string &name) {
return self.get_lod(model_idx, name);
},
py::return_value_policy::reference)
.def("variant_tag",
[](PredictorRes &self) { return self.variant_tag(); });
.def("variant_tag", [](PredictorRes &self) { return self.variant_tag(); })
.def("get_engine_names",
[](PredictorRes &self) { return self.get_engine_names(); });
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init())
......
......@@ -35,8 +35,17 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralCopyOp::inference() {
// reade request from client
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name());
VLOG(2) << "precedent name: " << pre_name();
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "precedent name: " << pre_name;
const TensorVector *in = &input_blob->tensor_vector;
VLOG(2) << "input size: " << in->size();
int batch_size = input_blob->GetBatchSize();
......
......@@ -40,12 +40,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference";
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name());
VLOG(2) << "Get precedent op name: " << pre_name();
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name();
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
return -1;
}
......@@ -149,8 +158,8 @@ int GeneralDistKVInferOp::inference() {
timeline.Start();
if (InferManager::instance().infer(
GENERAL_MODEL_NAME, &infer_in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name();
return -1;
}
......
......@@ -41,12 +41,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVQuantInferOp::inference() {
VLOG(2) << "Going to run inference";
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name());
VLOG(2) << "Get precedent op name: " << pre_name();
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name();
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
return -1;
}
......@@ -180,8 +189,8 @@ int GeneralDistKVQuantInferOp::inference() {
timeline.Start();
if (InferManager::instance().infer(
GENERAL_MODEL_NAME, &infer_in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name();
return -1;
}
......
......@@ -31,8 +31,6 @@ namespace baidu {
namespace paddle_serving {
namespace serving {
static const char* GENERAL_MODEL_NAME = "general_model";
struct GeneralBlob {
std::vector<paddle::PaddleTensor> tensor_vector;
int64_t time_stamp[20];
......
......@@ -37,12 +37,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralInferOp::inference() {
VLOG(2) << "Going to run inference";
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name());
VLOG(2) << "Get precedent op name: " << pre_name();
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name();
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
return -1;
}
......@@ -59,8 +68,9 @@ int GeneralInferOp::inference() {
int64_t start = timeline.TimeStampUS();
timeline.Start();
if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME;
if (InferManager::instance().infer(
engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str();
return -1;
}
......
......@@ -33,23 +33,17 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::general_model::ModelOutput;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralResponseOp::inference() {
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name());
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name();
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
Timer timeline;
// double response_time = 0.0;
......@@ -73,9 +67,25 @@ int GeneralResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
}
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
FetchInst *fetch_inst = res->add_insts();
const GeneralBlob *input_blob;
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
ModelOutput *output = res->add_outputs();
// To get the order of model return values
output->set_engine_name(pre_name);
FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array();
tensor->set_elem_type(1);
......@@ -103,7 +113,7 @@ int GeneralResponseOp::inference() {
if (in->at(idx).dtype == paddle::PaddleDType::INT64) {
int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data());
if (model_config->_is_lod_fetch[idx]) {
FetchInst *fetch_p = res->mutable_insts(0);
FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_lod(
in->at(idx).lod[0][j]);
......@@ -112,7 +122,7 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]);
}
} else {
FetchInst *fetch_p = res->mutable_insts(0);
FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < cap; ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]);
}
......@@ -121,7 +131,7 @@ int GeneralResponseOp::inference() {
} else if (in->at(idx).dtype == paddle::PaddleDType::FLOAT32) {
float *data_ptr = static_cast<float *>(in->at(idx).data.data());
if (model_config->_is_lod_fetch[idx]) {
FetchInst *fetch_p = res->mutable_insts(0);
FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_lod(
in->at(idx).lod[0][j]);
......@@ -130,7 +140,7 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]);
}
} else {
FetchInst *fetch_p = res->mutable_insts(0);
FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < cap; ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]);
}
......@@ -138,12 +148,26 @@ int GeneralResponseOp::inference() {
var_idx++;
}
}
}
if (req->profile_server()) {
int64_t end = timeline.TimeStampUS();
// TODO(barriery): multi-model profile_time.
// At present, only the response_op is multi-input, so here we get
// the profile_time by hard coding. It needs to be replaced with
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "p size for input blob: " << input_blob->p_size;
for (int i = 0; i < input_blob->p_size; ++i) {
res->add_profile_time(input_blob->time_stamp[i]);
int profile_time_idx = -1;
if (pi == 0) {
profile_time_idx = 0;
} else {
profile_time_idx = input_blob->p_size - 2;
}
for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) {
res->add_profile_time(input_blob->time_stamp[profile_time_idx]);
}
}
// TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start);
......
......@@ -32,22 +32,18 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::general_model::ModelOutput;
using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralTextResponseOp::inference() {
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name());
VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name();
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "infer batch size: " << batch_size;
const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
Timer timeline;
int64_t start = timeline.TimeStampUS();
......@@ -67,11 +63,26 @@ int GeneralTextResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
}
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
const GeneralBlob *input_blob;
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
ModelOutput *output = res->add_outputs();
output->set_engine_name(
pre_name); // To get the order of model return values
for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = res->add_insts();
FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array();
// currently only response float tensor or lod_tensor
......@@ -100,26 +111,42 @@ int GeneralTextResponseOp::inference() {
for (int j = 0; j < batch_size; ++j) {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) {
res->mutable_insts(j)->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[k]);
output->mutable_insts(j)
->mutable_tensor_array(var_idx)
->add_float_data(data_ptr[k]);
}
}
} else {
for (int j = 0; j < batch_size; ++j) {
for (int k = j * cap; k < (j + 1) * cap; ++k) {
res->mutable_insts(j)->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[k]);
output->mutable_insts(j)
->mutable_tensor_array(var_idx)
->add_float_data(data_ptr[k]);
}
}
}
var_idx++;
}
}
if (req->profile_server()) {
int64_t end = timeline.TimeStampUS();
for (int i = 0; i < input_blob->p_size; ++i) {
res->add_profile_time(input_blob->time_stamp[i]);
// TODO(barriery): multi-model profile_time.
// At present, only the response_op is multi-input, so here we get
// the profile_time by hard coding. It needs to be replaced with
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1;
if (pi == 0) {
profile_time_idx = 0;
} else {
profile_time_idx = input_blob->p_size - 2;
}
for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) {
res->add_profile_time(input_blob->time_stamp[profile_time_idx]);
}
}
// TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start);
......
......@@ -40,10 +40,15 @@ message Request {
};
message Response {
repeated FetchInst insts = 1;
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
};
message ModelOutput {
repeated FetchInst insts = 1;
optional string engine_name = 2;
}
service GeneralModelService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
......
......@@ -14,6 +14,7 @@
#include "core/predictor/framework/dag.h"
#include <string>
#include <utility> // make_pair
#include <vector>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/predictor_metric.h" // PredictorMetric
......@@ -199,25 +200,81 @@ const DagStage* Dag::stage_by_index(uint32_t index) { return _stages[index]; }
int Dag::topo_sort() {
std::stringstream ss;
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) {
uint32_t nodes_size = _index_nodes.size();
std::vector<uint32_t> in_degree(nodes_size, 0);
std::vector<std::vector<uint32_t>> in_egde(nodes_size);
for (uint32_t nid = 0; nid < nodes_size; nid++) {
in_degree[nid] += _index_nodes[nid]->depends.size();
for (auto it = _index_nodes[nid]->depends.begin();
it != _index_nodes[nid]->depends.end();
++it) {
uint32_t pnid = Dag::node_by_name(it->first)->id -
1; // 0 is reserved for begginer-op
in_egde[pnid].push_back(nid);
}
}
for (int i = 0; i < in_degree.size(); ++i) {
LOG(INFO) << "(" << _index_nodes[i]->name << ") in_degree[" << i
<< "]: " << in_degree[i];
}
int sorted_num = 0;
DagStage* stage = new (std::nothrow) DagStage();
if (stage == NULL) {
LOG(ERROR) << "Invalid stage!";
return ERR_MEM_ALLOC_FAILURE;
}
stage->nodes.push_back(_index_nodes[nid]);
ss.str("");
ss << _stages.size();
stage->name = ss.str();
stage->full_name = full_name() + NAME_DELIMITER + stage->name;
for (uint32_t nid = 0; nid < nodes_size; ++nid) {
if (in_degree[nid] == 0) {
++sorted_num;
stage->nodes.push_back(_index_nodes[nid]);
// assign stage number after stage created
_index_nodes[nid]->stage = _stages.size();
// assign dag node full name after stage created
_index_nodes[nid]->full_name =
stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name;
}
}
if (stage->nodes.size() == 0) {
LOG(ERROR) << "Invalid Dag!";
return ERR_INTERNAL_FAILURE;
}
_stages.push_back(stage);
while (sorted_num < nodes_size) {
auto pre_nodes = _stages.back()->nodes;
DagStage* stage = new (std::nothrow) DagStage();
ss.str("");
ss << _stages.size();
stage->name = ss.str();
stage->full_name = full_name() + NAME_DELIMITER + stage->name;
for (uint32_t pi = 0; pi < pre_nodes.size(); ++pi) {
uint32_t pnid = pre_nodes[pi]->id - 1;
for (uint32_t ei = 0; ei < in_egde[pnid].size(); ++ei) {
uint32_t nid = in_egde[pnid][ei];
--in_degree[nid];
if (in_degree[nid] == 0) {
++sorted_num;
stage->nodes.push_back(_index_nodes[nid]);
// assign stage number after stage created
_index_nodes[nid]->stage = nid;
_index_nodes[nid]->stage = _stages.size();
// assign dag node full name after stage created
_index_nodes[nid]->full_name =
stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name;
}
}
}
if (stage->nodes.size() == 0) {
LOG(ERROR) << "Invalid Dag!";
return ERR_INTERNAL_FAILURE;
}
_stages.push_back(stage);
}
return ERR_OK;
}
......
......@@ -76,19 +76,34 @@ int DagView::init(Dag* dag, const std::string& service_name) {
}
op->set_full_name(service_name + NAME_DELIMITER + node->full_name);
// Set the name of the Op as the key of the matching engine.
VLOG(2) << "op->set_engine_name(" << node->name.c_str() << ")";
op->set_engine_name(node->name);
vnode->conf = node;
vnode->op = op;
// Add depends
for (auto it = vnode->conf->depends.begin();
it != vnode->conf->depends.end();
++it) {
std::string pre_node_name = it->first;
VLOG(2) << "add op pre name: \n"
<< "current op name: " << vnode->op->op_name()
<< ", previous op name: " << pre_node_name;
vnode->op->add_pre_node_name(pre_node_name);
}
vstage->nodes.push_back(vnode);
}
// TODO(guru4elephant): this seems buggy, please review later
if (si > 0) {
VLOG(2) << "set op pre name: \n"
<< "current op name: " << vstage->nodes.back()->op->op_name()
<< " previous op name: "
<< _view[si - 1]->nodes.back()->op->op_name();
vstage->nodes.back()->op->set_pre_node_name(
_view[si - 1]->nodes.back()->op->op_name());
}
/*if (si > 0) {*/
// VLOG(2) << "set op pre name: \n"
//<< "current op name: " << vstage->nodes.back()->op->op_name()
//<< " previous op name: "
//<< _view[si - 1]->nodes.back()->op->op_name();
// vstage->nodes.back()->op->set_pre_node_name(
//_view[si - 1]->nodes.back()->op->op_name());
/*}*/
_view.push_back(vstage);
}
......@@ -139,6 +154,7 @@ int DagView::execute_one_stage(ViewStage* vstage,
butil::IOBufBuilder* debug_os) {
butil::Timer stage_time(butil::Timer::STARTED);
uint32_t node_size = vstage->nodes.size();
VLOG(2) << "vstage->nodes.size(): " << node_size;
for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf;
......
......@@ -765,6 +765,8 @@ class InferManager {
}
size_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
if (!engine) {
......
......@@ -56,11 +56,11 @@ int MempoolWrapper::thread_initialize() {
im::fugue::memory::Region* region = new im::fugue::memory::Region();
region->init();
im::Mempool* mempool = new (std::nothrow) im::Mempool(region);
MempoolRegion* mempool_region = new MempoolRegion(region, mempool);
if (mempool == NULL) {
LOG(ERROR) << "Failed create thread mempool";
return -1;
}
MempoolRegion* mempool_region = new MempoolRegion(region, mempool);
if (THREAD_SETSPECIFIC(_bspec_key, mempool_region) != 0) {
LOG(ERROR) << "unable to set the thrd_data";
......
......@@ -60,6 +60,7 @@ int Op::init(Bus* bus,
return -1;
}
_pre_node_names.clear();
return custom_init();
}
......
......@@ -14,7 +14,9 @@
#pragma once
#include <bvar/bvar.h> // bvar::LatencyRecorder
#include <cstdlib>
#include <string>
#include <vector>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/channel.h"
#include "core/predictor/framework/op_repository.h"
......@@ -132,18 +134,28 @@ class Op {
const std::string& full_name() const { return _full_name; }
const std::string& pre_name() const { return _pre_node_name; }
const std::vector<std::string>& pre_names() const { return _pre_node_names; }
void set_full_name(const std::string full_name) { _full_name = full_name; }
void set_pre_node_name(const std::string pre_name) {
_pre_node_name = pre_name;
void add_pre_node_name(const std::string pre_name) {
_pre_node_names.push_back(pre_name);
}
const std::string& type() const;
uint32_t id() const;
// Set the name of the Op as the key of the matching engine.
// Notes that this key is only used by infer_op (only the
// infer_op needs to find the corresponding engine).
// At present, there is only general_infer_op.
void set_engine_name(const std::string engine_name) {
_engine_name = engine_name;
}
const std::string& engine_name() const { return _engine_name; }
// --------------- Default implements ----------------
virtual int custom_init() { return 0; }
......@@ -189,13 +201,14 @@ class Op {
Bus* _bus;
Dag* _dag;
uint32_t _id;
std::string _pre_node_name; // only for sequential execution
std::vector<std::string> _pre_node_names; // for DAG execution
std::string _name;
std::string _full_name; // service_workflow_stageindex_opname
std::string _type;
bool _has_calc;
bool _has_init;
TimerFlow* _timer;
std::string _engine_name; // only for infer_op
};
template <typename T>
......@@ -215,7 +228,10 @@ class OpWithChannel : public Op {
return _channel;
}
_channel = butil::get_object<ChannelType>();
// TODO(barriery): There are some problems in using butil::get_object
// _channel = butil::get_object<ChannelType>();
_channel = new ChannelType();
if (!_channel) {
LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name();
return NULL;
......@@ -229,8 +245,14 @@ class OpWithChannel : public Op {
int release_channel() {
if (_channel) {
_channel->deinit();
butil::return_object<ChannelType>(_channel);
delete _channel;
}
// TODO(barriery): There are some problems in using butil::get_object
/*
if (_channel) {
_channel->deinit();
butil::return_object<ChannelType>(_channel);
} */
_channel = NULL;
return 0;
......
......@@ -40,10 +40,15 @@ message Request {
};
message Response {
repeated FetchInst insts = 1;
repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2;
};
message ModelOutput {
repeated FetchInst insts = 1;
optional string engine_name = 2;
}
service GeneralModelService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
......
# Model Ensemble in Paddle Serving
([简体中文](MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md)|English)
In some scenarios, multiple models with the same input may be used to predict in parallel and integrate predicted results for better prediction effect. Paddle Serving also supports this feature.
Next, we will take the text classification task as an example to show model ensemble in Paddle Serving (This feature is still serial prediction for the time being. We will support parallel prediction as soon as possible).
## Simple example
In this example (see the figure below), the server side predict the bow and CNN models with the same input in a service in parallel, The client side fetchs the prediction results of the two models, and processes the prediction results to get the final predict results.
![simple example](model_ensemble_example.png)
It should be noted that at present, only multiple models with the same format input and output in the same service are supported. In this example, the input and output formats of CNN and BOW model are the same.
The code used in the example is saved in the `python/examples/imdb` path:
```shell
.
├── get_data.sh
├── imdb_reader.py
├── test_ensemble_client.py
└── test_ensemble_server.py
```
### Prepare data
Get the pre-trained CNN and BOW models by the following command (you can also run the `get_data.sh` script):
```shell
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz
tar -zxvf imdb_model.tar.gz
```
### Start server
Start server by the following Python code (you can also run the `test_ensemble_server.py` script):
```python
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = Server()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
```
Different from the normal prediction service, here we need to use DAG to describe the logic of the server side.
When creating an Op, you need to specify the predecessor of the current Op (in this example, the predecessor of `cnn_infer_op` and `bow_infer_op` is `read_op`, and the predecessor of `response_op` is `cnn_infer_op` and `bow_infer_op`. For the infer Op `infer_op`, you need to define the prediction engine name `engine_name` (You can also use the default value. It is recommended to set the value to facilitate the client side to obtain the order of prediction results).
At the same time, when configuring the model path, you need to create a model configuration dictionary with the infer Op as the key and the corresponding model path as value to inform Serving which model each infer OP uses.
### Start client
Start client by the following Python code (you can also run the `test_ensemble_client.py` script):
```python
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
client = Client()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
```
Compared with the normal prediction service, the client side has not changed much. When multiple model predictions are used, the prediction service will return a dictionary with engine name `engine_name`(the value is defined on the server side) as the key, and the corresponding model prediction results as the value.
### Expected result
```shell
step: 0, model: cnn, res: 0.560272455215
step: 0, model: bow, res: 0.633530199528
step: 1, model: cnn, res: 0.560272455215
step: 1, model: bow, res: 0.633530199528
step: 2, model: cnn, res: 0.560272455215
step: 2, model: bow, res: 0.633530199528
```
# Paddle Serving中的集成预测
(简体中文|[English](MODEL_ENSEMBLE_IN_PADDLE_SERVING.md))
在一些场景中,可能使用多个相同输入的模型并行集成预测以获得更好的预测效果,Paddle Serving提供了这项功能。
下面将以文本分类任务为例,来展示Paddle Serving的集成预测功能(暂时还是串行预测,我们会尽快支持并行化)。
## 集成预测样例
该样例中(见下图),Server端在一项服务中并行预测相同输入的BOW和CNN模型,Client端获取两个模型的预测结果并进行后处理,得到最终的预测结果。
![simple example](model_ensemble_example.png)
需要注意的是,目前只支持在同一个服务中使用多个相同格式输入输出的模型。在该例子中,CNN模型和BOW模型的输入输出格式是相同的。
样例中用到的代码保存在`python/examples/imdb`路径下:
```shell
.
├── get_data.sh
├── imdb_reader.py
├── test_ensemble_client.py
└── test_ensemble_server.py
```
### 数据准备
通过下面命令获取预训练的CNN和BOW模型(您也可以直接运行`get_data.sh`脚本):
```shell
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz
tar -zxvf imdb_model.tar.gz
```
### 启动Server
通过下面的Python代码启动Server端(您也可以直接运行`test_ensemble_server.py`脚本):
```python
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = Server()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
```
与普通预测服务不同的是,这里我们需要用DAG来描述Server端的运行逻辑。
在创建Op的时候需要指定当前Op的前继(在该例子中,`cnn_infer_op``bow_infer_op`的前继均是`read_op``response_op`的前继是`cnn_infer_op``bow_infer_op`),对于预测Op`infer_op`还需要定义预测引擎名称`engine_name`(也可以使用默认值,建议设置该值方便Client端获取预测结果)。
同时在配置模型路径时,需要以预测Op为key,对应的模型路径为value,创建模型配置字典,来告知Serving每个预测Op使用哪个模型。
### 启动Client
通过下面的Python代码运行Client端(您也可以直接运行`test_ensemble_client.py`脚本):
```python
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
client = Client()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
```
Client端与普通预测服务没有发生太大的变化。当使用多个模型预测时,预测服务将返回一个key为Server端定义的引擎名称`engine_name`,value为对应的模型预测结果的字典。
### 预期结果
```txt
step: 0, model: cnn, res: 0.560272455215
step: 0, model: bow, res: 0.633530199528
step: 1, model: cnn, res: 0.560272455215
step: 1, model: bow, res: 0.633530199528
step: 2, model: cnn, res: 0.560272455215
step: 2, model: bow, res: 0.633530199528
```
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
client = Client()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = Server()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
......@@ -264,28 +264,45 @@ class Client(object):
if res == -1:
return None
result_map_batch = []
multi_result_map = []
model_engine_names = result_batch.get_engine_names()
for mi, engine_name in enumerate(model_engine_names):
result_map = {}
# result map needs to be a numpy array
for i, name in enumerate(fetch_names):
if self.fetch_names_to_type_[name] == int_type:
result_map[name] = result_batch.get_int64_by_name(name)
shape = result_batch.get_shape(name)
result_map[name] = result_batch.get_int64_by_name(mi, name)
shape = result_batch.get_shape(mi, name)
result_map[name] = np.array(result_map[name])
result_map[name].shape = shape
if name in self.lod_tensor_set:
result_map["{}.lod".format(name)] = result_batch.get_lod(
name)
result_map["{}.lod".format(
name)] = result_batch.get_lod(mi, name)
elif self.fetch_names_to_type_[name] == float_type:
result_map[name] = result_batch.get_float_by_name(name)
shape = result_batch.get_shape(name)
result_map[name] = result_batch.get_float_by_name(mi, name)
shape = result_batch.get_shape(mi, name)
result_map[name] = np.array(result_map[name])
result_map[name].shape = shape
if name in self.lod_tensor_set:
result_map["{}.lod".format(name)] = result_batch.get_lod(
name)
return result_map
result_map["{}.lod".format(
name)] = result_batch.get_lod(mi, name)
multi_result_map.append(result_map)
ret = None
if len(model_engine_names) == 1:
# If only one model result is returned, the format of ret is result_map
ret = multi_result_map[0]
else:
# If multiple model results are returned, the format of ret is {name: result_map}
ret = {
engine_name: multi_result_map[mi]
for mi, engine_name in enumerate(model_engine_names)
}
# When using the A/B test, the tag of variant needs to be returned
return ret if not need_variant_tag else [
ret, self.result_handle_.variant_tag()
]
def release(self):
self.client_handle_.destroy_predictor()
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import os
from .proto import server_configure_pb2 as server_sdk
......@@ -21,6 +22,7 @@ import socket
import paddle_serving_server as paddle_serving_server
from .version import serving_server_version
from contextlib import closing
import collections
class OpMaker(object):
......@@ -36,17 +38,35 @@ class OpMaker(object):
"general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
"general_copy": "GeneralCopyOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used
# when we have OpGraphMaker, inputs and outputs are necessary
def create(self, name, inputs=[], outputs=[]):
if name not in self.op_dict:
raise Exception("Op name {} is not supported right now".format(
name))
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
node.name = "{}_op".format(name)
node.type = self.op_dict[name]
return node
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
......@@ -55,12 +75,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node):
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
......@@ -69,13 +102,30 @@ class OpSeqMaker(object):
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object):
def __init__(self):
self.server_handle_ = None
self.infer_service_conf = None
self.model_toolkit_conf = None
self.resource_conf = None
self.engine = None
self.memory_optimization = False
self.model_conf = None
self.workflow_fn = "workflow.prototxt"
......@@ -94,6 +144,7 @@ class Server(object):
self.cur_path = os.getcwd()
self.use_local_bin = False
self.mkl_flag = False
self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency
......@@ -118,6 +169,9 @@ class Server(object):
def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False):
self.memory_optimization = flag
......@@ -126,32 +180,30 @@ class Server(object):
self.use_local_bin = True
self.bin_path = os.environ["SERVING_BIN"]
def _prepare_engine(self, model_config_path, device):
def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if self.engine == None:
self.engine = server_sdk.EngineDesc()
self.model_config_path = model_config_path
self.engine.name = "general_model"
self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
os.system("touch {}".format(self.engine.reloadable_meta))
self.engine.reloadable_type = "timestamp_ne"
self.engine.runtime_thread_num = 0
self.engine.batch_infer_size = 0
self.engine.enable_batch_align = 0
self.engine.model_data_path = model_config_path
self.engine.enable_memory_optimization = self.memory_optimization
self.engine.static_optimization = False
self.engine.force_update_static_cache = False
for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc()
engine.name = engine_name
engine.reloadable_meta = model_config_path + "/fluid_time_file"
os.system("touch {}".format(engine.reloadable_meta))
engine.reloadable_type = "timestamp_ne"
engine.runtime_thread_num = 0
engine.batch_infer_size = 0
engine.enable_batch_align = 0
engine.model_data_path = model_config_path
engine.enable_memory_optimization = self.memory_optimization
engine.static_optimization = False
engine.force_update_static_cache = False
if device == "cpu":
self.engine.type = "FLUID_CPU_ANALYSIS_DIR"
engine.type = "FLUID_CPU_ANALYSIS_DIR"
elif device == "gpu":
self.engine.type = "FLUID_GPU_ANALYSIS_DIR"
engine.type = "FLUID_GPU_ANALYSIS_DIR"
self.model_toolkit_conf.engines.extend([self.engine])
self.model_toolkit_conf.engines.extend([engine])
def _prepare_infer_service(self, port):
if self.infer_service_conf == None:
......@@ -184,10 +236,49 @@ class Server(object):
with open(filepath, "w") as fout:
fout.write(str(pb_obj))
def load_model_config(self, path):
self.model_config_path = path
def load_model_config(self, model_config_paths):
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names = [
'general_infer_0', 'general_dist_kv_infer_0',
'general_dist_kv_quant_infer_0'
]
engine_name = None
for node in self.workflow_conf.workflows[0].nodes:
if node.name in default_engine_names:
engine_name = node.name
break
if engine_name is None:
raise Exception(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self.model_config_paths = {engine_name: model_config_paths}
workflow_oi_config_path = self.model_config_paths[engine_name]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r')
f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf)
# check config here
......@@ -258,7 +349,7 @@ class Server(object):
if not self.port_is_available(port):
raise SystemExit("Prot {} is already used".format(port))
self._prepare_resource(workdir)
self._prepare_engine(self.model_config_path, device)
self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port)
self.workdir = workdir
......
......@@ -62,22 +62,14 @@ class WebService(object):
abort(400)
try:
feed, fetch = self.preprocess(request.json, request.json["fetch"])
if isinstance(feed, list):
fetch_map_batch = self.client_service.predict(
feed_batch=feed, fetch=fetch)
fetch_map_batch = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map_batch)
for key in fetch_map_batch:
fetch_map_batch[key] = fetch_map_batch[key].tolist()
result = {"result": fetch_map_batch}
elif isinstance(feed, dict):
if "fetch" in feed:
if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"]
fetch_map = self.client_service.predict(feed=feed, fetch=fetch)
for key in fetch_map:
fetch_map[key] = fetch_map[key][0].tolist()
result = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError:
result = {"result": "Request Value Error"}
return result
......
......@@ -24,6 +24,7 @@ import time
from .version import serving_server_version
from contextlib import closing
import argparse
import collections
def serve_args():
......@@ -66,17 +67,35 @@ class OpMaker(object):
"general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv": "GeneralDistKVOp"
}
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used
# when we have OpGraphMaker, inputs and outputs are necessary
def create(self, name, inputs=[], outputs=[]):
if name not in self.op_dict:
raise Exception("Op name {} is not supported right now".format(
name))
def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format(
node_type))
node = server_sdk.DAGNode()
node.name = "{}_op".format(name)
node.type = self.op_dict[name]
return node
# node.name will be used as the infer engine name
if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object):
......@@ -85,12 +104,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence"
def add_op(self, node):
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name
dep.mode = "RO"
node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node])
def get_op_sequence(self):
......@@ -99,13 +131,30 @@ class OpSeqMaker(object):
return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object):
def __init__(self):
self.server_handle_ = None
self.infer_service_conf = None
self.model_toolkit_conf = None
self.resource_conf = None
self.engine = None
self.memory_optimization = False
self.model_conf = None
self.workflow_fn = "workflow.prototxt"
......@@ -125,6 +174,7 @@ class Server(object):
self.check_cuda()
self.use_local_bin = False
self.gpuid = 0
self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency
......@@ -149,6 +199,9 @@ class Server(object):
def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False):
self.memory_optimization = flag
......@@ -167,33 +220,31 @@ class Server(object):
def set_gpuid(self, gpuid=0):
self.gpuid = gpuid
def _prepare_engine(self, model_config_path, device):
def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if self.engine == None:
self.engine = server_sdk.EngineDesc()
self.model_config_path = model_config_path
self.engine.name = "general_model"
#self.engine.reloadable_meta = model_config_path + "/fluid_time_file"
self.engine.reloadable_meta = self.workdir + "/fluid_time_file"
os.system("touch {}".format(self.engine.reloadable_meta))
self.engine.reloadable_type = "timestamp_ne"
self.engine.runtime_thread_num = 0
self.engine.batch_infer_size = 0
self.engine.enable_batch_align = 0
self.engine.model_data_path = model_config_path
self.engine.enable_memory_optimization = self.memory_optimization
self.engine.static_optimization = False
self.engine.force_update_static_cache = False
for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc()
engine.name = engine_name
# engine.reloadable_meta = model_config_path + "/fluid_time_file"
engine.reloadable_meta = self.workdir + "/fluid_time_file"
os.system("touch {}".format(engine.reloadable_meta))
engine.reloadable_type = "timestamp_ne"
engine.runtime_thread_num = 0
engine.batch_infer_size = 0
engine.enable_batch_align = 0
engine.model_data_path = model_config_path
engine.enable_memory_optimization = self.memory_optimization
engine.static_optimization = False
engine.force_update_static_cache = False
if device == "cpu":
self.engine.type = "FLUID_CPU_ANALYSIS_DIR"
engine.type = "FLUID_CPU_ANALYSIS_DIR"
elif device == "gpu":
self.engine.type = "FLUID_GPU_ANALYSIS_DIR"
engine.type = "FLUID_GPU_ANALYSIS_DIR"
self.model_toolkit_conf.engines.extend([self.engine])
self.model_toolkit_conf.engines.extend([engine])
def _prepare_infer_service(self, port):
if self.infer_service_conf == None:
......@@ -225,10 +276,49 @@ class Server(object):
with open(filepath, "w") as fout:
fout.write(str(pb_obj))
def load_model_config(self, path):
self.model_config_path = path
def load_model_config(self, model_config_paths):
# At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names = [
'general_infer_0', 'general_dist_kv_infer_0',
'general_dist_kv_quant_infer_0'
]
engine_name = None
for node in self.workflow_conf.workflows[0].nodes:
if node.name in default_engine_names:
engine_name = node.name
break
if engine_name is None:
raise Exception(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self.model_config_paths = {engine_name: model_config_paths}
workflow_oi_config_path = self.model_config_paths[engine_name]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r')
f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf)
# check config here
......@@ -291,7 +381,7 @@ class Server(object):
self.set_port(port)
self._prepare_resource(workdir)
self._prepare_engine(self.model_config_path, device)
self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port)
self.workdir = workdir
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from flask import Flask, request, abort
from paddle_serving_server_gpu import OpMaker, OpSeqMaker, Server
......@@ -103,13 +104,18 @@ class WebService(object):
abort(400)
if "fetch" not in request.json:
abort(400)
try:
feed, fetch = self.preprocess(request.json, request.json["fetch"])
fetch_map_batch = self.client.predict(feed=feed, fetch=fetch)
fetch_map_batch = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map_batch)
for key in fetch_map_batch:
fetch_map_batch[key] = fetch_map_batch[key].tolist()
result = {"result": fetch_map_batch}
if isinstance(feed, dict) and "fetch" in feed:
del feed["fetch"]
fetch_map = self.client.predict(feed=feed, fetch=fetch)
for key in fetch_map:
fetch_map[key] = fetch_map[key][0].tolist()
result = self.postprocess(
feed=request.json, fetch=fetch, fetch_map=fetch_map)
result = {"result": result}
except ValueError:
result = {"result": "Request Value Error"}
return result
def run_server(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册