未验证 提交 8b721192 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge branch 'develop' into web-service

...@@ -256,6 +256,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv ...@@ -256,6 +256,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv
### Developers ### Developers
- [How to config Serving native operators on server side?](doc/SERVER_DAG.md) - [How to config Serving native operators on server side?](doc/SERVER_DAG.md)
- [How to develop a new Serving operator?](doc/NEW_OPERATOR.md) - [How to develop a new Serving operator?](doc/NEW_OPERATOR.md)
- [How to develop a new Web Service?](doc/NEW_WEB_SERVICE.md)
- [Golang client](doc/IMDB_GO_CLIENT.md) - [Golang client](doc/IMDB_GO_CLIENT.md)
- [Compile from source code](doc/COMPILE.md) - [Compile from source code](doc/COMPILE.md)
......
...@@ -262,6 +262,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv ...@@ -262,6 +262,7 @@ curl -H "Content-Type:application/json" -X POST -d '{"url": "https://paddle-serv
### 开发者教程 ### 开发者教程
- [如何配置Server端的计算图?](doc/SERVER_DAG_CN.md) - [如何配置Server端的计算图?](doc/SERVER_DAG_CN.md)
- [如何开发一个新的General Op?](doc/NEW_OPERATOR_CN.md) - [如何开发一个新的General Op?](doc/NEW_OPERATOR_CN.md)
- [如何开发一个新的Web Service?](doc/NEW_WEB_SERVICE_CN.md)
- [如何在Paddle Serving使用Go Client?](doc/IMDB_GO_CLIENT_CN.md) - [如何在Paddle Serving使用Go Client?](doc/IMDB_GO_CLIENT_CN.md)
- [如何编译PaddleServing?](doc/COMPILE_CN.md) - [如何编译PaddleServing?](doc/COMPILE_CN.md)
......
...@@ -217,6 +217,7 @@ int run_m(int argc, char** argv) { ...@@ -217,6 +217,7 @@ int run_m(int argc, char** argv) {
LOG(INFO) << " total_request = " << std::to_string(request_num) << " speed = " LOG(INFO) << " total_request = " << std::to_string(request_num) << " speed = "
<< std::to_string(1000000 * thread_num / mean_time) // mean_time us << std::to_string(1000000 * thread_num / mean_time) // mean_time us
<< " query per second"; << " query per second";
return 0;
} }
} // namespace mcube } // namespace mcube
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <fstream> #include <fstream>
#include <map> #include <map>
#include <string> #include <string>
#include <utility> // move
#include <vector> #include <vector>
#include "core/sdk-cpp/builtin_format.pb.h" #include "core/sdk-cpp/builtin_format.pb.h"
...@@ -39,12 +40,32 @@ namespace baidu { ...@@ -39,12 +40,32 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace general_model { namespace general_model {
class PredictorRes { class ModelRes {
public:
PredictorRes() {}
~PredictorRes() {}
public: public:
ModelRes() {}
ModelRes(const ModelRes& res) {
_engine_name = res._engine_name;
_int64_value_map.insert(res._int64_value_map.begin(),
res._int64_value_map.end());
_float_value_map.insert(res._float_value_map.begin(),
res._float_value_map.end());
_shape_map.insert(res._shape_map.begin(), res._shape_map.end());
_lod_map.insert(res._lod_map.begin(), res._lod_map.end());
}
ModelRes(ModelRes&& res) {
_engine_name = std::move(res._engine_name);
_int64_value_map.insert(
std::make_move_iterator(std::begin(res._int64_value_map)),
std::make_move_iterator(std::end(res._int64_value_map)));
_float_value_map.insert(
std::make_move_iterator(std::begin(res._float_value_map)),
std::make_move_iterator(std::end(res._float_value_map)));
_shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)),
std::make_move_iterator(std::end(res._shape_map)));
_lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)),
std::make_move_iterator(std::end(res._lod_map)));
}
~ModelRes() {}
const std::vector<int64_t>& get_int64_by_name(const std::string& name) { const std::vector<int64_t>& get_int64_by_name(const std::string& name) {
return _int64_value_map[name]; return _int64_value_map[name];
} }
...@@ -57,19 +78,75 @@ class PredictorRes { ...@@ -57,19 +78,75 @@ class PredictorRes {
const std::vector<int>& get_lod(const std::string& name) { const std::vector<int>& get_lod(const std::string& name) {
return _lod_map[name]; return _lod_map[name];
} }
void set_variant_tag(const std::string& variant_tag) { void set_engine_name(const std::string& engine_name) {
_variant_tag = variant_tag; _engine_name = engine_name;
}
const std::string& engine_name() { return _engine_name; }
ModelRes& operator=(ModelRes&& res) {
if (this != &res) {
_engine_name = std::move(res._engine_name);
_int64_value_map.insert(
std::make_move_iterator(std::begin(res._int64_value_map)),
std::make_move_iterator(std::end(res._int64_value_map)));
_float_value_map.insert(
std::make_move_iterator(std::begin(res._float_value_map)),
std::make_move_iterator(std::end(res._float_value_map)));
_shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)),
std::make_move_iterator(std::end(res._shape_map)));
_lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)),
std::make_move_iterator(std::end(res._lod_map)));
}
return *this;
} }
const std::string& variant_tag() { return _variant_tag; }
public: public:
std::string _engine_name;
std::map<std::string, std::vector<int64_t>> _int64_value_map; std::map<std::string, std::vector<int64_t>> _int64_value_map;
std::map<std::string, std::vector<float>> _float_value_map; std::map<std::string, std::vector<float>> _float_value_map;
std::map<std::string, std::vector<int>> _shape_map; std::map<std::string, std::vector<int>> _shape_map;
std::map<std::string, std::vector<int>> _lod_map; std::map<std::string, std::vector<int>> _lod_map;
};
class PredictorRes {
public:
PredictorRes() {}
~PredictorRes() {}
public:
void clear() {
_models.clear();
_engine_names.clear();
}
const std::vector<int64_t>& get_int64_by_name(const int model_idx,
const std::string& name) {
return _models[model_idx].get_int64_by_name(name);
}
const std::vector<float>& get_float_by_name(const int model_idx,
const std::string& name) {
return _models[model_idx].get_float_by_name(name);
}
const std::vector<int>& get_shape(const int model_idx,
const std::string& name) {
return _models[model_idx].get_shape(name);
}
const std::vector<int>& get_lod(const int model_idx,
const std::string& name) {
return _models[model_idx].get_lod(name);
}
void add_model_res(ModelRes&& res) {
_engine_names.push_back(res.engine_name());
_models.emplace_back(std::move(res));
}
void set_variant_tag(const std::string& variant_tag) {
_variant_tag = variant_tag;
}
const std::string& variant_tag() { return _variant_tag; }
const std::vector<std::string>& get_engine_names() { return _engine_names; }
private: private:
std::vector<ModelRes> _models;
std::string _variant_tag; std::string _variant_tag;
std::vector<std::string> _engine_names;
}; };
class PredictorClient { class PredictorClient {
......
...@@ -111,6 +111,7 @@ void PredictorClient::set_predictor_conf(const std::string &conf_path, ...@@ -111,6 +111,7 @@ void PredictorClient::set_predictor_conf(const std::string &conf_path,
int PredictorClient::destroy_predictor() { int PredictorClient::destroy_predictor() {
_api.thrd_finalize(); _api.thrd_finalize();
_api.destroy(); _api.destroy();
return 0;
} }
int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) { int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) {
...@@ -119,6 +120,7 @@ int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) { ...@@ -119,6 +120,7 @@ int PredictorClient::create_predictor_by_desc(const std::string &sdk_desc) {
return -1; return -1;
} }
_api.thrd_initialize(); _api.thrd_initialize();
return 0;
} }
int PredictorClient::create_predictor() { int PredictorClient::create_predictor() {
...@@ -129,6 +131,7 @@ int PredictorClient::create_predictor() { ...@@ -129,6 +131,7 @@ int PredictorClient::create_predictor() {
return -1; return -1;
} }
_api.thrd_initialize(); _api.thrd_initialize();
return 0;
} }
int PredictorClient::batch_predict( int PredictorClient::batch_predict(
...@@ -143,10 +146,7 @@ int PredictorClient::batch_predict( ...@@ -143,10 +146,7 @@ int PredictorClient::batch_predict(
const int &pid) { const int &pid) {
int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size()); int batch_size = std::max(float_feed_batch.size(), int_feed_batch.size());
predict_res_batch._int64_value_map.clear(); predict_res_batch.clear();
predict_res_batch._float_value_map.clear();
predict_res_batch._shape_map.clear();
predict_res_batch._lod_map.clear();
Timer timeline; Timer timeline;
int64_t preprocess_start = timeline.TimeStampUS(); int64_t preprocess_start = timeline.TimeStampUS();
...@@ -189,11 +189,11 @@ int PredictorClient::batch_predict( ...@@ -189,11 +189,11 @@ int PredictorClient::batch_predict(
Tensor *tensor = tensor_vec[idx]; Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare float feed " << name << " shape size " VLOG(2) << "prepare float feed " << name << " shape size "
<< float_shape[vec_idx].size(); << float_shape[vec_idx].size();
for (int j = 0; j < float_shape[vec_idx].size(); ++j) { for (uint32_t j = 0; j < float_shape[vec_idx].size(); ++j) {
tensor->add_shape(float_shape[vec_idx][j]); tensor->add_shape(float_shape[vec_idx][j]);
} }
tensor->set_elem_type(1); tensor->set_elem_type(1);
for (int j = 0; j < float_feed[vec_idx].size(); ++j) { for (uint32_t j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_float_data(float_feed[vec_idx][j]); tensor->add_float_data(float_feed[vec_idx][j]);
} }
vec_idx++; vec_idx++;
...@@ -208,13 +208,13 @@ int PredictorClient::batch_predict( ...@@ -208,13 +208,13 @@ int PredictorClient::batch_predict(
Tensor *tensor = tensor_vec[idx]; Tensor *tensor = tensor_vec[idx];
VLOG(2) << "prepare int feed " << name << " shape size " VLOG(2) << "prepare int feed " << name << " shape size "
<< int_shape[vec_idx].size(); << int_shape[vec_idx].size();
for (int j = 0; j < int_shape[vec_idx].size(); ++j) { for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) {
tensor->add_shape(int_shape[vec_idx][j]); tensor->add_shape(int_shape[vec_idx][j]);
} }
tensor->set_elem_type(0); tensor->set_elem_type(0);
VLOG(3) << "feed var name " << name << " index " << vec_idx VLOG(3) << "feed var name " << name << " index " << vec_idx
<< "first data " << int_feed[vec_idx][0]; << "first data " << int_feed[vec_idx][0];
for (int j = 0; j < int_feed[vec_idx].size(); ++j) { for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_int64_data(int_feed[vec_idx][j]); tensor->add_int64_data(int_feed[vec_idx][j]);
} }
vec_idx++; vec_idx++;
...@@ -248,23 +248,29 @@ int PredictorClient::batch_predict( ...@@ -248,23 +248,29 @@ int PredictorClient::batch_predict(
client_infer_end = timeline.TimeStampUS(); client_infer_end = timeline.TimeStampUS();
postprocess_start = client_infer_end; postprocess_start = client_infer_end;
uint32_t model_num = res.outputs_size();
for (uint32_t m_idx = 0; m_idx < model_num; ++m_idx) {
VLOG(2) << "process model output index: " << m_idx;
auto output = res.outputs(m_idx);
ModelRes model;
model.set_engine_name(output.engine_name());
for (auto &name : fetch_name) { for (auto &name : fetch_name) {
// int idx = _fetch_name_to_idx[name]; // int idx = _fetch_name_to_idx[name];
int idx = 0; int idx = 0;
int shape_size = res.insts(0).tensor_array(idx).shape_size(); int shape_size = output.insts(0).tensor_array(idx).shape_size();
VLOG(2) << "fetch var " << name << " index " << idx << " shape size " VLOG(2) << "fetch var " << name << " index " << idx << " shape size "
<< shape_size; << shape_size;
predict_res_batch._shape_map[name].resize(shape_size); model._shape_map[name].resize(shape_size);
for (int i = 0; i < shape_size; ++i) { for (int i = 0; i < shape_size; ++i) {
predict_res_batch._shape_map[name][i] = model._shape_map[name][i] =
res.insts(0).tensor_array(idx).shape(i); output.insts(0).tensor_array(idx).shape(i);
} }
int lod_size = res.insts(0).tensor_array(idx).lod_size(); int lod_size = output.insts(0).tensor_array(idx).lod_size();
if (lod_size > 0) { if (lod_size > 0) {
predict_res_batch._lod_map[name].resize(lod_size); model._lod_map[name].resize(lod_size);
for (int i = 0; i < lod_size; ++i) { for (int i = 0; i < lod_size; ++i) {
predict_res_batch._lod_map[name][i] = model._lod_map[name][i] = output.insts(0).tensor_array(idx).lod(i);
res.insts(0).tensor_array(idx).lod(i);
} }
} }
idx += 1; idx += 1;
...@@ -275,25 +281,27 @@ int PredictorClient::batch_predict( ...@@ -275,25 +281,27 @@ int PredictorClient::batch_predict(
int idx = 0; int idx = 0;
if (_fetch_name_to_type[name] == 0) { if (_fetch_name_to_type[name] == 0) {
VLOG(2) << "ferch var " << name << "type int"; VLOG(2) << "ferch var " << name << "type int";
predict_res_batch._int64_value_map[name].resize( model._int64_value_map[name].resize(
res.insts(0).tensor_array(idx).int64_data_size()); output.insts(0).tensor_array(idx).int64_data_size());
int size = res.insts(0).tensor_array(idx).int64_data_size(); int size = output.insts(0).tensor_array(idx).int64_data_size();
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
predict_res_batch._int64_value_map[name][i] = model._int64_value_map[name][i] =
res.insts(0).tensor_array(idx).int64_data(i); output.insts(0).tensor_array(idx).int64_data(i);
} }
} else { } else {
VLOG(2) << "fetch var " << name << "type float"; VLOG(2) << "fetch var " << name << "type float";
predict_res_batch._float_value_map[name].resize( model._float_value_map[name].resize(
res.insts(0).tensor_array(idx).float_data_size()); output.insts(0).tensor_array(idx).float_data_size());
int size = res.insts(0).tensor_array(idx).float_data_size(); int size = output.insts(0).tensor_array(idx).float_data_size();
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
predict_res_batch._float_value_map[name][i] = model._float_value_map[name][i] =
res.insts(0).tensor_array(idx).float_data(i); output.insts(0).tensor_array(idx).float_data(i);
} }
} }
idx += 1; idx += 1;
} }
predict_res_batch.add_model_res(std::move(model));
}
postprocess_end = timeline.TimeStampUS(); postprocess_end = timeline.TimeStampUS();
} }
...@@ -305,7 +313,6 @@ int PredictorClient::batch_predict( ...@@ -305,7 +313,6 @@ int PredictorClient::batch_predict(
<< "prepro_1:" << preprocess_end << " " << "prepro_1:" << preprocess_end << " "
<< "client_infer_0:" << client_infer_start << " " << "client_infer_0:" << client_infer_start << " "
<< "client_infer_1:" << client_infer_end << " "; << "client_infer_1:" << client_infer_end << " ";
if (FLAGS_profile_server) { if (FLAGS_profile_server) {
int op_num = res.profile_time_size() / 2; int op_num = res.profile_time_size() / 2;
for (int i = 0; i < op_num; ++i) { for (int i = 0; i < op_num; ++i) {
......
...@@ -31,27 +31,28 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -31,27 +31,28 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol()) py::class_<PredictorRes>(m, "PredictorRes", py::buffer_protocol())
.def(py::init()) .def(py::init())
.def("get_int64_by_name", .def("get_int64_by_name",
[](PredictorRes &self, std::string &name) { [](PredictorRes &self, int model_idx, std::string &name) {
return self.get_int64_by_name(name); return self.get_int64_by_name(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("get_float_by_name", .def("get_float_by_name",
[](PredictorRes &self, std::string &name) { [](PredictorRes &self, int model_idx, std::string &name) {
return self.get_float_by_name(name); return self.get_float_by_name(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("get_shape", .def("get_shape",
[](PredictorRes &self, std::string &name) { [](PredictorRes &self, int model_idx, std::string &name) {
return self.get_shape(name); return self.get_shape(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("get_lod", .def("get_lod",
[](PredictorRes &self, std::string &name) { [](PredictorRes &self, int model_idx, std::string &name) {
return self.get_lod(name); return self.get_lod(model_idx, name);
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("variant_tag", .def("variant_tag", [](PredictorRes &self) { return self.variant_tag(); })
[](PredictorRes &self) { return self.variant_tag(); }); .def("get_engine_names",
[](PredictorRes &self) { return self.get_engine_names(); });
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol()) py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init()) .def(py::init())
...@@ -77,7 +78,6 @@ PYBIND11_MODULE(serving_client, m) { ...@@ -77,7 +78,6 @@ PYBIND11_MODULE(serving_client, m) {
[](PredictorClient &self) { self.create_predictor(); }) [](PredictorClient &self) { self.create_predictor(); })
.def("destroy_predictor", .def("destroy_predictor",
[](PredictorClient &self) { self.destroy_predictor(); }) [](PredictorClient &self) { self.destroy_predictor(); })
.def("batch_predict", .def("batch_predict",
[](PredictorClient &self, [](PredictorClient &self,
const std::vector<std::vector<std::vector<float>>> const std::vector<std::vector<std::vector<float>>>
......
...@@ -35,8 +35,17 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -35,8 +35,17 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralCopyOp::inference() { int GeneralCopyOp::inference() {
// reade request from client // reade request from client
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "precedent name: " << pre_name(); if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "precedent name: " << pre_name;
const TensorVector *in = &input_blob->tensor_vector; const TensorVector *in = &input_blob->tensor_vector;
VLOG(2) << "input size: " << in->size(); VLOG(2) << "input size: " << in->size();
int batch_size = input_blob->GetBatchSize(); int batch_size = input_blob->GetBatchSize();
......
...@@ -40,12 +40,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -40,12 +40,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVInferOp::inference() { int GeneralDistKVInferOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "Get precedent op name: " << pre_name(); if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>(); GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
return -1; return -1;
} }
...@@ -149,8 +158,8 @@ int GeneralDistKVInferOp::inference() { ...@@ -149,8 +158,8 @@ int GeneralDistKVInferOp::inference() {
timeline.Start(); timeline.Start();
if (InferManager::instance().infer( if (InferManager::instance().infer(
GENERAL_MODEL_NAME, &infer_in, out, batch_size)) { engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; LOG(ERROR) << "Failed do infer in fluid model: " << engine_name();
return -1; return -1;
} }
......
...@@ -41,12 +41,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -41,12 +41,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralDistKVQuantInferOp::inference() { int GeneralDistKVQuantInferOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "Get precedent op name: " << pre_name(); if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>(); GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
return -1; return -1;
} }
...@@ -180,8 +189,8 @@ int GeneralDistKVQuantInferOp::inference() { ...@@ -180,8 +189,8 @@ int GeneralDistKVQuantInferOp::inference() {
timeline.Start(); timeline.Start();
if (InferManager::instance().infer( if (InferManager::instance().infer(
GENERAL_MODEL_NAME, &infer_in, out, batch_size)) { engine_name().c_str(), &infer_in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; LOG(ERROR) << "Failed do infer in fluid model: " << engine_name();
return -1; return -1;
} }
......
...@@ -31,8 +31,6 @@ namespace baidu { ...@@ -31,8 +31,6 @@ namespace baidu {
namespace paddle_serving { namespace paddle_serving {
namespace serving { namespace serving {
static const char* GENERAL_MODEL_NAME = "general_model";
struct GeneralBlob { struct GeneralBlob {
std::vector<paddle::PaddleTensor> tensor_vector; std::vector<paddle::PaddleTensor> tensor_vector;
int64_t time_stamp[20]; int64_t time_stamp[20];
......
...@@ -37,12 +37,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; ...@@ -37,12 +37,21 @@ using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralInferOp::inference() { int GeneralInferOp::inference() {
VLOG(2) << "Going to run inference"; VLOG(2) << "Going to run inference";
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "Get precedent op name: " << pre_name(); if (pre_node_names.size() != 1) {
LOG(ERROR) << "This op(" << op_name()
<< ") can only have one predecessor op, but received "
<< pre_node_names.size();
return -1;
}
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
VLOG(2) << "Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>(); GeneralBlob *output_blob = mutable_data<GeneralBlob>();
if (!input_blob) { if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name(); LOG(ERROR) << "Failed mutable depended argument, op:" << pre_name;
return -1; return -1;
} }
...@@ -59,8 +68,9 @@ int GeneralInferOp::inference() { ...@@ -59,8 +68,9 @@ int GeneralInferOp::inference() {
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
timeline.Start(); timeline.Start();
if (InferManager::instance().infer(GENERAL_MODEL_NAME, in, out, batch_size)) { if (InferManager::instance().infer(
LOG(ERROR) << "Failed do infer in fluid model: " << GENERAL_MODEL_NAME; engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " << engine_name().c_str();
return -1; return -1;
} }
......
...@@ -33,23 +33,17 @@ using baidu::paddle_serving::predictor::general_model::Tensor; ...@@ -33,23 +33,17 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response; using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FetchInst; using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::general_model::ModelOutput;
using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralResponseOp::inference() { int GeneralResponseOp::inference() {
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name();
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
Timer timeline; Timer timeline;
// double response_time = 0.0; // double response_time = 0.0;
...@@ -73,9 +67,25 @@ int GeneralResponseOp::inference() { ...@@ -73,9 +67,25 @@ int GeneralResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)]; model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
} }
// response inst with only fetch_var_names const GeneralBlob *input_blob;
Response *res = mutable_data<Response>(); for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
FetchInst *fetch_inst = res->add_insts(); const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
ModelOutput *output = res->add_outputs();
// To get the order of model return values
output->set_engine_name(pre_name);
FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) { for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array(); Tensor *tensor = fetch_inst->add_tensor_array();
tensor->set_elem_type(1); tensor->set_elem_type(1);
...@@ -103,7 +113,7 @@ int GeneralResponseOp::inference() { ...@@ -103,7 +113,7 @@ int GeneralResponseOp::inference() {
if (in->at(idx).dtype == paddle::PaddleDType::INT64) { if (in->at(idx).dtype == paddle::PaddleDType::INT64) {
int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data()); int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data());
if (model_config->_is_lod_fetch[idx]) { if (model_config->_is_lod_fetch[idx]) {
FetchInst *fetch_p = res->mutable_insts(0); FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_lod( fetch_p->mutable_tensor_array(var_idx)->add_lod(
in->at(idx).lod[0][j]); in->at(idx).lod[0][j]);
...@@ -112,7 +122,7 @@ int GeneralResponseOp::inference() { ...@@ -112,7 +122,7 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]); fetch_p->mutable_tensor_array(var_idx)->add_int64_data(data_ptr[j]);
} }
} else { } else {
FetchInst *fetch_p = res->mutable_insts(0); FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < cap; ++j) { for (int j = 0; j < cap; ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]);
} }
...@@ -121,7 +131,7 @@ int GeneralResponseOp::inference() { ...@@ -121,7 +131,7 @@ int GeneralResponseOp::inference() {
} else if (in->at(idx).dtype == paddle::PaddleDType::FLOAT32) { } else if (in->at(idx).dtype == paddle::PaddleDType::FLOAT32) {
float *data_ptr = static_cast<float *>(in->at(idx).data.data()); float *data_ptr = static_cast<float *>(in->at(idx).data.data());
if (model_config->_is_lod_fetch[idx]) { if (model_config->_is_lod_fetch[idx]) {
FetchInst *fetch_p = res->mutable_insts(0); FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_lod( fetch_p->mutable_tensor_array(var_idx)->add_lod(
in->at(idx).lod[0][j]); in->at(idx).lod[0][j]);
...@@ -130,7 +140,7 @@ int GeneralResponseOp::inference() { ...@@ -130,7 +140,7 @@ int GeneralResponseOp::inference() {
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]);
} }
} else { } else {
FetchInst *fetch_p = res->mutable_insts(0); FetchInst *fetch_p = output->mutable_insts(0);
for (int j = 0; j < cap; ++j) { for (int j = 0; j < cap; ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]); fetch_p->mutable_tensor_array(var_idx)->add_float_data(data_ptr[j]);
} }
...@@ -138,12 +148,26 @@ int GeneralResponseOp::inference() { ...@@ -138,12 +148,26 @@ int GeneralResponseOp::inference() {
var_idx++; var_idx++;
} }
} }
}
if (req->profile_server()) { if (req->profile_server()) {
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
// TODO(barriery): multi-model profile_time.
// At present, only the response_op is multi-input, so here we get
// the profile_time by hard coding. It needs to be replaced with
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "p size for input blob: " << input_blob->p_size; VLOG(2) << "p size for input blob: " << input_blob->p_size;
for (int i = 0; i < input_blob->p_size; ++i) { int profile_time_idx = -1;
res->add_profile_time(input_blob->time_stamp[i]); if (pi == 0) {
profile_time_idx = 0;
} else {
profile_time_idx = input_blob->p_size - 2;
}
for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) {
res->add_profile_time(input_blob->time_stamp[profile_time_idx]);
}
} }
// TODO(guru4elephant): find more elegant way to do this // TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start); res->add_profile_time(start);
......
...@@ -32,22 +32,18 @@ using baidu::paddle_serving::predictor::general_model::Tensor; ...@@ -32,22 +32,18 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Response; using baidu::paddle_serving::predictor::general_model::Response;
using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FetchInst; using baidu::paddle_serving::predictor::general_model::FetchInst;
using baidu::paddle_serving::predictor::general_model::ModelOutput;
using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralTextResponseOp::inference() { int GeneralTextResponseOp::inference() {
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name()); VLOG(2) << "Going to run inference";
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name();
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "infer batch size: " << batch_size;
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
Timer timeline; Timer timeline;
int64_t start = timeline.TimeStampUS(); int64_t start = timeline.TimeStampUS();
...@@ -67,11 +63,26 @@ int GeneralTextResponseOp::inference() { ...@@ -67,11 +63,26 @@ int GeneralTextResponseOp::inference() {
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)]; model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
} }
// response inst with only fetch_var_names const GeneralBlob *input_blob;
Response *res = mutable_data<Response>(); for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "pre names[" << pi << "]: " << pre_name << " ("
<< pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
if (!input_blob) {
LOG(ERROR) << "Failed mutable depended argument, op: " << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
int batch_size = input_blob->GetBatchSize();
VLOG(2) << "input batch size: " << batch_size;
ModelOutput *output = res->add_outputs();
output->set_engine_name(
pre_name); // To get the order of model return values
for (int i = 0; i < batch_size; ++i) { for (int i = 0; i < batch_size; ++i) {
FetchInst *fetch_inst = res->add_insts(); FetchInst *fetch_inst = output->add_insts();
for (auto &idx : fetch_index) { for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array(); Tensor *tensor = fetch_inst->add_tensor_array();
// currently only response float tensor or lod_tensor // currently only response float tensor or lod_tensor
...@@ -100,26 +111,42 @@ int GeneralTextResponseOp::inference() { ...@@ -100,26 +111,42 @@ int GeneralTextResponseOp::inference() {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1]; for (int k = in->at(idx).lod[0][j]; k < in->at(idx).lod[0][j + 1];
k++) { k++) {
res->mutable_insts(j)->mutable_tensor_array(var_idx)->add_float_data( output->mutable_insts(j)
data_ptr[k]); ->mutable_tensor_array(var_idx)
->add_float_data(data_ptr[k]);
} }
} }
} else { } else {
for (int j = 0; j < batch_size; ++j) { for (int j = 0; j < batch_size; ++j) {
for (int k = j * cap; k < (j + 1) * cap; ++k) { for (int k = j * cap; k < (j + 1) * cap; ++k) {
res->mutable_insts(j)->mutable_tensor_array(var_idx)->add_float_data( output->mutable_insts(j)
data_ptr[k]); ->mutable_tensor_array(var_idx)
->add_float_data(data_ptr[k]);
} }
} }
} }
var_idx++; var_idx++;
} }
}
if (req->profile_server()) { if (req->profile_server()) {
int64_t end = timeline.TimeStampUS(); int64_t end = timeline.TimeStampUS();
// TODO(barriery): multi-model profile_time.
for (int i = 0; i < input_blob->p_size; ++i) { // At present, only the response_op is multi-input, so here we get
res->add_profile_time(input_blob->time_stamp[i]); // the profile_time by hard coding. It needs to be replaced with
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1;
if (pi == 0) {
profile_time_idx = 0;
} else {
profile_time_idx = input_blob->p_size - 2;
}
for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) {
res->add_profile_time(input_blob->time_stamp[profile_time_idx]);
}
} }
// TODO(guru4elephant): find more elegant way to do this // TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start); res->add_profile_time(start);
......
...@@ -40,10 +40,15 @@ message Request { ...@@ -40,10 +40,15 @@ message Request {
}; };
message Response { message Response {
repeated FetchInst insts = 1; repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2; repeated int64 profile_time = 2;
}; };
message ModelOutput {
repeated FetchInst insts = 1;
optional string engine_name = 2;
}
service GeneralModelService { service GeneralModelService {
rpc inference(Request) returns (Response); rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response); rpc debug(Request) returns (Response);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "core/predictor/framework/dag.h" #include "core/predictor/framework/dag.h"
#include <string> #include <string>
#include <utility> // make_pair
#include <vector> #include <vector>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/predictor_metric.h" // PredictorMetric #include "core/predictor/framework/predictor_metric.h" // PredictorMetric
...@@ -199,25 +200,81 @@ const DagStage* Dag::stage_by_index(uint32_t index) { return _stages[index]; } ...@@ -199,25 +200,81 @@ const DagStage* Dag::stage_by_index(uint32_t index) { return _stages[index]; }
int Dag::topo_sort() { int Dag::topo_sort() {
std::stringstream ss; std::stringstream ss;
for (uint32_t nid = 0; nid < _index_nodes.size(); nid++) { uint32_t nodes_size = _index_nodes.size();
std::vector<uint32_t> in_degree(nodes_size, 0);
std::vector<std::vector<uint32_t>> in_egde(nodes_size);
for (uint32_t nid = 0; nid < nodes_size; nid++) {
in_degree[nid] += _index_nodes[nid]->depends.size();
for (auto it = _index_nodes[nid]->depends.begin();
it != _index_nodes[nid]->depends.end();
++it) {
uint32_t pnid = Dag::node_by_name(it->first)->id -
1; // 0 is reserved for begginer-op
in_egde[pnid].push_back(nid);
}
}
for (int i = 0; i < in_degree.size(); ++i) {
LOG(INFO) << "(" << _index_nodes[i]->name << ") in_degree[" << i
<< "]: " << in_degree[i];
}
int sorted_num = 0;
DagStage* stage = new (std::nothrow) DagStage(); DagStage* stage = new (std::nothrow) DagStage();
if (stage == NULL) { if (stage == NULL) {
LOG(ERROR) << "Invalid stage!"; LOG(ERROR) << "Invalid stage!";
return ERR_MEM_ALLOC_FAILURE; return ERR_MEM_ALLOC_FAILURE;
} }
stage->nodes.push_back(_index_nodes[nid]);
ss.str(""); ss.str("");
ss << _stages.size(); ss << _stages.size();
stage->name = ss.str(); stage->name = ss.str();
stage->full_name = full_name() + NAME_DELIMITER + stage->name; stage->full_name = full_name() + NAME_DELIMITER + stage->name;
for (uint32_t nid = 0; nid < nodes_size; ++nid) {
if (in_degree[nid] == 0) {
++sorted_num;
stage->nodes.push_back(_index_nodes[nid]);
// assign stage number after stage created
_index_nodes[nid]->stage = _stages.size();
// assign dag node full name after stage created
_index_nodes[nid]->full_name =
stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name;
}
}
if (stage->nodes.size() == 0) {
LOG(ERROR) << "Invalid Dag!";
return ERR_INTERNAL_FAILURE;
}
_stages.push_back(stage); _stages.push_back(stage);
while (sorted_num < nodes_size) {
auto pre_nodes = _stages.back()->nodes;
DagStage* stage = new (std::nothrow) DagStage();
ss.str("");
ss << _stages.size();
stage->name = ss.str();
stage->full_name = full_name() + NAME_DELIMITER + stage->name;
for (uint32_t pi = 0; pi < pre_nodes.size(); ++pi) {
uint32_t pnid = pre_nodes[pi]->id - 1;
for (uint32_t ei = 0; ei < in_egde[pnid].size(); ++ei) {
uint32_t nid = in_egde[pnid][ei];
--in_degree[nid];
if (in_degree[nid] == 0) {
++sorted_num;
stage->nodes.push_back(_index_nodes[nid]);
// assign stage number after stage created // assign stage number after stage created
_index_nodes[nid]->stage = nid; _index_nodes[nid]->stage = _stages.size();
// assign dag node full name after stage created // assign dag node full name after stage created
_index_nodes[nid]->full_name = _index_nodes[nid]->full_name =
stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name; stage->full_name + NAME_DELIMITER + _index_nodes[nid]->name;
} }
}
}
if (stage->nodes.size() == 0) {
LOG(ERROR) << "Invalid Dag!";
return ERR_INTERNAL_FAILURE;
}
_stages.push_back(stage);
}
return ERR_OK; return ERR_OK;
} }
......
...@@ -76,19 +76,34 @@ int DagView::init(Dag* dag, const std::string& service_name) { ...@@ -76,19 +76,34 @@ int DagView::init(Dag* dag, const std::string& service_name) {
} }
op->set_full_name(service_name + NAME_DELIMITER + node->full_name); op->set_full_name(service_name + NAME_DELIMITER + node->full_name);
// Set the name of the Op as the key of the matching engine.
VLOG(2) << "op->set_engine_name(" << node->name.c_str() << ")";
op->set_engine_name(node->name);
vnode->conf = node; vnode->conf = node;
vnode->op = op; vnode->op = op;
// Add depends
for (auto it = vnode->conf->depends.begin();
it != vnode->conf->depends.end();
++it) {
std::string pre_node_name = it->first;
VLOG(2) << "add op pre name: \n"
<< "current op name: " << vnode->op->op_name()
<< ", previous op name: " << pre_node_name;
vnode->op->add_pre_node_name(pre_node_name);
}
vstage->nodes.push_back(vnode); vstage->nodes.push_back(vnode);
} }
// TODO(guru4elephant): this seems buggy, please review later // TODO(guru4elephant): this seems buggy, please review later
if (si > 0) { /*if (si > 0) {*/
VLOG(2) << "set op pre name: \n" // VLOG(2) << "set op pre name: \n"
<< "current op name: " << vstage->nodes.back()->op->op_name() //<< "current op name: " << vstage->nodes.back()->op->op_name()
<< " previous op name: " //<< " previous op name: "
<< _view[si - 1]->nodes.back()->op->op_name(); //<< _view[si - 1]->nodes.back()->op->op_name();
vstage->nodes.back()->op->set_pre_node_name( // vstage->nodes.back()->op->set_pre_node_name(
_view[si - 1]->nodes.back()->op->op_name()); //_view[si - 1]->nodes.back()->op->op_name());
} /*}*/
_view.push_back(vstage); _view.push_back(vstage);
} }
...@@ -139,6 +154,7 @@ int DagView::execute_one_stage(ViewStage* vstage, ...@@ -139,6 +154,7 @@ int DagView::execute_one_stage(ViewStage* vstage,
butil::IOBufBuilder* debug_os) { butil::IOBufBuilder* debug_os) {
butil::Timer stage_time(butil::Timer::STARTED); butil::Timer stage_time(butil::Timer::STARTED);
uint32_t node_size = vstage->nodes.size(); uint32_t node_size = vstage->nodes.size();
VLOG(2) << "vstage->nodes.size(): " << node_size;
for (uint32_t ni = 0; ni < node_size; ni++) { for (uint32_t ni = 0; ni < node_size; ni++) {
ViewNode* vnode = vstage->nodes[ni]; ViewNode* vnode = vstage->nodes[ni];
DagNode* conf = vnode->conf; DagNode* conf = vnode->conf;
......
...@@ -765,6 +765,8 @@ class InferManager { ...@@ -765,6 +765,8 @@ class InferManager {
} }
size_t engine_num = model_toolkit_conf.engines_size(); size_t engine_num = model_toolkit_conf.engines_size();
for (size_t ei = 0; ei < engine_num; ++ei) { for (size_t ei = 0; ei < engine_num; ++ei) {
LOG(INFO) << "model_toolkit_conf.engines(" << ei
<< ").name: " << model_toolkit_conf.engines(ei).name();
std::string engine_name = model_toolkit_conf.engines(ei).name(); std::string engine_name = model_toolkit_conf.engines(ei).name();
VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine(); VersionedInferEngine* engine = new (std::nothrow) VersionedInferEngine();
if (!engine) { if (!engine) {
......
...@@ -56,11 +56,11 @@ int MempoolWrapper::thread_initialize() { ...@@ -56,11 +56,11 @@ int MempoolWrapper::thread_initialize() {
im::fugue::memory::Region* region = new im::fugue::memory::Region(); im::fugue::memory::Region* region = new im::fugue::memory::Region();
region->init(); region->init();
im::Mempool* mempool = new (std::nothrow) im::Mempool(region); im::Mempool* mempool = new (std::nothrow) im::Mempool(region);
MempoolRegion* mempool_region = new MempoolRegion(region, mempool);
if (mempool == NULL) { if (mempool == NULL) {
LOG(ERROR) << "Failed create thread mempool"; LOG(ERROR) << "Failed create thread mempool";
return -1; return -1;
} }
MempoolRegion* mempool_region = new MempoolRegion(region, mempool);
if (THREAD_SETSPECIFIC(_bspec_key, mempool_region) != 0) { if (THREAD_SETSPECIFIC(_bspec_key, mempool_region) != 0) {
LOG(ERROR) << "unable to set the thrd_data"; LOG(ERROR) << "unable to set the thrd_data";
......
...@@ -60,6 +60,7 @@ int Op::init(Bus* bus, ...@@ -60,6 +60,7 @@ int Op::init(Bus* bus,
return -1; return -1;
} }
_pre_node_names.clear();
return custom_init(); return custom_init();
} }
......
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
#pragma once #pragma once
#include <bvar/bvar.h> // bvar::LatencyRecorder #include <bvar/bvar.h> // bvar::LatencyRecorder
#include <cstdlib>
#include <string> #include <string>
#include <vector>
#include "core/predictor/common/inner_common.h" #include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/channel.h" #include "core/predictor/framework/channel.h"
#include "core/predictor/framework/op_repository.h" #include "core/predictor/framework/op_repository.h"
...@@ -132,18 +134,28 @@ class Op { ...@@ -132,18 +134,28 @@ class Op {
const std::string& full_name() const { return _full_name; } const std::string& full_name() const { return _full_name; }
const std::string& pre_name() const { return _pre_node_name; } const std::vector<std::string>& pre_names() const { return _pre_node_names; }
void set_full_name(const std::string full_name) { _full_name = full_name; } void set_full_name(const std::string full_name) { _full_name = full_name; }
void set_pre_node_name(const std::string pre_name) { void add_pre_node_name(const std::string pre_name) {
_pre_node_name = pre_name; _pre_node_names.push_back(pre_name);
} }
const std::string& type() const; const std::string& type() const;
uint32_t id() const; uint32_t id() const;
// Set the name of the Op as the key of the matching engine.
// Notes that this key is only used by infer_op (only the
// infer_op needs to find the corresponding engine).
// At present, there is only general_infer_op.
void set_engine_name(const std::string engine_name) {
_engine_name = engine_name;
}
const std::string& engine_name() const { return _engine_name; }
// --------------- Default implements ---------------- // --------------- Default implements ----------------
virtual int custom_init() { return 0; } virtual int custom_init() { return 0; }
...@@ -189,13 +201,14 @@ class Op { ...@@ -189,13 +201,14 @@ class Op {
Bus* _bus; Bus* _bus;
Dag* _dag; Dag* _dag;
uint32_t _id; uint32_t _id;
std::string _pre_node_name; // only for sequential execution std::vector<std::string> _pre_node_names; // for DAG execution
std::string _name; std::string _name;
std::string _full_name; // service_workflow_stageindex_opname std::string _full_name; // service_workflow_stageindex_opname
std::string _type; std::string _type;
bool _has_calc; bool _has_calc;
bool _has_init; bool _has_init;
TimerFlow* _timer; TimerFlow* _timer;
std::string _engine_name; // only for infer_op
}; };
template <typename T> template <typename T>
...@@ -215,7 +228,10 @@ class OpWithChannel : public Op { ...@@ -215,7 +228,10 @@ class OpWithChannel : public Op {
return _channel; return _channel;
} }
_channel = butil::get_object<ChannelType>(); // TODO(barriery): There are some problems in using butil::get_object
// _channel = butil::get_object<ChannelType>();
_channel = new ChannelType();
if (!_channel) { if (!_channel) {
LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name(); LOG(ERROR) << "Failed mutable channel of type:" << typeid(T).name();
return NULL; return NULL;
...@@ -229,8 +245,14 @@ class OpWithChannel : public Op { ...@@ -229,8 +245,14 @@ class OpWithChannel : public Op {
int release_channel() { int release_channel() {
if (_channel) { if (_channel) {
_channel->deinit(); _channel->deinit();
butil::return_object<ChannelType>(_channel); delete _channel;
} }
// TODO(barriery): There are some problems in using butil::get_object
/*
if (_channel) {
_channel->deinit();
butil::return_object<ChannelType>(_channel);
} */
_channel = NULL; _channel = NULL;
return 0; return 0;
......
...@@ -40,10 +40,15 @@ message Request { ...@@ -40,10 +40,15 @@ message Request {
}; };
message Response { message Response {
repeated FetchInst insts = 1; repeated ModelOutput outputs = 1;
repeated int64 profile_time = 2; repeated int64 profile_time = 2;
}; };
message ModelOutput {
repeated FetchInst insts = 1;
optional string engine_name = 2;
}
service GeneralModelService { service GeneralModelService {
rpc inference(Request) returns (Response); rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response); rpc debug(Request) returns (Response);
......
# How to Convert Paddle Inference Model To Paddle Serving Format
([简体中文](./INFERENCE_TO_SERVING_CN.md)|English)
## Example
``` python
from paddle_serving_client.io import inference_model_to_serving
inference_model_dir = "your_inference_model"
serving_client_dir = "serving_client_dir"
serving_server_dir = "serving_server_dir"
feed_var_names, fetch_var_names = inference_model_to_serving(
inference_model_dir, serving_client_dir, serving_server_dir)
```
# 如何从Paddle保存的预测模型转为Paddle Serving格式可部署的模型
([English](./INFERENCE_TO_SERVING.md)|简体中文)
## 示例
``` python
from paddle_serving_client.io import inference_model_to_serving
inference_model_dir = "your_inference_model"
serving_client_dir = "serving_client_dir"
serving_server_dir = "serving_server_dir"
feed_var_names, fetch_var_names = inference_model_to_serving(
inference_model_dir, serving_client_dir, serving_server_dir)
```
# Model Ensemble in Paddle Serving
([简体中文](MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md)|English)
In some scenarios, multiple models with the same input may be used to predict in parallel and integrate predicted results for better prediction effect. Paddle Serving also supports this feature.
Next, we will take the text classification task as an example to show model ensemble in Paddle Serving (This feature is still serial prediction for the time being. We will support parallel prediction as soon as possible).
## Simple example
In this example (see the figure below), the server side predict the bow and CNN models with the same input in a service in parallel, The client side fetchs the prediction results of the two models, and processes the prediction results to get the final predict results.
![simple example](model_ensemble_example.png)
It should be noted that at present, only multiple models with the same format input and output in the same service are supported. In this example, the input and output formats of CNN and BOW model are the same.
The code used in the example is saved in the `python/examples/imdb` path:
```shell
.
├── get_data.sh
├── imdb_reader.py
├── test_ensemble_client.py
└── test_ensemble_server.py
```
### Prepare data
Get the pre-trained CNN and BOW models by the following command (you can also run the `get_data.sh` script):
```shell
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz
tar -zxvf imdb_model.tar.gz
```
### Start server
Start server by the following Python code (you can also run the `test_ensemble_server.py` script):
```python
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = Server()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
```
Different from the normal prediction service, here we need to use DAG to describe the logic of the server side.
When creating an Op, you need to specify the predecessor of the current Op (in this example, the predecessor of `cnn_infer_op` and `bow_infer_op` is `read_op`, and the predecessor of `response_op` is `cnn_infer_op` and `bow_infer_op`. For the infer Op `infer_op`, you need to define the prediction engine name `engine_name` (You can also use the default value. It is recommended to set the value to facilitate the client side to obtain the order of prediction results).
At the same time, when configuring the model path, you need to create a model configuration dictionary with the infer Op as the key and the corresponding model path as value to inform Serving which model each infer OP uses.
### Start client
Start client by the following Python code (you can also run the `test_ensemble_client.py` script):
```python
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
client = Client()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
```
Compared with the normal prediction service, the client side has not changed much. When multiple model predictions are used, the prediction service will return a dictionary with engine name `engine_name`(the value is defined on the server side) as the key, and the corresponding model prediction results as the value.
### Expected result
```shell
step: 0, model: cnn, res: 0.560272455215
step: 0, model: bow, res: 0.633530199528
step: 1, model: cnn, res: 0.560272455215
step: 1, model: bow, res: 0.633530199528
step: 2, model: cnn, res: 0.560272455215
step: 2, model: bow, res: 0.633530199528
```
# Paddle Serving中的集成预测
(简体中文|[English](MODEL_ENSEMBLE_IN_PADDLE_SERVING.md))
在一些场景中,可能使用多个相同输入的模型并行集成预测以获得更好的预测效果,Paddle Serving提供了这项功能。
下面将以文本分类任务为例,来展示Paddle Serving的集成预测功能(暂时还是串行预测,我们会尽快支持并行化)。
## 集成预测样例
该样例中(见下图),Server端在一项服务中并行预测相同输入的BOW和CNN模型,Client端获取两个模型的预测结果并进行后处理,得到最终的预测结果。
![simple example](model_ensemble_example.png)
需要注意的是,目前只支持在同一个服务中使用多个相同格式输入输出的模型。在该例子中,CNN模型和BOW模型的输入输出格式是相同的。
样例中用到的代码保存在`python/examples/imdb`路径下:
```shell
.
├── get_data.sh
├── imdb_reader.py
├── test_ensemble_client.py
└── test_ensemble_server.py
```
### 数据准备
通过下面命令获取预训练的CNN和BOW模型(您也可以直接运行`get_data.sh`脚本):
```shell
wget --no-check-certificate https://fleet.bj.bcebos.com/text_classification_data.tar.gz
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/imdb-demo/imdb_model.tar.gz
tar -zxvf text_classification_data.tar.gz
tar -zxvf imdb_model.tar.gz
```
### 启动Server
通过下面的Python代码启动Server端(您也可以直接运行`test_ensemble_server.py`脚本):
```python
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = Server()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
```
与普通预测服务不同的是,这里我们需要用DAG来描述Server端的运行逻辑。
在创建Op的时候需要指定当前Op的前继(在该例子中,`cnn_infer_op``bow_infer_op`的前继均是`read_op``response_op`的前继是`cnn_infer_op``bow_infer_op`),对于预测Op`infer_op`还需要定义预测引擎名称`engine_name`(也可以使用默认值,建议设置该值方便Client端获取预测结果)。
同时在配置模型路径时,需要以预测Op为key,对应的模型路径为value,创建模型配置字典,来告知Serving每个预测Op使用哪个模型。
### 启动Client
通过下面的Python代码运行Client端(您也可以直接运行`test_ensemble_client.py`脚本):
```python
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
client = Client()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
```
Client端与普通预测服务没有发生太大的变化。当使用多个模型预测时,预测服务将返回一个key为Server端定义的引擎名称`engine_name`,value为对应的模型预测结果的字典。
### 预期结果
```txt
step: 0, model: cnn, res: 0.560272455215
step: 0, model: bow, res: 0.633530199528
step: 1, model: cnn, res: 0.560272455215
step: 1, model: bow, res: 0.633530199528
step: 2, model: cnn, res: 0.560272455215
step: 2, model: bow, res: 0.633530199528
```
# How to develop a new Web service?
([简体中文](NEW_WEB_SERVICE_CN.md)|English)
This document will take the image classification service based on the Imagenet data set as an example to introduce how to develop a new web service. The complete code can be visited at [here](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imagenet/image_classification_service.py).
## WebService base class
Paddle Serving implements the [WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23) base class. You need to override its `preprocess` and `postprocess` method. The default implementation is as follows:
```python
class WebService(object):
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map=None):
return fetch_map
```
### preprocess
The preprocess method has two input parameters, `feed` and `fetch`. For an HTTP request `request`:
- The value of `feed` is request data `request.json`
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
The return values are the feed and fetch values used in the prediction.
### postprocess
The postprocess method has three input parameters, `feed`, `fetch` and `fetch_map`:
- The value of `feed` is request data `request.json`
- The value of `fetch` is the fetch part `request.json["fetch"]` in the request data
- The value of `fetch_map` is the model output value.
The return value will be processed as `{"reslut": fetch_map}` as the return of the HTTP request.
## Develop ImageService class
```python
class ImageService(WebService):
def preprocess(self, feed={}, fetch=[]):
reader = ImageReader()
if "image" not in feed:
raise ("feed data error!")
if isinstance(feed["image"], list):
feed_batch = []
for image in feed["image"]:
sample = base64.b64decode(image)
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
feed_batch.append(res_feed)
return feed_batch, fetch
else:
sample = base64.b64decode(feed["image"])
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
return res_feed, fetch
```
For the above `ImageService`, only the `preprocess` method is rewritten to process the image data in Base64 format into the data format required by prediction.
# 如何开发一个新的Web Service?
(简体中文|[English](NEW_WEB_SERVICE.md))
本文档将以Imagenet图像分类服务为例,来介绍如何开发一个新的Web Service。您可以在[这里](https://github.com/PaddlePaddle/Serving/blob/develop/python/examples/imagenet/image_classification_service.py)查阅完整的代码。
## WebService基类
Paddle Serving实现了[WebService](https://github.com/PaddlePaddle/Serving/blob/develop/python/paddle_serving_server/web_service.py#L23)基类,您需要重写它的`preprocess`方法和`postprocess`方法,默认实现如下:
```python
class WebService(object):
def preprocess(self, feed={}, fetch=[]):
return feed, fetch
def postprocess(self, feed={}, fetch=[], fetch_map=None):
return fetch_map
```
### preprocess方法
preprocess方法有两个输入参数,`feed``fetch`。对于一个HTTP请求`request`
- `feed`的值为请求数据`request.json`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
返回值分别是预测过程中用到的feed和fetch值。
### postprocess方法
postprocess方法有三个输入参数,`feed``fetch``fetch_map`
- `feed`的值为请求数据`request.json`
- `fetch`的值为请求数据中的fetch部分`request.json["fetch"]`
- `fetch_map`的值为fetch到的模型输出值
返回值将会被处理成`{"reslut": fetch_map}`作为HTTP请求的返回。
## 开发ImageService类
```python
class ImageService(WebService):
def preprocess(self, feed={}, fetch=[]):
reader = ImageReader()
if "image" not in feed:
raise ("feed data error!")
if isinstance(feed["image"], list):
feed_batch = []
for image in feed["image"]:
sample = base64.b64decode(image)
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
feed_batch.append(res_feed)
return feed_batch, fetch
else:
sample = base64.b64decode(feed["image"])
img = reader.process_image(sample)
res_feed = {}
res_feed["image"] = img.reshape(-1)
return res_feed, fetch
```
对于上述的`ImageService`,只重写了前处理方法,将base64格式的图片数据处理成模型预测需要的数据格式。
...@@ -14,13 +14,19 @@ Deep neural nets often have some preprocessing steps on input data, and postproc ...@@ -14,13 +14,19 @@ Deep neural nets often have some preprocessing steps on input data, and postproc
## How to define Node ## How to define Node
### Simple series structure
PaddleServing has some predefined Computation Node in the framework. A very commonly used Computation Graph is the simple reader-inference-response mode that can cover most of the single model inference scenarios. A example graph and the corresponding DAG definition code is as follows. PaddleServing has some predefined Computation Node in the framework. A very commonly used Computation Graph is the simple reader-inference-response mode that can cover most of the single model inference scenarios. A example graph and the corresponding DAG definition code is as follows.
<center> <center>
<img src='simple_dag.png' width = "260" height = "370" align="middle"/> <img src='simple_dag.png' width = "260" height = "370" align="middle"/>
</center> </center>
``` python ``` python
import paddle_serving_server as serving import paddle_serving_server as serving
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
op_maker = serving.OpMaker() op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer') general_infer_op = op_maker.create('general_infer')
...@@ -32,18 +38,54 @@ op_seq_maker.add_op(general_infer_op) ...@@ -32,18 +38,54 @@ op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op) op_seq_maker.add_op(general_response_op)
``` ```
For simple series logic, we simplify it and build it with `OpSeqMaker`. You can determine the successor by default according to the order of joining `OpSeqMaker` without specifying the successor of each node.
Since the code will be commonly used and users do not have to change the code, PaddleServing releases a easy-to-use launching command for service startup. An example is as follows: Since the code will be commonly used and users do not have to change the code, PaddleServing releases a easy-to-use launching command for service startup. An example is as follows:
``` python ``` python
python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292
``` ```
### Nodes with multiple inputs
An example containing multiple input nodes is given in the [MODEL_ENSEMBLE_IN_PADDLE_SERVING](MODEL_ENSEMBLE_IN_PADDLE_SERVING.md). A example graph and the corresponding DAG definition code is as follows.
<center>
<img src='complex_dag.png' width = "480" height = "400" align="middle"/>
</center>
```python
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
```
For a graph with multiple input nodes, we need to use `OpGraphMaker` to build it, and you must give the predecessor of each node.
## More Examples ## More Examples
If a user has sparse features as inputs, and the model will do embedding lookup for each feature, we can do distributed embedding lookup operation which is not in the Paddle training computation graph. An example is as follows: If a user has sparse features as inputs, and the model will do embedding lookup for each feature, we can do distributed embedding lookup operation which is not in the Paddle training computation graph. An example is as follows:
``` python ``` python
import paddle_serving_server as serving import paddle_serving_server as serving
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
op_maker = serving.OpMaker() op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
dist_kv_op = op_maker.create('general_dist_kv') dist_kv_op = op_maker.create('general_dist_kv')
......
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
## 如何定义节点 ## 如何定义节点
### 简单的串联结构
PaddleServing在框架中具有一些预定义的计算节点。 一种非常常用的计算图是简单的reader-infer-response模式,可以涵盖大多数单一模型推理方案。 示例图和相应的DAG定义代码如下。 PaddleServing在框架中具有一些预定义的计算节点。 一种非常常用的计算图是简单的reader-infer-response模式,可以涵盖大多数单一模型推理方案。 示例图和相应的DAG定义代码如下。
<center> <center>
<img src='simple_dag.png' width = "260" height = "370" align="middle"/> <img src='simple_dag.png' width = "260" height = "370" align="middle"/>
...@@ -21,6 +23,9 @@ PaddleServing在框架中具有一些预定义的计算节点。 一种非常常 ...@@ -21,6 +23,9 @@ PaddleServing在框架中具有一些预定义的计算节点。 一种非常常
``` python ``` python
import paddle_serving_server as serving import paddle_serving_server as serving
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
op_maker = serving.OpMaker() op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
general_infer_op = op_maker.create('general_infer') general_infer_op = op_maker.create('general_infer')
...@@ -32,18 +37,54 @@ op_seq_maker.add_op(general_infer_op) ...@@ -32,18 +37,54 @@ op_seq_maker.add_op(general_infer_op)
op_seq_maker.add_op(general_response_op) op_seq_maker.add_op(general_response_op)
``` ```
对于简单的串联逻辑,我们将其简化为`Sequence`,使用`OpSeqMaker`进行构建。用户可以不指定每个节点的前继,默认按加入`OpSeqMaker`的顺序来确定前继。
由于该代码在大多数情况下都会被使用,并且用户不必更改代码,因此PaddleServing会发布一个易于使用的启动命令来启动服务。 示例如下: 由于该代码在大多数情况下都会被使用,并且用户不必更改代码,因此PaddleServing会发布一个易于使用的启动命令来启动服务。 示例如下:
``` python ``` python
python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292 python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9292
``` ```
### 包含多个输入的节点
[Paddle Serving中的集成预测](MODEL_ENSEMBLE_IN_PADDLE_SERVING_CN.md)文档中给出了一个包含多个输入节点的样例,示意图和代码如下。
<center>
<img src='complex_dag.png' width = "480" height = "400" align="middle"/>
</center>
```python
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
```
对于含有多输入节点的计算图,需要使用`OpGraphMaker`来构建,同时必须给出每个节点的前继。
## 更多示例 ## 更多示例
如果用户将稀疏特征作为输入,并且模型将对每个特征进行嵌入查找,则我们可以进行分布式嵌入查找操作,该操作不在Paddle训练计算图中。 示例如下: 如果用户将稀疏特征作为输入,并且模型将对每个特征进行嵌入查找,则我们可以进行分布式嵌入查找操作,该操作不在Paddle训练计算图中。 示例如下:
``` python ``` python
import paddle_serving_server as serving import paddle_serving_server as serving
from paddle_serving_server import OpMaker
from paddle_serving_server import OpSeqMaker
op_maker = serving.OpMaker() op_maker = serving.OpMaker()
read_op = op_maker.create('general_reader') read_op = op_maker.create('general_reader')
dist_kv_op = op_maker.create('general_dist_kv') dist_kv_op = op_maker.create('general_dist_kv')
......
# Faster RCNN model on Paddle Serving
([简体中文](./README_CN.md)|English)
This article requires [Paddle Detection](https://github.com/PaddlePaddle/PaddleDetection) trained models and configuration files. If users want to quickly deploy on Paddle Serving, please read the Chapter 2 directly.
## 1. Train an object detection model
Users can read [Paddle Detection Getting Started](https://github.com/PaddlePaddle/PaddleDetection/blob/release/0.2/docs/tutorials/GETTING_STARTED_cn.md) to understand the background of Paddle Detection. The purpose of PaddleDetection is to provide a rich and easy-to-use object detection model for industry and academia. Not only is it superior in performance and easy to deploy, but it can also flexibly meet the needs of algorithm research.
### Environmental requirements
CPU version: No special requirements
GPU version: CUDA 9.0 and above
```
git clone https://github.com/PaddlePaddle/PaddleDetection
cd PaddleDetection
```
Next, you can train the faster rcnn model
```
python tools/train.py -c configs/faster_rcnn_r50_1x.yml
```
The time for training the model depends on the situation and is related to the computing power of the training equipment and the number of iterations.
In the training process, `faster_rcnn_r50_1x.yml` defines the snapshot of the saved model. After the final training, the model with the best effect will be saved as `best_model.pdmodel`, which is a compressed PaddleDetection Exclusive model files.
**If we want the model to be used by Paddle Serving, we must do export_model.**
Output model
```
python export_model.py
```
## 2. Start the model and predict
If users do not use the Paddle Detection project to train models, we are here to provide you with sample model downloads. If you trained the model with Paddle Detection, you can skip the ** Download Model ** section.
### Download model
```
wget https://paddle-serving.bj.bcebos.com/pddet_demo/faster_rcnn_model.tar.gz
wget https://paddle-serving.bj.bcebos.com/pddet_demo/paddle_serving_app-0.0.1-py2-none-any.whl
wget https://paddle-serving.bj.bcebos.com/pddet_demo/infer_cfg.yml
tar xf faster_rcnn_model.tar.gz
mv faster_rcnn_model/pddet *.
```
### Start the service
```
GLOG_v = 2 python -m paddle_serving_server_gpu.serve --model pddet_serving_model --port 9494 --gpu_id 0
```
### Perform prediction
```
python test_client.py --config_path = infer_cfg.yml --infer_img = 000000570688.jpg --dump_result --visualize
```
## 3. Result analysis
<p align = "center">
    <br>
<img src = '000000570688.jpg'>
    <br>
<p>
This is the input picture
  
<p align = "center">
    <br>
<img src = '000000570688_bbox.jpg'>
    <br>
<p>
This is the picture after adding bbox. You can see that the client has done post-processing for the picture. In addition, the output/bbox.json also has the number and coordinate information of each box.
# Faster RCNN模型
(简体中文|[English](./README.md))
本文需要[Paddle Detection](https://github.com/PaddlePaddle/PaddleDetection)训练的模型和配置文件。如果用户想要快速部署在Paddle Serving上,请直接阅读第二章节。
## 1. 训练物体检测模型
用户可以阅读 [Paddle Detection入门使用](https://github.com/PaddlePaddle/PaddleDetection/blob/release/0.2/docs/tutorials/GETTING_STARTED_cn.md)来了解Paddle Detection的背景。PaddleDetection的目的是为工业界和学术界提供丰富、易用的目标检测模型。不仅性能优越、易于部署,而且能够灵活的满足算法研究的需求。
### 环境要求
CPU版: 没有特别要求
GPU版: CUDA 9.0及以上
```
git clone https://github.com/PaddlePaddle/PaddleDetection
cd PaddleDetection
```
接下来可以训练faster rcnn模型
```
python tools/train.py -c configs/faster_rcnn_r50_1x.yml
```
训练模型的时间视情况而定,与训练的设备算力和迭代轮数相关。
在训练的过程中,`faster_rcnn_r50_1x.yml`当中定义了保存模型的`snapshot`,在最终训练完成后,效果最好的模型,会被保存为`best_model.pdmodel`,这是一个经过压缩的PaddleDetection的专属模型文件。
**如果我们要让模型可被Paddle Serving所使用,必须做export_model。**
输出模型
```
python export_model.py
```
## 2. 启动模型并预测
如果用户没有用Paddle Detection项目训练模型,我们也在此为您提供示例模型下载。如果您用Paddle Detection训练了模型,可以跳过 **下载模型** 部分。
### 下载模型
```
wget https://paddle-serving.bj.bcebos.com/pddet_demo/faster_rcnn_model.tar.gz
wget https://paddle-serving.bj.bcebos.com/pddet_demo/paddle_serving_app-0.0.1-py2-none-any.whl
wget https://paddle-serving.bj.bcebos.com/pddet_demo/infer_cfg.yml
tar xf faster_rcnn_model.tar.gz
mv faster_rcnn_model/pddet* .
```
### 启动服务
```
GLOG_v=2 python -m paddle_serving_server_gpu.serve --model pddet_serving_model --port 9494 --gpu_id 0
```
### 执行预测
```
python test_client.py --config_path=infer_cfg.yml --infer_img=000000570688.jpg --dump_result --visualize
```
## 3. 结果分析
<p align="center">
<br>
<img src='000000570688.jpg' >
<br>
<p>
这是输入图片
<p align="center">
<br>
<img src='000000570688_bbox.jpg' >
<br>
<p>
这是实现添加了bbox之后的图片,可以看到客户端已经为图片做好了后处理,此外在output/bbox.json也有各个框的编号和坐标信息。
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from paddle_serving_client import Client
import sys
import os
import time
from paddle_serving_app.reader.pddet import Detection
import numpy as np
py_version = sys.version_info[0]
feed_var_names = ['image', 'im_shape', 'im_info']
fetch_var_names = ['multiclass_nms']
pddet = Detection(config_path=sys.argv[2], output_dir="./output")
feed_dict = pddet.preprocess(feed_var_names, sys.argv[3])
client = Client()
client.load_client_config(sys.argv[1])
client.connect(['127.0.0.1:9494'])
fetch_map = client.predict(feed=feed_dict, fetch=fetch_var_names)
outs = fetch_map.values()
pddet.postprocess(fetch_map, fetch_var_names)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_client import Client
from imdb_reader import IMDBDataset
client = Client()
# If you have more than one model, make sure that the input
# and output of more than one model are the same.
client.load_client_config('imdb_bow_client_conf/serving_client_conf.prototxt')
client.connect(["127.0.0.1:9393"])
# you can define any english sentence or dataset here
# This example reuses imdb reader in training, you
# can define your own data preprocessing easily.
imdb_dataset = IMDBDataset()
imdb_dataset.load_resource('imdb.vocab')
for i in range(3):
line = 'i am very sad | 0'
word_ids, label = imdb_dataset.get_words_and_label(line)
feed = {"words": word_ids}
fetch = ["acc", "cost", "prediction"]
fetch_maps = client.predict(feed=feed, fetch=fetch)
if len(fetch_maps) == 1:
print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1]))
else:
for model, fetch_map in fetch_maps.items():
print("step: {}, model: {}, res: {}".format(i, model, fetch_map[
'prediction'][0][1]))
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
from paddle_serving_server import OpMaker
from paddle_serving_server import OpGraphMaker
from paddle_serving_server import Server
op_maker = OpMaker()
read_op = op_maker.create('general_reader')
cnn_infer_op = op_maker.create(
'general_infer', engine_name='cnn', inputs=[read_op])
bow_infer_op = op_maker.create(
'general_infer', engine_name='bow', inputs=[read_op])
response_op = op_maker.create(
'general_response', inputs=[cnn_infer_op, bow_infer_op])
op_graph_maker = OpGraphMaker()
op_graph_maker.add_op(read_op)
op_graph_maker.add_op(cnn_infer_op)
op_graph_maker.add_op(bow_infer_op)
op_graph_maker.add_op(response_op)
server = Server()
server.set_op_graph(op_graph_maker.get_op_graph())
model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'}
server.load_model_config(model_config)
server.prepare_server(workdir="work_dir1", port=9393, device="cpu")
server.run_server()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import argparse
from .image_tool import Resize, Detection
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import numpy as np
from PIL import Image, ImageDraw
import cv2
import yaml
import copy
import argparse
import logging
import paddle.fluid as fluid
import json
FORMAT = '%(asctime)s-%(levelname)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)
precision_map = {
'trt_int8': fluid.core.AnalysisConfig.Precision.Int8,
'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32,
'trt_fp16': fluid.core.AnalysisConfig.Precision.Half
}
class Resize(object):
def __init__(self,
target_size,
max_size=0,
interp=cv2.INTER_LINEAR,
use_cv2=True,
image_shape=None):
super(Resize, self).__init__()
self.target_size = target_size
self.max_size = max_size
self.interp = interp
self.use_cv2 = use_cv2
self.image_shape = image_shape
def __call__(self, im):
origin_shape = im.shape[:2]
im_c = im.shape[2]
if self.max_size != 0:
im_size_min = np.min(origin_shape[0:2])
im_size_max = np.max(origin_shape[0:2])
im_scale = float(self.target_size) / float(im_size_min)
if np.round(im_scale * im_size_max) > self.max_size:
im_scale = float(self.max_size) / float(im_size_max)
im_scale_x = im_scale
im_scale_y = im_scale
resize_w = int(im_scale_x * float(origin_shape[1]))
resize_h = int(im_scale_y * float(origin_shape[0]))
else:
im_scale_x = float(self.target_size) / float(origin_shape[1])
im_scale_y = float(self.target_size) / float(origin_shape[0])
resize_w = self.target_size
resize_h = self.target_size
if self.use_cv2:
im = cv2.resize(
im,
None,
None,
fx=im_scale_x,
fy=im_scale_y,
interpolation=self.interp)
else:
if self.max_size != 0:
raise TypeError(
'If you set max_size to cap the maximum size of image,'
'please set use_cv2 to True to resize the image.')
im = im.astype('uint8')
im = Image.fromarray(im)
im = im.resize((int(resize_w), int(resize_h)), self.interp)
im = np.array(im)
# padding im
if self.max_size != 0 and self.image_shape is not None:
padding_im = np.zeros(
(self.max_size, self.max_size, im_c), dtype=np.float32)
im_h, im_w = im.shape[:2]
padding_im[:im_h, :im_w, :] = im
im = padding_im
return im, im_scale_x
class Normalize(object):
def __init__(self, mean, std, is_scale=True, is_channel_first=False):
super(Normalize, self).__init__()
self.mean = mean
self.std = std
self.is_scale = is_scale
self.is_channel_first = is_channel_first
def __call__(self, im):
im = im.astype(np.float32, copy=False)
if self.is_channel_first:
mean = np.array(self.mean)[:, np.newaxis, np.newaxis]
std = np.array(self.std)[:, np.newaxis, np.newaxis]
else:
mean = np.array(self.mean)[np.newaxis, np.newaxis, :]
std = np.array(self.std)[np.newaxis, np.newaxis, :]
if self.is_scale:
im = im / 255.0
im -= mean
im /= std
return im
class Permute(object):
def __init__(self, to_bgr=False, channel_first=True):
self.to_bgr = to_bgr
self.channel_first = channel_first
def __call__(self, im):
if self.channel_first:
im = im.transpose((2, 0, 1))
if self.to_bgr:
im = im[[2, 1, 0], :, :]
return im.copy()
class PadStride(object):
def __init__(self, stride=0):
assert stride >= 0, "Unsupported stride: {},"
" the stride in PadStride must be greater "
"or equal to 0".format(stride)
self.coarsest_stride = stride
def __call__(self, im):
coarsest_stride = self.coarsest_stride
if coarsest_stride == 0:
return im
im_c, im_h, im_w = im.shape
pad_h = int(np.ceil(float(im_h) / coarsest_stride) * coarsest_stride)
pad_w = int(np.ceil(float(im_w) / coarsest_stride) * coarsest_stride)
padding_im = np.zeros((im_c, pad_h, pad_w), dtype=np.float32)
padding_im[:, :im_h, :im_w] = im
return padding_im
class Detection():
def __init__(self, config_path, output_dir):
self.config_path = config_path
self.if_visualize = True
self.if_dump_result = True
self.output_dir = output_dir
def DecodeImage(self, im_path):
assert os.path.exists(im_path), "Image path {} can not be found".format(
im_path)
with open(im_path, 'rb') as f:
im = f.read()
data = np.frombuffer(im, dtype='uint8')
im = cv2.imdecode(data, 1) # BGR mode, but need RGB mode
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
return im
def Preprocess(self, img_path, arch, config):
img = self.DecodeImage(img_path)
orig_shape = img.shape
scale = 1.
data = []
data_config = copy.deepcopy(config)
for data_aug_conf in data_config:
obj = data_aug_conf.pop('type')
preprocess = eval(obj)(**data_aug_conf)
if obj == 'Resize':
img, scale = preprocess(img)
else:
img = preprocess(img)
img = img[np.newaxis, :] # N, C, H, W
data.append(img)
extra_info = self.get_extra_info(img, arch, orig_shape, scale)
data += extra_info
return data
def expand_boxes(self, boxes, scale):
"""
Expand an array of boxes by a given scale.
"""
w_half = (boxes[:, 2] - boxes[:, 0]) * .5
h_half = (boxes[:, 3] - boxes[:, 1]) * .5
x_c = (boxes[:, 2] + boxes[:, 0]) * .5
y_c = (boxes[:, 3] + boxes[:, 1]) * .5
w_half *= scale
h_half *= scale
boxes_exp = np.zeros(boxes.shape)
boxes_exp[:, 0] = x_c - w_half
boxes_exp[:, 2] = x_c + w_half
boxes_exp[:, 1] = y_c - h_half
boxes_exp[:, 3] = y_c + h_half
return boxes_exp
def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5):
import pycocotools.mask as mask_util
scale = (resolution + 2.0) / resolution
segm_res = []
for t in results:
bboxes = t['bbox'][0]
lengths = t['bbox'][1][0]
if bboxes.shape == (1, 1) or bboxes is None:
continue
if len(bboxes.tolist()) == 0:
continue
masks = t['mask'][0]
s = 0
# for each sample
for i in range(len(lengths)):
num = lengths[i]
im_shape = t['im_shape'][i]
bbox = bboxes[s:s + num][:, 2:]
clsid_scores = bboxes[s:s + num][:, 0:2]
mask = masks[s:s + num]
s += num
im_h = int(im_shape[0])
im_w = int(im_shape[1])
expand_bbox = expand_boxes(bbox, scale)
expand_bbox = expand_bbox.astype(np.int32)
padded_mask = np.zeros(
(resolution + 2, resolution + 2), dtype=np.float32)
for j in range(num):
xmin, ymin, xmax, ymax = expand_bbox[j].tolist()
clsid, score = clsid_scores[j].tolist()
clsid = int(clsid)
padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :]
catid = clsid2catid[clsid]
w = xmax - xmin + 1
h = ymax - ymin + 1
w = np.maximum(w, 1)
h = np.maximum(h, 1)
resized_mask = cv2.resize(padded_mask, (w, h))
resized_mask = np.array(
resized_mask > thresh_binarize, dtype=np.uint8)
im_mask = np.zeros((im_h, im_w), dtype=np.uint8)
x0 = min(max(xmin, 0), im_w)
x1 = min(max(xmax + 1, 0), im_w)
y0 = min(max(ymin, 0), im_h)
y1 = min(max(ymax + 1, 0), im_h)
im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):(
y1 - ymin), (x0 - xmin):(x1 - xmin)]
segm = mask_util.encode(
np.array(
im_mask[:, :, np.newaxis], order='F'))[0]
catid = clsid2catid[clsid]
segm['counts'] = segm['counts'].decode('utf8')
coco_res = {
'category_id': catid,
'segmentation': segm,
'score': score
}
segm_res.append(coco_res)
return segm_res
def draw_bbox(self, image, catid2name, bboxes, threshold, color_list):
"""
draw bbox on image
"""
draw = ImageDraw.Draw(image)
for dt in np.array(bboxes):
catid, bbox, score = dt['category_id'], dt['bbox'], dt['score']
if score < threshold:
continue
xmin, ymin, w, h = bbox
xmax = xmin + w
ymax = ymin + h
color = tuple(color_list[catid])
# draw bbox
draw.line(
[(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin),
(xmin, ymin)],
width=2,
fill=color)
# draw label
text = "{} {:.2f}".format(catid2name[catid], score)
tw, th = draw.textsize(text)
draw.rectangle(
[(xmin + 1, ymin - th), (xmin + tw + 1, ymin)], fill=color)
draw.text((xmin + 1, ymin - th), text, fill=(255, 255, 255))
return image
def draw_mask(self, image, masks, threshold, color_list, alpha=0.7):
"""
Draw mask on image
"""
mask_color_id = 0
w_ratio = .4
img_array = np.array(image).astype('float32')
for dt in np.array(masks):
segm, score = dt['segmentation'], dt['score']
if score < threshold:
continue
import pycocotools.mask as mask_util
mask = mask_util.decode(segm) * 255
color_mask = color_list[mask_color_id % len(color_list), 0:3]
mask_color_id += 1
for c in range(3):
color_mask[c] = color_mask[c] * (1 - w_ratio) + w_ratio * 255
idx = np.nonzero(mask)
img_array[idx[0], idx[1], :] *= 1.0 - alpha
img_array[idx[0], idx[1], :] += alpha * color_mask
return Image.fromarray(img_array.astype('uint8'))
def get_extra_info(self, im, arch, shape, scale):
info = []
input_shape = []
im_shape = []
logger.info('The architecture is {}'.format(arch))
if 'YOLO' in arch:
im_size = np.array([shape[:2]]).astype('int32')
logger.info('Extra info: im_size')
info.append(im_size)
elif 'SSD' in arch:
im_shape = np.array([shape[:2]]).astype('int32')
logger.info('Extra info: im_shape')
info.append([im_shape])
elif 'RetinaNet' in arch:
input_shape.extend(im.shape[2:])
im_info = np.array([input_shape + [scale]]).astype('float32')
logger.info('Extra info: im_info')
info.append(im_info)
elif 'RCNN' in arch:
input_shape.extend(im.shape[2:])
im_shape.extend(shape[:2])
im_info = np.array([input_shape + [scale]]).astype('float32')
im_shape = np.array([im_shape + [1.]]).astype('float32')
logger.info('Extra info: im_info, im_shape')
info.append(im_info)
info.append(im_shape)
else:
logger.error(
"Unsupported arch: {}, expect YOLO, SSD, RetinaNet and RCNN".
format(arch))
return info
def offset_to_lengths(self, lod):
offset = lod[0]
lengths = [offset[i + 1] - offset[i] for i in range(len(offset) - 1)]
return [lengths]
def bbox2out(self, results, clsid2catid, is_bbox_normalized=False):
"""
Args:
results: request a dict, should include: `bbox`, `im_id`,
if is_bbox_normalized=True, also need `im_shape`.
clsid2catid: class id to category id map of COCO2017 dataset.
is_bbox_normalized: whether or not bbox is normalized.
"""
xywh_res = []
for t in results:
bboxes = t['bbox'][0]
lengths = t['bbox'][1][0]
if bboxes.shape == (1, 1) or bboxes is None:
continue
k = 0
for i in range(len(lengths)):
num = lengths[i]
for j in range(num):
dt = bboxes[k]
clsid, score, xmin, ymin, xmax, ymax = dt.tolist()
catid = (clsid2catid[int(clsid)])
if is_bbox_normalized:
xmin, ymin, xmax, ymax = \
self.clip_bbox([xmin, ymin, xmax, ymax])
w = xmax - xmin
h = ymax - ymin
im_shape = t['im_shape'][0][i].tolist()
im_height, im_width = int(im_shape[0]), int(im_shape[1])
xmin *= im_width
ymin *= im_height
w *= im_width
h *= im_height
else:
w = xmax - xmin + 1
h = ymax - ymin + 1
bbox = [xmin, ymin, w, h]
coco_res = {
'category_id': catid,
'bbox': bbox,
'score': score
}
xywh_res.append(coco_res)
k += 1
return xywh_res
def get_bbox_result(self, fetch_map, fetch_name, result, conf, clsid2catid):
is_bbox_normalized = True if 'SSD' in conf['arch'] else False
output = fetch_map[fetch_name]
lod = [fetch_map[fetch_name + '.lod']]
lengths = self.offset_to_lengths(lod)
np_data = np.array(output)
result['bbox'] = (np_data, lengths)
result['im_id'] = np.array([[0]])
bbox_results = self.bbox2out([result], clsid2catid, is_bbox_normalized)
return bbox_results
def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5):
import pycocotools.mask as mask_util
scale = (resolution + 2.0) / resolution
segm_res = []
for t in results:
bboxes = t['bbox'][0]
lengths = t['bbox'][1][0]
if bboxes.shape == (1, 1) or bboxes is None:
continue
if len(bboxes.tolist()) == 0:
continue
masks = t['mask'][0]
s = 0
# for each sample
for i in range(len(lengths)):
num = lengths[i]
im_shape = t['im_shape'][i]
bbox = bboxes[s:s + num][:, 2:]
clsid_scores = bboxes[s:s + num][:, 0:2]
mask = masks[s:s + num]
s += num
im_h = int(im_shape[0])
im_w = int(im_shape[1])
expand_bbox = expand_boxes(bbox, scale)
expand_bbox = expand_bbox.astype(np.int32)
padded_mask = np.zeros(
(resolution + 2, resolution + 2), dtype=np.float32)
for j in range(num):
xmin, ymin, xmax, ymax = expand_bbox[j].tolist()
clsid, score = clsid_scores[j].tolist()
clsid = int(clsid)
padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :]
catid = clsid2catid[clsid]
w = xmax - xmin + 1
h = ymax - ymin + 1
w = np.maximum(w, 1)
h = np.maximum(h, 1)
resized_mask = cv2.resize(padded_mask, (w, h))
resized_mask = np.array(
resized_mask > thresh_binarize, dtype=np.uint8)
im_mask = np.zeros((im_h, im_w), dtype=np.uint8)
x0 = min(max(xmin, 0), im_w)
x1 = min(max(xmax + 1, 0), im_w)
y0 = min(max(ymin, 0), im_h)
y1 = min(max(ymax + 1, 0), im_h)
im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):(
y1 - ymin), (x0 - xmin):(x1 - xmin)]
segm = mask_util.encode(
np.array(
im_mask[:, :, np.newaxis], order='F'))[0]
catid = clsid2catid[clsid]
segm['counts'] = segm['counts'].decode('utf8')
coco_res = {
'category_id': catid,
'segmentation': segm,
'score': score
}
segm_res.append(coco_res)
return segm_res
def get_mask_result(self, fetch_map, fetch_var_names, result, conf,
clsid2catid):
resolution = conf['mask_resolution']
bbox_out, mask_out = fetch_map[fetch_var_names]
lengths = self.offset_to_lengths(bbox_out.lod())
bbox = np.array(bbox_out)
mask = np.array(mask_out)
result['bbox'] = (bbox, lengths)
result['mask'] = (mask, lengths)
mask_results = self.mask2out([result], clsid2catid,
conf['mask_resolution'])
return mask_results
def get_category_info(self, with_background, label_list):
if label_list[0] != 'background' and with_background:
label_list.insert(0, 'background')
if label_list[0] == 'background' and not with_background:
label_list = label_list[1:]
clsid2catid = {i: i for i in range(len(label_list))}
catid2name = {i: name for i, name in enumerate(label_list)}
return clsid2catid, catid2name
def color_map(self, num_classes):
color_map = num_classes * [0, 0, 0]
for i in range(0, num_classes):
j = 0
lab = i
while lab:
color_map[i * 3] |= (((lab >> 0) & 1) << (7 - j))
color_map[i * 3 + 1] |= (((lab >> 1) & 1) << (7 - j))
color_map[i * 3 + 2] |= (((lab >> 2) & 1) << (7 - j))
j += 1
lab >>= 3
color_map = np.array(color_map).reshape(-1, 3)
return color_map
def visualize(self,
bbox_results,
catid2name,
num_classes,
mask_results=None):
image = Image.open(self.infer_img).convert('RGB')
color_list = self.color_map(num_classes)
image = self.draw_bbox(image, catid2name, bbox_results, 0.5, color_list)
if mask_results is not None:
image = self.draw_mask(image, mask_results, 0.5, color_list)
image_path = os.path.split(self.infer_img)[-1]
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
out_path = os.path.join(self.output_dir, image_path)
image.save(out_path, quality=95)
logger.info('Save visualize result to {}'.format(out_path))
def preprocess(self, feed_var_names, image_file):
self.infer_img = image_file
config_path = self.config_path
res = {}
assert config_path is not None, "Config path: {} des not exist!".format(
model_path)
with open(config_path) as f:
conf = yaml.safe_load(f)
img_data = self.Preprocess(image_file, conf['arch'], conf['Preprocess'])
if 'SSD' in conf['arch']:
img_data, res['im_shape'] = img_data
img_data = [img_data]
if len(feed_var_names) != len(img_data):
raise ValueError(
'the length of feed vars does not equals the length of preprocess of img data, please check your feed dict'
)
def processImg(v):
np_data = np.array(v[0])
res = np_data
return res
feed_dict = {k: processImg(v) for k, v in zip(feed_var_names, img_data)}
return feed_dict
def postprocess(self, fetch_map, fetch_var_names):
config_path = self.config_path
res = {}
with open(config_path) as f:
conf = yaml.safe_load(f)
if 'SSD' in conf['arch']:
img_data, res['im_shape'] = img_data
img_data = [img_data]
clsid2catid, catid2name = self.get_category_info(
conf['with_background'], conf['label_list'])
bbox_result = self.get_bbox_result(fetch_map, fetch_var_names[0], res,
conf, clsid2catid)
mask_result = None
if 'mask_resolution' in conf:
res['im_shape'] = img_data[-1]
mask_result = self.get_mask_result(fetch_map, fetch_var_names, res,
conf, clsid2catid)
if self.if_visualize:
if os.path.isdir(self.output_dir) is False:
os.mkdir(self.output_dir)
self.visualize(bbox_result, catid2name,
len(conf['label_list']), mask_result)
if self.if_dump_result:
if os.path.isdir(self.output_dir) is False:
os.mkdir(self.output_dir)
bbox_file = os.path.join(self.output_dir, 'bbox.json')
logger.info('dump bbox to {}'.format(bbox_file))
with open(bbox_file, 'w') as f:
json.dump(bbox_result, f, indent=4)
if mask_result is not None:
mask_file = os.path.join(flags.output_dir, 'mask.json')
logger.info('dump mask to {}'.format(mask_file))
with open(mask_file, 'w') as f:
json.dump(mask_result, f, indent=4)
...@@ -264,28 +264,46 @@ class Client(object): ...@@ -264,28 +264,46 @@ class Client(object):
if res == -1: if res == -1:
return None return None
result_map_batch = [] multi_result_map = []
model_engine_names = result_batch.get_engine_names()
for mi, engine_name in enumerate(model_engine_names):
result_map = {} result_map = {}
# result map needs to be a numpy array # result map needs to be a numpy array
for i, name in enumerate(fetch_names): for i, name in enumerate(fetch_names):
if self.fetch_names_to_type_[name] == int_type: if self.fetch_names_to_type_[name] == int_type:
result_map[name] = result_batch.get_int64_by_name(name) result_map[name] = result_batch.get_int64_by_name(mi, name)
shape = result_batch.get_shape(name) shape = result_batch.get_shape(mi, name)
result_map[name] = np.array(result_map[name]) result_map[name] = np.array(result_map[name], dtype='int64')
result_map[name].shape = shape result_map[name].shape = shape
if name in self.lod_tensor_set: if name in self.lod_tensor_set:
result_map["{}.lod".format(name)] = result_batch.get_lod( result_map["{}.lod".format(
name) name)] = result_batch.get_lod(mi, name)
elif self.fetch_names_to_type_[name] == float_type: elif self.fetch_names_to_type_[name] == float_type:
result_map[name] = result_batch.get_float_by_name(name) result_map[name] = result_batch.get_float_by_name(mi, name)
shape = result_batch.get_shape(name) shape = result_batch.get_shape(mi, name)
result_map[name] = np.array(result_map[name]) result_map[name] = np.array(
result_map[name], dtype='float32')
result_map[name].shape = shape result_map[name].shape = shape
if name in self.lod_tensor_set: if name in self.lod_tensor_set:
result_map["{}.lod".format(name)] = result_batch.get_lod( result_map["{}.lod".format(
name) name)] = result_batch.get_lod(mi, name)
multi_result_map.append(result_map)
return result_map
ret = None
if len(model_engine_names) == 1:
# If only one model result is returned, the format of ret is result_map
ret = multi_result_map[0]
else:
# If multiple model results are returned, the format of ret is {name: result_map}
ret = {
engine_name: multi_result_map[mi]
for mi, engine_name in enumerate(model_engine_names)
}
# When using the A/B test, the tag of variant needs to be returned
return ret if not need_variant_tag else [
ret, self.result_handle_.variant_tag()
]
def release(self): def release(self):
self.client_handle_.destroy_predictor() self.client_handle_.destroy_predictor()
......
...@@ -20,6 +20,7 @@ from paddle.fluid.framework import default_main_program ...@@ -20,6 +20,7 @@ from paddle.fluid.framework import default_main_program
from paddle.fluid.framework import Program from paddle.fluid.framework import Program
from paddle.fluid import CPUPlace from paddle.fluid import CPUPlace
from paddle.fluid.io import save_inference_model from paddle.fluid.io import save_inference_model
import paddle.fluid as fluid
from ..proto import general_model_config_pb2 as model_conf from ..proto import general_model_config_pb2 as model_conf
import os import os
...@@ -100,3 +101,20 @@ def save_model(server_model_folder, ...@@ -100,3 +101,20 @@ def save_model(server_model_folder,
with open("{}/serving_server_conf.stream.prototxt".format( with open("{}/serving_server_conf.stream.prototxt".format(
server_model_folder), "wb") as fout: server_model_folder), "wb") as fout:
fout.write(config.SerializeToString()) fout.write(config.SerializeToString())
def inference_model_to_serving(infer_model, serving_client, serving_server):
place = fluid.CPUPlace()
exe = fluid.Executor(place)
inference_program, feed_target_names, fetch_targets = \
fluid.io.load_inference_model(dirname=infer_model, executor=exe)
feed_dict = {
x: inference_program.global_block().var(x)
for x in feed_target_names
}
fetch_dict = {x.name: x for x in fetch_targets}
save_model(serving_client, serving_server, feed_dict, fetch_dict,
inference_program)
feed_names = feed_dict.keys()
fetch_names = fetch_dict.keys()
return feed_names, fetch_names
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing
import os import os
from .proto import server_configure_pb2 as server_sdk from .proto import server_configure_pb2 as server_sdk
...@@ -21,6 +22,7 @@ import socket ...@@ -21,6 +22,7 @@ import socket
import paddle_serving_server as paddle_serving_server import paddle_serving_server as paddle_serving_server
from .version import serving_server_version from .version import serving_server_version
from contextlib import closing from contextlib import closing
import collections
class OpMaker(object): class OpMaker(object):
...@@ -36,17 +38,35 @@ class OpMaker(object): ...@@ -36,17 +38,35 @@ class OpMaker(object):
"general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp", "general_dist_kv_quant_infer": "GeneralDistKVQuantInferOp",
"general_copy": "GeneralCopyOp" "general_copy": "GeneralCopyOp"
} }
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
# when we have OpGraphMaker, inputs and outputs are necessary if node_type not in self.op_dict:
def create(self, name, inputs=[], outputs=[]): raise Exception("Op type {} is not supported right now".format(
if name not in self.op_dict: node_type))
raise Exception("Op name {} is not supported right now".format(
name))
node = server_sdk.DAGNode() node = server_sdk.DAGNode()
node.name = "{}_op".format(name) # node.name will be used as the infer engine name
node.type = self.op_dict[name] if engine_name:
return node node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object): class OpSeqMaker(object):
...@@ -55,12 +75,25 @@ class OpSeqMaker(object): ...@@ -55,12 +75,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1" self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence" self.workflow.workflow_type = "Sequence"
def add_op(self, node): def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1: if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency() dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name dep.name = self.workflow.nodes[-1].name
dep.mode = "RO" dep.mode = "RO"
node.dependencies.extend([dep]) node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node]) self.workflow.nodes.extend([node])
def get_op_sequence(self): def get_op_sequence(self):
...@@ -69,13 +102,30 @@ class OpSeqMaker(object): ...@@ -69,13 +102,30 @@ class OpSeqMaker(object):
return workflow_conf return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object): class Server(object):
def __init__(self): def __init__(self):
self.server_handle_ = None self.server_handle_ = None
self.infer_service_conf = None self.infer_service_conf = None
self.model_toolkit_conf = None self.model_toolkit_conf = None
self.resource_conf = None self.resource_conf = None
self.engine = None
self.memory_optimization = False self.memory_optimization = False
self.model_conf = None self.model_conf = None
self.workflow_fn = "workflow.prototxt" self.workflow_fn = "workflow.prototxt"
...@@ -94,6 +144,7 @@ class Server(object): ...@@ -94,6 +144,7 @@ class Server(object):
self.cur_path = os.getcwd() self.cur_path = os.getcwd()
self.use_local_bin = False self.use_local_bin = False
self.mkl_flag = False self.mkl_flag = False
self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
...@@ -118,6 +169,9 @@ class Server(object): ...@@ -118,6 +169,9 @@ class Server(object):
def set_op_sequence(self, op_seq): def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False): def set_memory_optimize(self, flag=False):
self.memory_optimization = flag self.memory_optimization = flag
...@@ -126,32 +180,30 @@ class Server(object): ...@@ -126,32 +180,30 @@ class Server(object):
self.use_local_bin = True self.use_local_bin = True
self.bin_path = os.environ["SERVING_BIN"] self.bin_path = os.environ["SERVING_BIN"]
def _prepare_engine(self, model_config_path, device): def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None: if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf() self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if self.engine == None: for engine_name, model_config_path in model_config_paths.items():
self.engine = server_sdk.EngineDesc() engine = server_sdk.EngineDesc()
engine.name = engine_name
self.model_config_path = model_config_path engine.reloadable_meta = model_config_path + "/fluid_time_file"
self.engine.name = "general_model" os.system("touch {}".format(engine.reloadable_meta))
self.engine.reloadable_meta = model_config_path + "/fluid_time_file" engine.reloadable_type = "timestamp_ne"
os.system("touch {}".format(self.engine.reloadable_meta)) engine.runtime_thread_num = 0
self.engine.reloadable_type = "timestamp_ne" engine.batch_infer_size = 0
self.engine.runtime_thread_num = 0 engine.enable_batch_align = 0
self.engine.batch_infer_size = 0 engine.model_data_path = model_config_path
self.engine.enable_batch_align = 0 engine.enable_memory_optimization = self.memory_optimization
self.engine.model_data_path = model_config_path engine.static_optimization = False
self.engine.enable_memory_optimization = self.memory_optimization engine.force_update_static_cache = False
self.engine.static_optimization = False
self.engine.force_update_static_cache = False
if device == "cpu": if device == "cpu":
self.engine.type = "FLUID_CPU_ANALYSIS_DIR" engine.type = "FLUID_CPU_ANALYSIS_DIR"
elif device == "gpu": elif device == "gpu":
self.engine.type = "FLUID_GPU_ANALYSIS_DIR" engine.type = "FLUID_GPU_ANALYSIS_DIR"
self.model_toolkit_conf.engines.extend([self.engine]) self.model_toolkit_conf.engines.extend([engine])
def _prepare_infer_service(self, port): def _prepare_infer_service(self, port):
if self.infer_service_conf == None: if self.infer_service_conf == None:
...@@ -184,10 +236,49 @@ class Server(object): ...@@ -184,10 +236,49 @@ class Server(object):
with open(filepath, "w") as fout: with open(filepath, "w") as fout:
fout.write(str(pb_obj)) fout.write(str(pb_obj))
def load_model_config(self, path): def load_model_config(self, model_config_paths):
self.model_config_path = path # At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names = [
'general_infer_0', 'general_dist_kv_infer_0',
'general_dist_kv_quant_infer_0'
]
engine_name = None
for node in self.workflow_conf.workflows[0].nodes:
if node.name in default_engine_names:
engine_name = node.name
break
if engine_name is None:
raise Exception(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self.model_config_paths = {engine_name: model_config_paths}
workflow_oi_config_path = self.model_config_paths[engine_name]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig() self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r') f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge( self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf) str(f.read()), self.model_conf)
# check config here # check config here
...@@ -258,7 +349,7 @@ class Server(object): ...@@ -258,7 +349,7 @@ class Server(object):
if not self.port_is_available(port): if not self.port_is_available(port):
raise SystemExit("Prot {} is already used".format(port)) raise SystemExit("Prot {} is already used".format(port))
self._prepare_resource(workdir) self._prepare_resource(workdir)
self._prepare_engine(self.model_config_path, device) self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port) self._prepare_infer_service(port)
self.port = port self.port = port
self.workdir = workdir self.workdir = workdir
......
...@@ -24,6 +24,7 @@ import time ...@@ -24,6 +24,7 @@ import time
from .version import serving_server_version from .version import serving_server_version
from contextlib import closing from contextlib import closing
import argparse import argparse
import collections
def serve_args(): def serve_args():
...@@ -66,17 +67,35 @@ class OpMaker(object): ...@@ -66,17 +67,35 @@ class OpMaker(object):
"general_dist_kv_infer": "GeneralDistKVInferOp", "general_dist_kv_infer": "GeneralDistKVInferOp",
"general_dist_kv": "GeneralDistKVOp" "general_dist_kv": "GeneralDistKVOp"
} }
self.node_name_suffix_ = collections.defaultdict(int)
# currently, inputs and outputs are not used def create(self, node_type, engine_name=None, inputs=[], outputs=[]):
# when we have OpGraphMaker, inputs and outputs are necessary if node_type not in self.op_dict:
def create(self, name, inputs=[], outputs=[]): raise Exception("Op type {} is not supported right now".format(
if name not in self.op_dict: node_type))
raise Exception("Op name {} is not supported right now".format(
name))
node = server_sdk.DAGNode() node = server_sdk.DAGNode()
node.name = "{}_op".format(name) # node.name will be used as the infer engine name
node.type = self.op_dict[name] if engine_name:
return node node.name = engine_name
else:
node.name = '{}_{}'.format(node_type,
self.node_name_suffix_[node_type])
self.node_name_suffix_[node_type] += 1
node.type = self.op_dict[node_type]
if inputs:
for dep_node_str in inputs:
dep_node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(dep_node_str, dep_node)
dep = server_sdk.DAGNodeDependency()
dep.name = dep_node.name
dep.mode = "RO"
node.dependencies.extend([dep])
# Because the return value will be used as the key value of the
# dict, and the proto object is variable which cannot be hashed,
# so it is processed into a string. This has little effect on
# overall efficiency.
return google.protobuf.text_format.MessageToString(node)
class OpSeqMaker(object): class OpSeqMaker(object):
...@@ -85,12 +104,25 @@ class OpSeqMaker(object): ...@@ -85,12 +104,25 @@ class OpSeqMaker(object):
self.workflow.name = "workflow1" self.workflow.name = "workflow1"
self.workflow.workflow_type = "Sequence" self.workflow.workflow_type = "Sequence"
def add_op(self, node): def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
if len(node.dependencies) > 1:
raise Exception(
'Set more than one predecessor for op in OpSeqMaker is not allowed.'
)
if len(self.workflow.nodes) >= 1: if len(self.workflow.nodes) >= 1:
if len(node.dependencies) == 0:
dep = server_sdk.DAGNodeDependency() dep = server_sdk.DAGNodeDependency()
dep.name = self.workflow.nodes[-1].name dep.name = self.workflow.nodes[-1].name
dep.mode = "RO" dep.mode = "RO"
node.dependencies.extend([dep]) node.dependencies.extend([dep])
elif len(node.dependencies) == 1:
if node.dependencies[0].name != self.workflow.nodes[-1].name:
raise Exception(
'You must add op in order in OpSeqMaker. The previous op is {}, but the current op is followed by {}.'.
format(node.dependencies[0].name, self.workflow.nodes[
-1].name))
self.workflow.nodes.extend([node]) self.workflow.nodes.extend([node])
def get_op_sequence(self): def get_op_sequence(self):
...@@ -99,13 +131,30 @@ class OpSeqMaker(object): ...@@ -99,13 +131,30 @@ class OpSeqMaker(object):
return workflow_conf return workflow_conf
class OpGraphMaker(object):
def __init__(self):
self.workflow = server_sdk.Workflow()
self.workflow.name = "workflow1"
# Currently, SDK only supports "Sequence"
self.workflow.workflow_type = "Sequence"
def add_op(self, node_str):
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.workflow.nodes.extend([node])
def get_op_graph(self):
workflow_conf = server_sdk.WorkflowConf()
workflow_conf.workflows.extend([self.workflow])
return workflow_conf
class Server(object): class Server(object):
def __init__(self): def __init__(self):
self.server_handle_ = None self.server_handle_ = None
self.infer_service_conf = None self.infer_service_conf = None
self.model_toolkit_conf = None self.model_toolkit_conf = None
self.resource_conf = None self.resource_conf = None
self.engine = None
self.memory_optimization = False self.memory_optimization = False
self.model_conf = None self.model_conf = None
self.workflow_fn = "workflow.prototxt" self.workflow_fn = "workflow.prototxt"
...@@ -125,6 +174,7 @@ class Server(object): ...@@ -125,6 +174,7 @@ class Server(object):
self.check_cuda() self.check_cuda()
self.use_local_bin = False self.use_local_bin = False
self.gpuid = 0 self.gpuid = 0
self.model_config_paths = None # for multi-model in a workflow
def set_max_concurrency(self, concurrency): def set_max_concurrency(self, concurrency):
self.max_concurrency = concurrency self.max_concurrency = concurrency
...@@ -149,6 +199,9 @@ class Server(object): ...@@ -149,6 +199,9 @@ class Server(object):
def set_op_sequence(self, op_seq): def set_op_sequence(self, op_seq):
self.workflow_conf = op_seq self.workflow_conf = op_seq
def set_op_graph(self, op_graph):
self.workflow_conf = op_graph
def set_memory_optimize(self, flag=False): def set_memory_optimize(self, flag=False):
self.memory_optimization = flag self.memory_optimization = flag
...@@ -167,33 +220,31 @@ class Server(object): ...@@ -167,33 +220,31 @@ class Server(object):
def set_gpuid(self, gpuid=0): def set_gpuid(self, gpuid=0):
self.gpuid = gpuid self.gpuid = gpuid
def _prepare_engine(self, model_config_path, device): def _prepare_engine(self, model_config_paths, device):
if self.model_toolkit_conf == None: if self.model_toolkit_conf == None:
self.model_toolkit_conf = server_sdk.ModelToolkitConf() self.model_toolkit_conf = server_sdk.ModelToolkitConf()
if self.engine == None: for engine_name, model_config_path in model_config_paths.items():
self.engine = server_sdk.EngineDesc() engine = server_sdk.EngineDesc()
engine.name = engine_name
self.model_config_path = model_config_path # engine.reloadable_meta = model_config_path + "/fluid_time_file"
self.engine.name = "general_model" engine.reloadable_meta = self.workdir + "/fluid_time_file"
#self.engine.reloadable_meta = model_config_path + "/fluid_time_file" os.system("touch {}".format(engine.reloadable_meta))
self.engine.reloadable_meta = self.workdir + "/fluid_time_file" engine.reloadable_type = "timestamp_ne"
os.system("touch {}".format(self.engine.reloadable_meta)) engine.runtime_thread_num = 0
self.engine.reloadable_type = "timestamp_ne" engine.batch_infer_size = 0
self.engine.runtime_thread_num = 0 engine.enable_batch_align = 0
self.engine.batch_infer_size = 0 engine.model_data_path = model_config_path
self.engine.enable_batch_align = 0 engine.enable_memory_optimization = self.memory_optimization
self.engine.model_data_path = model_config_path engine.static_optimization = False
self.engine.enable_memory_optimization = self.memory_optimization engine.force_update_static_cache = False
self.engine.static_optimization = False
self.engine.force_update_static_cache = False
if device == "cpu": if device == "cpu":
self.engine.type = "FLUID_CPU_ANALYSIS_DIR" engine.type = "FLUID_CPU_ANALYSIS_DIR"
elif device == "gpu": elif device == "gpu":
self.engine.type = "FLUID_GPU_ANALYSIS_DIR" engine.type = "FLUID_GPU_ANALYSIS_DIR"
self.model_toolkit_conf.engines.extend([self.engine]) self.model_toolkit_conf.engines.extend([engine])
def _prepare_infer_service(self, port): def _prepare_infer_service(self, port):
if self.infer_service_conf == None: if self.infer_service_conf == None:
...@@ -225,10 +276,49 @@ class Server(object): ...@@ -225,10 +276,49 @@ class Server(object):
with open(filepath, "w") as fout: with open(filepath, "w") as fout:
fout.write(str(pb_obj)) fout.write(str(pb_obj))
def load_model_config(self, path): def load_model_config(self, model_config_paths):
self.model_config_path = path # At present, Serving needs to configure the model path in
# the resource.prototxt file to determine the input and output
# format of the workflow. To ensure that the input and output
# of multiple models are the same.
workflow_oi_config_path = None
if isinstance(model_config_paths, str):
# If there is only one model path, use the default infer_op.
# Because there are several infer_op type, we need to find
# it from workflow_conf.
default_engine_names = [
'general_infer_0', 'general_dist_kv_infer_0',
'general_dist_kv_quant_infer_0'
]
engine_name = None
for node in self.workflow_conf.workflows[0].nodes:
if node.name in default_engine_names:
engine_name = node.name
break
if engine_name is None:
raise Exception(
"You have set the engine_name of Op. Please use the form {op: model_path} to configure model path"
)
self.model_config_paths = {engine_name: model_config_paths}
workflow_oi_config_path = self.model_config_paths[engine_name]
elif isinstance(model_config_paths, dict):
self.model_config_paths = {}
for node_str, path in model_config_paths.items():
node = server_sdk.DAGNode()
google.protobuf.text_format.Parse(node_str, node)
self.model_config_paths[node.name] = path
print("You have specified multiple model paths, please ensure "
"that the input and output of multiple models are the same.")
workflow_oi_config_path = self.model_config_paths.items()[0][1]
else:
raise Exception("The type of model_config_paths must be str or "
"dict({op: model_path}), not {}.".format(
type(model_config_paths)))
self.model_conf = m_config.GeneralModelConfig() self.model_conf = m_config.GeneralModelConfig()
f = open("{}/serving_server_conf.prototxt".format(path), 'r') f = open(
"{}/serving_server_conf.prototxt".format(workflow_oi_config_path),
'r')
self.model_conf = google.protobuf.text_format.Merge( self.model_conf = google.protobuf.text_format.Merge(
str(f.read()), self.model_conf) str(f.read()), self.model_conf)
# check config here # check config here
...@@ -291,7 +381,7 @@ class Server(object): ...@@ -291,7 +381,7 @@ class Server(object):
self.set_port(port) self.set_port(port)
self._prepare_resource(workdir) self._prepare_resource(workdir)
self._prepare_engine(self.model_config_path, device) self._prepare_engine(self.model_config_paths, device)
self._prepare_infer_service(port) self._prepare_infer_service(port)
self.workdir = workdir self.workdir = workdir
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# pylint: disable=doc-string-missing
from flask import Flask, request, abort from flask import Flask, request, abort
from contextlib import closing from contextlib import closing
......
...@@ -47,7 +47,8 @@ REQUIRED_PACKAGES = [ ...@@ -47,7 +47,8 @@ REQUIRED_PACKAGES = [
packages=['paddle_serving_app', packages=['paddle_serving_app',
'paddle_serving_app.reader', 'paddle_serving_app.reader',
'paddle_serving_app.utils'] 'paddle_serving_app.utils',
'paddle_serving_app.reader.pddet']
package_data={} package_data={}
package_dir={'paddle_serving_app': package_dir={'paddle_serving_app':
...@@ -55,7 +56,9 @@ package_dir={'paddle_serving_app': ...@@ -55,7 +56,9 @@ package_dir={'paddle_serving_app':
'paddle_serving_app.reader': 'paddle_serving_app.reader':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader', '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader',
'paddle_serving_app.utils': 'paddle_serving_app.utils':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/utils',} '${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/utils',
'paddle_serving_app.reader.pddet':
'${PADDLE_SERVING_BINARY_DIR}/python/paddle_serving_app/reader/pddet',}
setup( setup(
name='paddle-serving-app', name='paddle-serving-app',
......
...@@ -323,6 +323,9 @@ function python_test_bert() { ...@@ -323,6 +323,9 @@ function python_test_bert() {
echo "bert RPC inference pass" echo "bert RPC inference pass"
;; ;;
*) *)
echo "error type"
exit 1
;;
esac esac
echo "test bert $TYPE finished as expected." echo "test bert $TYPE finished as expected."
unset SERVING_BIN unset SERVING_BIN
...@@ -357,6 +360,9 @@ function python_test_imdb() { ...@@ -357,6 +360,9 @@ function python_test_imdb() {
echo "imdb ignore GPU test" echo "imdb ignore GPU test"
;; ;;
*) *)
echo "error type"
exit 1
;;
esac esac
echo "test imdb $TYPE finished as expected." echo "test imdb $TYPE finished as expected."
unset SERVING_BIN unset SERVING_BIN
...@@ -389,6 +395,9 @@ function python_test_lac() { ...@@ -389,6 +395,9 @@ function python_test_lac() {
echo "lac ignore GPU test" echo "lac ignore GPU test"
;; ;;
*) *)
echo "error type"
exit 1
;;
esac esac
echo "test lac $TYPE finished as expected." echo "test lac $TYPE finished as expected."
unset SERVING_BIN unset SERVING_BIN
...@@ -408,6 +417,248 @@ function python_run_test() { ...@@ -408,6 +417,248 @@ function python_run_test() {
cd ../.. # pwd: /Serving cd ../.. # pwd: /Serving
} }
function monitor_test() {
local TYPE=$1 # pwd: /Serving
mkdir _monitor_test && cd _monitor_test # pwd: /Serving/_monitor_test
case $TYPE in
CPU):
pip install pyftpdlib
mkdir remote_path
mkdir local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &"
cd .. # pwd: /Serving/_monitor_test
# type: ftp
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: ftp
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill
;;
GPU):
pip install pyftpdlib
mkdir remote_path
mkdir local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
check_cmd "python -m pyftpdlib -p 8000 &>/dev/null &"
cd .. # pwd: /Serving/_monitor_test
# type: ftp
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server_gpu.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: ftp
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server_gpu.monitor \
--type='ftp' --ftp_host='127.0.0.1' --ftp_port='8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /
# remote_model_name: uci_housing.tar.gz
# local_tmp_path: ___tmp
# local_path: local_path
cd remote_path # pwd: /Serving/_monitor_test/remote_path
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
touch donefile
cd .. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server_gpu.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/' --remote_model_name='uci_housing.tar.gz' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --unpacked_filename='uci_housing_model' \
--interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
# type: general
# remote_path: /tmp_dir
# remote_model_name: uci_housing_model
# local_tmp_path: ___tmp
# local_path: local_path
mkdir -p remote_path/tmp_dir && cd remote_path/tmp_dir # pwd: /Serving/_monitor_test/remote_path/tmp_dir
wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz
tar -xzf uci_housing.tar.gz
touch donefile
cd ../.. # pwd: /Serving/_monitor_test
mkdir -p local_path/uci_housing_model
python -m paddle_serving_server_gpu.monitor \
--type='general' --general_host='ftp://127.0.0.1:8000' \
--remote_path='/tmp_dir' --remote_model_name='uci_housing_model' \
--remote_donefile_name='donefile' --local_path='local_path' \
--local_model_name='uci_housing_model' --local_timestamp_file='fluid_time_file' \
--local_tmp_path='___tmp' --interval='1' >/dev/null &
sleep 10
if [ ! -f "local_path/uci_housing_model/fluid_time_file" ]; then
echo "local_path/uci_housing_model/fluid_time_file not exist."
exit 1
fi
ps -ef | grep "monitor" | grep -v grep | awk '{print $2}' | xargs kill
rm -rf remote_path/*
rm -rf local_path/*
ps -ef | grep "pyftpdlib" | grep -v grep | awk '{print $2}' | xargs kill
;;
*)
echo "error type"
exit 1
;;
esac
cd .. # pwd: /Serving
rm -rf _monitor_test
echo "test monitor $TYPE finished as expected."
}
function main() { function main() {
local TYPE=$1 # pwd: / local TYPE=$1 # pwd: /
init # pwd: /Serving init # pwd: /Serving
...@@ -415,6 +666,7 @@ function main() { ...@@ -415,6 +666,7 @@ function main() {
build_server $TYPE # pwd: /Serving build_server $TYPE # pwd: /Serving
build_app $TYPE # pwd: /Serving build_app $TYPE # pwd: /Serving
python_run_test $TYPE # pwd: /Serving python_run_test $TYPE # pwd: /Serving
monitor_test $TYPE # pwd: /Serving
echo "serving $TYPE part finished as expected." echo "serving $TYPE part finished as expected."
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册