提交 cb2e00d9 编写于 作者: B barrierye

get engine name in Client

上级 3de8d893
...@@ -27,6 +27,9 @@ ...@@ -27,6 +27,9 @@
#include "core/sdk-cpp/general_model_service.pb.h" #include "core/sdk-cpp/general_model_service.pb.h"
#include "core/sdk-cpp/include/common.h" #include "core/sdk-cpp/include/common.h"
#include "core/sdk-cpp/include/predictor_sdk.h" #include "core/sdk-cpp/include/predictor_sdk.h"
#define BLOG(fmt, ...) \
printf( \
"[%s:%s]:%d " fmt "\n", __FILE__, __FUNCTION__, __LINE__, ##__VA_ARGS__)
using baidu::paddle_serving::sdk_cpp::Predictor; using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi; using baidu::paddle_serving::sdk_cpp::PredictorApi;
...@@ -42,8 +45,19 @@ namespace general_model { ...@@ -42,8 +45,19 @@ namespace general_model {
class ModelRes { class ModelRes {
public: public:
ModelRes() {} ModelRes() {}
ModelRes(const ModelRes& res) {
_engine_name = res._engine_name;
_int64_map.insert(res._int64_map.begin(), res._int64_map.end());
_float_map.insert(res._float_map.begin(), res._float_map.end());
}
ModelRes(ModelRes&& res) {
_engine_name = std::move(res._engine_name);
_int64_map.insert(std::make_move_iterator(std::begin(res._int64_map)),
std::make_move_iterator(std::end(res._int64_map)));
_float_map.insert(std::make_move_iterator(std::begin(res._float_map)),
std::make_move_iterator(std::end(res._float_map)));
}
~ModelRes() {} ~ModelRes() {}
public:
const std::vector<std::vector<int64_t>>& get_int64_by_name( const std::vector<std::vector<int64_t>>& get_int64_by_name(
const std::string& name) { const std::string& name) {
return _int64_map[name]; return _int64_map[name];
...@@ -55,19 +69,18 @@ class ModelRes { ...@@ -55,19 +69,18 @@ class ModelRes {
void set_engine_name(const std::string& engine_name) { void set_engine_name(const std::string& engine_name) {
_engine_name = engine_name; _engine_name = engine_name;
} }
const std::string& engine_name() { const std::string& engine_name() { return _engine_name; }
return engine_name; ModelRes& operator=(ModelRes&& res) {
}
ModelRes& operator = (ModelRes&& res) {
std::cout << "move ++++++++>";
if (this != &res) { if (this != &res) {
_int64_map = res._int64_map; _engine_name = std::move(res._engine_name);
_float_map = res._float_map; _int64_map.insert(std::make_move_iterator(std::begin(res._int64_map)),
res._int64_map = nullptr; std::make_move_iterator(std::end(res._int64_map)));
res._float_map = nullptr; _float_map.insert(std::make_move_iterator(std::begin(res._float_map)),
std::make_move_iterator(std::end(res._float_map)));
} }
return *this; return *this;
} }
public: public:
std::string _engine_name; std::string _engine_name;
std::map<std::string, std::vector<std::vector<int64_t>>> _int64_map; std::map<std::string, std::vector<std::vector<int64_t>>> _int64_map;
...@@ -82,7 +95,7 @@ class PredictorRes { ...@@ -82,7 +95,7 @@ class PredictorRes {
public: public:
void clear() { void clear() {
_models.clear(); _models.clear();
_engine_names.clear(); _engine_names.clear();
} }
const std::vector<std::vector<int64_t>>& get_int64_by_name( const std::vector<std::vector<int64_t>>& get_int64_by_name(
const int model_idx, const std::string& name) { const int model_idx, const std::string& name) {
...@@ -94,16 +107,13 @@ class PredictorRes { ...@@ -94,16 +107,13 @@ class PredictorRes {
} }
void add_model_res(ModelRes&& res) { void add_model_res(ModelRes&& res) {
_engine_names.push_back(res.engine_name()); _engine_names.push_back(res.engine_name());
_models.emplace_back(res); _models.emplace_back(std::move(res));
} }
void set_variant_tag(const std::string& variant_tag) { void set_variant_tag(const std::string& variant_tag) {
_variant_tag = variant_tag; _variant_tag = variant_tag;
} }
const std::string& variant_tag() { return _variant_tag; } const std::string& variant_tag() { return _variant_tag; }
int model_num() {return _models.size();} const std::vector<std::string>& get_engine_names() { return _engine_names; }
const std::vector<std::string>& get_engine_names() {
return _engine_names;
}
private: private:
std::vector<ModelRes> _models; std::vector<ModelRes> _models;
......
...@@ -219,35 +219,39 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed, ...@@ -219,35 +219,39 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
postprocess_start = client_infer_end; postprocess_start = client_infer_end;
// multi-model output // multi-model output
uint32_t model_num = res.outputs_size(); uint32_t model_num = res.outputs_size();
predict_res._models.resize(model_num); // predict_res._models.resize(model_num);
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) { for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx; VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx); auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name]; int idx = _fetch_name_to_idx[name];
VLOG(2) << "fetch name: " << name; VLOG(2) << "fetch name: " << name;
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
int len = output.insts(0).tensor_array(idx).int64_data_size(); int len = output.insts(0).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len; VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len;
predict_res._models[m_idx]._int64_map[name].resize(1); model._int64_map[name].resize(1);
predict_res._models[m_idx]._int64_map[name][0].resize(len); model._int64_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res._models[m_idx]._int64_map[name][0][i] = model._int64_map[name][0][i] =
output.insts(0).tensor_array(idx).int64_data(i); output.insts(0).tensor_array(idx).int64_data(i);
} }
} else if (_fetch_name_to_type[name] == 1) { } else if (_fetch_name_to_type[name] == 1) {
int len = output.insts(0).tensor_array(idx).float_data_size(); int len = output.insts(0).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name << " type: float32 len : " << len; VLOG(2) << "fetch tensor : " << name
predict_res._models[m_idx]._float_map[name].resize(1); << " type: float32 len : " << len;
predict_res._models[m_idx]._float_map[name][0].resize(len); model._float_map[name].resize(1);
model._float_map[name][0].resize(len);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res._models[m_idx]._float_map[name][0][i] = model._float_map[name][0][i] =
output.insts(0).tensor_array(idx).float_data(i); output.insts(0).tensor_array(idx).float_data(i);
} }
} }
//TODO // TODO
postprocess_end = timeline.TimeStampUS(); postprocess_end = timeline.TimeStampUS();
} }
predict_res.add_model_res(std::move(model));
} }
} }
...@@ -259,7 +263,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed, ...@@ -259,7 +263,7 @@ int PredictorClient::predict(const std::vector<std::vector<float>> &float_feed,
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " "; << "client_infer_1:" << client_infer_end << " ";
//TODO: multi-model // TODO: multi-model
if (FLAGS_profile_server) { if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2; int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) { for (int i = 0; i < op_num; ++i) {
...@@ -303,7 +307,7 @@ int PredictorClient::batch_predict( ...@@ -303,7 +307,7 @@ int PredictorClient::batch_predict(
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
req.add_fetch_var_names(name); req.add_fetch_var_names(name);
} }
for (int bi = 0; bi < batch_size; bi++) { for (int bi = 0; bi < batch_size; bi++) {
VLOG(2) << "prepare batch " << bi; VLOG(2) << "prepare batch " << bi;
std::vector<Tensor *> tensor_vec; std::vector<Tensor *> tensor_vec;
...@@ -382,13 +386,15 @@ int PredictorClient::batch_predict( ...@@ -382,13 +386,15 @@ int PredictorClient::batch_predict(
postprocess_start = client_infer_end; postprocess_start = client_infer_end;
uint32_t model_num = res.outputs_size(); uint32_t model_num = res.outputs_size();
predict_res_batch._models.resize(model_num); // predict_res_batch._models.resize(model_num);
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) { for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx; VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx); auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
predict_res_batch._models[m_idx]._int64_map[name].resize(batch_size); model._int64_map[name].resize(batch_size);
predict_res_batch._models[m_idx]._float_map[name].resize(batch_size); model._float_map[name].resize(batch_size);
} }
VLOG(2) << "response batch size " << output.insts_size(); VLOG(2) << "response batch size " << output.insts_size();
VLOG(2) << "response var nmae " << output.insts(0).tensor_array_size(); VLOG(2) << "response var nmae " << output.insts(0).tensor_array_size();
...@@ -398,29 +404,33 @@ int PredictorClient::batch_predict( ...@@ -398,29 +404,33 @@ int PredictorClient::batch_predict(
int len = output.insts(bi).tensor_array(idx).data_size(); int len = output.insts(bi).tensor_array(idx).data_size();
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
int len = output.insts(bi).tensor_array(idx).int64_data_size(); int len = output.insts(bi).tensor_array(idx).int64_data_size();
VLOG(2) << "fetch tensor : " << name << " type: int64 len : " << len; VLOG(2) << "fetch tensor : " << name
predict_res_batch._models[m_idx]._int64_map[name][bi].resize(len); << " type: int64 len : " << len;
VLOG(2) << "fetch name " << name << " index " << idx << " first data " model._int64_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx
<< " first data "
<< output.insts(bi).tensor_array(idx).int64_data(0); << output.insts(bi).tensor_array(idx).int64_data(0);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res_batch._models[m_idx]._int64_map[name][bi][i] = model._int64_map[name][bi][i] =
output.insts(bi).tensor_array(idx).int64_data(i); output.insts(bi).tensor_array(idx).int64_data(i);
} }
} else if (_fetch_name_to_type[name] == 1) { } else if (_fetch_name_to_type[name] == 1) {
int len = output.insts(bi).tensor_array(idx).float_data_size(); int len = output.insts(bi).tensor_array(idx).float_data_size();
VLOG(2) << "fetch tensor : " << name VLOG(2) << "fetch tensor : " << name
<< " type: float32 len : " << len; << " type: float32 len : " << len;
predict_res_batch._models[m_idx]._float_map[name][bi].resize(len); model._float_map[name][bi].resize(len);
VLOG(2) << "fetch name " << name << " index " << idx << " first data " VLOG(2) << "fetch name " << name << " index " << idx
<< " first data "
<< output.insts(bi).tensor_array(idx).float_data(0); << output.insts(bi).tensor_array(idx).float_data(0);
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i) {
predict_res_batch._models[m_idx]._float_map[name][bi][i] = model._float_map[name][bi][i] =
output.insts(bi).tensor_array(idx).float_data(i); output.insts(bi).tensor_array(idx).float_data(i);
} }
} }
idx += 1; idx += 1;
} }
} }
predict_res_batch.add_model_res(std::move(model));
postprocess_end = timeline.TimeStampUS(); postprocess_end = timeline.TimeStampUS();
} }
} }
...@@ -433,7 +443,7 @@ int PredictorClient::batch_predict( ...@@ -433,7 +443,7 @@ int PredictorClient::batch_predict(
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " "; << "client_infer_1:" << client_infer_end << " ";
//TODO: multi-models // TODO: multi-models
if (FLAGS_profile_server) { if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2; int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) { for (int i = 0; i < op_num; ++i) {
......
...@@ -40,10 +40,9 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -40,10 +40,9 @@ PYBIND11_MODULE(serving_client, m) {
return self.get_float_by_name(model_idx, name); return self.get_float_by_name(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("variant_tag", .def("variant_tag", [](PredictorRes &self) { return self.variant_tag(); })
[](PredictorRes &self) { return self.variant_tag(); }) .def("get_engine_names",
.def("model_num", [](PredictorRes &self) { return self.get_engine_names(); });
[](PredictorRes &self) {return self.model_num(); });
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol()) py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init()) .def(py::init())
......
...@@ -37,8 +37,11 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -37,8 +37,11 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralInferOp::inference() { int GeneralInferOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) { if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name() <<") can only have one predecessor op, but received " << pre_node_names.size(); LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1; return -1;
} }
const std::string pre_name = pre_node_names[0]; const std::string pre_name = pre_node_names[0];
...@@ -65,8 +68,9 @@ int GeneralInferOp::inference() { ...@@ -65,8 +68,9 @@ int GeneralInferOp::inference() {
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
timeline.Start(); timeline.Start();
if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) { if (InferManager::instance().infer(
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str();
return -1; return -1;
} }
......
...@@ -40,7 +40,7 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -40,7 +40,7 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralResponseOp::inference() { int GeneralResponseOp::inference() {
const std::vector<std::string> pre_node_names = pre_names(); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size(); VLOG(2) << "pre node names size: " << pre_node_names.size();
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names // response inst with only fetch_var_names
Response *res = mutable_data<Response>(); Response *res = mutable_data<Response>();
...@@ -67,13 +67,14 @@ int GeneralResponseOp::inference() { ...@@ -67,13 +67,14 @@ int GeneralResponseOp::inference() {
const GeneralBlob *input_blob; const GeneralBlob *input_blob;
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
VLOG(2) << "pre names[" << pi << "]: " const std::string &pre_name = pre_node_names[pi];
<< pre_node_names[pi] << " (" VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")"; << pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]); input_blob = get_depend_argument<GeneralBlob>(pre_name);
fprintf(stderr, "input(%s) blob address %x\n", pre_node_names[pi].c_str(), input_blob); // fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_node_names[pi]; LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
return -1; return -1;
} }
...@@ -82,6 +83,8 @@ int GeneralResponseOp::inference() { ...@@ -82,6 +83,8 @@ int GeneralResponseOp::inference() {
VLOG(2) << "input batch size: " << batch_size; VLOG(2) << "input batch size: " << batch_size;
ModelOutput *output = res->add_outputs(); ModelOutput *output = res->add_outputs();
output->set_engine_name(
pre_name); // To get the order of model return values
for (int i = 0; i < batch_size; ++i) { for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = output->add_insts(); FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) { for (auto &idx : fetch_index) {
...@@ -114,7 +117,8 @@ int GeneralResponseOp::inference() { ...@@ -114,7 +117,8 @@ int GeneralResponseOp::inference() {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) { k++) {
FetchInst *fetch_p = output->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[k]); fetch_p->mutable_tensor_array(var_idx)->add_int64_data(
data_ptr[k]);
} }
} }
} else { } else {
...@@ -130,7 +134,8 @@ int GeneralResponseOp::inference() { ...@@ -130,7 +134,8 @@ int GeneralResponseOp::inference() {
} else { } else {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
FetchInst *fetch_p = output->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[0]); fetch_p->mutable_tensor_array(var_idx)->add_int64_data(
data_ptr[0]);
} }
} }
} }
...@@ -142,7 +147,8 @@ int GeneralResponseOp::inference() { ...@@ -142,7 +147,8 @@ int GeneralResponseOp::inference() {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) { k++) {
FetchInst *fetch_p = output->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[k]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[k]);
} }
} }
} else { } else {
...@@ -158,7 +164,8 @@ int GeneralResponseOp::inference() { ...@@ -158,7 +164,8 @@ int GeneralResponseOp::inference() {
} else { } else {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
FetchInst *fetch_p = output->mutable_insts(j); FetchInst *fetch_p = output->mutable_insts(j);
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[0]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(
data_ptr[0]);
} }
} }
} }
...@@ -169,10 +176,10 @@ int GeneralResponseOp::inference() { ...@@ -169,10 +176,10 @@ int GeneralResponseOp::inference() {
if (req->profile_server()) { if (req->profile_server()) {
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
for (uint32_t i = 0; i< pre_node_names.size(); ++i) { for (uint32_t i = 0; i < pre_node_names.size(); ++i) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[i]); input_blob = get_depend_argument<GeneralBlob>(pre_node_names[i]);
VLOG(2) << "p size for input blob: " << input_blob->p_size; VLOG(2) << "p size for input blob: " << input_blob->p_size;
ModelOutput* output = res->mutable_outputs(i); ModelOutput *output = res->mutable_outputs(i);
for (int i = 0; i < input_blob->p_size; ++i) { for (int i = 0; i < input_blob->p_size; ++i) {
output->add_profile_time(input_blob->time_stamp[i]); output->add_profile_time(input_blob->time_stamp[i]);
} }
......
...@@ -37,9 +37,11 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -37,9 +37,11 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralTextResponseOp::inference() { int GeneralTextResponseOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
//TODO: multi-predecessor const std::vector<std::string> pre_node_names = pre_names();
if (pre_node_names.size() != 1) { if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name() <<") can only have one predecessor op, but received " << pre_node_names.size(); LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1; return -1;
} }
const std::string pre_name = pre_node_names[0]; const std::string pre_name = pre_node_names[0];
...@@ -51,6 +53,8 @@ int GeneralTextResponseOp::inference() { ...@@ -51,6 +53,8 @@ int GeneralTextResponseOp::inference() {
return -1; return -1;
} }
// TODO: multi-predecessor
/*
const TensorVector *in = &input_blob->tensor_vector; const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize(); int batch_size = input_blob->GetBatchSize();
...@@ -133,7 +137,7 @@ int GeneralTextResponseOp::inference() { ...@@ -133,7 +137,7 @@ int GeneralTextResponseOp::inference() {
res->add_profile_time(start); res->add_profile_time(start);
res->add_profile_time(end); res->add_profile_time(end);
} }
*/
return 0; return 0;
} }
DEFINE_OP(GeneralTextResponseOp); DEFINE_OP(GeneralTextResponseOp);
......
...@@ -46,6 +46,7 @@ message Response { ...@@ -46,6 +46,7 @@ message Response {
message ModelOutput { message ModelOutput {
repeated FetchInst insts = 1; repeated FetchInst insts = 1;
repeated int64 profile_time = 2; repeated int64 profile_time = 2;
optional string engine_name = 3;
} }
service GeneralModelService { service GeneralModelService {
......
...@@ -46,6 +46,7 @@ message Response { ...@@ -46,6 +46,7 @@ message Response {
message ModelOutput { message ModelOutput {
repeated FetchInst insts = 1; repeated FetchInst insts = 1;
repeated int64 profile_time = 2; repeated int64 profile_time = 2;
optional string engine_name = 3;
} }
service GeneralModelService { service GeneralModelService {
......
...@@ -247,8 +247,8 @@ class Client(object): ...@@ -247,8 +247,8 @@ class Client(object):
return None return None
multi_result_map_batch = [] multi_result_map_batch = []
model_num = result_batch.model_num() model_engine_names = result_batch.get_engine_names()
for mi in range(model_num): for mi, engine_name in enumerate(model_engine_names):
result_map_batch = [] result_map_batch = []
result_map = {} result_map = {}
for i, name in enumerate(fetch_names): for i, name in enumerate(fetch_names):
...@@ -263,21 +263,23 @@ class Client(object): ...@@ -263,21 +263,23 @@ class Client(object):
result_map_batch.append(single_result) result_map_batch.append(single_result)
multi_result_map_batch.append(result_map_batch) multi_result_map_batch.append(result_map_batch)
if model_num == 1: ret = None
if len(model_engine_names) == 1:
if batch_size == 1: if batch_size == 1:
return [multi_result_map_batch[0][0], self.result_handle_.variant_tag() ret = multi_result_map_batch[0][0]
] if need_variant_tag else multi_result_map_batch[0][0]
else: else:
return [multi_result_map_batch[0], self.result_handle_.variant_tag() ret = multi_result_map_batch[0]
] if need_variant_tag else multi_result_map_batch[0]
else: else:
ret = {}
if batch_size == 1: if batch_size == 1:
multi_result_map = [result_map_batch[0] for result_map_batch in multi_result_map_batch] for mi, result_map_batch in enumerate(multi_result_map_batch):
return [multi_result_map, self.result_handle_.variant_tag() ret[model_engine_names[mi]] = result_map_batch[0]
] if need_variant_tag else multi_result_map
else: else:
return [multi_result_map_batch, self.result_handle_.variant_tag() for mi, result_map_batch in enumerate(multi_result_map_batch):
] if need_variant_tag else multi_result_map_batch ret[model_engine_names[mi]] = result_map_batch
return [ret, self.result_handle_.variant_tag()
] if need_variant_tag else ret
def release(self): def release(self):
self.client_handle_.destroy_predictor() self.client_handle_.destroy_predictor()
self.client_handle_ = None self.client_handle_ = None
...@@ -21,6 +21,7 @@ import socket ...@@ -21,6 +21,7 @@ import socket
import paddle_serving_server as paddle_serving_server import paddle_serving_server as paddle_serving_server
from .version import serving_server_version from .version import serving_server_version
from contextlib import closing from contextlib import closing
import collections
class OpMaker(object): class OpMaker(object):
...@@ -36,18 +37,37 @@ class OpMaker(object): ...@@ -36,18 +37,37 @@ class OpMaker(object):
"general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp", "general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
"general_copy": "GeneralCopyOp" "general_copy": "GeneralCopyOp"
} }
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used # currently, inputs and outputs are not used
# when we have OpGraphMaker, inputs and outputs are necessary # when we have OpGraphMaker, inputs and outputs are necessary
def create(self, node_type, node_name=None, inputs=[], outputs=[]): def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
if node_type not in self.op_dict: if node_type not in self.op_dict:
raise Exception("Op type {} is not supported right now".format( raise Exception("Op type {} is not supported right now".format(
node_type)) node_type))
node = server_sdk.DAGNode() node = server_sdk.DAGNode()
node.name = node_name if node_name is not None else "{}_op".format( # node.name will be used as the infer engine name
node_type) if engine_name:
node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type] node.type = self.op_dict[node_type]
return node if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object): class OpSeqMaker(object):
...@@ -56,19 +76,25 @@ class OpSeqMaker(object): ...@@ -56,19 +76,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1" self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence" self.workflow.workflow_type = "Sequence"
def add_op(self, node, dependent_nodes=None): def add_op(self, node_str):
if dependent_nodes is None: node = server_sdk.DAGNode()
if len(self.workflow.nodes) >= 1: google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency() dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name dep.name = self.workflow.nodes[-1].name
dep.mode = "RO" dep.mode = "RO"
node.dependencies.extend([dep]) node.dependencies.extend([dep])
else: elif len(node.dependencies) == 1:
for dep_node in dependent_nodes: if node.dependencies[0].name != self.workflow.nodes[-1].name:
dep = server_sdk.DAGNodeDependency() raise Exception(
dep.name = dep_node.name 'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
dep.mode = "RO" format(node.dependencies[0].name, self.workflow.nodes[
node.dependencies.extend([dep]) -1].name))
self.workflow.nodes.extend([node]) self.workflow.nodes.extend([node])
def get_op_sequence(self): def get_op_sequence(self):
...@@ -77,6 +103,24 @@ class OpSeqMaker(object): ...@@ -77,6 +103,24 @@ class OpSeqMaker(object):
return workflow_conf return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object): class Server(object):
def __init__(self): def __init__(self):
self.server_handle_ = None self.server_handle_ = None
...@@ -100,7 +144,7 @@ class Server(object): ...@@ -100,7 +144,7 @@ class Server(object):
self.cur_path = os.getcwd() self.cur_path = os.getcwd()
self.use_local_bin = False self.use_local_bin = False
self.mkl_flag = False self.mkl_flag = False
self.model_config_paths = None self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
...@@ -117,6 +161,9 @@ class Server(object): ...@@ -117,6 +161,9 @@ class Server(object):
def set_op_sequence(self, op_seq): def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False): def set_memory_optimize(self, flag=False):
self.memory_optimization = flag self.memory_optimization = flag
...@@ -129,12 +176,6 @@ class Server(object): ...@@ -129,12 +176,6 @@ class Server(object):
if self.model_toolkit_conf == None: if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf() self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if isinstance(model_config_paths, str):
model_config_paths = {"general_infer_op": model_config_paths}
elif not isinstance(model_config_paths, dict):
raise Exception("model_config_paths can not be {}".format(
type(model_config_paths)))
for engine_name, model_config_path in model_config_paths.items(): for engine_name, model_config_path in model_config_paths.items():
engine = server_sdk.EngineDesc() engine = server_sdk.EngineDesc()
engine.name = engine_name engine.name = engine_name
...@@ -188,11 +229,33 @@ class Server(object): ...@@ -188,11 +229,33 @@ class Server(object):
fout.write(str(pb_obj)) fout.write(str(pb_obj))
def load_model_config(self, model_config_paths): def load_model_config(self, model_config_paths):
self.model_config_paths = model_config_paths # At present, Serving needs to configure the model path in
path = model_config_paths.items()[0][1] # the resource.prototxt file to determine the input and output
self.model_config_path = path # format of the workflow. To ensure that the input and output
# of multiple models are the same
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# the default engine name is "general_infer"
self.model_config_paths = {"general_infer_0": model_config_paths}
workflow_oi_config_path = self.model_config_paths["general_infer_0"]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig() self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r') f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge( self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf) str(f.read()), self.model_conf)
# check config here # check config here
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册