diff --git a/README.md b/README.md index 46b97be4236a9f2316c97b47396187fbce2cb22b..7c6df8d5ab4463c59c1ad250383f63ac1d01529e 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ pip install paddle-serving-server-gpu # GPU ``` You may need to use a domestic mirror source (in China, you can use the Tsinghua mirror source, add `-i https://pypi.tuna.tsinghua.edu.cn/simple` to pip command) to speed up the download. - + Client package support Centos 7 and Ubuntu 18, or you can use HTTP service without install client.

Quick Start Example

@@ -256,6 +256,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv ### Developers - [How to config Serving native operators on server side?](doc/SERVER_DAG.md) - [How to develop a new Serving operator?](doc/NEW_OPERATOR.md) +- [How to develop a new Web Service?](doc/NEW_WEB_SERVICE.md) - [Golang client](doc/IMDB_GO_CLIENT.md) - [Compile from source code](doc/COMPILE.md) diff --git a/README_CN.md b/README_CN.md index 4cafb499ee36168b93a244c66f7d5af4ea831160..e7f976098bf10476ed8bfb1d9d031ed4854acae6 100644 --- a/README_CN.md +++ b/README_CN.md @@ -262,6 +262,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv ### 开发者教程 - [如何配置Server端的计算图?](doc/SERVER_DAG_CN.md) - [如何开发一个新的General Op?](doc/NEW_OPERATOR_CN.md) +- [如何开发一个新的Web Service?](doc/NEW_WEB_SERVICE_CN.md) - [如何在Paddle Serving使用Go Client?](doc/IMDB_GO_CLIENT_CN.md) - [如何编译PaddleServing?](doc/COMPILE_CN.md) diff --git a/core/cube/cube-api/src/cube_cli.cpp b/core/cube/cube-api/src/cube_cli.cpp index 7f45e436a821af3db06d4a90fe4cc8cc4a4936b6..eee4b0c31ad83ca69d242e81bae3ce4ecfb5bf1a 100644 --- a/core/cube/cube-api/src/cube_cli.cpp +++ b/core/cube/cube-api/src/cube_cli.cpp @@ -217,6 +217,7 @@ int run_m(int argc, char** argv) { LOG(INFO) << " total_request = " << std::to_string(request_num) << " speed = " << std::to_string(1000000 * thread_num / mean_time) // mean_time us << " query per second"; + return 0; } } // namespace mcube diff --git a/core/general-client/include/general_model.h b/core/general-client/include/general_model.h index 703593f8333c75386ab8a16fc1cf71b803158d9e..9d0cc8b66cfe2e3fe2f4d012c7920f518d32ef5a 100644 --- a/core/general-client/include/general_model.h +++ b/core/general-client/include/general_model.h @@ -21,6 +21,7 @@ #include #include #include +#include // move #include #include "core/sdk-cpp/builtin_format.pb.h" @@ -39,12 +40,32 @@ namespace baidu { namespace paddle_serving { namespace general_model { -class PredictorRes { - public: - PredictorRes() {} - ~PredictorRes() {} - +class ModelRes { public: + ModelRes() {} + ModelRes(const ModelRes& res) { + _engine_name = res._engine_name; + _int64_value_map.insert(res._int64_value_map.begin(), + res._int64_value_map.end()); + _float_value_map.insert(res._float_value_map.begin(), + res._float_value_map.end()); + _shape_map.insert(res._shape_map.begin(), res._shape_map.end()); + _lod_map.insert(res._lod_map.begin(), res._lod_map.end()); + } + ModelRes(ModelRes&& res) { + _engine_name = std::move(res._engine_name); + _int64_value_map.insert( + std::make_move_iterator(std::begin(res._int64_value_map)), + std::make_move_iterator(std::end(res._int64_value_map))); + _float_value_map.insert( + std::make_move_iterator(std::begin(res._float_value_map)), + std::make_move_iterator(std::end(res._float_value_map))); + _shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)), + std::make_move_iterator(std::end(res._shape_map))); + _lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)), + std::make_move_iterator(std::end(res._lod_map))); + } + ~ModelRes() {} const std::vector& get_int64_by_name(const std::string& name) { return _int64_value_map[name]; } @@ -57,19 +78,75 @@ class PredictorRes { const std::vector& get_lod(const std::string& name) { return _lod_map[name]; } - void set_variant_tag(const std::string& variant_tag) { - _variant_tag = variant_tag; + void set_engine_name(const std::string& engine_name) { + _engine_name = engine_name; + } + const std::string& engine_name() { return _engine_name; } + ModelRes& operator=(ModelRes&& res) { + if (this != &res) { + _engine_name = std::move(res._engine_name); + _int64_value_map.insert( + std::make_move_iterator(std::begin(res._int64_value_map)), + std::make_move_iterator(std::end(res._int64_value_map))); + _float_value_map.insert( + std::make_move_iterator(std::begin(res._float_value_map)), + std::make_move_iterator(std::end(res._float_value_map))); + _shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)), + std::make_move_iterator(std::end(res._shape_map))); + _lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)), + std::make_move_iterator(std::end(res._lod_map))); + } + return *this; } - const std::string& variant_tag() { return _variant_tag; } public: + std::string _engine_name; std::map> _int64_value_map; std::map> _float_value_map; std::map> _shape_map; std::map> _lod_map; +}; + +class PredictorRes { + public: + PredictorRes() {} + ~PredictorRes() {} + + public: + void clear() { + _models.clear(); + _engine_names.clear(); + } + const std::vector& get_int64_by_name(const int model_idx, + const std::string& name) { + return _models[model_idx].get_int64_by_name(name); + } + const std::vector& get_float_by_name(const int model_idx, + const std::string& name) { + return _models[model_idx].get_float_by_name(name); + } + const std::vector& get_shape(const int model_idx, + const std::string& name) { + return _models[model_idx].get_shape(name); + } + const std::vector& get_lod(const int model_idx, + const std::string& name) { + return _models[model_idx].get_lod(name); + } + void add_model_res(ModelRes&& res) { + _engine_names.push_back(res.engine_name()); + _models.emplace_back(std::move(res)); + } + void set_variant_tag(const std::string& variant_tag) { + _variant_tag = variant_tag; + } + const std::string& variant_tag() { return _variant_tag; } + const std::vector& get_engine_names() { return _engine_names; } private: + std::vector _models; std::string _variant_tag; + std::vector _engine_names; }; class PredictorClient { diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp index 0270caf91441745ab02d460de8c6396462a3a3fe..65fa6587ecb68f18b72a03c7f54433252ea1608a 100644 --- a/core/general-client/src/general_model.cpp +++ b/core/general-client/src/general_model.cpp @@ -111,6 +111,7 @@ void PredictorClient::set_predictor_conf(const std::string &conf_path, int PredictorClient::destroy_predictor() { _api.thrd_finalize(); _api.destroy(); + return 0; } int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) { @@ -119,6 +120,7 @@ int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) { return -1; } _api.thrd_initialize(); + return 0; } int PredictorClient::create_predictor() { @@ -129,6 +131,7 @@ int PredictorClient::create_predictor() { return -1; } _api.thrd_initialize(); + return 0; } int PredictorClient::batch_predict( @@ -143,10 +146,7 @@ int PredictorClient::batch_predict( const int &pid) { int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size()); - predict_res_batch._int64_value_map.clear(); - predict_res_batch._float_value_map.clear(); - predict_res_batch._shape_map.clear(); - predict_res_batch._lod_map.clear(); + predict_res_batch.clear(); Timer timeline; int64_t preprocess_start = timeline.TimeStampUS(); @@ -189,11 +189,11 @@ int PredictorClient::batch_predict( Tensor *tensor = tensor_vec[idx]; VLOG(2) << "prepare float feed " << name << " shape size " << float_shape[vec_idx].size(); - for (int j = 0; j < float_shape[vec_idx].size(); ++j) { + for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) { tensor->add_shape(float_shape[vec_idx][j]); } tensor->set_elem_type(1); - for (int j = 0; j < float_feed[vec_idx].size(); ++j) { + for (uint32_t j = 0; j < float_feed[vec_idx].size(); ++j) { tensor->add_float_data(float_feed[vec_idx][j]); } vec_idx++; @@ -208,13 +208,13 @@ int PredictorClient::batch_predict( Tensor *tensor = tensor_vec[idx]; VLOG(2) << "prepare int feed " << name << " shape size " << int_shape[vec_idx].size(); - for (int j = 0; j < int_shape[vec_idx].size(); ++j) { + for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) { tensor->add_shape(int_shape[vec_idx][j]); } tensor->set_elem_type(0); VLOG(3) << "feed var name " << name << " index " << vec_idx << "first data " << int_feed[vec_idx][0]; - for (int j = 0; j < int_feed[vec_idx].size(); ++j) { + for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) { tensor->add_int64_data(int_feed[vec_idx][j]); } vec_idx++; @@ -248,51 +248,59 @@ int PredictorClient::batch_predict( client_infer_end = timeline.TimeStampUS(); postprocess_start = client_infer_end; - for (auto &name : fetch_name) { - // int idx = _fetch_name_to_idx[name]; - int idx = 0; - int shape_size = res.insts(0).tensor_array(idx).shape_size(); - VLOG(2) << "fetch var " << name << " index " << idx << " shape size " - << shape_size; - predict_res_batch._shape_map[name].resize(shape_size); - for (int i = 0; i < shape_size; ++i) { - predict_res_batch._shape_map[name][i] = - res.insts(0).tensor_array(idx).shape(i); - } - int lod_size = res.insts(0).tensor_array(idx).lod_size(); - if (lod_size > 0) { - predict_res_batch._lod_map[name].resize(lod_size); - for (int i = 0; i < lod_size; ++i) { - predict_res_batch._lod_map[name][i] = - res.insts(0).tensor_array(idx).lod(i); + uint32_t model_num = res.outputs_size(); + for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) { + VLOG(2) << "process model output index: " << m_idx; + auto output = res.outputs(m_idx); + ModelRes model; + model.set_engine_name(output.engine_name()); + + for (auto &name : fetch_name) { + // int idx = _fetch_name_to_idx[name]; + int idx = 0; + int shape_size = output.insts(0).tensor_array(idx).shape_size(); + VLOG(2) << "fetch var " << name << " index " << idx << " shape size " + << shape_size; + model._shape_map[name].resize(shape_size); + for (int i = 0; i < shape_size; ++i) { + model._shape_map[name][i] = + output.insts(0).tensor_array(idx).shape(i); } + int lod_size = output.insts(0).tensor_array(idx).lod_size(); + if (lod_size > 0) { + model._lod_map[name].resize(lod_size); + for (int i = 0; i < lod_size; ++i) { + model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i); + } + } + idx += 1; } - idx += 1; - } - for (auto &name : fetch_name) { - // int idx = _fetch_name_to_idx[name]; - int idx = 0; - if (_fetch_name_to_type[name] == 0) { - VLOG(2) << "ferch var " << name << "type int"; - predict_res_batch._int64_value_map[name].resize( - res.insts(0).tensor_array(idx).int64_data_size()); - int size = res.insts(0).tensor_array(idx).int64_data_size(); - for (int i = 0; i < size; ++i) { - predict_res_batch._int64_value_map[name][i] = - res.insts(0).tensor_array(idx).int64_data(i); - } - } else { - VLOG(2) << "fetch var " << name << "type float"; - predict_res_batch._float_value_map[name].resize( - res.insts(0).tensor_array(idx).float_data_size()); - int size = res.insts(0).tensor_array(idx).float_data_size(); - for (int i = 0; i < size; ++i) { - predict_res_batch._float_value_map[name][i] = - res.insts(0).tensor_array(idx).float_data(i); + for (auto &name : fetch_name) { + // int idx = _fetch_name_to_idx[name]; + int idx = 0; + if (_fetch_name_to_type[name] == 0) { + VLOG(2) << "ferch var " << name << "type int"; + model._int64_value_map[name].resize( + output.insts(0).tensor_array(idx).int64_data_size()); + int size = output.insts(0).tensor_array(idx).int64_data_size(); + for (int i = 0; i < size; ++i) { + model._int64_value_map[name][i] = + output.insts(0).tensor_array(idx).int64_data(i); + } + } else { + VLOG(2) << "fetch var " << name << "type float"; + model._float_value_map[name].resize( + output.insts(0).tensor_array(idx).float_data_size()); + int size = output.insts(0).tensor_array(idx).float_data_size(); + for (int i = 0; i < size; ++i) { + model._float_value_map[name][i] = + output.insts(0).tensor_array(idx).float_data(i); + } } + idx += 1; } - idx += 1; + predict_res_batch.add_model_res(std::move(model)); } postprocess_end = timeline.TimeStampUS(); } @@ -305,7 +313,6 @@ int PredictorClient::batch_predict( << "prepro_1:" << preprocess_end << " " << "client_infer_0:" << client_infer_start << " " << "client_infer_1:" << client_infer_end << " "; - if (FLAGS_profile_server) { int op_num = res.profile_time_size() / 2; for (int i = 0; i < op_num; ++i) { diff --git a/core/general-client/src/pybind_general_model.cpp b/core/general-client/src/pybind_general_model.cpp index f30a3a859cb562e1f477defaeb23b5b8c7866cb0..5eab58989d23c6ab95a8351d76f11316bc28c76a 100644 --- a/core/general-client/src/pybind_general_model.cpp +++ b/core/general-client/src/pybind_general_model.cpp @@ -31,27 +31,28 @@ PYBIND11_MODULE(serving_client, m) { py::class_(m, "PredictorRes", py::buffer_protocol()) .def(py::init()) .def("get_int64_by_name", - [](PredictorRes &self, std::string &name) { - return self.get_int64_by_name(name); + [](PredictorRes &self, int model_idx, std::string &name) { + return self.get_int64_by_name(model_idx, name); }, py::return_value_policy::reference) .def("get_float_by_name", - [](PredictorRes &self, std::string &name) { - return self.get_float_by_name(name); + [](PredictorRes &self, int model_idx, std::string &name) { + return self.get_float_by_name(model_idx, name); }, py::return_value_policy::reference) .def("get_shape", - [](PredictorRes &self, std::string &name) { - return self.get_shape(name); + [](PredictorRes &self, int model_idx, std::string &name) { + return self.get_shape(model_idx, name); }, py::return_value_policy::reference) .def("get_lod", - [](PredictorRes &self, std::string &name) { - return self.get_lod(name); + [](PredictorRes &self, int model_idx, std::string &name) { + return self.get_lod(model_idx, name); }, py::return_value_policy::reference) - .def("variant_tag", - [](PredictorRes &self) { return self.variant_tag(); }); + .def("variant_tag", [](PredictorRes &self) { return self.variant_tag(); }) + .def("get_engine_names", + [](PredictorRes &self) { return self.get_engine_names(); }); py::class_(m, "PredictorClient", py::buffer_protocol()) .def(py::init()) @@ -77,7 +78,6 @@ PYBIND11_MODULE(serving_client, m) { [](PredictorClient &self) { self.create_predictor(); }) .def("destroy_predictor", [](PredictorClient &self) { self.destroy_predictor(); }) - .def("batch_predict", [](PredictorClient &self, const std::vector>> diff --git a/core/general-server/op/general_copy_op.cpp b/core/general-server/op/general_copy_op.cpp index a7f7d2904343bb9203fada6872994ed8916346bd..322bcc07795f1b053847991eae17cb3922dd7a7b 100644 --- a/core/general-server/op/general_copy_op.cpp +++ b/core/general-server/op/general_copy_op.cpp @@ -35,8 +35,17 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralCopyOp::inference() { // reade request from client - const GeneralBlob *input_blob = get_depend_argument(pre_name()); - VLOG(2) << "precedent name: " << pre_name(); + const std::vector pre_node_names = pre_names(); + if (pre_node_names.size() != 1) { + LOG(ERROR) << "This op(" << op_name() + << ") can only have one predecessor op, but received " + << pre_node_names.size(); + return -1; + } + const std::string pre_name = pre_node_names[0]; + + const GeneralBlob *input_blob = get_depend_argument(pre_name); + VLOG(2) << "precedent name: " << pre_name; const TensorVector *in = &input_blob->tensor_vector; VLOG(2) << "input size: " << in->size(); int batch_size = input_blob->GetBatchSize(); diff --git a/core/general-server/op/general_dist_kv_infer_op.cpp b/core/general-server/op/general_dist_kv_infer_op.cpp old mode 100755 new mode 100644 index ac4e7bb23e9410aede4fd353099d3c90ce91bcd3..9c6c70352b5387fab95acd16cdf79aa2b46f6122 --- a/core/general-server/op/general_dist_kv_infer_op.cpp +++ b/core/general-server/op/general_dist_kv_infer_op.cpp @@ -40,12 +40,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralDistKVInferOp::inference() { VLOG(2) << "Going to run inference"; - const GeneralBlob *input_blob = get_depend_argument(pre_name()); - VLOG(2) << "Get precedent op name: " << pre_name(); + const std::vector pre_node_names = pre_names(); + if (pre_node_names.size() != 1) { + LOG(ERROR) << "This op(" << op_name() + << ") can only have one predecessor op, but received " + << pre_node_names.size(); + return -1; + } + const std::string pre_name = pre_node_names[0]; + + const GeneralBlob *input_blob = get_depend_argument(pre_name); + VLOG(2) << "Get precedent op name: " << pre_name; GeneralBlob *output_blob = mutable_data(); if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); + LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name; return -1; } @@ -149,8 +158,8 @@ int GeneralDistKVInferOp::inference() { timeline.Start(); if (InferManager::instance().infer( - GENERAL_MODEL_NAME, &infer_in, out, batch_size)) { - LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; + engine_name().c_str(), &infer_in, out, batch_size)) { + LOG(ERROR) << "Failed do infer in fluid model: " << engine_name(); return -1; } diff --git a/core/general-server/op/general_dist_kv_quant_infer_op.cpp b/core/general-server/op/general_dist_kv_quant_infer_op.cpp index 583384b79ed5ec69d14cb31b7c8239c3f786c33d..8752e8a72085c946b097cecf62a0bdbf90d682c4 100644 --- a/core/general-server/op/general_dist_kv_quant_infer_op.cpp +++ b/core/general-server/op/general_dist_kv_quant_infer_op.cpp @@ -41,12 +41,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralDistKVQuantInferOp::inference() { VLOG(2) << "Going to run inference"; - const GeneralBlob *input_blob = get_depend_argument(pre_name()); - VLOG(2) << "Get precedent op name: " << pre_name(); + const std::vector pre_node_names = pre_names(); + if (pre_node_names.size() != 1) { + LOG(ERROR) << "This op(" << op_name() + << ") can only have one predecessor op, but received " + << pre_node_names.size(); + return -1; + } + const std::string pre_name = pre_node_names[0]; + + const GeneralBlob *input_blob = get_depend_argument(pre_name); + VLOG(2) << "Get precedent op name: " << pre_name; GeneralBlob *output_blob = mutable_data(); if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); + LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name; return -1; } @@ -180,8 +189,8 @@ int GeneralDistKVQuantInferOp::inference() { timeline.Start(); if (InferManager::instance().infer( - GENERAL_MODEL_NAME, &infer_in, out, batch_size)) { - LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; + engine_name().c_str(), &infer_in, out, batch_size)) { + LOG(ERROR) << "Failed do infer in fluid model: " << engine_name(); return -1; } diff --git a/core/general-server/op/general_infer_helper.h b/core/general-server/op/general_infer_helper.h index b09ef6d55b8aef3dc54dd5ce2921c27eb6ad86c5..6a6b24329ed73cbb10e467f366ff04c4e2ac8031 100644 --- a/core/general-server/op/general_infer_helper.h +++ b/core/general-server/op/general_infer_helper.h @@ -31,8 +31,6 @@ namespace baidu { namespace paddle_serving { namespace serving { -static const char* GENERAL_MODEL_NAME = "general_model"; - struct GeneralBlob { std::vector tensor_vector; int64_t time_stamp[20]; diff --git a/core/general-server/op/general_infer_op.cpp b/core/general-server/op/general_infer_op.cpp index 6cec9d1cb4d87e13c566c24c90cef11f148749c8..a9ff2e7226b25842889e391d82217b3b6a140170 100644 --- a/core/general-server/op/general_infer_op.cpp +++ b/core/general-server/op/general_infer_op.cpp @@ -37,12 +37,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralInferOp::inference() { VLOG(2) << "Going to run inference"; - const GeneralBlob *input_blob = get_depend_argument(pre_name()); - VLOG(2) << "Get precedent op name: " << pre_name(); + const std::vector pre_node_names = pre_names(); + if (pre_node_names.size() != 1) { + LOG(ERROR) << "This op(" << op_name() + << ") can only have one predecessor op, but received " + << pre_node_names.size(); + return -1; + } + const std::string pre_name = pre_node_names[0]; + + const GeneralBlob *input_blob = get_depend_argument(pre_name); + VLOG(2) << "Get precedent op name: " << pre_name; GeneralBlob *output_blob = mutable_data(); if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); + LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name; return -1; } @@ -59,8 +68,9 @@ int GeneralInferOp::inference() { int64_t start = timeline.TimeStampUS(); timeline.Start(); - if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) { - LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; + if (InferManager::instance().infer( + engine_name().c_str(), in, out, batch_size)) { + LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str(); return -1; } diff --git a/core/general-server/op/general_response_op.cpp b/core/general-server/op/general_response_op.cpp index b84cb199001a864ba55de7fe278262c650e98c22..e00984f42b36d708825622b39fef63a449ba147e 100644 --- a/core/general-server/op/general_response_op.cpp +++ b/core/general-server/op/general_response_op.cpp @@ -33,23 +33,17 @@ using baidu::paddle_serving::predictor::general_model::Tensor; using baidu::paddle_serving::predictor::general_model::Response; using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::FetchInst; +using baidu::paddle_serving::predictor::general_model::ModelOutput; using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralResponseOp::inference() { - const GeneralBlob *input_blob = get_depend_argument(pre_name()); - - if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name(); - return -1; - } - - const TensorVector *in = &input_blob->tensor_vector; - int batch_size = input_blob->GetBatchSize(); - - VLOG(2) << "input batch size: " << batch_size; + const std::vector pre_node_names = pre_names(); + VLOG(2) << "pre node names size: " << pre_node_names.size(); const Request *req = dynamic_cast(get_request_message()); + // response inst with only fetch_var_names + Response *res = mutable_data(); Timer timeline; // double response_time = 0.0; @@ -73,77 +67,107 @@ int GeneralResponseOp::inference() { model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)]; } - // response inst with only fetch_var_names - Response *res = mutable_data(); - FetchInst *fetch_inst = res->add_insts(); - for (auto &idx : fetch_index) { - Tensor *tensor = fetch_inst->add_tensor_array(); - tensor->set_elem_type(1); - if (model_config->_is_lod_fetch[idx]) { - VLOG(2) << "out[" << idx << "] is lod_tensor"; - for (int k = 0; k < in->at(idx).shape.size(); ++k) { - VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k]; - tensor->add_shape(in->at(idx).shape[k]); - } - } else { - VLOG(2) << "out[" << idx << "] is tensor"; - for (int k = 0; k < in->at(idx).shape.size(); ++k) { - VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k]; - tensor->add_shape(in->at(idx).shape[k]); - } + const GeneralBlob *input_blob; + for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { + const std::string &pre_name = pre_node_names[pi]; + VLOG(2) << "pre names[" << pi << "]: " << pre_name << " (" + << pre_node_names.size() << ")"; + input_blob = get_depend_argument(pre_name); + // fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(), + // input_blob); + if (!input_blob) { + LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name; + return -1; } - } - int var_idx = 0; - for (auto &idx : fetch_index) { - int cap = 1; - for (int j = 0; j < in->at(idx).shape.size(); ++j) { - cap *= in->at(idx).shape[j]; - } - if (in->at(idx).dtype == paddle::PaddleDType::INT64) { - int64_t *data_ptr = static_cast(in->at(idx).data.data()); + const TensorVector *in = &input_blob->tensor_vector; + + ModelOutput *output = res->add_outputs(); + // To get the order of model return values + output->set_engine_name(pre_name); + FetchInst *fetch_inst = output->add_insts(); + for (auto &idx : fetch_index) { + Tensor *tensor = fetch_inst->add_tensor_array(); + tensor->set_elem_type(1); if (model_config->_is_lod_fetch[idx]) { - FetchInst *fetch_p = res->mutable_insts(0); - for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_lod( - in->at(idx).lod[0][j]); - } - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]); + VLOG(2) << "out[" << idx << "] is lod_tensor"; + for (int k = 0; k < in->at(idx).shape.size(); ++k) { + VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k]; + tensor->add_shape(in->at(idx).shape[k]); } } else { - FetchInst *fetch_p = res->mutable_insts(0); - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); + VLOG(2) << "out[" << idx << "] is tensor"; + for (int k = 0; k < in->at(idx).shape.size(); ++k) { + VLOG(2) << "shape[" << k << "]: " << in->at(idx).shape[k]; + tensor->add_shape(in->at(idx).shape[k]); } } - var_idx++; - } else if (in->at(idx).dtype == paddle::PaddleDType::FLOAT32) { - float *data_ptr = static_cast(in->at(idx).data.data()); - if (model_config->_is_lod_fetch[idx]) { - FetchInst *fetch_p = res->mutable_insts(0); - for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_lod( - in->at(idx).lod[0][j]); - } - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); + } + + int var_idx = 0; + for (auto &idx : fetch_index) { + int cap = 1; + for (int j = 0; j < in->at(idx).shape.size(); ++j) { + cap *= in->at(idx).shape[j]; + } + if (in->at(idx).dtype == paddle::PaddleDType::INT64) { + int64_t *data_ptr = static_cast(in->at(idx).data.data()); + if (model_config->_is_lod_fetch[idx]) { + FetchInst *fetch_p = output->mutable_insts(0); + for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_lod( + in->at(idx).lod[0][j]); + } + for (int j = 0; j < cap; ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]); + } + } else { + FetchInst *fetch_p = output->mutable_insts(0); + for (int j = 0; j < cap; ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); + } } - } else { - FetchInst *fetch_p = res->mutable_insts(0); - for (int j = 0; j < cap; ++j) { - fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); + var_idx++; + } else if (in->at(idx).dtype == paddle::PaddleDType::FLOAT32) { + float *data_ptr = static_cast(in->at(idx).data.data()); + if (model_config->_is_lod_fetch[idx]) { + FetchInst *fetch_p = output->mutable_insts(0); + for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_lod( + in->at(idx).lod[0][j]); + } + for (int j = 0; j < cap; ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); + } + } else { + FetchInst *fetch_p = output->mutable_insts(0); + for (int j = 0; j < cap; ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); + } } + var_idx++; } - var_idx++; } } if (req->profile_server()) { int64_t end = timeline.TimeStampUS(); - VLOG(2) << "p size for input blob: " << input_blob->p_size; - for (int i = 0; i < input_blob->p_size; ++i) { - res->add_profile_time(input_blob->time_stamp[i]); + // TODO(barriery): multi-model profile_time. + // At present, only the response_op is multi-input, so here we get + // the profile_time by hard coding. It needs to be replaced with + // a more elegant way. + for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { + input_blob = get_depend_argument(pre_node_names[pi]); + VLOG(2) << "p size for input blob: " << input_blob->p_size; + int profile_time_idx = -1; + if (pi == 0) { + profile_time_idx = 0; + } else { + profile_time_idx = input_blob->p_size - 2; + } + for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) { + res->add_profile_time(input_blob->time_stamp[profile_time_idx]); + } } // TODO(guru4elephant): find more elegant way to do this res->add_profile_time(start); diff --git a/core/general-server/op/general_text_response_op.cpp b/core/general-server/op/general_text_response_op.cpp index 43c7af774fd939a8fa1ca14456285cc75dbd7f8d..ae194119f1fc3edad01662041035f7011873998a 100644 --- a/core/general-server/op/general_text_response_op.cpp +++ b/core/general-server/op/general_text_response_op.cpp @@ -32,22 +32,18 @@ using baidu::paddle_serving::predictor::general_model::Tensor; using baidu::paddle_serving::predictor::general_model::Response; using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::FetchInst; +using baidu::paddle_serving::predictor::general_model::ModelOutput; using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralTextResponseOp::inference() { - const GeneralBlob *input_blob = get_depend_argument(pre_name()); + VLOG(2) << "Going to run inference"; + const std::vector pre_node_names = pre_names(); + VLOG(2) << "pre node names size: " << pre_node_names.size(); - if (!input_blob) { - LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name(); - return -1; - } - - const TensorVector *in = &input_blob->tensor_vector; - int batch_size = input_blob->GetBatchSize(); - - VLOG(2) << "infer batch size: " << batch_size; const Request *req = dynamic_cast(get_request_message()); + // response inst with only fetch_var_names + Response *res = mutable_data(); Timer timeline; int64_t start = timeline.TimeStampUS(); @@ -67,59 +63,90 @@ int GeneralTextResponseOp::inference() { model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)]; } - // response inst with only fetch_var_names - Response *res = mutable_data(); + const GeneralBlob *input_blob; + for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { + const std::string &pre_name = pre_node_names[pi]; + VLOG(2) << "pre names[" << pi << "]: " << pre_name << " (" + << pre_node_names.size() << ")"; + input_blob = get_depend_argument(pre_name); + if (!input_blob) { + LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name; + return -1; + } - for (int i = 0; i < batch_size; ++i) { - FetchInst *fetch_inst = res->add_insts(); - for (auto &idx : fetch_index) { - Tensor *tensor = fetch_inst->add_tensor_array(); - // currently only response float tensor or lod_tensor - tensor->set_elem_type(1); - if (model_config->_is_lod_fetch[idx]) { - VLOG(2) << "out[" << idx << " is lod_tensor"; - tensor->add_shape(-1); - } else { - VLOG(2) << "out[" << idx << "] is tensor"; - for (int k = 1; k < in->at(idx).shape.size(); ++k) { - VLOG(2) << "shape[" << k - 1 << "]: " << in->at(idx).shape[k]; - tensor->add_shape(in->at(idx).shape[k]); + const TensorVector *in = &input_blob->tensor_vector; + int batch_size = input_blob->GetBatchSize(); + VLOG(2) << "input batch size: " << batch_size; + + ModelOutput *output = res->add_outputs(); + output->set_engine_name( + pre_name); // To get the order of model return values + for (int i = 0; i < batch_size; ++i) { + FetchInst *fetch_inst = output->add_insts(); + for (auto &idx : fetch_index) { + Tensor *tensor = fetch_inst->add_tensor_array(); + // currently only response float tensor or lod_tensor + tensor->set_elem_type(1); + if (model_config->_is_lod_fetch[idx]) { + VLOG(2) << "out[" << idx << " is lod_tensor"; + tensor->add_shape(-1); + } else { + VLOG(2) << "out[" << idx << "] is tensor"; + for (int k = 1; k < in->at(idx).shape.size(); ++k) { + VLOG(2) << "shape[" << k - 1 << "]: " << in->at(idx).shape[k]; + tensor->add_shape(in->at(idx).shape[k]); + } } } } - } - int var_idx = 0; - for (auto &idx : fetch_index) { - float *data_ptr = static_cast(in->at(idx).data.data()); - int cap = 1; - for (int j = 1; j < in->at(idx).shape.size(); ++j) { - cap *= in->at(idx).shape[j]; - } - if (model_config->_is_lod_fetch[idx]) { - for (int j = 0; j < batch_size; ++j) { - for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; - k++) { - res->mutable_insts(j)->mutable_tensor_array(var_idx)->add_float_data( - data_ptr[k]); - } + int var_idx = 0; + for (auto &idx : fetch_index) { + float *data_ptr = static_cast(in->at(idx).data.data()); + int cap = 1; + for (int j = 1; j < in->at(idx).shape.size(); ++j) { + cap *= in->at(idx).shape[j]; } - } else { - for (int j = 0; j < batch_size; ++j) { - for (int k = j * cap; k < (j + 1) * cap; ++k) { - res->mutable_insts(j)->mutable_tensor_array(var_idx)->add_float_data( - data_ptr[k]); + if (model_config->_is_lod_fetch[idx]) { + for (int j = 0; j < batch_size; ++j) { + for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; + k++) { + output->mutable_insts(j) + ->mutable_tensor_array(var_idx) + ->add_float_data(data_ptr[k]); + } + } + } else { + for (int j = 0; j < batch_size; ++j) { + for (int k = j * cap; k < (j + 1) * cap; ++k) { + output->mutable_insts(j) + ->mutable_tensor_array(var_idx) + ->add_float_data(data_ptr[k]); + } } } + var_idx++; } - var_idx++; } if (req->profile_server()) { int64_t end = timeline.TimeStampUS(); - - for (int i = 0; i < input_blob->p_size; ++i) { - res->add_profile_time(input_blob->time_stamp[i]); + // TODO(barriery): multi-model profile_time. + // At present, only the response_op is multi-input, so here we get + // the profile_time by hard coding. It needs to be replaced with + // a more elegant way. + for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { + input_blob = get_depend_argument(pre_node_names[pi]); + VLOG(2) << "p size for input blob: " << input_blob->p_size; + int profile_time_idx = -1; + if (pi == 0) { + profile_time_idx = 0; + } else { + profile_time_idx = input_blob->p_size - 2; + } + for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) { + res->add_profile_time(input_blob->time_stamp[profile_time_idx]); + } } // TODO(guru4elephant): find more elegant way to do this res->add_profile_time(start); diff --git a/core/general-server/proto/general_model_service.proto b/core/general-server/proto/general_model_service.proto index ad1128c373491ac82e3d7042f0693aa574ac92c5..8581ecb2a2e10deced910a20ce26c2beaca956fa 100644 --- a/core/general-server/proto/general_model_service.proto +++ b/core/general-server/proto/general_model_service.proto @@ -40,10 +40,15 @@ message Request { }; message Response { - repeated FetchInst insts = 1; + repeated ModelOutput outputs = 1; repeated int64 profile_time = 2; }; +message ModelOutput { + repeated FetchInst insts = 1; + optional string engine_name = 2; +} + service GeneralModelService { rpc inference(Request) returns (Response); rpc debug(Request) returns (Response); diff --git a/core/predictor/framework/dag.cpp b/core/predictor/framework/dag.cpp index 9aea595e1281ccac9367f89acde3f8b19b98cc5e..f039ac70ffe2e55a59f926d754ca411a034058f4 100644 --- a/core/predictor/framework/dag.cpp +++ b/core/predictor/framework/dag.cpp @@ -14,6 +14,7 @@ #include "core/predictor/framework/dag.h" #include +#include // make_pair #include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/predictor_metric.h" // PredictorMetric @@ -199,25 +200,81 @@ const DagStage* Dag::stage_by_index(uint32_t index) { return _stages[index]; } int Dag::topo_sort() { std::stringstream ss; - for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) { - DagStage* stage = new (std::nothrow) DagStage(); - if (stage == NULL) { - LOG(ERROR) << "Invalid stage!"; - return ERR_MEM_ALLOC_FAILURE; + uint32_t nodes_size = _index_nodes.size(); + std::vector in_degree(nodes_size, 0); + std::vector> in_egde(nodes_size); + for (uint32_t nid = 0; nid < nodes_size; nid++) { + in_degree[nid] += _index_nodes[nid]->depends.size(); + for (auto it = _index_nodes[nid]->depends.begin(); + it != _index_nodes[nid]->depends.end(); + ++it) { + uint32_t pnid = Dag::node_by_name(it->first)->id - + 1; // 0 is reserved for begginer-op + in_egde[pnid].push_back(nid); + } + } + for (int i = 0; i < in_degree.size(); ++i) { + LOG(INFO) << "(" << _index_nodes[i]->name << ") in_degree[" << i + << "]: " << in_degree[i]; + } + int sorted_num = 0; + DagStage* stage = new (std::nothrow) DagStage(); + if (stage == NULL) { + LOG(ERROR) << "Invalid stage!"; + return ERR_MEM_ALLOC_FAILURE; + } + ss.str(""); + ss << _stages.size(); + stage->name = ss.str(); + stage->full_name = full_name() + NAME_DELIMITER + stage->name; + for (uint32_t nid = 0; nid < nodes_size; ++nid) { + if (in_degree[nid] == 0) { + ++sorted_num; + stage->nodes.push_back(_index_nodes[nid]); + // assign stage number after stage created + _index_nodes[nid]->stage = _stages.size(); + // assign dag node full name after stage created + _index_nodes[nid]->full_name = + stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; } - stage->nodes.push_back(_index_nodes[nid]); + } + + if (stage->nodes.size() == 0) { + LOG(ERROR) << "Invalid Dag!"; + return ERR_INTERNAL_FAILURE; + } + _stages.push_back(stage); + + while (sorted_num < nodes_size) { + auto pre_nodes = _stages.back()->nodes; + DagStage* stage = new (std::nothrow) DagStage(); ss.str(""); ss << _stages.size(); stage->name = ss.str(); stage->full_name = full_name() + NAME_DELIMITER + stage->name; + for (uint32_t pi = 0; pi < pre_nodes.size(); ++pi) { + uint32_t pnid = pre_nodes[pi]->id - 1; + for (uint32_t ei = 0; ei < in_egde[pnid].size(); ++ei) { + uint32_t nid = in_egde[pnid][ei]; + --in_degree[nid]; + if (in_degree[nid] == 0) { + ++sorted_num; + stage->nodes.push_back(_index_nodes[nid]); + // assign stage number after stage created + _index_nodes[nid]->stage = _stages.size(); + // assign dag node full name after stage created + _index_nodes[nid]->full_name = + stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; + } + } + } + if (stage->nodes.size() == 0) { + LOG(ERROR) << "Invalid Dag!"; + return ERR_INTERNAL_FAILURE; + } _stages.push_back(stage); - - // assign stage number after stage created - _index_nodes[nid]->stage = nid; - // assign dag node full name after stage created - _index_nodes[nid]->full_name = - stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; } + return ERR_OK; } diff --git a/core/predictor/framework/dag_view.cpp b/core/predictor/framework/dag_view.cpp index 743e73418b535776e651bd1500509ffcad0e0618..bde8084b41fee00bc95d2a35444a15258d2a12a8 100644 --- a/core/predictor/framework/dag_view.cpp +++ b/core/predictor/framework/dag_view.cpp @@ -76,19 +76,34 @@ int DagView::init(Dag* dag, const std::string& service_name) { } op->set_full_name(service_name + NAME_DELIMITER + node->full_name); + + // Set the name of the Op as the key of the matching engine. + VLOG(2) << "op->set_engine_name(" << node->name.c_str() << ")"; + op->set_engine_name(node->name); + vnode->conf = node; vnode->op = op; + // Add depends + for (auto it = vnode->conf->depends.begin(); + it != vnode->conf->depends.end(); + ++it) { + std::string pre_node_name = it->first; + VLOG(2) << "add op pre name: \n" + << "current op name: " << vnode->op->op_name() + << ", previous op name: " << pre_node_name; + vnode->op->add_pre_node_name(pre_node_name); + } vstage->nodes.push_back(vnode); } // TODO(guru4elephant): this seems buggy, please review later - if (si > 0) { - VLOG(2) << "set op pre name: \n" - << "current op name: " << vstage->nodes.back()->op->op_name() - << " previous op name: " - << _view[si - 1]->nodes.back()->op->op_name(); - vstage->nodes.back()->op->set_pre_node_name( - _view[si - 1]->nodes.back()->op->op_name()); - } + /*if (si > 0) {*/ + // VLOG(2) << "set op pre name: \n" + //<< "current op name: " << vstage->nodes.back()->op->op_name() + //<< " previous op name: " + //<< _view[si - 1]->nodes.back()->op->op_name(); + // vstage->nodes.back()->op->set_pre_node_name( + //_view[si - 1]->nodes.back()->op->op_name()); + /*}*/ _view.push_back(vstage); } @@ -139,6 +154,7 @@ int DagView::execute_one_stage(ViewStage* vstage, butil::IOBufBuilder* debug_os) { butil::Timer stage_time(butil::Timer::STARTED); uint32_t node_size = vstage->nodes.size(); + VLOG(2) << "vstage->nodes.size(): " << node_size; for (uint32_t ni = 0; ni < node_size; ni++) { ViewNode* vnode = vstage->nodes[ni]; DagNode* conf = vnode->conf; diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index f8bd7843706187c13d8f38c7d33c87b51369e5a0..4bb3be9ad2c3dc7ef94a32200b014325aceedf45 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -765,6 +765,8 @@ class InferManager { } size_t engine_num = model_toolkit_conf.engines_size(); for (size_t ei = 0; ei < engine_num; ++ei) { + LOG(INFO) << "model_toolkit_conf.engines(" << ei + << ").name: " << model_toolkit_conf.engines(ei).name(); std::string engine_name = model_toolkit_conf.engines(ei).name(); VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); if (!engine) { diff --git a/core/predictor/framework/memory.cpp b/core/predictor/framework/memory.cpp index ec5967c85630c549fdc19eb6ce1773bfd541a05c..9e7f028e80d42ad855de006131540fece845afe3 100644 --- a/core/predictor/framework/memory.cpp +++ b/core/predictor/framework/memory.cpp @@ -56,11 +56,11 @@ int MempoolWrapper::thread_initialize() { im::fugue::memory::Region* region = new im::fugue::memory::Region(); region->init(); im::Mempool* mempool = new (std::nothrow) im::Mempool(region); - MempoolRegion* mempool_region = new MempoolRegion(region, mempool); if (mempool == NULL) { LOG(ERROR) << "Failed create thread mempool"; return -1; } + MempoolRegion* mempool_region = new MempoolRegion(region, mempool); if (THREAD_SETSPECIFIC(_bspec_key, mempool_region) != 0) { LOG(ERROR) << "unable to set the thrd_data"; diff --git a/core/predictor/op/op.cpp b/core/predictor/op/op.cpp index d2e512eb69af0b70cbe07b5bc75c3acb88fea918..59ef6aed71977a3f762ff4fbe9480db19cb4057e 100644 --- a/core/predictor/op/op.cpp +++ b/core/predictor/op/op.cpp @@ -60,6 +60,7 @@ int Op::init(Bus* bus, return -1; } + _pre_node_names.clear(); return custom_init(); } diff --git a/core/predictor/op/op.h b/core/predictor/op/op.h index 84bcf44575826a6ab00e037ce57e119ffbe4f3f3..ae52975fe6f2506fb0bf483318f607df137c8a96 100644 --- a/core/predictor/op/op.h +++ b/core/predictor/op/op.h @@ -14,7 +14,9 @@ #pragma once #include // bvar::LatencyRecorder +#include #include +#include #include "core/predictor/common/inner_common.h" #include "core/predictor/framework/channel.h" #include "core/predictor/framework/op_repository.h" @@ -132,18 +134,28 @@ class Op { const std::string& full_name() const { return _full_name; } - const std::string& pre_name() const { return _pre_node_name; } + const std::vector& pre_names() const { return _pre_node_names; } void set_full_name(const std::string full_name) { _full_name = full_name; } - void set_pre_node_name(const std::string pre_name) { - _pre_node_name = pre_name; + void add_pre_node_name(const std::string pre_name) { + _pre_node_names.push_back(pre_name); } const std::string& type() const; uint32_t id() const; + // Set the name of the Op as the key of the matching engine. + // Notes that this key is only used by infer_op (only the + // infer_op needs to find the corresponding engine). + // At present, there is only general_infer_op. + void set_engine_name(const std::string engine_name) { + _engine_name = engine_name; + } + + const std::string& engine_name() const { return _engine_name; } + // --------------- Default implements ---------------- virtual int custom_init() { return 0; } @@ -189,13 +201,14 @@ class Op { Bus* _bus; Dag* _dag; uint32_t _id; - std::string _pre_node_name; // only for sequential execution + std::vector _pre_node_names; // for DAG execution std::string _name; std::string _full_name; // service_workflow_stageindex_opname std::string _type; bool _has_calc; bool _has_init; TimerFlow* _timer; + std::string _engine_name; // only for infer_op }; template @@ -215,7 +228,10 @@ class OpWithChannel : public Op { return _channel; } - _channel = butil::get_object(); + // TODO(barriery): There are some problems in using butil::get_object + // _channel = butil::get_object(); + _channel = new ChannelType(); + if (!_channel) { LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name(); return NULL; @@ -229,8 +245,14 @@ class OpWithChannel : public Op { int release_channel() { if (_channel) { _channel->deinit(); - butil::return_object(_channel); + delete _channel; } + // TODO(barriery): There are some problems in using butil::get_object + /* + if (_channel) { + _channel->deinit(); + butil::return_object(_channel); + } */ _channel = NULL; return 0; diff --git a/core/sdk-cpp/proto/general_model_service.proto b/core/sdk-cpp/proto/general_model_service.proto index 39c09f09f1fa56fcc8572dde9981b48b76214917..51c0335a9db896e1260e83915de81e51451a904b 100644 --- a/core/sdk-cpp/proto/general_model_service.proto +++ b/core/sdk-cpp/proto/general_model_service.proto @@ -40,10 +40,15 @@ message Request { }; message Response { - repeated FetchInst insts = 1; + repeated ModelOutput outputs = 1; repeated int64 profile_time = 2; }; +message ModelOutput { + repeated FetchInst insts = 1; + optional string engine_name = 2; +} + service GeneralModelService { rpc inference(Request) returns (Response); rpc debug(Request) returns (Response); diff --git a/doc/INFERNCE_TO_SERVING.md b/doc/INFERNCE_TO_SERVING.md new file mode 100644 index 0000000000000000000000000000000000000000..8334159ea255ca65241a2b567e43682a148bb775 --- /dev/null +++ b/doc/INFERNCE_TO_SERVING.md @@ -0,0 +1,14 @@ +# How to Convert Paddle Inference Model To Paddle Serving Format + +([简体中文](./INFERENCE_TO_SERVING_CN.md)|English) + +## Example + +``` python +from paddle_serving_client.io import inference_model_to_serving +inference_model_dir = "your_inference_model" +serving_client_dir = "serving_client_dir" +serving_server_dir = "serving_server_dir" +feed_var_names, fetch_var_names = inference_model_to_serving( + inference_model_dir, serving_client_dir, serving_server_dir) +``` diff --git a/doc/INFERNCE_TO_SERVING_CN.md b/doc/INFERNCE_TO_SERVING_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..94d1def424db467e200020c69fbd6d1599a5ffde --- /dev/null +++ b/doc/INFERNCE_TO_SERVING_CN.md @@ -0,0 +1,14 @@ +# 如何从Paddle保存的预测模型转为Paddle Serving格式可部署的模型 + +([English](./INFERENCE_TO_SERVING.md)|简体中文) + +## 示例 + +``` python +from paddle_serving_client.io import inference_model_to_serving +inference_model_dir = "your_inference_model" +serving_client_dir = "serving_client_dir" +serving_server_dir = "serving_server_dir" +feed_var_names, fetch_var_names = inference_model_to_serving( + inference_model_dir, serving_client_dir, serving_server_dir) +``` diff --git a/doc/MODEL_ENSEMBLE_IN_PADDLE_SERVING.md b/doc/MODEL_ENSEMBLE_IN_PADDLE_SERVING.md new file mode 100644 index 0000000000000000000000000000000000000000..7f583342cf2437b29916f6711c7bd0701206bf8d --- /dev/null +++ b/doc/MODEL_ENSEMBLE_IN_PADDLE_SERVING.md @@ -0,0 +1,121 @@ +# Model Ensemble in Paddle Serving + +([简体中文](MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md)|English) + +In some scenarios, multiple models with the same input may be used to predict in parallel and integrate predicted results for better prediction effect. Paddle Serving also supports this feature. + +Next, we will take the text classification task as an example to show model ensemble in Paddle Serving (This feature is still serial prediction for the time being. We will support parallel prediction as soon as possible). + +## Simple example + +In this example (see the figure below), the server side predict the bow and CNN models with the same input in a service in parallel, The client side fetchs the prediction results of the two models, and processes the prediction results to get the final predict results. + +![simple example](model_ensemble_example.png) + +It should be noted that at present, only multiple models with the same format input and output in the same service are supported. In this example, the input and output formats of CNN and BOW model are the same. + +The code used in the example is saved in the `python/examples/imdb` path: + +```shell +. +├── get_data.sh +├── imdb_reader.py +├── test_ensemble_client.py +└── test_ensemble_server.py +``` + +### Prepare data + +Get the pre-trained CNN and BOW models by the following command (you can also run the `get_data.sh` script): + +```shell +wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz +tar -zxvf text_classification_data.tar.gz +tar -zxvf imdb_model.tar.gz +``` + +### Start server + +Start server by the following Python code (you can also run the `test_ensemble_server.py` script): + +```python +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) + +server = Server() +server.set_op_graph(op_graph_maker.get_op_graph()) +model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'} +server.load_model_config(model_config) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() +``` + +Different from the normal prediction service, here we need to use DAG to describe the logic of the server side. + +When creating an Op, you need to specify the predecessor of the current Op (in this example, the predecessor of `cnn_infer_op` and `bow_infer_op` is `read_op`, and the predecessor of `response_op` is `cnn_infer_op` and `bow_infer_op`. For the infer Op `infer_op`, you need to define the prediction engine name `engine_name` (You can also use the default value. It is recommended to set the value to facilitate the client side to obtain the order of prediction results). + +At the same time, when configuring the model path, you need to create a model configuration dictionary with the infer Op as the key and the corresponding model path as value to inform Serving which model each infer OP uses. + +### Start client + +Start client by the following Python code (you can also run the `test_ensemble_client.py` script): + +```python +from paddle_serving_client import Client +from imdb_reader import IMDBDataset + +client = Client() +# If you have more than one model, make sure that the input +# and output of more than one model are the same. +client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt') +client.connect(["127.0.0.1:9393"]) + +# you can define any english sentence or dataset here +# This example reuses imdb reader in training, you +# can define your own data preprocessing easily. +imdb_dataset = IMDBDataset() +imdb_dataset.load_resource('imdb.vocab') + +for i in range(3): + line = 'i am very sad | 0' + word_ids, label = imdb_dataset.get_words_and_label(line) + feed = {"words": word_ids} + fetch = ["acc", "cost", "prediction"] + fetch_maps = client.predict(feed=feed, fetch=fetch) + if len(fetch_maps) == 1: + print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1])) + else: + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map[ + 'prediction'][0][1])) +``` + +Compared with the normal prediction service, the client side has not changed much. When multiple model predictions are used, the prediction service will return a dictionary with engine name `engine_name`(the value is defined on the server side) as the key, and the corresponding model prediction results as the value. + +### Expected result + +```shell +step: 0, model: cnn, res: 0.560272455215 +step: 0, model: bow, res: 0.633530199528 +step: 1, model: cnn, res: 0.560272455215 +step: 1, model: bow, res: 0.633530199528 +step: 2, model: cnn, res: 0.560272455215 +step: 2, model: bow, res: 0.633530199528 +``` diff --git a/doc/MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md b/doc/MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..ee816aa5a441610e845a933217041a774fad8129 --- /dev/null +++ b/doc/MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md @@ -0,0 +1,121 @@ +# Paddle Serving中的集成预测 + +(简体中文|[English](MODEL_ENSEMBLE_IN_PADDLE_SERVING.md)) + +在一些场景中,可能使用多个相同输入的模型并行集成预测以获得更好的预测效果,Paddle Serving提供了这项功能。 + +下面将以文本分类任务为例,来展示Paddle Serving的集成预测功能(暂时还是串行预测,我们会尽快支持并行化)。 + +## 集成预测样例 + +该样例中(见下图),Server端在一项服务中并行预测相同输入的BOW和CNN模型,Client端获取两个模型的预测结果并进行后处理,得到最终的预测结果。 + +![simple example](model_ensemble_example.png) + +需要注意的是,目前只支持在同一个服务中使用多个相同格式输入输出的模型。在该例子中,CNN模型和BOW模型的输入输出格式是相同的。 + +样例中用到的代码保存在`python/examples/imdb`路径下: + +```shell +. +├── get_data.sh +├── imdb_reader.py +├── test_ensemble_client.py +└── test_ensemble_server.py +``` + +### 数据准备 + +通过下面命令获取预训练的CNN和BOW模型(您也可以直接运行`get_data.sh`脚本): + +```shell +wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz +tar -zxvf text_classification_data.tar.gz +tar -zxvf imdb_model.tar.gz +``` + +### 启动Server + +通过下面的Python代码启动Server端(您也可以直接运行`test_ensemble_server.py`脚本): + +```python +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) + +server = Server() +server.set_op_graph(op_graph_maker.get_op_graph()) +model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'} +server.load_model_config(model_config) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() +``` + +与普通预测服务不同的是,这里我们需要用DAG来描述Server端的运行逻辑。 + +在创建Op的时候需要指定当前Op的前继(在该例子中,`cnn_infer_op`与`bow_infer_op`的前继均是`read_op`,`response_op`的前继是`cnn_infer_op`和`bow_infer_op`),对于预测Op`infer_op`还需要定义预测引擎名称`engine_name`(也可以使用默认值,建议设置该值方便Client端获取预测结果)。 + +同时在配置模型路径时,需要以预测Op为key,对应的模型路径为value,创建模型配置字典,来告知Serving每个预测Op使用哪个模型。 + +### 启动Client + +通过下面的Python代码运行Client端(您也可以直接运行`test_ensemble_client.py`脚本): + +```python +from paddle_serving_client import Client +from imdb_reader import IMDBDataset + +client = Client() +# If you have more than one model, make sure that the input +# and output of more than one model are the same. +client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt') +client.connect(["127.0.0.1:9393"]) + +# you can define any english sentence or dataset here +# This example reuses imdb reader in training, you +# can define your own data preprocessing easily. +imdb_dataset = IMDBDataset() +imdb_dataset.load_resource('imdb.vocab') + +for i in range(3): + line = 'i am very sad | 0' + word_ids, label = imdb_dataset.get_words_and_label(line) + feed = {"words": word_ids} + fetch = ["acc", "cost", "prediction"] + fetch_maps = client.predict(feed=feed, fetch=fetch) + if len(fetch_maps) == 1: + print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1])) + else: + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map[ + 'prediction'][0][1])) +``` + +Client端与普通预测服务没有发生太大的变化。当使用多个模型预测时,预测服务将返回一个key为Server端定义的引擎名称`engine_name`,value为对应的模型预测结果的字典。 + +### 预期结果 + +```txt +step: 0, model: cnn, res: 0.560272455215 +step: 0, model: bow, res: 0.633530199528 +step: 1, model: cnn, res: 0.560272455215 +step: 1, model: bow, res: 0.633530199528 +step: 2, model: cnn, res: 0.560272455215 +step: 2, model: bow, res: 0.633530199528 +``` diff --git a/doc/NEW_WEB_SERVICE.md b/doc/NEW_WEB_SERVICE.md new file mode 100644 index 0000000000000000000000000000000000000000..63f62a774d914c7271bfed1508881e04f74f2ca8 --- /dev/null +++ b/doc/NEW_WEB_SERVICE.md @@ -0,0 +1,64 @@ +# How to develop a new Web service? + +([简体中文](NEW_WEB_SERVICE_CN.md)|English) + +This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imagenet/image_classification_service.py). + +## WebService base class + +Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows: + +```python +class WebService(object): + + def preprocess(self, feed={}, fetch=[]): + return feed, fetch + def postprocess(self, feed={}, fetch=[], fetch_map=None): + return fetch_map +``` + +### preprocess + +The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`: + +- The value of `feed` is request data `request.json` +- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data + +The return values are the feed and fetch values used in the prediction. + +### postprocess + +The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`: + +- The value of `feed` is request data `request.json` +- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data +- The value of `fetch_map` is the model output value. + +The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request. + +## Develop ImageService class + +```python +class ImageService(WebService): + def preprocess(self, feed={}, fetch=[]): + reader = ImageReader() + if "image" not in feed: + raise ("feed data error!") + if isinstance(feed["image"], list): + feed_batch = [] + for image in feed["image"]: + sample = base64.b64decode(image) + img = reader.process_image(sample) + res_feed = {} + res_feed["image"] = img.reshape(-1) + feed_batch.append(res_feed) + return feed_batch, fetch + else: + sample = base64.b64decode(feed["image"]) + img = reader.process_image(sample) + res_feed = {} + res_feed["image"] = img.reshape(-1) + return res_feed, fetch +``` + +For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction. diff --git a/doc/NEW_WEB_SERVICE_CN.md b/doc/NEW_WEB_SERVICE_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..e1a21d8a0e91a114c9d94b09ef0afa9a0d29de89 --- /dev/null +++ b/doc/NEW_WEB_SERVICE_CN.md @@ -0,0 +1,64 @@ +# 如何开发一个新的Web Service? + +(简体中文|[English](NEW_WEB_SERVICE.md)) + +本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imagenet/image_classification_service.py)查阅完整的代码。 + +## WebService基类 + +Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下: + +```python +class WebService(object): + + def preprocess(self, feed={}, fetch=[]): + return feed, fetch + def postprocess(self, feed={}, fetch=[], fetch_map=None): + return fetch_map +``` + +### preprocess方法 + +preprocess方法有两个输入参数,`feed`和`fetch`。对于一个HTTP请求`request`: + +- `feed`的值为请求数据`request.json` +- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]` + +返回值分别是预测过程中用到的feed和fetch值。 + +### postprocess方法 + +postprocess方法有三个输入参数,`feed`、`fetch`和`fetch_map`: + +- `feed`的值为请求数据`request.json` +- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]` +- `fetch_map`的值为fetch到的模型输出值 + +返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。 + +## 开发ImageService类 + +```python +class ImageService(WebService): + def preprocess(self, feed={}, fetch=[]): + reader = ImageReader() + if "image" not in feed: + raise ("feed data error!") + if isinstance(feed["image"], list): + feed_batch = [] + for image in feed["image"]: + sample = base64.b64decode(image) + img = reader.process_image(sample) + res_feed = {} + res_feed["image"] = img.reshape(-1) + feed_batch.append(res_feed) + return feed_batch, fetch + else: + sample = base64.b64decode(feed["image"]) + img = reader.process_image(sample) + res_feed = {} + res_feed["image"] = img.reshape(-1) + return res_feed, fetch +``` + +对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式。 diff --git a/doc/SERVER_DAG.md b/doc/SERVER_DAG.md index 5a5c851efacc28e5419d262ca671c83ec61e2015..dbf277ccbccc2a06838d65bfbf75e514b4d9a1ed 100644 --- a/doc/SERVER_DAG.md +++ b/doc/SERVER_DAG.md @@ -14,13 +14,19 @@ Deep neural nets often have some preprocessing steps on input data, and postproc ## How to define Node +### Simple series structure + PaddleServing has some predefined Computation Node in the framework. A very commonly used Computation Graph is the simple reader-inference-response mode that can cover most of the single model inference scenarios. A example graph and the corresponding DAG definition code is as follows. +
``` python import paddle_serving_server as serving +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker + op_maker = serving.OpMaker() read_op = op_maker.create('general_reader') general_infer_op = op_maker.create('general_infer') @@ -32,18 +38,54 @@ op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_response_op) ``` +For simple series logic, we simplify it and build it with `OpSeqMaker`. You can determine the successor by default according to the order of joining `OpSeqMaker` without specifying the successor of each node. + Since the code will be commonly used and users do not have to change the code, PaddleServing releases a easy-to-use launching command for service startup. An example is as follows: ``` python python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 ``` +### Nodes with multiple inputs + +An example containing multiple input nodes is given in the [MODEL_ENSEMBLE_IN_PADDLE_SERVING](MODEL_ENSEMBLE_IN_PADDLE_SERVING.md). A example graph and the corresponding DAG definition code is as follows. + +
+ +
+ +```python +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) +``` + +For a graph with multiple input nodes, we need to use `OpGraphMaker` to build it, and you must give the predecessor of each node. + ## More Examples If a user has sparse features as inputs, and the model will do embedding lookup for each feature, we can do distributed embedding lookup operation which is not in the Paddle training computation graph. An example is as follows: ``` python import paddle_serving_server as serving +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker + op_maker = serving.OpMaker() read_op = op_maker.create('general_reader') dist_kv_op = op_maker.create('general_dist_kv') diff --git a/doc/SERVER_DAG_CN.md b/doc/SERVER_DAG_CN.md index 3bf42ef8e3fbcb8c509a69bfe6aea12f78dc4567..80d01f0287c5f721f093e96c7bcd1827f0601496 100644 --- a/doc/SERVER_DAG_CN.md +++ b/doc/SERVER_DAG_CN.md @@ -14,6 +14,8 @@ ## 如何定义节点 +### 简单的串联结构 + PaddleServing在框架中具有一些预定义的计算节点。 一种非常常用的计算图是简单的reader-infer-response模式,可以涵盖大多数单一模型推理方案。 示例图和相应的DAG定义代码如下。
@@ -21,6 +23,9 @@ PaddleServing在框架中具有一些预定义的计算节点。 一种非常常 ``` python import paddle_serving_server as serving +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker + op_maker = serving.OpMaker() read_op = op_maker.create('general_reader') general_infer_op = op_maker.create('general_infer') @@ -32,18 +37,54 @@ op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_response_op) ``` +对于简单的串联逻辑,我们将其简化为`Sequence`,使用`OpSeqMaker`进行构建。用户可以不指定每个节点的前继,默认按加入`OpSeqMaker`的顺序来确定前继。 + 由于该代码在大多数情况下都会被使用,并且用户不必更改代码,因此PaddleServing会发布一个易于使用的启动命令来启动服务。 示例如下: ``` python python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 ``` +### 包含多个输入的节点 + +在[Paddle Serving中的集成预测](MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md)文档中给出了一个包含多个输入节点的样例,示意图和代码如下。 + +
+ +
+ +```python +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) +``` + +对于含有多输入节点的计算图,需要使用`OpGraphMaker`来构建,同时必须给出每个节点的前继。 + ## 更多示例 如果用户将稀疏特征作为输入,并且模型将对每个特征进行嵌入查找,则我们可以进行分布式嵌入查找操作,该操作不在Paddle训练计算图中。 示例如下: ``` python import paddle_serving_server as serving +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker + op_maker = serving.OpMaker() read_op = op_maker.create('general_reader') dist_kv_op = op_maker.create('general_dist_kv') diff --git a/doc/complex_dag.png b/doc/complex_dag.png new file mode 100644 index 0000000000000000000000000000000000000000..4e844d9fc3915579ec44bb981e9e2bfc3e4f7675 Binary files /dev/null and b/doc/complex_dag.png differ diff --git a/doc/model_ensemble_example.png b/doc/model_ensemble_example.png new file mode 100644 index 0000000000000000000000000000000000000000..823e91ee9ea6e2b10c3bd2c0ca119f088582c685 Binary files /dev/null and b/doc/model_ensemble_example.png differ diff --git a/python/examples/faster_rcnn_model/000000570688.jpg b/python/examples/faster_rcnn_model/000000570688.jpg new file mode 100755 index 0000000000000000000000000000000000000000..cb304bd56c4010c08611a30dcca58ea9140cea54 Binary files /dev/null and b/python/examples/faster_rcnn_model/000000570688.jpg differ diff --git a/python/examples/faster_rcnn_model/000000570688_bbox.jpg b/python/examples/faster_rcnn_model/000000570688_bbox.jpg new file mode 100644 index 0000000000000000000000000000000000000000..61bc11c02c92b92cffac91a6c3533a90a45c4e14 Binary files /dev/null and b/python/examples/faster_rcnn_model/000000570688_bbox.jpg differ diff --git a/python/examples/faster_rcnn_model/README.md b/python/examples/faster_rcnn_model/README.md new file mode 100644 index 0000000000000000000000000000000000000000..aca6e5183daf9a46096587fab8276b4e7346f746 --- /dev/null +++ b/python/examples/faster_rcnn_model/README.md @@ -0,0 +1,70 @@ +# Faster RCNN model on Paddle Serving + +([简体中文](./README_CN.md)|English) + +This article requires [Paddle Detection](https://github.com/PaddlePaddle/PaddleDetection) trained models and configuration files. If users want to quickly deploy on Paddle Serving, please read the Chapter 2 directly. + +## 1. Train an object detection model + +Users can read [Paddle Detection Getting Started](https://github.com/PaddlePaddle/PaddleDetection/blob/release/0.2/docs/tutorials/GETTING_STARTED_cn.md) to understand the background of Paddle Detection. The purpose of PaddleDetection is to provide a rich and easy-to-use object detection model for industry and academia. Not only is it superior in performance and easy to deploy, but it can also flexibly meet the needs of algorithm research. + +### Environmental requirements + +CPU version: No special requirements + +GPU version: CUDA 9.0 and above + +``` +git clone https://github.com/PaddlePaddle/PaddleDetection +cd PaddleDetection +``` +Next, you can train the faster rcnn model +``` +python tools/train.py -c configs/faster_rcnn_r50_1x.yml +``` +The time for training the model depends on the situation and is related to the computing power of the training equipment and the number of iterations. +In the training process, `faster_rcnn_r50_1x.yml` defines the snapshot of the saved model. After the final training, the model with the best effect will be saved as `best_model.pdmodel`, which is a compressed PaddleDetection Exclusive model files. + +**If we want the model to be used by Paddle Serving, we must do export_model.** + +Output model +``` +python export_model.py +``` +## 2. Start the model and predict +If users do not use the Paddle Detection project to train models, we are here to provide you with sample model downloads. If you trained the model with Paddle Detection, you can skip the ** Download Model ** section. + +### Download model +``` +wget https://paddle-serving.bj.bcebos.com/pddet_demo/faster_rcnn_model.tar.gz +wget https://paddle-serving.bj.bcebos.com/pddet_demo/paddle_serving_app-0.0.1-py2-none-any.whl +wget https://paddle-serving.bj.bcebos.com/pddet_demo/infer_cfg.yml +tar xf faster_rcnn_model.tar.gz +mv faster_rcnn_model/pddet *. +``` + +### Start the service +``` +GLOG_v = 2 python -m paddle_serving_server_gpu.serve --model pddet_serving_model --port 9494 --gpu_id 0 +``` + +### Perform prediction +``` +python test_client.py --config_path = infer_cfg.yml --infer_img = 000000570688.jpg --dump_result --visualize +``` + +## 3. Result analysis +

+    
+ +    
+

+This is the input picture +   +

+    
+ +    
+

+ +This is the picture after adding bbox. You can see that the client has done post-processing for the picture. In addition, the output/bbox.json also has the number and coordinate information of each box. diff --git a/python/examples/faster_rcnn_model/README_CN.md b/python/examples/faster_rcnn_model/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..a1ac36ff93f5d75a4d8874b89f3cb1509589c4d0 --- /dev/null +++ b/python/examples/faster_rcnn_model/README_CN.md @@ -0,0 +1,70 @@ +# Faster RCNN模型 + +(简体中文|[English](./README.md)) + +本文需要[Paddle Detection](https://github.com/PaddlePaddle/PaddleDetection)训练的模型和配置文件。如果用户想要快速部署在Paddle Serving上,请直接阅读第二章节。 + +## 1. 训练物体检测模型 + +用户可以阅读 [Paddle Detection入门使用](https://github.com/PaddlePaddle/PaddleDetection/blob/release/0.2/docs/tutorials/GETTING_STARTED_cn.md)来了解Paddle Detection的背景。PaddleDetection的目的是为工业界和学术界提供丰富、易用的目标检测模型。不仅性能优越、易于部署,而且能够灵活的满足算法研究的需求。 + +### 环境要求 + +CPU版: 没有特别要求 + +GPU版: CUDA 9.0及以上 + +``` +git clone https://github.com/PaddlePaddle/PaddleDetection +cd PaddleDetection +``` +接下来可以训练faster rcnn模型 +``` +python tools/train.py -c configs/faster_rcnn_r50_1x.yml +``` +训练模型的时间视情况而定,与训练的设备算力和迭代轮数相关。 +在训练的过程中,`faster_rcnn_r50_1x.yml`当中定义了保存模型的`snapshot`,在最终训练完成后,效果最好的模型,会被保存为`best_model.pdmodel`,这是一个经过压缩的PaddleDetection的专属模型文件。 + +**如果我们要让模型可被Paddle Serving所使用,必须做export_model。** + +输出模型 +``` +python export_model.py +``` + +## 2. 启动模型并预测 +如果用户没有用Paddle Detection项目训练模型,我们也在此为您提供示例模型下载。如果您用Paddle Detection训练了模型,可以跳过 **下载模型** 部分。 + +### 下载模型 +``` +wget https://paddle-serving.bj.bcebos.com/pddet_demo/faster_rcnn_model.tar.gz +wget https://paddle-serving.bj.bcebos.com/pddet_demo/paddle_serving_app-0.0.1-py2-none-any.whl +wget https://paddle-serving.bj.bcebos.com/pddet_demo/infer_cfg.yml +tar xf faster_rcnn_model.tar.gz +mv faster_rcnn_model/pddet* . +``` + +### 启动服务 +``` +GLOG_v=2 python -m paddle_serving_server_gpu.serve --model pddet_serving_model --port 9494 --gpu_id 0 +``` + +### 执行预测 +``` +python test_client.py --config_path=infer_cfg.yml --infer_img=000000570688.jpg --dump_result --visualize +``` + +## 3. 结果分析 +

+
+ +
+

+这是输入图片 + +

+
+ +
+

+这是实现添加了bbox之后的图片,可以看到客户端已经为图片做好了后处理,此外在output/bbox.json也有各个框的编号和坐标信息。 diff --git a/python/examples/faster_rcnn_model/test_client.py b/python/examples/faster_rcnn_model/test_client.py new file mode 100755 index 0000000000000000000000000000000000000000..ae2e5b8f6e961d965555d8f268f38be14c0263d0 --- /dev/null +++ b/python/examples/faster_rcnn_model/test_client.py @@ -0,0 +1,33 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from paddle_serving_client import Client +import sys +import os +import time +from paddle_serving_app.reader.pddet import Detection +import numpy as np + +py_version = sys.version_info[0] + +feed_var_names = ['image', 'im_shape', 'im_info'] +fetch_var_names = ['multiclass_nms'] +pddet = Detection(config_path=sys.argv[2], output_dir="./output") +feed_dict = pddet.preprocess(feed_var_names, sys.argv[3]) +client = Client() +client.load_client_config(sys.argv[1]) +client.connect(['127.0.0.1:9494']) +fetch_map = client.predict(feed=feed_dict, fetch=fetch_var_names) +outs = fetch_map.values() +pddet.postprocess(fetch_map, fetch_var_names) diff --git a/python/examples/imdb/test_ensemble_client.py b/python/examples/imdb/test_ensemble_client.py new file mode 100644 index 0000000000000000000000000000000000000000..6cafb3389fff5a25103bcb2b3a867b73b35b9e8e --- /dev/null +++ b/python/examples/imdb/test_ensemble_client.py @@ -0,0 +1,42 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import Client +from imdb_reader import IMDBDataset + +client = Client() +# If you have more than one model, make sure that the input +# and output of more than one model are the same. +client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt') +client.connect(["127.0.0.1:9393"]) + +# you can define any english sentence or dataset here +# This example reuses imdb reader in training, you +# can define your own data preprocessing easily. +imdb_dataset = IMDBDataset() +imdb_dataset.load_resource('imdb.vocab') + +for i in range(3): + line = 'i am very sad | 0' + word_ids, label = imdb_dataset.get_words_and_label(line) + feed = {"words": word_ids} + fetch = ["acc", "cost", "prediction"] + fetch_maps = client.predict(feed=feed, fetch=fetch) + if len(fetch_maps) == 1: + print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1])) + else: + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map[ + 'prediction'][0][1])) diff --git a/python/examples/imdb/test_ensemble_server.py b/python/examples/imdb/test_ensemble_server.py new file mode 100644 index 0000000000000000000000000000000000000000..464288a0a167d8487f787d12c4b44a138da86f88 --- /dev/null +++ b/python/examples/imdb/test_ensemble_server.py @@ -0,0 +1,40 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) + +server = Server() +server.set_op_graph(op_graph_maker.get_op_graph()) +model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'} +server.load_model_config(model_config) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/python/paddle_serving_app/reader/pddet/__init__.py b/python/paddle_serving_app/reader/pddet/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..c4e356387b5855cb60606f7bdebe7d8c6d091814 --- /dev/null +++ b/python/paddle_serving_app/reader/pddet/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import argparse +from .image_tool import Resize, Detection diff --git a/python/paddle_serving_app/reader/pddet/image_tool.py b/python/paddle_serving_app/reader/pddet/image_tool.py new file mode 100644 index 0000000000000000000000000000000000000000..4b461bc491a90d25a3259cf6db806beae6dbf593 --- /dev/null +++ b/python/paddle_serving_app/reader/pddet/image_tool.py @@ -0,0 +1,620 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +import numpy as np +from PIL import Image, ImageDraw +import cv2 +import yaml +import copy +import argparse +import logging +import paddle.fluid as fluid +import json + +FORMAT = '%(asctime)s-%(levelname)s: %(message)s' +logging.basicConfig(level=logging.INFO, format=FORMAT) +logger = logging.getLogger(__name__) + +precision_map = { + 'trt_int8': fluid.core.AnalysisConfig.Precision.Int8, + 'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32, + 'trt_fp16': fluid.core.AnalysisConfig.Precision.Half +} + + +class Resize(object): + def __init__(self, + target_size, + max_size=0, + interp=cv2.INTER_LINEAR, + use_cv2=True, + image_shape=None): + super(Resize, self).__init__() + self.target_size = target_size + self.max_size = max_size + self.interp = interp + self.use_cv2 = use_cv2 + self.image_shape = image_shape + + def __call__(self, im): + origin_shape = im.shape[:2] + im_c = im.shape[2] + if self.max_size != 0: + im_size_min = np.min(origin_shape[0:2]) + im_size_max = np.max(origin_shape[0:2]) + im_scale = float(self.target_size) / float(im_size_min) + if np.round(im_scale * im_size_max) > self.max_size: + im_scale = float(self.max_size) / float(im_size_max) + im_scale_x = im_scale + im_scale_y = im_scale + resize_w = int(im_scale_x * float(origin_shape[1])) + resize_h = int(im_scale_y * float(origin_shape[0])) + else: + im_scale_x = float(self.target_size) / float(origin_shape[1]) + im_scale_y = float(self.target_size) / float(origin_shape[0]) + resize_w = self.target_size + resize_h = self.target_size + if self.use_cv2: + im = cv2.resize( + im, + None, + None, + fx=im_scale_x, + fy=im_scale_y, + interpolation=self.interp) + else: + if self.max_size != 0: + raise TypeError( + 'If you set max_size to cap the maximum size of image,' + 'please set use_cv2 to True to resize the image.') + im = im.astype('uint8') + im = Image.fromarray(im) + im = im.resize((int(resize_w), int(resize_h)), self.interp) + im = np.array(im) + # padding im + if self.max_size != 0 and self.image_shape is not None: + padding_im = np.zeros( + (self.max_size, self.max_size, im_c), dtype=np.float32) + im_h, im_w = im.shape[:2] + padding_im[:im_h, :im_w, :] = im + im = padding_im + return im, im_scale_x + + +class Normalize(object): + def __init__(self, mean, std, is_scale=True, is_channel_first=False): + super(Normalize, self).__init__() + self.mean = mean + self.std = std + self.is_scale = is_scale + self.is_channel_first = is_channel_first + + def __call__(self, im): + im = im.astype(np.float32, copy=False) + if self.is_channel_first: + mean = np.array(self.mean)[:, np.newaxis, np.newaxis] + std = np.array(self.std)[:, np.newaxis, np.newaxis] + else: + mean = np.array(self.mean)[np.newaxis, np.newaxis, :] + std = np.array(self.std)[np.newaxis, np.newaxis, :] + if self.is_scale: + im = im / 255.0 + im -= mean + im /= std + return im + + +class Permute(object): + def __init__(self, to_bgr=False, channel_first=True): + self.to_bgr = to_bgr + self.channel_first = channel_first + + def __call__(self, im): + if self.channel_first: + im = im.transpose((2, 0, 1)) + if self.to_bgr: + im = im[[2, 1, 0], :, :] + return im.copy() + + +class PadStride(object): + def __init__(self, stride=0): + assert stride >= 0, "Unsupported stride: {}," + " the stride in PadStride must be greater " + "or equal to 0".format(stride) + self.coarsest_stride = stride + + def __call__(self, im): + coarsest_stride = self.coarsest_stride + if coarsest_stride == 0: + return im + im_c, im_h, im_w = im.shape + pad_h = int(np.ceil(float(im_h) / coarsest_stride) * coarsest_stride) + pad_w = int(np.ceil(float(im_w) / coarsest_stride) * coarsest_stride) + padding_im = np.zeros((im_c, pad_h, pad_w), dtype=np.float32) + padding_im[:, :im_h, :im_w] = im + return padding_im + + +class Detection(): + def __init__(self, config_path, output_dir): + self.config_path = config_path + self.if_visualize = True + self.if_dump_result = True + self.output_dir = output_dir + + def DecodeImage(self, im_path): + assert os.path.exists(im_path), "Image path {} can not be found".format( + im_path) + with open(im_path, 'rb') as f: + im = f.read() + data = np.frombuffer(im, dtype='uint8') + im = cv2.imdecode(data, 1) # BGR mode, but need RGB mode + im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) + return im + + def Preprocess(self, img_path, arch, config): + img = self.DecodeImage(img_path) + orig_shape = img.shape + scale = 1. + data = [] + data_config = copy.deepcopy(config) + for data_aug_conf in data_config: + obj = data_aug_conf.pop('type') + preprocess = eval(obj)(**data_aug_conf) + if obj == 'Resize': + img, scale = preprocess(img) + else: + img = preprocess(img) + + img = img[np.newaxis, :] # N, C, H, W + data.append(img) + extra_info = self.get_extra_info(img, arch, orig_shape, scale) + data += extra_info + return data + + def expand_boxes(self, boxes, scale): + """ + Expand an array of boxes by a given scale. + """ + w_half = (boxes[:, 2] - boxes[:, 0]) * .5 + h_half = (boxes[:, 3] - boxes[:, 1]) * .5 + x_c = (boxes[:, 2] + boxes[:, 0]) * .5 + y_c = (boxes[:, 3] + boxes[:, 1]) * .5 + + w_half *= scale + h_half *= scale + + boxes_exp = np.zeros(boxes.shape) + boxes_exp[:, 0] = x_c - w_half + boxes_exp[:, 2] = x_c + w_half + boxes_exp[:, 1] = y_c - h_half + boxes_exp[:, 3] = y_c + h_half + + return boxes_exp + + def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5): + import pycocotools.mask as mask_util + scale = (resolution + 2.0) / resolution + + segm_res = [] + + for t in results: + bboxes = t['bbox'][0] + lengths = t['bbox'][1][0] + if bboxes.shape == (1, 1) or bboxes is None: + continue + if len(bboxes.tolist()) == 0: + continue + masks = t['mask'][0] + + s = 0 + # for each sample + for i in range(len(lengths)): + num = lengths[i] + im_shape = t['im_shape'][i] + + bbox = bboxes[s:s + num][:, 2:] + clsid_scores = bboxes[s:s + num][:, 0:2] + mask = masks[s:s + num] + s += num + + im_h = int(im_shape[0]) + im_w = int(im_shape[1]) + + expand_bbox = expand_boxes(bbox, scale) + expand_bbox = expand_bbox.astype(np.int32) + + padded_mask = np.zeros( + (resolution + 2, resolution + 2), dtype=np.float32) + + for j in range(num): + xmin, ymin, xmax, ymax = expand_bbox[j].tolist() + clsid, score = clsid_scores[j].tolist() + clsid = int(clsid) + padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :] + + catid = clsid2catid[clsid] + + w = xmax - xmin + 1 + h = ymax - ymin + 1 + w = np.maximum(w, 1) + h = np.maximum(h, 1) + + resized_mask = cv2.resize(padded_mask, (w, h)) + resized_mask = np.array( + resized_mask > thresh_binarize, dtype=np.uint8) + im_mask = np.zeros((im_h, im_w), dtype=np.uint8) + + x0 = min(max(xmin, 0), im_w) + x1 = min(max(xmax + 1, 0), im_w) + y0 = min(max(ymin, 0), im_h) + y1 = min(max(ymax + 1, 0), im_h) + + im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):( + y1 - ymin), (x0 - xmin):(x1 - xmin)] + segm = mask_util.encode( + np.array( + im_mask[:, :, np.newaxis], order='F'))[0] + catid = clsid2catid[clsid] + segm['counts'] = segm['counts'].decode('utf8') + coco_res = { + 'category_id': catid, + 'segmentation': segm, + 'score': score + } + segm_res.append(coco_res) + return segm_res + + def draw_bbox(self, image, catid2name, bboxes, threshold, color_list): + """ + draw bbox on image + """ + draw = ImageDraw.Draw(image) + + for dt in np.array(bboxes): + catid, bbox, score = dt['category_id'], dt['bbox'], dt['score'] + if score < threshold: + continue + + xmin, ymin, w, h = bbox + xmax = xmin + w + ymax = ymin + h + + color = tuple(color_list[catid]) + + # draw bbox + draw.line( + [(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin), + (xmin, ymin)], + width=2, + fill=color) + + # draw label + text = "{} {:.2f}".format(catid2name[catid], score) + tw, th = draw.textsize(text) + draw.rectangle( + [(xmin + 1, ymin - th), (xmin + tw + 1, ymin)], fill=color) + draw.text((xmin + 1, ymin - th), text, fill=(255, 255, 255)) + + return image + + def draw_mask(self, image, masks, threshold, color_list, alpha=0.7): + """ + Draw mask on image + """ + mask_color_id = 0 + w_ratio = .4 + img_array = np.array(image).astype('float32') + for dt in np.array(masks): + segm, score = dt['segmentation'], dt['score'] + if score < threshold: + continue + import pycocotools.mask as mask_util + mask = mask_util.decode(segm) * 255 + color_mask = color_list[mask_color_id % len(color_list), 0:3] + mask_color_id += 1 + for c in range(3): + color_mask[c] = color_mask[c] * (1 - w_ratio) + w_ratio * 255 + idx = np.nonzero(mask) + img_array[idx[0], idx[1], :] *= 1.0 - alpha + img_array[idx[0], idx[1], :] += alpha * color_mask + return Image.fromarray(img_array.astype('uint8')) + + def get_extra_info(self, im, arch, shape, scale): + info = [] + input_shape = [] + im_shape = [] + logger.info('The architecture is {}'.format(arch)) + if 'YOLO' in arch: + im_size = np.array([shape[:2]]).astype('int32') + logger.info('Extra info: im_size') + info.append(im_size) + elif 'SSD' in arch: + im_shape = np.array([shape[:2]]).astype('int32') + logger.info('Extra info: im_shape') + info.append([im_shape]) + elif 'RetinaNet' in arch: + input_shape.extend(im.shape[2:]) + im_info = np.array([input_shape + [scale]]).astype('float32') + logger.info('Extra info: im_info') + info.append(im_info) + elif 'RCNN' in arch: + input_shape.extend(im.shape[2:]) + im_shape.extend(shape[:2]) + im_info = np.array([input_shape + [scale]]).astype('float32') + im_shape = np.array([im_shape + [1.]]).astype('float32') + logger.info('Extra info: im_info, im_shape') + info.append(im_info) + info.append(im_shape) + else: + logger.error( + "Unsupported arch: {}, expect YOLO, SSD, RetinaNet and RCNN". + format(arch)) + return info + + def offset_to_lengths(self, lod): + offset = lod[0] + lengths = [offset[i + 1] - offset[i] for i in range(len(offset) - 1)] + return [lengths] + + def bbox2out(self, results, clsid2catid, is_bbox_normalized=False): + """ + Args: + results: request a dict, should include: `bbox`, `im_id`, + if is_bbox_normalized=True, also need `im_shape`. + clsid2catid: class id to category id map of COCO2017 dataset. + is_bbox_normalized: whether or not bbox is normalized. + """ + xywh_res = [] + for t in results: + bboxes = t['bbox'][0] + lengths = t['bbox'][1][0] + if bboxes.shape == (1, 1) or bboxes is None: + continue + + k = 0 + for i in range(len(lengths)): + num = lengths[i] + for j in range(num): + dt = bboxes[k] + clsid, score, xmin, ymin, xmax, ymax = dt.tolist() + catid = (clsid2catid[int(clsid)]) + + if is_bbox_normalized: + xmin, ymin, xmax, ymax = \ + self.clip_bbox([xmin, ymin, xmax, ymax]) + w = xmax - xmin + h = ymax - ymin + im_shape = t['im_shape'][0][i].tolist() + im_height, im_width = int(im_shape[0]), int(im_shape[1]) + xmin *= im_width + ymin *= im_height + w *= im_width + h *= im_height + else: + w = xmax - xmin + 1 + h = ymax - ymin + 1 + + bbox = [xmin, ymin, w, h] + coco_res = { + 'category_id': catid, + 'bbox': bbox, + 'score': score + } + xywh_res.append(coco_res) + k += 1 + return xywh_res + + def get_bbox_result(self, fetch_map, fetch_name, result, conf, clsid2catid): + is_bbox_normalized = True if 'SSD' in conf['arch'] else False + output = fetch_map[fetch_name] + lod = [fetch_map[fetch_name + '.lod']] + lengths = self.offset_to_lengths(lod) + np_data = np.array(output) + result['bbox'] = (np_data, lengths) + result['im_id'] = np.array([[0]]) + + bbox_results = self.bbox2out([result], clsid2catid, is_bbox_normalized) + return bbox_results + + def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5): + import pycocotools.mask as mask_util + scale = (resolution + 2.0) / resolution + + segm_res = [] + + for t in results: + bboxes = t['bbox'][0] + lengths = t['bbox'][1][0] + if bboxes.shape == (1, 1) or bboxes is None: + continue + if len(bboxes.tolist()) == 0: + continue + masks = t['mask'][0] + + s = 0 + # for each sample + for i in range(len(lengths)): + num = lengths[i] + im_shape = t['im_shape'][i] + + bbox = bboxes[s:s + num][:, 2:] + clsid_scores = bboxes[s:s + num][:, 0:2] + mask = masks[s:s + num] + s += num + + im_h = int(im_shape[0]) + im_w = int(im_shape[1]) + + expand_bbox = expand_boxes(bbox, scale) + expand_bbox = expand_bbox.astype(np.int32) + + padded_mask = np.zeros( + (resolution + 2, resolution + 2), dtype=np.float32) + + for j in range(num): + xmin, ymin, xmax, ymax = expand_bbox[j].tolist() + clsid, score = clsid_scores[j].tolist() + clsid = int(clsid) + padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :] + + catid = clsid2catid[clsid] + + w = xmax - xmin + 1 + h = ymax - ymin + 1 + w = np.maximum(w, 1) + h = np.maximum(h, 1) + + resized_mask = cv2.resize(padded_mask, (w, h)) + resized_mask = np.array( + resized_mask > thresh_binarize, dtype=np.uint8) + im_mask = np.zeros((im_h, im_w), dtype=np.uint8) + + x0 = min(max(xmin, 0), im_w) + x1 = min(max(xmax + 1, 0), im_w) + y0 = min(max(ymin, 0), im_h) + y1 = min(max(ymax + 1, 0), im_h) + + im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):( + y1 - ymin), (x0 - xmin):(x1 - xmin)] + segm = mask_util.encode( + np.array( + im_mask[:, :, np.newaxis], order='F'))[0] + catid = clsid2catid[clsid] + segm['counts'] = segm['counts'].decode('utf8') + coco_res = { + 'category_id': catid, + 'segmentation': segm, + 'score': score + } + segm_res.append(coco_res) + return segm_res + + def get_mask_result(self, fetch_map, fetch_var_names, result, conf, + clsid2catid): + resolution = conf['mask_resolution'] + bbox_out, mask_out = fetch_map[fetch_var_names] + lengths = self.offset_to_lengths(bbox_out.lod()) + bbox = np.array(bbox_out) + mask = np.array(mask_out) + result['bbox'] = (bbox, lengths) + result['mask'] = (mask, lengths) + mask_results = self.mask2out([result], clsid2catid, + conf['mask_resolution']) + return mask_results + + def get_category_info(self, with_background, label_list): + if label_list[0] != 'background' and with_background: + label_list.insert(0, 'background') + if label_list[0] == 'background' and not with_background: + label_list = label_list[1:] + clsid2catid = {i: i for i in range(len(label_list))} + catid2name = {i: name for i, name in enumerate(label_list)} + return clsid2catid, catid2name + + def color_map(self, num_classes): + color_map = num_classes * [0, 0, 0] + for i in range(0, num_classes): + j = 0 + lab = i + while lab: + color_map[i * 3] |= (((lab >> 0) & 1) << (7 - j)) + color_map[i * 3 + 1] |= (((lab >> 1) & 1) << (7 - j)) + color_map[i * 3 + 2] |= (((lab >> 2) & 1) << (7 - j)) + j += 1 + lab >>= 3 + color_map = np.array(color_map).reshape(-1, 3) + return color_map + + def visualize(self, + bbox_results, + catid2name, + num_classes, + mask_results=None): + image = Image.open(self.infer_img).convert('RGB') + color_list = self.color_map(num_classes) + image = self.draw_bbox(image, catid2name, bbox_results, 0.5, color_list) + if mask_results is not None: + image = self.draw_mask(image, mask_results, 0.5, color_list) + image_path = os.path.split(self.infer_img)[-1] + if not os.path.exists(self.output_dir): + os.makedirs(self.output_dir) + out_path = os.path.join(self.output_dir, image_path) + image.save(out_path, quality=95) + logger.info('Save visualize result to {}'.format(out_path)) + + def preprocess(self, feed_var_names, image_file): + self.infer_img = image_file + config_path = self.config_path + res = {} + assert config_path is not None, "Config path: {} des not exist!".format( + model_path) + with open(config_path) as f: + conf = yaml.safe_load(f) + + img_data = self.Preprocess(image_file, conf['arch'], conf['Preprocess']) + if 'SSD' in conf['arch']: + img_data, res['im_shape'] = img_data + img_data = [img_data] + if len(feed_var_names) != len(img_data): + raise ValueError( + 'the length of feed vars does not equals the length of preprocess of img data, please check your feed dict' + ) + + def processImg(v): + np_data = np.array(v[0]) + res = np_data + return res + + feed_dict = {k: processImg(v) for k, v in zip(feed_var_names, img_data)} + return feed_dict + + def postprocess(self, fetch_map, fetch_var_names): + config_path = self.config_path + res = {} + with open(config_path) as f: + conf = yaml.safe_load(f) + if 'SSD' in conf['arch']: + img_data, res['im_shape'] = img_data + img_data = [img_data] + clsid2catid, catid2name = self.get_category_info( + conf['with_background'], conf['label_list']) + bbox_result = self.get_bbox_result(fetch_map, fetch_var_names[0], res, + conf, clsid2catid) + mask_result = None + if 'mask_resolution' in conf: + res['im_shape'] = img_data[-1] + mask_result = self.get_mask_result(fetch_map, fetch_var_names, res, + conf, clsid2catid) + if self.if_visualize: + if os.path.isdir(self.output_dir) is False: + os.mkdir(self.output_dir) + self.visualize(bbox_result, catid2name, + len(conf['label_list']), mask_result) + if self.if_dump_result: + if os.path.isdir(self.output_dir) is False: + os.mkdir(self.output_dir) + bbox_file = os.path.join(self.output_dir, 'bbox.json') + logger.info('dump bbox to {}'.format(bbox_file)) + with open(bbox_file, 'w') as f: + json.dump(bbox_result, f, indent=4) + if mask_result is not None: + mask_file = os.path.join(flags.output_dir, 'mask.json') + logger.info('dump mask to {}'.format(mask_file)) + with open(mask_file, 'w') as f: + json.dump(mask_result, f, indent=4) diff --git a/python/paddle_serving_client/__init__.py b/python/paddle_serving_client/__init__.py index 98d233f059a1ad0b588bce5bf3ef831d783c3a44..053062ee508b33e7602dea5a53b4868a662452cd 100644 --- a/python/paddle_serving_client/__init__.py +++ b/python/paddle_serving_client/__init__.py @@ -264,28 +264,46 @@ class Client(object): if res == -1: return None - result_map_batch = [] - result_map = {} - # result map needs to be a numpy array - for i, name in enumerate(fetch_names): - if self.fetch_names_to_type_[name] == int_type: - result_map[name] = result_batch.get_int64_by_name(name) - shape = result_batch.get_shape(name) - result_map[name] = np.array(result_map[name]) - result_map[name].shape = shape - if name in self.lod_tensor_set: - result_map["{}.lod".format(name)] = result_batch.get_lod( - name) - elif self.fetch_names_to_type_[name] == float_type: - result_map[name] = result_batch.get_float_by_name(name) - shape = result_batch.get_shape(name) - result_map[name] = np.array(result_map[name]) - result_map[name].shape = shape - if name in self.lod_tensor_set: - result_map["{}.lod".format(name)] = result_batch.get_lod( - name) - - return result_map + multi_result_map = [] + model_engine_names = result_batch.get_engine_names() + for mi, engine_name in enumerate(model_engine_names): + result_map = {} + # result map needs to be a numpy array + for i, name in enumerate(fetch_names): + if self.fetch_names_to_type_[name] == int_type: + result_map[name] = result_batch.get_int64_by_name(mi, name) + shape = result_batch.get_shape(mi, name) + result_map[name] = np.array(result_map[name], dtype='int64') + result_map[name].shape = shape + if name in self.lod_tensor_set: + result_map["{}.lod".format( + name)] = result_batch.get_lod(mi, name) + elif self.fetch_names_to_type_[name] == float_type: + result_map[name] = result_batch.get_float_by_name(mi, name) + shape = result_batch.get_shape(mi, name) + result_map[name] = np.array( + result_map[name], dtype='float32') + result_map[name].shape = shape + if name in self.lod_tensor_set: + result_map["{}.lod".format( + name)] = result_batch.get_lod(mi, name) + multi_result_map.append(result_map) + + ret = None + if len(model_engine_names) == 1: + # If only one model result is returned, the format of ret is result_map + ret = multi_result_map[0] + else: + # If multiple model results are returned, the format of ret is {name: result_map} + ret = { + engine_name: multi_result_map[mi] + for mi, engine_name in enumerate(model_engine_names) + } + + # When using the A/B test, the tag of variant needs to be returned + return ret if not need_variant_tag else [ + ret, self.result_handle_.variant_tag() + ] def release(self): self.client_handle_.destroy_predictor() diff --git a/python/paddle_serving_client/io/__init__.py b/python/paddle_serving_client/io/__init__.py index d723795f214e22957bff49f0ddf8fd42086b8a7e..74a6ca871b5c1e32b3c1ecbc6656c95d7c78a399 100644 --- a/python/paddle_serving_client/io/__init__.py +++ b/python/paddle_serving_client/io/__init__.py @@ -20,6 +20,7 @@ from paddle.fluid.framework import default_main_program from paddle.fluid.framework import Program from paddle.fluid import CPUPlace from paddle.fluid.io import save_inference_model +import paddle.fluid as fluid from ..proto import general_model_config_pb2 as model_conf import os @@ -100,3 +101,20 @@ def save_model(server_model_folder, with open("{}/serving_server_conf.stream.prototxt".format( server_model_folder), "wb") as fout: fout.write(config.SerializeToString()) + + +def inference_model_to_serving(infer_model, serving_client, serving_server): + place = fluid.CPUPlace() + exe = fluid.Executor(place) + inference_program, feed_target_names, fetch_targets = \ + fluid.io.load_inference_model(dirname=infer_model, executor=exe) + feed_dict = { + x: inference_program.global_block().var(x) + for x in feed_target_names + } + fetch_dict = {x.name: x for x in fetch_targets} + save_model(serving_client, serving_server, feed_dict, fetch_dict, + inference_program) + feed_names = feed_dict.keys() + fetch_names = fetch_dict.keys() + return feed_names, fetch_names diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 68a3b424cd9bda861ddf0b54ac42bf356c3de310..a58fb11ac3ee1fbe5086ae4381f6d6208c0c73ec 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=doc-string-missing import os from .proto import server_configure_pb2 as server_sdk @@ -21,6 +22,7 @@ import socket import paddle_serving_server as paddle_serving_server from .version import serving_server_version from contextlib import closing +import collections class OpMaker(object): @@ -36,17 +38,35 @@ class OpMaker(object): "general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp", "general_copy": "GeneralCopyOp" } + self.node_name_suffix_ = collections.defaultdict(int) - # currently, inputs and outputs are not used - # when we have OpGraphMaker, inputs and outputs are necessary - def create(self, name, inputs=[], outputs=[]): - if name not in self.op_dict: - raise Exception("Op name {} is not supported right now".format( - name)) + def create(self, node_type, engine_name=None, inputs=[], outputs=[]): + if node_type not in self.op_dict: + raise Exception("Op type {} is not supported right now".format( + node_type)) node = server_sdk.DAGNode() - node.name = "{}_op".format(name) - node.type = self.op_dict[name] - return node + # node.name will be used as the infer engine name + if engine_name: + node.name = engine_name + else: + node.name = '{}_{}'.format(node_type, + self.node_name_suffix_[node_type]) + self.node_name_suffix_[node_type] += 1 + + node.type = self.op_dict[node_type] + if inputs: + for dep_node_str in inputs: + dep_node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(dep_node_str, dep_node) + dep = server_sdk.DAGNodeDependency() + dep.name = dep_node.name + dep.mode = "RO" + node.dependencies.extend([dep]) + # Because the return value will be used as the key value of the + # dict, and the proto object is variable which cannot be hashed, + # so it is processed into a string. This has little effect on + # overall efficiency. + return google.protobuf.text_format.MessageToString(node) class OpSeqMaker(object): @@ -55,12 +75,25 @@ class OpSeqMaker(object): self.workflow.name = "workflow1" self.workflow.workflow_type = "Sequence" - def add_op(self, node): + def add_op(self, node_str): + node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(node_str, node) + if len(node.dependencies) > 1: + raise Exception( + 'Set more than one predecessor for op in OpSeqMaker is not allowed.' + ) if len(self.workflow.nodes) >= 1: - dep = server_sdk.DAGNodeDependency() - dep.name = self.workflow.nodes[-1].name - dep.mode = "RO" - node.dependencies.extend([dep]) + if len(node.dependencies) == 0: + dep = server_sdk.DAGNodeDependency() + dep.name = self.workflow.nodes[-1].name + dep.mode = "RO" + node.dependencies.extend([dep]) + elif len(node.dependencies) == 1: + if node.dependencies[0].name != self.workflow.nodes[-1].name: + raise Exception( + 'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'. + format(node.dependencies[0].name, self.workflow.nodes[ + -1].name)) self.workflow.nodes.extend([node]) def get_op_sequence(self): @@ -69,13 +102,30 @@ class OpSeqMaker(object): return workflow_conf +class OpGraphMaker(object): + def __init__(self): + self.workflow = server_sdk.Workflow() + self.workflow.name = "workflow1" + # Currently, SDK only supports "Sequence" + self.workflow.workflow_type = "Sequence" + + def add_op(self, node_str): + node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(node_str, node) + self.workflow.nodes.extend([node]) + + def get_op_graph(self): + workflow_conf = server_sdk.WorkflowConf() + workflow_conf.workflows.extend([self.workflow]) + return workflow_conf + + class Server(object): def __init__(self): self.server_handle_ = None self.infer_service_conf = None self.model_toolkit_conf = None self.resource_conf = None - self.engine = None self.memory_optimization = False self.model_conf = None self.workflow_fn = "workflow.prototxt" @@ -94,6 +144,7 @@ class Server(object): self.cur_path = os.getcwd() self.use_local_bin = False self.mkl_flag = False + self.model_config_paths = None # for multi-model in a workflow def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency @@ -118,6 +169,9 @@ class Server(object): def set_op_sequence(self, op_seq): self.workflow_conf = op_seq + def set_op_graph(self, op_graph): + self.workflow_conf = op_graph + def set_memory_optimize(self, flag=False): self.memory_optimization = flag @@ -126,32 +180,30 @@ class Server(object): self.use_local_bin = True self.bin_path = os.environ["SERVING_BIN"] - def _prepare_engine(self, model_config_path, device): + def _prepare_engine(self, model_config_paths, device): if self.model_toolkit_conf == None: self.model_toolkit_conf = server_sdk.ModelToolkitConf() - if self.engine == None: - self.engine = server_sdk.EngineDesc() - - self.model_config_path = model_config_path - self.engine.name = "general_model" - self.engine.reloadable_meta = model_config_path + "/fluid_time_file" - os.system("touch {}".format(self.engine.reloadable_meta)) - self.engine.reloadable_type = "timestamp_ne" - self.engine.runtime_thread_num = 0 - self.engine.batch_infer_size = 0 - self.engine.enable_batch_align = 0 - self.engine.model_data_path = model_config_path - self.engine.enable_memory_optimization = self.memory_optimization - self.engine.static_optimization = False - self.engine.force_update_static_cache = False - - if device == "cpu": - self.engine.type = "FLUID_CPU_ANALYSIS_DIR" - elif device == "gpu": - self.engine.type = "FLUID_GPU_ANALYSIS_DIR" - - self.model_toolkit_conf.engines.extend([self.engine]) + for engine_name, model_config_path in model_config_paths.items(): + engine = server_sdk.EngineDesc() + engine.name = engine_name + engine.reloadable_meta = model_config_path + "/fluid_time_file" + os.system("touch {}".format(engine.reloadable_meta)) + engine.reloadable_type = "timestamp_ne" + engine.runtime_thread_num = 0 + engine.batch_infer_size = 0 + engine.enable_batch_align = 0 + engine.model_data_path = model_config_path + engine.enable_memory_optimization = self.memory_optimization + engine.static_optimization = False + engine.force_update_static_cache = False + + if device == "cpu": + engine.type = "FLUID_CPU_ANALYSIS_DIR" + elif device == "gpu": + engine.type = "FLUID_GPU_ANALYSIS_DIR" + + self.model_toolkit_conf.engines.extend([engine]) def _prepare_infer_service(self, port): if self.infer_service_conf == None: @@ -184,10 +236,49 @@ class Server(object): with open(filepath, "w") as fout: fout.write(str(pb_obj)) - def load_model_config(self, path): - self.model_config_path = path + def load_model_config(self, model_config_paths): + # At present, Serving needs to configure the model path in + # the resource.prototxt file to determine the input and output + # format of the workflow. To ensure that the input and output + # of multiple models are the same. + workflow_oi_config_path = None + if isinstance(model_config_paths, str): + # If there is only one model path, use the default infer_op. + # Because there are several infer_op type, we need to find + # it from workflow_conf. + default_engine_names = [ + 'general_infer_0', 'general_dist_kv_infer_0', + 'general_dist_kv_quant_infer_0' + ] + engine_name = None + for node in self.workflow_conf.workflows[0].nodes: + if node.name in default_engine_names: + engine_name = node.name + break + if engine_name is None: + raise Exception( + "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path" + ) + self.model_config_paths = {engine_name: model_config_paths} + workflow_oi_config_path = self.model_config_paths[engine_name] + elif isinstance(model_config_paths, dict): + self.model_config_paths = {} + for node_str, path in model_config_paths.items(): + node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(node_str, node) + self.model_config_paths[node.name] = path + print("You have specified multiple model paths, please ensure " + "that the input and output of multiple models are the same.") + workflow_oi_config_path = self.model_config_paths.items()[0][1] + else: + raise Exception("The type of model_config_paths must be str or " + "dict({op: model_path}), not {}.".format( + type(model_config_paths))) + self.model_conf = m_config.GeneralModelConfig() - f = open("{}/serving_server_conf.prototxt".format(path), 'r') + f = open( + "{}/serving_server_conf.prototxt".format(workflow_oi_config_path), + 'r') self.model_conf = google.protobuf.text_format.Merge( str(f.read()), self.model_conf) # check config here @@ -258,7 +349,7 @@ class Server(object): if not self.port_is_available(port): raise SystemExit("Prot {} is already used".format(port)) self._prepare_resource(workdir) - self._prepare_engine(self.model_config_path, device) + self._prepare_engine(self.model_config_paths, device) self._prepare_infer_service(port) self.port = port self.workdir = workdir diff --git a/python/paddle_serving_server_gpu/__init__.py b/python/paddle_serving_server_gpu/__init__.py index 0e5a49c4870956557c99fdf8abf08edf47bc4aa6..45e71a383b4fe0e5ca3a5284985b702cd815f18c 100644 --- a/python/paddle_serving_server_gpu/__init__.py +++ b/python/paddle_serving_server_gpu/__init__.py @@ -24,6 +24,7 @@ import time from .version import serving_server_version from contextlib import closing import argparse +import collections def serve_args(): @@ -66,17 +67,35 @@ class OpMaker(object): "general_dist_kv_infer": "GeneralDistKVInferOp", "general_dist_kv": "GeneralDistKVOp" } + self.node_name_suffix_ = collections.defaultdict(int) - # currently, inputs and outputs are not used - # when we have OpGraphMaker, inputs and outputs are necessary - def create(self, name, inputs=[], outputs=[]): - if name not in self.op_dict: - raise Exception("Op name {} is not supported right now".format( - name)) + def create(self, node_type, engine_name=None, inputs=[], outputs=[]): + if node_type not in self.op_dict: + raise Exception("Op type {} is not supported right now".format( + node_type)) node = server_sdk.DAGNode() - node.name = "{}_op".format(name) - node.type = self.op_dict[name] - return node + # node.name will be used as the infer engine name + if engine_name: + node.name = engine_name + else: + node.name = '{}_{}'.format(node_type, + self.node_name_suffix_[node_type]) + self.node_name_suffix_[node_type] += 1 + + node.type = self.op_dict[node_type] + if inputs: + for dep_node_str in inputs: + dep_node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(dep_node_str, dep_node) + dep = server_sdk.DAGNodeDependency() + dep.name = dep_node.name + dep.mode = "RO" + node.dependencies.extend([dep]) + # Because the return value will be used as the key value of the + # dict, and the proto object is variable which cannot be hashed, + # so it is processed into a string. This has little effect on + # overall efficiency. + return google.protobuf.text_format.MessageToString(node) class OpSeqMaker(object): @@ -85,12 +104,25 @@ class OpSeqMaker(object): self.workflow.name = "workflow1" self.workflow.workflow_type = "Sequence" - def add_op(self, node): + def add_op(self, node_str): + node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(node_str, node) + if len(node.dependencies) > 1: + raise Exception( + 'Set more than one predecessor for op in OpSeqMaker is not allowed.' + ) if len(self.workflow.nodes) >= 1: - dep = server_sdk.DAGNodeDependency() - dep.name = self.workflow.nodes[-1].name - dep.mode = "RO" - node.dependencies.extend([dep]) + if len(node.dependencies) == 0: + dep = server_sdk.DAGNodeDependency() + dep.name = self.workflow.nodes[-1].name + dep.mode = "RO" + node.dependencies.extend([dep]) + elif len(node.dependencies) == 1: + if node.dependencies[0].name != self.workflow.nodes[-1].name: + raise Exception( + 'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'. + format(node.dependencies[0].name, self.workflow.nodes[ + -1].name)) self.workflow.nodes.extend([node]) def get_op_sequence(self): @@ -99,13 +131,30 @@ class OpSeqMaker(object): return workflow_conf +class OpGraphMaker(object): + def __init__(self): + self.workflow = server_sdk.Workflow() + self.workflow.name = "workflow1" + # Currently, SDK only supports "Sequence" + self.workflow.workflow_type = "Sequence" + + def add_op(self, node_str): + node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(node_str, node) + self.workflow.nodes.extend([node]) + + def get_op_graph(self): + workflow_conf = server_sdk.WorkflowConf() + workflow_conf.workflows.extend([self.workflow]) + return workflow_conf + + class Server(object): def __init__(self): self.server_handle_ = None self.infer_service_conf = None self.model_toolkit_conf = None self.resource_conf = None - self.engine = None self.memory_optimization = False self.model_conf = None self.workflow_fn = "workflow.prototxt" @@ -125,6 +174,7 @@ class Server(object): self.check_cuda() self.use_local_bin = False self.gpuid = 0 + self.model_config_paths = None # for multi-model in a workflow def set_max_concurrency(self, concurrency): self.max_concurrency = concurrency @@ -149,6 +199,9 @@ class Server(object): def set_op_sequence(self, op_seq): self.workflow_conf = op_seq + def set_op_graph(self, op_graph): + self.workflow_conf = op_graph + def set_memory_optimize(self, flag=False): self.memory_optimization = flag @@ -167,33 +220,31 @@ class Server(object): def set_gpuid(self, gpuid=0): self.gpuid = gpuid - def _prepare_engine(self, model_config_path, device): + def _prepare_engine(self, model_config_paths, device): if self.model_toolkit_conf == None: self.model_toolkit_conf = server_sdk.ModelToolkitConf() - if self.engine == None: - self.engine = server_sdk.EngineDesc() - - self.model_config_path = model_config_path - self.engine.name = "general_model" - #self.engine.reloadable_meta = model_config_path + "/fluid_time_file" - self.engine.reloadable_meta = self.workdir + "/fluid_time_file" - os.system("touch {}".format(self.engine.reloadable_meta)) - self.engine.reloadable_type = "timestamp_ne" - self.engine.runtime_thread_num = 0 - self.engine.batch_infer_size = 0 - self.engine.enable_batch_align = 0 - self.engine.model_data_path = model_config_path - self.engine.enable_memory_optimization = self.memory_optimization - self.engine.static_optimization = False - self.engine.force_update_static_cache = False - - if device == "cpu": - self.engine.type = "FLUID_CPU_ANALYSIS_DIR" - elif device == "gpu": - self.engine.type = "FLUID_GPU_ANALYSIS_DIR" - - self.model_toolkit_conf.engines.extend([self.engine]) + for engine_name, model_config_path in model_config_paths.items(): + engine = server_sdk.EngineDesc() + engine.name = engine_name + # engine.reloadable_meta = model_config_path + "/fluid_time_file" + engine.reloadable_meta = self.workdir + "/fluid_time_file" + os.system("touch {}".format(engine.reloadable_meta)) + engine.reloadable_type = "timestamp_ne" + engine.runtime_thread_num = 0 + engine.batch_infer_size = 0 + engine.enable_batch_align = 0 + engine.model_data_path = model_config_path + engine.enable_memory_optimization = self.memory_optimization + engine.static_optimization = False + engine.force_update_static_cache = False + + if device == "cpu": + engine.type = "FLUID_CPU_ANALYSIS_DIR" + elif device == "gpu": + engine.type = "FLUID_GPU_ANALYSIS_DIR" + + self.model_toolkit_conf.engines.extend([engine]) def _prepare_infer_service(self, port): if self.infer_service_conf == None: @@ -225,10 +276,49 @@ class Server(object): with open(filepath, "w") as fout: fout.write(str(pb_obj)) - def load_model_config(self, path): - self.model_config_path = path + def load_model_config(self, model_config_paths): + # At present, Serving needs to configure the model path in + # the resource.prototxt file to determine the input and output + # format of the workflow. To ensure that the input and output + # of multiple models are the same. + workflow_oi_config_path = None + if isinstance(model_config_paths, str): + # If there is only one model path, use the default infer_op. + # Because there are several infer_op type, we need to find + # it from workflow_conf. + default_engine_names = [ + 'general_infer_0', 'general_dist_kv_infer_0', + 'general_dist_kv_quant_infer_0' + ] + engine_name = None + for node in self.workflow_conf.workflows[0].nodes: + if node.name in default_engine_names: + engine_name = node.name + break + if engine_name is None: + raise Exception( + "You have set the engine_name of Op. Please use the form {op: model_path} to configure model path" + ) + self.model_config_paths = {engine_name: model_config_paths} + workflow_oi_config_path = self.model_config_paths[engine_name] + elif isinstance(model_config_paths, dict): + self.model_config_paths = {} + for node_str, path in model_config_paths.items(): + node = server_sdk.DAGNode() + google.protobuf.text_format.Parse(node_str, node) + self.model_config_paths[node.name] = path + print("You have specified multiple model paths, please ensure " + "that the input and output of multiple models are the same.") + workflow_oi_config_path = self.model_config_paths.items()[0][1] + else: + raise Exception("The type of model_config_paths must be str or " + "dict({op: model_path}), not {}.".format( + type(model_config_paths))) + self.model_conf = m_config.GeneralModelConfig() - f = open("{}/serving_server_conf.prototxt".format(path), 'r') + f = open( + "{}/serving_server_conf.prototxt".format(workflow_oi_config_path), + 'r') self.model_conf = google.protobuf.text_format.Merge( str(f.read()), self.model_conf) # check config here @@ -291,7 +381,7 @@ class Server(object): self.set_port(port) self._prepare_resource(workdir) - self._prepare_engine(self.model_config_path, device) + self._prepare_engine(self.model_config_paths, device) self._prepare_infer_service(port) self.workdir = workdir diff --git a/python/paddle_serving_server_gpu/web_service.py b/python/paddle_serving_server_gpu/web_service.py index 6841220f9f4e52a23bc7b0a0176c58672fc4b675..eb1ecfd8faaf34a6bf2955af46d5a8cf09085ad7 100644 --- a/python/paddle_serving_server_gpu/web_service.py +++ b/python/paddle_serving_server_gpu/web_service.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=doc-string-missing from flask import Flask, request, abort from contextlib import closing diff --git a/python/setup.py.app.in b/python/setup.py.app.in index 13e71b22cdc5eb719c17af974dd2150710133491..3c0f8e065a5072919d808ba1da67f5c37eee0594 100644 --- a/python/setup.py.app.in +++ b/python/setup.py.app.in @@ -47,7 +47,8 @@ REQUIRED_PACKAGES = [ packages=['paddle_serving_app', 'paddle_serving_app.reader', - 'paddle_serving_app.utils'] + 'paddle_serving_app.utils', + 'paddle_serving_app.reader.pddet'] package_data={} package_dir={'paddle_serving_app': @@ -55,7 +56,9 @@ package_dir={'paddle_serving_app': 'paddle_serving_app.reader': '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader', 'paddle_serving_app.utils': - '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/utils',} + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/utils', + 'paddle_serving_app.reader.pddet': + '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader/pddet',} setup( name='paddle-serving-app', diff --git a/tools/serving_build.sh b/tools/serving_build.sh index e4bf6ece3a9df1808b9190e9e77d8d2e8aba62c0..1e47b8f4fe26c689b5d6680c1478740201b335b9 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -323,6 +323,9 @@ function python_test_bert() { echo "bert RPC inference pass" ;; *) + echo "error type" + exit 1 + ;; esac echo "test bert $TYPE finished as expected." unset SERVING_BIN @@ -357,6 +360,9 @@ function python_test_imdb() { echo "imdb ignore GPU test" ;; *) + echo "error type" + exit 1 + ;; esac echo "test imdb $TYPE finished as expected." unset SERVING_BIN @@ -389,6 +395,9 @@ function python_test_lac() { echo "lac ignore GPU test" ;; *) + echo "error type" + exit 1 + ;; esac echo "test lac $TYPE finished as expected." unset SERVING_BIN @@ -408,6 +417,248 @@ function python_run_test() { cd ../.. # pwd: /Serving } +function monitor_test() { + local TYPE=$1 # pwd: /Serving + mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test + case $TYPE in + CPU): + pip install pyftpdlib + mkdir remote_path + mkdir local_path + cd remote_path # pwd: /Serving/_monitor_test/remote_path + check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &" + cd .. # pwd: /Serving/_monitor_test + + # type: ftp + # remote_path: / + # remote_model_name: uci_housing.tar.gz + # local_tmp_path: ___tmp + # local_path: local_path + cd remote_path # pwd: /Serving/_monitor_test/remote_path + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + touch donefile + cd .. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server.monitor \ + --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ + --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ + --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + # type: ftp + # remote_path: /tmp_dir + # remote_model_name: uci_housing_model + # local_tmp_path: ___tmp + # local_path: local_path + mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + tar -xzf uci_housing.tar.gz + touch donefile + cd ../.. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server.monitor \ + --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ + --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + # type: general + # remote_path: / + # remote_model_name: uci_housing.tar.gz + # local_tmp_path: ___tmp + # local_path: local_path + cd remote_path # pwd: /Serving/_monitor_test/remote_path + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + touch donefile + cd .. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server.monitor \ + --type='general' --general_host='ftp://127.0.0.1:8000' \ + --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ + --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + # type: general + # remote_path: /tmp_dir + # remote_model_name: uci_housing_model + # local_tmp_path: ___tmp + # local_path: local_path + mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + tar -xzf uci_housing.tar.gz + touch donefile + cd ../.. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server.monitor \ + --type='general' --general_host='ftp://127.0.0.1:8000' \ + --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill + ;; + GPU): + pip install pyftpdlib + mkdir remote_path + mkdir local_path + cd remote_path # pwd: /Serving/_monitor_test/remote_path + check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &" + cd .. # pwd: /Serving/_monitor_test + + # type: ftp + # remote_path: / + # remote_model_name: uci_housing.tar.gz + # local_tmp_path: ___tmp + # local_path: local_path + cd remote_path # pwd: /Serving/_monitor_test/remote_path + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + touch donefile + cd .. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server_gpu.monitor \ + --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ + --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ + --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + # type: ftp + # remote_path: /tmp_dir + # remote_model_name: uci_housing_model + # local_tmp_path: ___tmp + # local_path: local_path + mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + tar -xzf uci_housing.tar.gz + touch donefile + cd ../.. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server_gpu.monitor \ + --type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \ + --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + # type: general + # remote_path: / + # remote_model_name: uci_housing.tar.gz + # local_tmp_path: ___tmp + # local_path: local_path + cd remote_path # pwd: /Serving/_monitor_test/remote_path + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + touch donefile + cd .. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server_gpu.monitor \ + --type='general' --general_host='ftp://127.0.0.1:8000' \ + --remote_path='/' --remote_model_name='uci_housing.tar.gz' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \ + --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + # type: general + # remote_path: /tmp_dir + # remote_model_name: uci_housing_model + # local_tmp_path: ___tmp + # local_path: local_path + mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir + wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz + tar -xzf uci_housing.tar.gz + touch donefile + cd ../.. # pwd: /Serving/_monitor_test + mkdir -p local_path/uci_housing_model + python -m paddle_serving_server_gpu.monitor \ + --type='general' --general_host='ftp://127.0.0.1:8000' \ + --remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \ + --remote_donefile_name='donefile' --local_path='local_path' \ + --local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \ + --local_tmp_path='___tmp' --interval='1' >/dev/null & + sleep 10 + if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then + echo "local_path/uci_housing_model/fluid_time_file not exist." + exit 1 + fi + ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill + rm -rf remote_path/* + rm -rf local_path/* + + ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill + ;; + *) + echo "error type" + exit 1 + ;; + esac + cd .. # pwd: /Serving + rm -rf _monitor_test + echo "test monitor $TYPE finished as expected." +} + function main() { local TYPE=$1 # pwd: / init # pwd: /Serving @@ -415,6 +666,7 @@ function main() { build_server $TYPE # pwd: /Serving build_app $TYPE # pwd: /Serving python_run_test $TYPE # pwd: /Serving + monitor_test $TYPE # pwd: /Serving echo "serving $TYPE part finished as expected." }