diff --git a/core/configure/proto/multi_lang_general_model_service.proto b/core/configure/proto/multi_lang_general_model_service.proto index 6e1764b23b3e6f7d9eb9a33925bcd83cfb1810bb..2a8a8bc1532c19aa02a1998aa751aa7ba9d41570 100644 --- a/core/configure/proto/multi_lang_general_model_service.proto +++ b/core/configure/proto/multi_lang_general_model_service.proto @@ -28,16 +28,17 @@ message FeedInst { repeated Tensor tensor_array = 1; }; message FetchInst { repeated Tensor tensor_array = 1; }; -message Request { +message InferenceRequest { repeated FeedInst insts = 1; repeated string feed_var_names = 2; repeated string fetch_var_names = 3; required bool is_python = 4 [ default = false ]; }; -message Response { +message InferenceResponse { repeated ModelOutput outputs = 1; optional string tag = 2; + required int32 err_code = 3; }; message ModelOutput { @@ -45,6 +46,17 @@ message ModelOutput { optional string engine_name = 2; } +message SetTimeoutRequest { required int32 timeout_ms = 1; } + +message SimpleResponse { required int32 err_code = 1; } + +message GetClientConfigRequest {} + +message GetClientConfigResponse { required string client_config_str = 1; } + service MultiLangGeneralModelService { - rpc inference(Request) returns (Response) {} + rpc Inference(InferenceRequest) returns (InferenceResponse) {} + rpc SetTimeout(SetTimeoutRequest) returns (SimpleResponse) {} + rpc GetClientConfig(GetClientConfigRequest) + returns (GetClientConfigResponse) {} }; diff --git a/core/general-client/include/general_model.h b/core/general-client/include/general_model.h index b379188854c30587d24962bc827aa099c3a39183..b5d27df5edbaf9278ecb8614e282d104347206f8 100644 --- a/core/general-client/include/general_model.h +++ b/core/general-client/include/general_model.h @@ -49,6 +49,8 @@ class ModelRes { res._int64_value_map.end()); _float_value_map.insert(res._float_value_map.begin(), res._float_value_map.end()); + _int32_value_map.insert(res._int32_value_map.begin(), + res._int32_value_map.end()); _shape_map.insert(res._shape_map.begin(), res._shape_map.end()); _lod_map.insert(res._lod_map.begin(), res._lod_map.end()); } @@ -60,6 +62,9 @@ class ModelRes { _float_value_map.insert( std::make_move_iterator(std::begin(res._float_value_map)), std::make_move_iterator(std::end(res._float_value_map))); + _int32_value_map.insert( + std::make_move_iterator(std::begin(res._int32_value_map)), + std::make_move_iterator(std::end(res._int32_value_map))); _shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)), std::make_move_iterator(std::end(res._shape_map))); _lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)), @@ -78,6 +83,12 @@ class ModelRes { std::vector&& get_float_by_name_with_rv(const std::string& name) { return std::move(_float_value_map[name]); } + const std::vector& get_int32_by_name(const std::string& name) { + return _int32_value_map[name]; + } + std::vector&& get_int32_by_name_with_rv(const std::string& name) { + return std::move(_int32_value_map[name]); + } const std::vector& get_shape_by_name(const std::string& name) { return _shape_map[name]; } @@ -103,6 +114,9 @@ class ModelRes { _float_value_map.insert( std::make_move_iterator(std::begin(res._float_value_map)), std::make_move_iterator(std::end(res._float_value_map))); + _int32_value_map.insert( + std::make_move_iterator(std::begin(res._int32_value_map)), + std::make_move_iterator(std::end(res._int32_value_map))); _shape_map.insert(std::make_move_iterator(std::begin(res._shape_map)), std::make_move_iterator(std::end(res._shape_map))); _lod_map.insert(std::make_move_iterator(std::begin(res._lod_map)), @@ -115,6 +129,7 @@ class ModelRes { std::string _engine_name; std::map> _int64_value_map; std::map> _float_value_map; + std::map> _int32_value_map; std::map> _shape_map; std::map> _lod_map; }; @@ -145,6 +160,14 @@ class PredictorRes { const std::string& name) { return std::move(_models[model_idx].get_float_by_name_with_rv(name)); } + const std::vector& get_int32_by_name(const int model_idx, + const std::string& name) { + return _models[model_idx].get_int32_by_name(name); + } + std::vector&& get_int32_by_name_with_rv(const int model_idx, + const std::string& name) { + return std::move(_models[model_idx].get_int32_by_name_with_rv(name)); + } const std::vector& get_shape_by_name(const int model_idx, const std::string& name) { return _models[model_idx].get_shape_by_name(name); diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp index 613abf9233610d170bce4386798662f78887edf7..9f709c71045577f7b043777a7ad1528a0e2ccc28 100644 --- a/core/general-client/src/general_model.cpp +++ b/core/general-client/src/general_model.cpp @@ -207,17 +207,28 @@ int PredictorClient::batch_predict( for (auto &name : int_feed_name) { int idx = _feed_name_to_idx[name]; Tensor *tensor = tensor_vec[idx]; - VLOG(2) << "prepare int feed " << name << " shape size " - << int_shape[vec_idx].size(); + if (_type[idx] == 0) { + VLOG(2) << "prepare int64 feed " << name << " shape size " + << int_shape[vec_idx].size(); + VLOG(3) << "feed var name " << name << " index " << vec_idx + << "first data " << int_feed[vec_idx][0]; + for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) { + tensor->add_int64_data(int_feed[vec_idx][j]); + } + } else if (_type[idx] == 2) { + VLOG(2) << "prepare int32 feed " << name << " shape size " + << int_shape[vec_idx].size(); + VLOG(3) << "feed var name " << name << " index " << vec_idx + << "first data " << int32_t(int_feed[vec_idx][0]); + for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) { + tensor->add_int_data(int32_t(int_feed[vec_idx][j])); + } + } + for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) { tensor->add_shape(int_shape[vec_idx][j]); } - tensor->set_elem_type(0); - VLOG(3) << "feed var name " << name << " index " << vec_idx - << "first data " << int_feed[vec_idx][0]; - for (uint32_t j = 0; j < int_feed[vec_idx].size(); ++j) { - tensor->add_int64_data(int_feed[vec_idx][j]); - } + tensor->set_elem_type(_type[idx]); vec_idx++; } @@ -284,18 +295,25 @@ int PredictorClient::batch_predict( for (auto &name : fetch_name) { // int idx = _fetch_name_to_idx[name]; if (_fetch_name_to_type[name] == 0) { - VLOG(2) << "ferch var " << name << "type int"; + VLOG(2) << "ferch var " << name << "type int64"; int size = output.insts(0).tensor_array(idx).int64_data_size(); model._int64_value_map[name] = std::vector( output.insts(0).tensor_array(idx).int64_data().begin(), output.insts(0).tensor_array(idx).int64_data().begin() + size); - } else { + } else if (_fetch_name_to_type[name] == 1) { VLOG(2) << "fetch var " << name << "type float"; int size = output.insts(0).tensor_array(idx).float_data_size(); model._float_value_map[name] = std::vector( output.insts(0).tensor_array(idx).float_data().begin(), output.insts(0).tensor_array(idx).float_data().begin() + size); + } else if (_fetch_name_to_type[name] == 2) { + VLOG(2) << "fetch var " << name << "type int32"; + int size = output.insts(0).tensor_array(idx).int_data_size(); + model._int32_value_map[name] = std::vector( + output.insts(0).tensor_array(idx).int_data().begin(), + output.insts(0).tensor_array(idx).int_data().begin() + size); } + idx += 1; } predict_res_batch.add_model_res(std::move(model)); @@ -442,12 +460,19 @@ int PredictorClient::numpy_predict( for (auto &name : int_feed_name) { int idx = _feed_name_to_idx[name]; Tensor *tensor = tensor_vec[idx]; - VLOG(2) << "prepare int feed " << name << " shape size " - << int_shape[vec_idx].size(); + for (uint32_t j = 0; j < int_shape[vec_idx].size(); ++j) { tensor->add_shape(int_shape[vec_idx][j]); } - tensor->set_elem_type(0); + tensor->set_elem_type(_type[idx]); + + if (_type[idx] == 0) { + VLOG(2) << "prepare int feed " << name << " shape size " + << int_shape[vec_idx].size(); + } else { + VLOG(2) << "prepare int32 feed " << name << " shape size " + << int_shape[vec_idx].size(); + } const int int_shape_size = int_shape[vec_idx].size(); switch (int_shape_size) { @@ -457,7 +482,11 @@ int PredictorClient::numpy_predict( for (ssize_t j = 0; j < int_array.shape(1); j++) { for (ssize_t k = 0; k < int_array.shape(2); k++) { for (ssize_t l = 0; k < int_array.shape(3); l++) { - tensor->add_int64_data(int_array(i, j, k, l)); + if (_type[idx] == 0) { + tensor->add_int64_data(int_array(i, j, k, l)); + } else { + tensor->add_int_data(int_array(i, j, k, l)); + } } } } @@ -469,7 +498,11 @@ int PredictorClient::numpy_predict( for (ssize_t i = 0; i < int_array.shape(0); i++) { for (ssize_t j = 0; j < int_array.shape(1); j++) { for (ssize_t k = 0; k < int_array.shape(2); k++) { - tensor->add_int64_data(int_array(i, j, k)); + if (_type[idx] == 0) { + tensor->add_int64_data(int_array(i, j, k)); + } else { + tensor->add_int_data(int_array(i, j, k)); + } } } } @@ -479,7 +512,11 @@ int PredictorClient::numpy_predict( auto int_array = int_feed[vec_idx].unchecked<2>(); for (ssize_t i = 0; i < int_array.shape(0); i++) { for (ssize_t j = 0; j < int_array.shape(1); j++) { - tensor->add_int64_data(int_array(i, j)); + if (_type[idx] == 0) { + tensor->add_int64_data(int_array(i, j)); + } else { + tensor->add_int_data(int_array(i, j)); + } } } break; @@ -487,7 +524,11 @@ int PredictorClient::numpy_predict( case 1: { auto int_array = int_feed[vec_idx].unchecked<1>(); for (ssize_t i = 0; i < int_array.shape(0); i++) { - tensor->add_int64_data(int_array(i)); + if (_type[idx] == 0) { + tensor->add_int64_data(int_array(i)); + } else { + tensor->add_int_data(int_array(i)); + } } break; } @@ -557,17 +598,23 @@ int PredictorClient::numpy_predict( for (auto &name : fetch_name) { // int idx = _fetch_name_to_idx[name]; if (_fetch_name_to_type[name] == 0) { - VLOG(2) << "ferch var " << name << "type int"; + VLOG(2) << "ferch var " << name << "type int64"; int size = output.insts(0).tensor_array(idx).int64_data_size(); model._int64_value_map[name] = std::vector( output.insts(0).tensor_array(idx).int64_data().begin(), output.insts(0).tensor_array(idx).int64_data().begin() + size); - } else { + } else if (_fetch_name_to_type[name] == 1) { VLOG(2) << "fetch var " << name << "type float"; int size = output.insts(0).tensor_array(idx).float_data_size(); model._float_value_map[name] = std::vector( output.insts(0).tensor_array(idx).float_data().begin(), output.insts(0).tensor_array(idx).float_data().begin() + size); + } else if (_fetch_name_to_type[name] == 2) { + VLOG(2) << "fetch var " << name << "type int32"; + int size = output.insts(0).tensor_array(idx).int_data_size(); + model._int32_value_map[name] = std::vector( + output.insts(0).tensor_array(idx).int_data().begin(), + output.insts(0).tensor_array(idx).int_data().begin() + size); } idx += 1; } @@ -601,7 +648,6 @@ int PredictorClient::numpy_predict( _api.thrd_clear(); return 0; } - } // namespace general_model } // namespace paddle_serving } // namespace baidu diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp index 7d48949b22d0ace289ab3b9214f092819f5476e0..380f861606a7719a33407dd946c5ac476629fdb7 100644 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -126,9 +126,12 @@ int GeneralReaderOp::inference() { if (elem_type[i] == 0) { // int64 elem_size[i] = sizeof(int64_t); lod_tensor.dtype = paddle::PaddleDType::INT64; - } else { + } else if (elem_type[i] == 1) { elem_size[i] = sizeof(float); lod_tensor.dtype = paddle::PaddleDType::FLOAT32; + } else if (elem_type[i] == 2) { + elem_size[i] = sizeof(int32_t); + lod_tensor.dtype = paddle::PaddleDType::INT32; } if (model_config->_is_lod_feed[i]) { @@ -159,8 +162,10 @@ int GeneralReaderOp::inference() { int data_len = 0; if (tensor.int64_data_size() > 0) { data_len = tensor.int64_data_size(); - } else { + } else if (tensor.float_data_size() > 0) { data_len = tensor.float_data_size(); + } else if (tensor.int_data_size() > 0) { + data_len = tensor.int_data_size(); } VLOG(2) << "tensor size for var[" << i << "]: " << data_len; tensor_size += data_len; @@ -198,6 +203,8 @@ int GeneralReaderOp::inference() { for (int i = 0; i < var_num; ++i) { if (elem_type[i] == 0) { int64_t *dst_ptr = static_cast(out->at(i).data.data()); + VLOG(2) << "first element data in var[" << i << "] is " + << req->insts(0).tensor_array(i).int64_data(0); int offset = 0; for (int j = 0; j < batch_size; ++j) { int elem_num = req->insts(j).tensor_array(i).int64_data_size(); @@ -210,8 +217,10 @@ int GeneralReaderOp::inference() { offset += capacity[i]; } } - } else { + } else if (elem_type[i] == 1) { float *dst_ptr = static_cast(out->at(i).data.data()); + VLOG(2) << "first element data in var[" << i << "] is " + << req->insts(0).tensor_array(i).float_data(0); int offset = 0; for (int j = 0; j < batch_size; ++j) { int elem_num = req->insts(j).tensor_array(i).float_data_size(); @@ -224,6 +233,22 @@ int GeneralReaderOp::inference() { offset += capacity[i]; } } + } else if (elem_type[i] == 2) { + int32_t *dst_ptr = static_cast(out->at(i).data.data()); + VLOG(2) << "first element data in var[" << i << "] is " + << req->insts(0).tensor_array(i).int_data(0); + int offset = 0; + for (int j = 0; j < batch_size; ++j) { + int elem_num = req->insts(j).tensor_array(i).int_data_size(); + for (int k = 0; k < elem_num; ++k) { + dst_ptr[offset + k] = req->insts(j).tensor_array(i).int_data(k); + } + if (out->at(i).lod.size() == 1) { + offset = out->at(i).lod[0][j + 1]; + } else { + offset += capacity[i]; + } + } } } diff --git a/core/general-server/op/general_response_op.cpp b/core/general-server/op/general_response_op.cpp index 5667a174d9bb6e134e58de72524c60839dc82356..935ef85d847cc17c2d5b76255de0936f0e08a890 100644 --- a/core/general-server/op/general_response_op.cpp +++ b/core/general-server/op/general_response_op.cpp @@ -91,7 +91,6 @@ int GeneralResponseOp::inference() { for (auto &idx : fetch_index) { Tensor *tensor = fetch_inst->add_tensor_array(); - tensor->set_elem_type(1); if (model_config->_is_lod_fetch[idx]) { VLOG(2) << "out[" << idx << "] " << model_config->_fetch_name[idx] << " is lod_tensor"; @@ -116,7 +115,7 @@ int GeneralResponseOp::inference() { cap *= in->at(idx).shape[j]; } if (in->at(idx).dtype == paddle::PaddleDType::INT64) { - VLOG(2) << "Prepare float var [" << model_config->_fetch_name[idx] + VLOG(2) << "Prepare int64 var [" << model_config->_fetch_name[idx] << "]."; int64_t *data_ptr = static_cast(in->at(idx).data.data()); if (model_config->_is_lod_fetch[idx]) { @@ -157,6 +156,27 @@ int GeneralResponseOp::inference() { } VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; var_idx++; + } else if (in->at(idx).dtype == paddle::PaddleDType::INT32) { + VLOG(2) << "Prepare int32 var [" << model_config->_fetch_name[idx] + << "]."; + int32_t *data_ptr = static_cast(in->at(idx).data.data()); + if (model_config->_is_lod_fetch[idx]) { + FetchInst *fetch_p = output->mutable_insts(0); + for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_lod( + in->at(idx).lod[0][j]); + } + for (int j = 0; j < cap; ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_int_data(data_ptr[j]); + } + } else { + FetchInst *fetch_p = output->mutable_insts(0); + for (int j = 0; j < cap; ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_int_data(data_ptr[j]); + } + } + VLOG(2) << "fetch var [" << model_config->_fetch_name[idx] << "] ready"; + var_idx++; } } } diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index e8c0ff47d86f081516a35576655f843a28b0591b..51cfb95a8d56d4261b9dab99df5216c5e6c79733 100644 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -603,13 +603,13 @@ class VersionedInferEngine : public InferEngine { LOG(ERROR) << "Failed generate engine with type:" << engine_type; return -1; } - VLOG(2) << "FLGS_logtostderr " << FLAGS_logtostderr; + VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr; int tmp = FLAGS_logtostderr; if (engine->proc_initialize(conf, version) != 0) { LOG(ERROR) << "Failed initialize engine, type:" << engine_type; return -1; } - VLOG(2) << "FLGS_logtostderr " << FLAGS_logtostderr; + VLOG(2) << "FLAGS_logtostderr " << FLAGS_logtostderr; FLAGS_logtostderr = tmp; auto r = _versions.insert(std::make_pair(engine->version(), engine)); if (!r.second) { diff --git a/core/predictor/tools/seq_generator.cpp b/core/predictor/tools/seq_generator.cpp index 135e25d6dd7ce44fa04f510f7d521b42998bc955..eb7e7ed7f9a609e0c21be9a2c3d686dd7d9a1abd 100644 --- a/core/predictor/tools/seq_generator.cpp +++ b/core/predictor/tools/seq_generator.cpp @@ -233,7 +233,7 @@ int compress_parameter_parallel(const char *file1, greedy_search( emb_table + k * emb_size, xmin, xmax, loss, emb_size, bits); // 得出 loss 最小的时候的 scale - float scale = (xmax - xmin) * (pow2bits - 1); + float scale = (xmax - xmin) / (pow2bits - 1); char *min_ptr = tensor_temp; char *max_ptr = tensor_temp + sizeof(float); memcpy(min_ptr, &xmin, sizeof(float)); diff --git a/doc/GRPC_IMPL_CN.md b/doc/GRPC_IMPL_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..7b10907caec98ae5754126a7ec54096cc4cd48af --- /dev/null +++ b/doc/GRPC_IMPL_CN.md @@ -0,0 +1,52 @@ +# gRPC接口 + +gRPC 接口实现形式类似 Web Service: + +![](grpc_impl.png) + +## 与bRPC接口对比 + +1. gRPC Server 端 `load_model_config` 函数添加 `client_config_path` 参数: + + ```python + def load_model_config(self, server_config_paths, client_config_path=None) + ``` + + 在一些例子中 bRPC Server 端与 bRPC Client 端的配置文件可能是不同的(如 cube local 例子中,Client 端的数据先交给 cube,经过 cube 处理后再交给预测库),所以 gRPC Server 端需要获取 gRPC Client 端的配置;同时为了取消 gRPC Client 端手动加载配置文件的过程,所以设计 gRPC Server 端同时加载两个配置文件。`client_config_path` 默认为 `/serving_server_conf.prototxt`。 + +2. gRPC Client 端取消 `load_client_config` 步骤: + + 在 `connect` 步骤通过 RPC 获取相应的 prototxt(从任意一个 endpoint 获取即可)。 + +3. gRPC Client 需要通过 RPC 方式设置 timeout 时间(调用形式与 bRPC Client保持一致) + + 因为 bRPC Client 在 `connect` 后无法更改 timeout 时间,所以当 gRPC Server 收到变更 timeout 的调用请求时会重新创建 bRPC Client 实例以变更 bRPC Client timeout时间,同时 gRPC Client 会设置 gRPC 的 deadline 时间。 + + **注意,设置 timeout 接口和 Inference 接口不能同时调用(非线程安全),出于性能考虑暂时不加锁。** + +4. gRPC Client 端 `predict` 函数添加 `asyn` 和 `is_python` 参数: + + ```python + def predict(self, feed, fetch, need_variant_tag=False, asyn=False, is_python=True) + ``` + + 其中,`asyn` 为异步调用选项。当 `asyn=True` 时为异步调用,返回 `MultiLangPredictFuture` 对象,通过 `MultiLangPredictFuture.result()` 阻塞获取预测值;当 `asyn=Fasle` 为同步调用。 + + `is_python` 为 proto 格式选项。当 `is_python=True` 时,基于 numpy bytes 格式进行数据传输,目前只适用于 Python;当 `is_python=False` 时,以普通数据格式传输,更加通用。使用 numpy bytes 格式传输耗时比普通数据格式小很多(详见 [#654](https://github.com/PaddlePaddle/Serving/pull/654))。 + +5. 异常处理:当 gRPC Server 端的 bRPC Client 预测失败(返回 `None`)时,gRPC Client 端同样返回None。其他 gRPC 异常会在 Client 内部捕获,并在返回的 fetch_map 中添加一个 "status_code" 字段来区分是否预测正常(参考 timeout 样例)。 + +6. 由于 gRPC 只支持 pick_first 和 round_robin 负载均衡策略,ABTEST 特性还未打齐。 + +7. 经测试,gRPC 版本可以在 Windows、macOS 平台使用。 + +8. 计划支持的客户端语言: + + - [x] Python + - [ ] Java + - [ ] Go + - [ ] JavaScript + +## Python 端的一些例子 + +详见 `python/examples/grpc_impl_example` 下的示例文件。 diff --git a/doc/grpc_impl.png b/doc/grpc_impl.png new file mode 100644 index 0000000000000000000000000000000000000000..05b1a67e815efae5f4b7b81758444bff48cfe59d Binary files /dev/null and b/doc/grpc_impl.png differ diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/README_CN.md b/python/examples/grpc_impl_example/criteo_ctr_with_cube/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..07fc1acc18903256c49d77e2af8e9c2d74b21c16 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/README_CN.md @@ -0,0 +1,40 @@ +## 带稀疏参数索引服务的CTR预测服务 + +该样例是为了展示gRPC Server 端 `load_model_config` 函数,在这个例子中,bRPC Server 端与 bRPC Client 端的配置文件是不同的(bPRC Client 端的数据先交给 cube,经过 cube 处理后再交给预测库) + +### 获取样例数据 +``` +sh get_data.sh +``` + +### 下载模型和稀疏参数序列文件 +``` +wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz +tar xf ctr_cube_unittest.tar.gz +mv models/ctr_client_conf ./ +mv models/ctr_serving_model_kv ./ +mv models/data ./cube/ +``` +执行脚本后会在当前目录有ctr_server_model_kv和ctr_client_config文件夹。 + +### 启动稀疏参数索引服务 +``` +wget https://paddle-serving.bj.bcebos.com/others/cube_app.tar.gz +tar xf cube_app.tar.gz +mv cube_app/cube* ./cube/ +sh cube_prepare.sh & +``` + +此处,模型当中的稀疏参数会被存放在稀疏参数索引服务Cube当中,关于稀疏参数索引服务Cube的介绍,请阅读[稀疏参数索引服务Cube单机版使用指南](../../../doc/CUBE_LOCAL_CN.md) + +### 启动RPC预测服务,服务端线程数为4(可在test_server.py配置) + +``` +python test_server.py ctr_serving_model_kv ctr_client_conf/serving_client_conf.prototxt +``` + +### 执行预测 + +``` +python test_client.py ./raw_data +``` diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/args.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/args.py new file mode 100755 index 0000000000000000000000000000000000000000..30124d4ebd9cd27cdb4580e654a8a47c55b178bf --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/args.py @@ -0,0 +1,105 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +import argparse + + +def parse_args(): + parser = argparse.ArgumentParser(description="PaddlePaddle CTR example") + parser.add_argument( + '--train_data_path', + type=str, + default='./data/raw/train.txt', + help="The path of training dataset") + parser.add_argument( + '--sparse_only', + type=bool, + default=False, + help="Whether we use sparse features only") + parser.add_argument( + '--test_data_path', + type=str, + default='./data/raw/valid.txt', + help="The path of testing dataset") + parser.add_argument( + '--batch_size', + type=int, + default=1000, + help="The size of mini-batch (default:1000)") + parser.add_argument( + '--embedding_size', + type=int, + default=10, + help="The size for embedding layer (default:10)") + parser.add_argument( + '--num_passes', + type=int, + default=10, + help="The number of passes to train (default: 10)") + parser.add_argument( + '--model_output_dir', + type=str, + default='models', + help='The path for model to store (default: models)') + parser.add_argument( + '--sparse_feature_dim', + type=int, + default=1000001, + help='sparse feature hashing space for index processing') + parser.add_argument( + '--is_local', + type=int, + default=1, + help='Local train or distributed train (default: 1)') + parser.add_argument( + '--cloud_train', + type=int, + default=0, + help='Local train or distributed train on paddlecloud (default: 0)') + parser.add_argument( + '--async_mode', + action='store_true', + default=False, + help='Whether start pserver in async mode to support ASGD') + parser.add_argument( + '--no_split_var', + action='store_true', + default=False, + help='Whether split variables into blocks when update_method is pserver') + parser.add_argument( + '--role', + type=str, + default='pserver', # trainer or pserver + help='The path for model to store (default: models)') + parser.add_argument( + '--endpoints', + type=str, + default='127.0.0.1:6000', + help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001') + parser.add_argument( + '--current_endpoint', + type=str, + default='127.0.0.1:6000', + help='The path for model to store (default: 127.0.0.1:6000)') + parser.add_argument( + '--trainer_id', + type=int, + default=0, + help='The path for model to store (default: models)') + parser.add_argument( + '--trainers', + type=int, + default=1, + help='The num of trianers, (default: 1)') + return parser.parse_args() diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/clean.sh b/python/examples/grpc_impl_example/criteo_ctr_with_cube/clean.sh new file mode 100755 index 0000000000000000000000000000000000000000..99a4819802178f1910c5fced7d4c5a39c3037e4a --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/clean.sh @@ -0,0 +1,4 @@ +ps -ef | grep cube | awk {'print $2'} | xargs kill -9 +rm -rf cube/cube_data cube/data cube/log* cube/nohup* cube/output/ cube/donefile cube/input cube/monitor cube/cube-builder.INFO +ps -ef | grep test | awk {'print $2'} | xargs kill -9 +ps -ef | grep serving | awk {'print $2'} | xargs kill -9 diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/criteo.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/criteo.py new file mode 100755 index 0000000000000000000000000000000000000000..f37eb1d2c1d8df6975ec0c28923c6e17c0272290 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/criteo.py @@ -0,0 +1,81 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + + +class CriteoDataset(object): + def setup(self, sparse_feature_dim): + self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + self.cont_max_ = [ + 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.cont_diff_ = [ + 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.hash_dim_ = sparse_feature_dim + # here, training data are lines with line_index < train_idx_ + self.train_idx_ = 41256555 + self.continuous_range_ = range(1, 14) + self.categorical_range_ = range(14, 40) + + def _process_line(self, line): + features = line.rstrip('\n').split('\t') + dense_feature = [] + sparse_feature = [] + for idx in self.continuous_range_: + if features[idx] == '': + dense_feature.append(0.0) + else: + dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \ + self.cont_diff_[idx - 1]) + for idx in self.categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % self.hash_dim_]) + + return dense_feature, sparse_feature, [int(features[0])] + + def infer_reader(self, filelist, batch, buf_size): + def local_iter(): + for fname in filelist: + with open(fname.strip(), "r") as fin: + for line in fin: + dense_feature, sparse_feature, label = self._process_line( + line) + #yield dense_feature, sparse_feature, label + yield [dense_feature] + sparse_feature + [label] + + import paddle + batch_iter = paddle.batch( + paddle.reader.shuffle( + local_iter, buf_size=buf_size), + batch_size=batch) + return batch_iter + + def generate_sample(self, line): + def data_iter(): + dense_feature, sparse_feature, label = self._process_line(line) + feature_name = ["dense_input"] + for idx in self.categorical_range_: + feature_name.append("C" + str(idx - 13)) + feature_name.append("label") + yield zip(feature_name, [dense_feature] + sparse_feature + [label]) + + return data_iter + + +if __name__ == "__main__": + criteo_dataset = CriteoDataset() + criteo_dataset.setup(int(sys.argv[1])) + criteo_dataset.run_from_stdin() diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/criteo_reader.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/criteo_reader.py new file mode 100755 index 0000000000000000000000000000000000000000..2a80af78a9c2033bf246f703ca70a817ab786af3 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/criteo_reader.py @@ -0,0 +1,83 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import sys +import paddle.fluid.incubate.data_generator as dg + + +class CriteoDataset(dg.MultiSlotDataGenerator): + def setup(self, sparse_feature_dim): + self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + self.cont_max_ = [ + 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.cont_diff_ = [ + 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 + ] + self.hash_dim_ = sparse_feature_dim + # here, training data are lines with line_index < train_idx_ + self.train_idx_ = 41256555 + self.continuous_range_ = range(1, 14) + self.categorical_range_ = range(14, 40) + + def _process_line(self, line): + features = line.rstrip('\n').split('\t') + dense_feature = [] + sparse_feature = [] + for idx in self.continuous_range_: + if features[idx] == '': + dense_feature.append(0.0) + else: + dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / \ + self.cont_diff_[idx - 1]) + for idx in self.categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % self.hash_dim_]) + + return dense_feature, sparse_feature, [int(features[0])] + + def infer_reader(self, filelist, batch, buf_size): + def local_iter(): + for fname in filelist: + with open(fname.strip(), "r") as fin: + for line in fin: + dense_feature, sparse_feature, label = self._process_line( + line) + #yield dense_feature, sparse_feature, label + yield [dense_feature] + sparse_feature + [label] + + import paddle + batch_iter = paddle.batch( + paddle.reader.shuffle( + local_iter, buf_size=buf_size), + batch_size=batch) + return batch_iter + + def generate_sample(self, line): + def data_iter(): + dense_feature, sparse_feature, label = self._process_line(line) + feature_name = ["dense_input"] + for idx in self.categorical_range_: + feature_name.append("C" + str(idx - 13)) + feature_name.append("label") + yield zip(feature_name, [dense_feature] + sparse_feature + [label]) + + return data_iter + + +if __name__ == "__main__": + criteo_dataset = CriteoDataset() + criteo_dataset.setup(int(sys.argv[1])) + criteo_dataset.run_from_stdin() diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/conf/cube.conf b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/conf/cube.conf new file mode 100755 index 0000000000000000000000000000000000000000..b70f6e34247e410f9b80054010338d3c8f452ec6 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/conf/cube.conf @@ -0,0 +1,13 @@ +[{ + "dict_name": "test_dict", + "shard": 1, + "dup": 1, + "timeout": 200, + "retry": 3, + "backup_request": 100, + "type": "ipport_list", + "load_balancer": "rr", + "nodes": [{ + "ipport_list": "list://127.0.0.1:8027" + }] +}] diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/conf/gflags.conf b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/conf/gflags.conf new file mode 100755 index 0000000000000000000000000000000000000000..21c7bddebd8f22b91d0ba26a6121007f96a4380b --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/conf/gflags.conf @@ -0,0 +1,4 @@ +--port=8027 +--dict_split=1 +--in_mem=true +--log_dir=./log/ diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/keys b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/keys new file mode 100755 index 0000000000000000000000000000000000000000..f00c965d8307308469e537302baa73048488f162 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube/keys @@ -0,0 +1,10 @@ +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube_prepare.sh b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube_prepare.sh new file mode 100755 index 0000000000000000000000000000000000000000..1417254a54e2194ab3a0194f2ec970f480787acd --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube_prepare.sh @@ -0,0 +1,22 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +#! /bin/bash + +mkdir -p cube_model +mkdir -p cube/data +./seq_generator ctr_serving_model/SparseFeatFactors ./cube_model/feature +./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=${PWD}/cube/data -shard_num=1 -only_build=false +mv ./cube/data/0_0/test_dict_part0/* ./cube/data/ +cd cube && ./cube diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube_quant_prepare.sh b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube_quant_prepare.sh new file mode 100755 index 0000000000000000000000000000000000000000..0db6575ab307fb81cdd0336a20bb9a8ec30d446d --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/cube_quant_prepare.sh @@ -0,0 +1,22 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +#! /bin/bash + +mkdir -p cube_model +mkdir -p cube/data +./seq_generator ctr_serving_model/SparseFeatFactors ./cube_model/feature 8 +./cube/cube-builder -dict_name=test_dict -job_mode=base -last_version=0 -cur_version=0 -depend_version=0 -input_path=./cube_model -output_path=${PWD}/cube/data -shard_num=1 -only_build=false +mv ./cube/data/0_0/test_dict_part0/* ./cube/data/ +cd cube && ./cube diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/get_data.sh b/python/examples/grpc_impl_example/criteo_ctr_with_cube/get_data.sh new file mode 100755 index 0000000000000000000000000000000000000000..1f244b3a4aa81488bb493825576ba30c4b3bba22 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/get_data.sh @@ -0,0 +1,2 @@ +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_data.tar.gz +tar -zxvf ctr_data.tar.gz diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/local_train.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/local_train.py new file mode 100755 index 0000000000000000000000000000000000000000..d4a1bc930924e348048f7ac3e5c46381d9b6441b --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/local_train.py @@ -0,0 +1,100 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from __future__ import print_function + +from args import parse_args +import os +import paddle.fluid as fluid +import sys +from network_conf import dnn_model + +dense_feature_dim = 13 + + +def train(): + args = parse_args() + sparse_only = args.sparse_only + if not os.path.isdir(args.model_output_dir): + os.mkdir(args.model_output_dir) + dense_input = fluid.layers.data( + name="dense_input", shape=[dense_feature_dim], dtype='float32') + sparse_input_ids = [ + fluid.layers.data( + name="C" + str(i), shape=[1], lod_level=1, dtype="int64") + for i in range(1, 27) + ] + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + + #nn_input = None if sparse_only else dense_input + nn_input = dense_input + predict_y, loss, auc_var, batch_auc_var, infer_vars = dnn_model( + nn_input, sparse_input_ids, label, args.embedding_size, + args.sparse_feature_dim) + + optimizer = fluid.optimizer.SGD(learning_rate=1e-4) + optimizer.minimize(loss) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset") + dataset.set_use_var([dense_input] + sparse_input_ids + [label]) + + python_executable = "python" + pipe_command = "{} criteo_reader.py {}".format(python_executable, + args.sparse_feature_dim) + + dataset.set_pipe_command(pipe_command) + dataset.set_batch_size(128) + thread_num = 10 + dataset.set_thread(thread_num) + + whole_filelist = [ + "raw_data/part-%d" % x for x in range(len(os.listdir("raw_data"))) + ] + + print(whole_filelist) + dataset.set_filelist(whole_filelist[:100]) + dataset.load_into_memory() + fluid.layers.Print(auc_var) + epochs = 1 + for i in range(epochs): + exe.train_from_dataset( + program=fluid.default_main_program(), dataset=dataset, debug=True) + print("epoch {} finished".format(i)) + + import paddle_serving_client.io as server_io + feed_var_dict = {} + feed_var_dict['dense_input'] = dense_input + for i, sparse in enumerate(sparse_input_ids): + feed_var_dict["embedding_{}.tmp_0".format(i)] = sparse + fetch_var_dict = {"prob": predict_y} + + feed_kv_dict = {} + feed_kv_dict['dense_input'] = dense_input + for i, emb in enumerate(infer_vars): + feed_kv_dict["embedding_{}.tmp_0".format(i)] = emb + fetch_var_dict = {"prob": predict_y} + + server_io.save_model("ctr_serving_model", "ctr_client_conf", feed_var_dict, + fetch_var_dict, fluid.default_main_program()) + + server_io.save_model("ctr_serving_model_kv", "ctr_client_conf_kv", + feed_kv_dict, fetch_var_dict, + fluid.default_main_program()) + + +if __name__ == '__main__': + train() diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/network_conf.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/network_conf.py new file mode 100755 index 0000000000000000000000000000000000000000..2975533a72ad21d6dd5896446fd06c1f9bdfe8b4 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/network_conf.py @@ -0,0 +1,77 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import paddle.fluid as fluid +import math + + +def dnn_model(dense_input, sparse_inputs, label, embedding_size, + sparse_feature_dim): + def embedding_layer(input): + emb = fluid.layers.embedding( + input=input, + is_sparse=True, + is_distributed=False, + size=[sparse_feature_dim, embedding_size], + param_attr=fluid.ParamAttr( + name="SparseFeatFactors", + initializer=fluid.initializer.Uniform())) + x = fluid.layers.sequence_pool(input=emb, pool_type='sum') + return emb, x + + def mlp_input_tensor(emb_sums, dense_tensor): + #if isinstance(dense_tensor, fluid.Variable): + # return fluid.layers.concat(emb_sums, axis=1) + #else: + return fluid.layers.concat(emb_sums + [dense_tensor], axis=1) + + def mlp(mlp_input): + fc1 = fluid.layers.fc(input=mlp_input, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(mlp_input.shape[1])))) + fc2 = fluid.layers.fc(input=fc1, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc1.shape[1])))) + fc3 = fluid.layers.fc(input=fc2, + size=400, + act='relu', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc2.shape[1])))) + pre = fluid.layers.fc(input=fc3, + size=2, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Normal( + scale=1 / math.sqrt(fc3.shape[1])))) + return pre + + emb_pair_sums = list(map(embedding_layer, sparse_inputs)) + emb_sums = [x[1] for x in emb_pair_sums] + infer_vars = [x[0] for x in emb_pair_sums] + mlp_in = mlp_input_tensor(emb_sums, dense_input) + predict = mlp(mlp_in) + cost = fluid.layers.cross_entropy(input=predict, label=label) + avg_cost = fluid.layers.reduce_sum(cost) + accuracy = fluid.layers.accuracy(input=predict, label=label) + auc_var, batch_auc_var, auc_states = \ + fluid.layers.auc(input=predict, label=label, num_thresholds=2 ** 12, slide_steps=20) + return predict, avg_cost, auc_var, batch_auc_var, infer_vars diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_client.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_client.py new file mode 100755 index 0000000000000000000000000000000000000000..f82c1a21c153594e0be192506af5318c24a4e99a --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_client.py @@ -0,0 +1,49 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient as Client +import sys +import os +import criteo as criteo +import time +from paddle_serving_client.metric import auc +import grpc + +client = Client() +client.connect(["127.0.0.1:9292"]) + +batch = 1 +buf_size = 100 +dataset = criteo.CriteoDataset() +dataset.setup(1000001) +test_filelists = ["{}/part-0".format(sys.argv[1])] +reader = dataset.infer_reader(test_filelists, batch, buf_size) +label_list = [] +prob_list = [] +start = time.time() +for ei in range(10000): + data = reader().next() + feed_dict = {} + feed_dict['dense_input'] = data[0][0] + for i in range(1, 27): + feed_dict["embedding_{}.tmp_0".format(i - 1)] = data[0][i] + fetch_map = client.predict(feed=feed_dict, fetch=["prob"]) + if fetch_map["serving_status_code"] == 0: + prob_list.append(fetch_map['prob'][0][1]) + label_list.append(data[0][-1][0]) + +print(auc(label_list, prob_list)) +end = time.time() +print(end - start) diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server.py new file mode 100755 index 0000000000000000000000000000000000000000..361d5a59becb7c110907f66d8b651e05e7eb418e --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import MultiLangServer as Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_dist_kv_infer_op = op_maker.create('general_dist_kv_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_dist_kv_infer_op) +op_seq_maker.add_op(response_op) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.set_num_threads(4) +server.load_model_config(sys.argv[1], sys.argv[2]) +server.prepare_server(workdir="work_dir1", port=9292, device="cpu") +server.run_server() diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server_gpu.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server_gpu.py new file mode 100755 index 0000000000000000000000000000000000000000..38e1bf82118f6af7cfe7b467003332a5328b2979 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server_gpu.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server_gpu import OpMaker +from paddle_serving_server_gpu import OpSeqMaker +from paddle_serving_server_gpu import MultiLangServer as Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_dist_kv_infer_op = op_maker.create('general_dist_kv_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_dist_kv_infer_op) +op_seq_maker.add_op(response_op) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.set_num_threads(4) +server.load_model_config(sys.argv[1], sys.argv[2]) +server.prepare_server(workdir="work_dir1", port=9292, device="cpu") +server.run_server() diff --git a/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server_quant.py b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server_quant.py new file mode 100755 index 0000000000000000000000000000000000000000..feca75b077d737a614bdfd955b7bf0d82ed08529 --- /dev/null +++ b/python/examples/grpc_impl_example/criteo_ctr_with_cube/test_server_quant.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server import OpMaker +from paddle_serving_server import OpSeqMaker +from paddle_serving_server import MultiLangServer as Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_dist_kv_infer_op = op_maker.create('general_dist_kv_quant_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_dist_kv_infer_op) +op_seq_maker.add_op(response_op) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.set_num_threads(4) +server.load_model_config(sys.argv[1], sys.argv[2]) +server.prepare_server(workdir="work_dir1", port=9292, device="cpu") +server.run_server() diff --git a/python/examples/grpc_impl_example/fit_a_line/README_CN.md b/python/examples/grpc_impl_example/fit_a_line/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..93e0d1cf7262d620df18570401ed39db67f839ef --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/README_CN.md @@ -0,0 +1,57 @@ +# 线性回归预测服务示例 + +## 获取数据 + +```shell +sh get_data.sh +``` + +## 开启 gRPC 服务端 + +``` shell +python test_server.py uci_housing_model/ +``` + +也可以通过下面的一行代码开启默认 gRPC 服务: + +```shell +python -m paddle_serving_server.serve --model uci_housing_model --thread 10 --port 9393 --use_multilang +``` + +## 客户端预测 + +### 同步预测 + +``` shell +python test_sync_client.py +``` + +### 异步预测 + +``` shell +python test_asyn_client.py +``` + +### Batch 预测 + +``` shell +python test_batch_client.py +``` + +### 通用 pb 预测 + +``` shell +python test_general_pb_client.py +``` + +### 预测超时 + +``` shell +python test_timeout_client.py +``` + +### List 输入 + +``` shell +python test_list_input_client.py +``` diff --git a/python/examples/grpc_impl_example/fit_a_line/get_data.sh b/python/examples/grpc_impl_example/fit_a_line/get_data.sh new file mode 100644 index 0000000000000000000000000000000000000000..84a3966a0ef323cef4b146d8e9489c70a7a8ae35 --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/get_data.sh @@ -0,0 +1,2 @@ +wget --no-check-certificate https://paddle-serving.bj.bcebos.com/uci_housing.tar.gz +tar -xzf uci_housing.tar.gz diff --git a/python/examples/fit_a_line/test_multilang_client.py b/python/examples/grpc_impl_example/fit_a_line/test_asyn_client.py similarity index 58% rename from python/examples/fit_a_line/test_multilang_client.py rename to python/examples/grpc_impl_example/fit_a_line/test_asyn_client.py index f85814a4b24693c269c192b23f0f5ab1c7d566a6..b01a9372585bae42abca213fe8fb8a55505dfe57 100644 --- a/python/examples/fit_a_line/test_multilang_client.py +++ b/python/examples/grpc_impl_example/fit_a_line/test_asyn_client.py @@ -13,38 +13,39 @@ # limitations under the License. # pylint: disable=doc-string-missing -from paddle_serving_client import MultiLangClient +from paddle_serving_client import MultiLangClient as Client import functools -import sys import time import threading +import grpc -client = MultiLangClient() -client.load_client_config(sys.argv[1]) +client = Client() client.connect(["127.0.0.1:9393"]) -import paddle -test_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.uci_housing.test(), buf_size=500), - batch_size=1) - complete_task_count = [0] lock = threading.Lock() -def call_back(call_future, data): - fetch_map = call_future.result() - print("{} {}".format(fetch_map["price"][0], data[0][1][0])) - with lock: - complete_task_count[0] += 1 +def call_back(call_future): + try: + fetch_map = call_future.result() + print(fetch_map) + except grpc.RpcError as e: + print(e.code()) + finally: + with lock: + complete_task_count[0] += 1 +x = [ + 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, + 0.4919, 0.1856, 0.0795, -0.0332 +] task_count = 0 -for data in test_reader(): - future = client.predict(feed={"x": data[0][0]}, fetch=["price"], asyn=True) +for i in range(3): + future = client.predict(feed={"x": x}, fetch=["price"], asyn=True) task_count += 1 - future.add_done_callback(functools.partial(call_back, data=data)) + future.add_done_callback(functools.partial(call_back)) while complete_task_count[0] != task_count: time.sleep(0.1) diff --git a/python/examples/grpc_impl_example/fit_a_line/test_batch_client.py b/python/examples/grpc_impl_example/fit_a_line/test_batch_client.py new file mode 100644 index 0000000000000000000000000000000000000000..0630a0a960e5e40a7507454feb57418c8cfbdc68 --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/test_batch_client.py @@ -0,0 +1,32 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing +from paddle_serving_client import MultiLangClient as Client + +client = Client() +client.connect(["127.0.0.1:9393"]) + +batch_size = 2 +x = [ + 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, + 0.4919, 0.1856, 0.0795, -0.0332 +] + +for i in range(3): + batch_feed = [{"x": x} for j in range(batch_size)] + fetch_map = client.predict(feed=batch_feed, fetch=["price"]) + if fetch_map["serving_status_code"] == 0: + print(fetch_map) + else: + print(fetch_map["serving_status_code"]) diff --git a/python/examples/grpc_impl_example/fit_a_line/test_general_pb_client.py b/python/examples/grpc_impl_example/fit_a_line/test_general_pb_client.py new file mode 100644 index 0000000000000000000000000000000000000000..b2744906b0dcd321f86a1b8117a78307e24578e5 --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/test_general_pb_client.py @@ -0,0 +1,30 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient as Client + +client = Client() +client.connect(["127.0.0.1:9393"]) + +x = [ + 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, + 0.4919, 0.1856, 0.0795, -0.0332 +] +for i in range(3): + fetch_map = client.predict(feed={"x": x}, fetch=["price"], is_python=False) + if fetch_map["serving_status_code"] == 0: + print(fetch_map) + else: + print(fetch_map["serving_status_code"]) diff --git a/python/examples/grpc_impl_example/fit_a_line/test_numpy_input_client.py b/python/examples/grpc_impl_example/fit_a_line/test_numpy_input_client.py new file mode 100644 index 0000000000000000000000000000000000000000..e98c1e87bb48613e4226cf5378063aec7c5b4093 --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/test_numpy_input_client.py @@ -0,0 +1,31 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient as Client +import numpy as np + +client = Client() +client.connect(["127.0.0.1:9393"]) + +x = [ + 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, + 0.4919, 0.1856, 0.0795, -0.0332 +] +for i in range(3): + fetch_map = client.predict(feed={"x": np.array(x)}, fetch=["price"]) + if fetch_map["serving_status_code"] == 0: + print(fetch_map) + else: + print(fetch_map["serving_status_code"]) diff --git a/python/examples/fit_a_line/test_multilang_server.py b/python/examples/grpc_impl_example/fit_a_line/test_server.py similarity index 94% rename from python/examples/fit_a_line/test_multilang_server.py rename to python/examples/grpc_impl_example/fit_a_line/test_server.py index 23eb938f0ee1bf6b195509816dea5221bbfa9218..6acc7bfe2e6d00621f32f1f7f437691fc15d20fc 100644 --- a/python/examples/fit_a_line/test_multilang_server.py +++ b/python/examples/grpc_impl_example/fit_a_line/test_server.py @@ -17,7 +17,7 @@ import os import sys from paddle_serving_server import OpMaker from paddle_serving_server import OpSeqMaker -from paddle_serving_server import MultiLangServer +from paddle_serving_server import MultiLangServer as Server op_maker = OpMaker() read_op = op_maker.create('general_reader') @@ -29,7 +29,7 @@ op_seq_maker.add_op(read_op) op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(response_op) -server = MultiLangServer() +server = Server() server.set_op_sequence(op_seq_maker.get_op_sequence()) server.load_model_config(sys.argv[1]) server.prepare_server(workdir="work_dir1", port=9393, device="cpu") diff --git a/python/examples/grpc_impl_example/fit_a_line/test_server_gpu.py b/python/examples/grpc_impl_example/fit_a_line/test_server_gpu.py new file mode 100644 index 0000000000000000000000000000000000000000..1547ee445f4f8ceebe58e6f9e4f05b92520911eb --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/test_server_gpu.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +import os +import sys +from paddle_serving_server_gpu import OpMaker +from paddle_serving_server_gpu import OpSeqMaker +from paddle_serving_server_gpu import MultiLangServer as Server + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +general_infer_op = op_maker.create('general_infer') +response_op = op_maker.create('general_response') + +op_seq_maker = OpSeqMaker() +op_seq_maker.add_op(read_op) +op_seq_maker.add_op(general_infer_op) +op_seq_maker.add_op(response_op) + +server = Server() +server.set_op_sequence(op_seq_maker.get_op_sequence()) +server.load_model_config(sys.argv[1]) +server.set_gpuid(0) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/python/examples/grpc_impl_example/fit_a_line/test_sync_client.py b/python/examples/grpc_impl_example/fit_a_line/test_sync_client.py new file mode 100644 index 0000000000000000000000000000000000000000..89530dc2f2a33ef44b2dbde52975634f4b4d8295 --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/test_sync_client.py @@ -0,0 +1,30 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient as Client + +client = Client() +client.connect(["127.0.0.1:9393"]) + +x = [ + 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, + 0.4919, 0.1856, 0.0795, -0.0332 +] +for i in range(3): + fetch_map = client.predict(feed={"x": x}, fetch=["price"]) + if fetch_map["serving_status_code"] == 0: + print(fetch_map) + else: + print(fetch_map["serving_status_code"]) diff --git a/python/examples/grpc_impl_example/fit_a_line/test_timeout_client.py b/python/examples/grpc_impl_example/fit_a_line/test_timeout_client.py new file mode 100644 index 0000000000000000000000000000000000000000..f90fab38533aabf3daa7627ee0b79c56892444dd --- /dev/null +++ b/python/examples/grpc_impl_example/fit_a_line/test_timeout_client.py @@ -0,0 +1,34 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient as Client +import grpc + +client = Client() +client.connect(["127.0.0.1:9393"]) +client.set_rpc_timeout_ms(1) + +x = [ + 0.0137, -0.1136, 0.2553, -0.0692, 0.0582, -0.0727, -0.1583, -0.0584, 0.6283, + 0.4919, 0.1856, 0.0795, -0.0332 +] +for i in range(3): + fetch_map = client.predict(feed={"x": x}, fetch=["price"]) + if fetch_map["serving_status_code"] == 0: + print(fetch_map) + elif fetch_map["serving_status_code"] == grpc.StatusCode.DEADLINE_EXCEEDED: + print('timeout') + else: + print(fetch_map["serving_status_code"]) diff --git a/python/examples/imdb/test_ensemble_client.py b/python/examples/imdb/test_ensemble_client.py index 6cafb3389fff5a25103bcb2b3a867b73b35b9e8e..eb1e29ddd6d5a02854e4859a35474306c1c4d073 100644 --- a/python/examples/imdb/test_ensemble_client.py +++ b/python/examples/imdb/test_ensemble_client.py @@ -32,11 +32,7 @@ for i in range(3): line = 'i am very sad | 0' word_ids, label = imdb_dataset.get_words_and_label(line) feed = {"words": word_ids} - fetch = ["acc", "cost", "prediction"] + fetch = ["prediction"] fetch_maps = client.predict(feed=feed, fetch=fetch) - if len(fetch_maps) == 1: - print("step: {}, res: {}".format(i, fetch_maps['prediction'][0][1])) - else: - for model, fetch_map in fetch_maps.items(): - print("step: {}, model: {}, res: {}".format(i, model, fetch_map[ - 'prediction'][0][1])) + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map)) diff --git a/python/examples/imdb/test_multilang_ensemble_client.py b/python/examples/imdb/test_multilang_ensemble_client.py new file mode 100644 index 0000000000000000000000000000000000000000..6686d4c8c38d6a17cb9c5701abf7d76773031772 --- /dev/null +++ b/python/examples/imdb/test_multilang_ensemble_client.py @@ -0,0 +1,37 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_client import MultiLangClient +from imdb_reader import IMDBDataset + +client = MultiLangClient() +# If you have more than one model, make sure that the input +# and output of more than one model are the same. +client.connect(["127.0.0.1:9393"]) + +# you can define any english sentence or dataset here +# This example reuses imdb reader in training, you +# can define your own data preprocessing easily. +imdb_dataset = IMDBDataset() +imdb_dataset.load_resource('imdb.vocab') + +for i in range(3): + line = 'i am very sad | 0' + word_ids, label = imdb_dataset.get_words_and_label(line) + feed = {"words": word_ids} + fetch = ["prediction"] + fetch_maps = client.predict(feed=feed, fetch=fetch) + for model, fetch_map in fetch_maps.items(): + print("step: {}, model: {}, res: {}".format(i, model, fetch_map)) diff --git a/python/examples/imdb/test_multilang_ensemble_server.py b/python/examples/imdb/test_multilang_ensemble_server.py new file mode 100644 index 0000000000000000000000000000000000000000..053aa06f0219de231415ba178135782334e56c1f --- /dev/null +++ b/python/examples/imdb/test_multilang_ensemble_server.py @@ -0,0 +1,40 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=doc-string-missing + +from paddle_serving_server import OpMaker +from paddle_serving_server import OpGraphMaker +from paddle_serving_server import MultiLangServer + +op_maker = OpMaker() +read_op = op_maker.create('general_reader') +cnn_infer_op = op_maker.create( + 'general_infer', engine_name='cnn', inputs=[read_op]) +bow_infer_op = op_maker.create( + 'general_infer', engine_name='bow', inputs=[read_op]) +response_op = op_maker.create( + 'general_response', inputs=[cnn_infer_op, bow_infer_op]) + +op_graph_maker = OpGraphMaker() +op_graph_maker.add_op(read_op) +op_graph_maker.add_op(cnn_infer_op) +op_graph_maker.add_op(bow_infer_op) +op_graph_maker.add_op(response_op) + +server = MultiLangServer() +server.set_op_graph(op_graph_maker.get_op_graph()) +model_config = {cnn_infer_op: 'imdb_cnn_model', bow_infer_op: 'imdb_bow_model'} +server.load_model_config(model_config) +server.prepare_server(workdir="work_dir1", port=9393, device="cpu") +server.run_server() diff --git a/python/examples/yolov4/000000570688.jpg b/python/examples/yolov4/000000570688.jpg new file mode 100644 index 0000000000000000000000000000000000000000..cb304bd56c4010c08611a30dcca58ea9140cea54 Binary files /dev/null and b/python/examples/yolov4/000000570688.jpg differ diff --git a/python/examples/yolov4/README.md b/python/examples/yolov4/README.md new file mode 100644 index 0000000000000000000000000000000000000000..08e16026d79ef7e93df732359f2c17609d4a2d0d --- /dev/null +++ b/python/examples/yolov4/README.md @@ -0,0 +1,23 @@ +# Yolov4 Detection Service + +([简体中文](README_CN.md)|English) + +## Get Model + +``` +python -m paddle_serving_app.package --get_model yolov4 +tar -xzvf yolov4.tar.gz +``` + +## Start RPC Service + +``` +python -m paddle_serving_server_gpu.serve --model yolov4_model --port 9393 --gpu_ids 0 +``` + +## Prediction + +``` +python test_client.py 000000570688.jpg +``` +After the prediction is completed, a json file to save the prediction result and a picture with the detection result box will be generated in the `./outpu folder. diff --git a/python/examples/yolov4/README_CN.md b/python/examples/yolov4/README_CN.md new file mode 100644 index 0000000000000000000000000000000000000000..a4eed96b08619d4602cbd012a676a9adb6e08a63 --- /dev/null +++ b/python/examples/yolov4/README_CN.md @@ -0,0 +1,24 @@ +# Yolov4 检测服务 + +(简体中文|[English](README.md)) + +## 获取模型 + +``` +python -m paddle_serving_app.package --get_model yolov4 +tar -xzvf yolov4.tar.gz +``` + +## 启动RPC服务 + +``` +python -m paddle_serving_server_gpu.serve --model yolov4_model --port 9393 --gpu_ids 0 +``` + +## 预测 + +``` +python test_client.py 000000570688.jpg +``` + +预测完成会在`./output`文件夹下生成保存预测结果的json文件以及标出检测结果框的图片。 diff --git a/python/examples/yolov4/label_list.txt b/python/examples/yolov4/label_list.txt new file mode 100644 index 0000000000000000000000000000000000000000..941cb4e1392266f6a6c09b1fdc5f79503b2e5df6 --- /dev/null +++ b/python/examples/yolov4/label_list.txt @@ -0,0 +1,80 @@ +person +bicycle +car +motorcycle +airplane +bus +train +truck +boat +traffic light +fire hydrant +stop sign +parking meter +bench +bird +cat +dog +horse +sheep +cow +elephant +bear +zebra +giraffe +backpack +umbrella +handbag +tie +suitcase +frisbee +skis +snowboard +sports ball +kite +baseball bat +baseball glove +skateboard +surfboard +tennis racket +bottle +wine glass +cup +fork +knife +spoon +bowl +banana +apple +sandwich +orange +broccoli +carrot +hot dog +pizza +donut +cake +chair +couch +potted plant +bed +dining table +toilet +tv +laptop +mouse +remote +keyboard +cell phone +microwave +oven +toaster +sink +refrigerator +book +clock +vase +scissors +teddy bear +hair drier +toothbrush diff --git a/python/examples/yolov4/test_client.py b/python/examples/yolov4/test_client.py new file mode 100644 index 0000000000000000000000000000000000000000..92dcd06552ca1fdd3f2d54060e9de501f052e349 --- /dev/null +++ b/python/examples/yolov4/test_client.py @@ -0,0 +1,41 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import numpy as np +from paddle_serving_client import Client +from paddle_serving_app.reader import * +import cv2 +preprocess = Sequential([ + File2Image(), BGR2RGB(), Resize( + (608, 608), interpolation=cv2.INTER_LINEAR), Div(255.0), Transpose( + (2, 0, 1)) +]) + +postprocess = RCNNPostprocess("label_list.txt", "output", [608, 608]) +client = Client() + +client.load_client_config("yolov4_client/serving_client_conf.prototxt") +client.connect(['127.0.0.1:9393']) + +im = preprocess(sys.argv[1]) +print(im.shape) +fetch_map = client.predict( + feed={ + "image": im, + "im_size": np.array(list(im.shape[1:])), + }, + fetch=["save_infer_model/scale_0.tmp_0"]) +fetch_map["image"] = sys.argv[1] +postprocess(fetch_map) diff --git a/python/paddle_serving_app/models/model_list.py b/python/paddle_serving_app/models/model_list.py index 0c26a59f6f0537b9c910f21062938d4720d4f9f4..79b3f91bd6584d17ddbc4124584cf40bd586b965 100644 --- a/python/paddle_serving_app/models/model_list.py +++ b/python/paddle_serving_app/models/model_list.py @@ -24,7 +24,7 @@ class ServingModels(object): "SentimentAnalysis"] = ["senta_bilstm", "senta_bow", "senta_cnn"] self.model_dict["SemanticRepresentation"] = ["ernie"] self.model_dict["ChineseWordSegmentation"] = ["lac"] - self.model_dict["ObjectDetection"] = ["faster_rcnn"] + self.model_dict["ObjectDetection"] = ["faster_rcnn", "yolov4"] self.model_dict["ImageSegmentation"] = [ "unet", "deeplabv3", "deeplabv3+cityscapes" ] diff --git a/python/paddle_serving_app/reader/image_reader.py b/python/paddle_serving_app/reader/image_reader.py index dc029bf0409179f1d392ce05d007565cd3007085..a44ca5de84da2bafce9b4cea37fb88095debabc6 100644 --- a/python/paddle_serving_app/reader/image_reader.py +++ b/python/paddle_serving_app/reader/image_reader.py @@ -280,10 +280,11 @@ class SegPostprocess(object): class RCNNPostprocess(object): - def __init__(self, label_file, output_dir): + def __init__(self, label_file, output_dir, resize_shape=None): self.output_dir = output_dir self.label_file = label_file self.label_list = [] + self.resize_shape = resize_shape with open(label_file) as fin: for line in fin: self.label_list.append(line.strip()) @@ -378,6 +379,13 @@ class RCNNPostprocess(object): xmax = xmin + w ymax = ymin + h + img_w, img_h = image.size + if self.resize_shape is not None: + xmin = xmin * img_w / self.resize_shape[0] + xmax = xmax * img_w / self.resize_shape[0] + ymin = ymin * img_h / self.resize_shape[1] + ymax = ymax * img_h / self.resize_shape[1] + color = tuple(color_list[catid]) # draw bbox diff --git a/python/paddle_serving_client/__init__.py b/python/paddle_serving_client/__init__.py index 63f827167de6417a15097d0ea2c7834e7fbf2d20..455bcf62cd039dde69736ec514892856eabd3088 100644 --- a/python/paddle_serving_client/__init__.py +++ b/python/paddle_serving_client/__init__.py @@ -28,8 +28,11 @@ sys.path.append( os.path.join(os.path.abspath(os.path.dirname(__file__)), 'proto')) from .proto import multi_lang_general_model_service_pb2_grpc -int_type = 0 -float_type = 1 +int64_type = 0 +float32_type = 1 +int32_type = 2 +int_type = set([int64_type, int32_type]) +float_type = set([float32_type]) class _NOPProfiler(object): @@ -279,7 +282,7 @@ class Client(object): raise ValueError("Wrong feed name: {}.".format(key)) #if not isinstance(feed_i[key], np.ndarray): self.shape_check(feed_i, key) - if self.feed_types_[key] == int_type: + if self.feed_types_[key] in int_type: if i == 0: int_feed_names.append(key) if isinstance(feed_i[key], np.ndarray): @@ -292,7 +295,7 @@ class Client(object): else: int_slot.append(feed_i[key]) self.all_numpy_input = False - elif self.feed_types_[key] == float_type: + elif self.feed_types_[key] in float_type: if i == 0: float_feed_names.append(key) if isinstance(feed_i[key], np.ndarray): @@ -339,7 +342,7 @@ class Client(object): result_map = {} # result map needs to be a numpy array for i, name in enumerate(fetch_names): - if self.fetch_names_to_type_[name] == int_type: + if self.fetch_names_to_type_[name] == int64_type: # result_map[name] will be py::array(numpy array) result_map[name] = result_batch_handle.get_int64_by_name( mi, name) @@ -348,7 +351,7 @@ class Client(object): if name in self.lod_tensor_set: result_map["{}.lod".format( name)] = result_batch_handle.get_lod(mi, name) - elif self.fetch_names_to_type_[name] == float_type: + elif self.fetch_names_to_type_[name] == float32_type: result_map[name] = result_batch_handle.get_float_by_name( mi, name) shape = result_batch_handle.get_shape(mi, name) @@ -356,6 +359,16 @@ class Client(object): if name in self.lod_tensor_set: result_map["{}.lod".format( name)] = result_batch_handle.get_lod(mi, name) + + elif self.fetch_names_to_type_[name] == int32_type: + # result_map[name] will be py::array(numpy array) + result_map[name] = result_batch_handle.get_int32_by_name( + mi, name) + shape = result_batch_handle.get_shape(mi, name) + result_map[name].shape = shape + if name in self.lod_tensor_set: + result_map["{}.lod".format( + name)] = result_batch_handle.get_lod(mi, name) multi_result_map.append(result_map) ret = None if len(model_engine_names) == 1: @@ -384,22 +397,41 @@ class Client(object): class MultiLangClient(object): def __init__(self): self.channel_ = None + self.stub_ = None + self.rpc_timeout_s_ = 2 - def load_client_config(self, path): - if not isinstance(path, str): - raise Exception("GClient only supports multi-model temporarily") - self._parse_model_config(path) + def add_variant(self, tag, cluster, variant_weight): + # TODO + raise Exception("cannot support ABtest yet") + + def set_rpc_timeout_ms(self, rpc_timeout): + if self.stub_ is None: + raise Exception("set timeout must be set after connect.") + if not isinstance(rpc_timeout, int): + # for bclient + raise ValueError("rpc_timeout must be int type.") + self.rpc_timeout_s_ = rpc_timeout / 1000.0 + timeout_req = multi_lang_general_model_service_pb2.SetTimeoutRequest() + timeout_req.timeout_ms = rpc_timeout + resp = self.stub_.SetTimeout(timeout_req) + return resp.err_code == 0 - def connect(self, endpoint): + def connect(self, endpoints): # https://github.com/tensorflow/serving/issues/1382 options = [('grpc.max_receive_message_length', 512 * 1024 * 1024), ('grpc.max_send_message_length', 512 * 1024 * 1024), - ('grpc.max_receive_message_length', 512 * 1024 * 1024)] - - self.channel_ = grpc.insecure_channel( - endpoint[0], options=options) #TODO + ('grpc.lb_policy_name', 'round_robin')] + # TODO: weight round robin + g_endpoint = 'ipv4:{}'.format(','.join(endpoints)) + self.channel_ = grpc.insecure_channel(g_endpoint, options=options) self.stub_ = multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelServiceStub( self.channel_) + # get client model config + get_client_config_req = multi_lang_general_model_service_pb2.GetClientConfigRequest( + ) + resp = self.stub_.GetClientConfig(get_client_config_req) + model_config_str = resp.client_config_str + self._parse_model_config(model_config_str) def _flatten_list(self, nested_list): for item in nested_list: @@ -409,11 +441,10 @@ class MultiLangClient(object): else: yield item - def _parse_model_config(self, model_config_path): + def _parse_model_config(self, model_config_str): model_conf = m_config.GeneralModelConfig() - f = open(model_config_path, 'r') - model_conf = google.protobuf.text_format.Merge( - str(f.read()), model_conf) + model_conf = google.protobuf.text_format.Merge(model_config_str, + model_conf) self.feed_names_ = [var.alias_name for var in model_conf.feed_var] self.feed_types_ = {} self.feed_shapes_ = {} @@ -434,8 +465,8 @@ class MultiLangClient(object): if var.is_lod_tensor: self.lod_tensor_set_.add(var.alias_name) - def _pack_feed_data(self, feed, fetch, is_python): - req = multi_lang_general_model_service_pb2.Request() + def _pack_inference_request(self, feed, fetch, is_python): + req = multi_lang_general_model_service_pb2.InferenceRequest() req.fetch_var_names.extend(fetch) req.is_python = is_python feed_batch = None @@ -460,26 +491,50 @@ class MultiLangClient(object): data = np.array(var, dtype="int64") elif v_type == 1: # float32 data = np.array(var, dtype="float32") + elif v_type == 2: # int32 + data = np.array(var, dtype="int32") else: - raise Exception("error type.") - else: + raise Exception("error tensor value type.") + elif isinstance(var, np.ndarray): data = var - if var.dtype == "float64": - data = data.astype("float32") + if v_type == 0: + if data.dtype != 'int64': + data = data.astype("int64") + elif v_type == 1: + if data.dtype != 'float32': + data = data.astype("float32") + elif v_type == 2: + if data.dtype != 'int32': + data = data.astype("int32") + else: + raise Exception("error tensor value type.") + else: + raise Exception("var must be list or ndarray.") tensor.data = data.tobytes() else: - if v_type == 0: # int64 - if isinstance(var, np.ndarray): - tensor.int64_data.extend(var.reshape(-1).tolist()) + if isinstance(var, np.ndarray): + if v_type == 0: # int64 + tensor.int64_data.extend( + var.reshape(-1).astype("int64").tolist()) + elif v_type == 1: + tensor.float_data.extend( + var.reshape(-1).astype('float32').tolist()) + elif v_type == 2: + tensor.int32_data.extend( + var.reshape(-1).astype('int32').tolist()) else: + raise Exception("error tensor value type.") + elif isinstance(var, list): + if v_type == 0: tensor.int64_data.extend(self._flatten_list(var)) - elif v_type == 1: # float32 - if isinstance(var, np.ndarray): - tensor.float_data.extend(var.reshape(-1).tolist()) - else: + elif v_type == 1: tensor.float_data.extend(self._flatten_list(var)) + elif v_type == 2: + tensor.int32_data.extend(self._flatten_list(var)) + else: + raise Exception("error tensor value type.") else: - raise Exception("error type.") + raise Exception("var must be list or ndarray.") if isinstance(var, np.ndarray): tensor.shape.extend(list(var.shape)) else: @@ -488,37 +543,52 @@ class MultiLangClient(object): req.insts.append(inst) return req - def _unpack_resp(self, resp, fetch, is_python, need_variant_tag): - result_map = {} - inst = resp.outputs[0].insts[0] + def _unpack_inference_response(self, resp, fetch, is_python, + need_variant_tag): + if resp.err_code != 0: + return None tag = resp.tag - for i, name in enumerate(fetch): - var = inst.tensor_array[i] - v_type = self.fetch_types_[name] - if is_python: - if v_type == 0: # int64 - result_map[name] = np.frombuffer(var.data, dtype="int64") - elif v_type == 1: # float32 - result_map[name] = np.frombuffer(var.data, dtype="float32") - else: - raise Exception("error type.") - else: - if v_type == 0: # int64 - result_map[name] = np.array( - list(var.int64_data), dtype="int64") - elif v_type == 1: # float32 - result_map[name] = np.array( - list(var.float_data), dtype="float32") + multi_result_map = {} + for model_result in resp.outputs: + inst = model_result.insts[0] + result_map = {} + for i, name in enumerate(fetch): + var = inst.tensor_array[i] + v_type = self.fetch_types_[name] + if is_python: + if v_type == 0: # int64 + result_map[name] = np.frombuffer( + var.data, dtype="int64") + elif v_type == 1: # float32 + result_map[name] = np.frombuffer( + var.data, dtype="float32") + else: + raise Exception("error type.") else: - raise Exception("error type.") - result_map[name].shape = list(var.shape) - if name in self.lod_tensor_set_: - result_map["{}.lod".format(name)] = np.array(list(var.lod)) - return result_map if not need_variant_tag else [result_map, tag] + if v_type == 0: # int64 + result_map[name] = np.array( + list(var.int64_data), dtype="int64") + elif v_type == 1: # float32 + result_map[name] = np.array( + list(var.float_data), dtype="float32") + else: + raise Exception("error type.") + result_map[name].shape = list(var.shape) + if name in self.lod_tensor_set_: + result_map["{}.lod".format(name)] = np.array(list(var.lod)) + multi_result_map[model_result.engine_name] = result_map + ret = None + if len(resp.outputs) == 1: + ret = list(multi_result_map.values())[0] + else: + ret = multi_result_map + ret["serving_status_code"] = 0 + return ret if not need_variant_tag else [ret, tag] def _done_callback_func(self, fetch, is_python, need_variant_tag): def unpack_resp(resp): - return self._unpack_resp(resp, fetch, is_python, need_variant_tag) + return self._unpack_inference_response(resp, fetch, is_python, + need_variant_tag) return unpack_resp @@ -531,16 +601,20 @@ class MultiLangClient(object): need_variant_tag=False, asyn=False, is_python=True): - req = self._pack_feed_data(feed, fetch, is_python=is_python) + req = self._pack_inference_request(feed, fetch, is_python=is_python) if not asyn: - resp = self.stub_.inference(req) - return self._unpack_resp( - resp, - fetch, - is_python=is_python, - need_variant_tag=need_variant_tag) + try: + resp = self.stub_.Inference(req, timeout=self.rpc_timeout_s_) + return self._unpack_inference_response( + resp, + fetch, + is_python=is_python, + need_variant_tag=need_variant_tag) + except grpc.RpcError as e: + return {"serving_status_code": e.code()} else: - call_future = self.stub_.inference.future(req) + call_future = self.stub_.Inference.future( + req, timeout=self.rpc_timeout_s_) return MultiLangPredictFuture( call_future, self._done_callback_func( @@ -555,7 +629,10 @@ class MultiLangPredictFuture(object): self.callback_func_ = callback_func def result(self): - resp = self.call_future_.result() + try: + resp = self.call_future_.result() + except grpc.RpcError as e: + return {"serving_status_code": e.code()} return self.callback_func_(resp) def add_done_callback(self, fn): diff --git a/python/paddle_serving_client/io/__init__.py b/python/paddle_serving_client/io/__init__.py index 20d29e2bdfe0d2753d2f23cda028d76a3b13c699..69e185be3d2e4d1a579a29d30b59341bfb8666ed 100644 --- a/python/paddle_serving_client/io/__init__.py +++ b/python/paddle_serving_client/io/__init__.py @@ -48,16 +48,18 @@ def save_model(server_model_folder, config = model_conf.GeneralModelConfig() + #int64 = 0; float32 = 1; int32 = 2; for key in feed_var_dict: feed_var = model_conf.FeedVar() feed_var.alias_name = key feed_var.name = feed_var_dict[key].name feed_var.is_lod_tensor = feed_var_dict[key].lod_level >= 1 - if feed_var_dict[key].dtype == core.VarDesc.VarType.INT32 or \ - feed_var_dict[key].dtype == core.VarDesc.VarType.INT64: + if feed_var_dict[key].dtype == core.VarDesc.VarType.INT64: feed_var.feed_type = 0 if feed_var_dict[key].dtype == core.VarDesc.VarType.FP32: feed_var.feed_type = 1 + if feed_var_dict[key].dtype == core.VarDesc.VarType.INT32: + feed_var.feed_type = 2 if feed_var.is_lod_tensor: feed_var.shape.extend([-1]) else: @@ -73,13 +75,12 @@ def save_model(server_model_folder, fetch_var.alias_name = key fetch_var.name = fetch_var_dict[key].name fetch_var.is_lod_tensor = fetch_var_dict[key].lod_level >= 1 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT32 or \ - fetch_var_dict[key].dtype == core.VarDesc.VarType.INT64: + if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT64: fetch_var.fetch_type = 0 - if fetch_var_dict[key].dtype == core.VarDesc.VarType.FP32: fetch_var.fetch_type = 1 - + if fetch_var_dict[key].dtype == core.VarDesc.VarType.INT32: + fetch_var.fetch_type = 2 if fetch_var.is_lod_tensor: fetch_var.shape.extend([-1]) else: diff --git a/python/paddle_serving_server/__init__.py b/python/paddle_serving_server/__init__.py index 3a5c07011ace961fdfb61ebf3217ab1aab375e82..875e275c759d9fb1a9ccb6632816418a75a93aec 100644 --- a/python/paddle_serving_server/__init__.py +++ b/python/paddle_serving_server/__init__.py @@ -231,6 +231,7 @@ class Server(object): self.infer_service_conf.services.extend([infer_service]) def _prepare_resource(self, workdir): + self.workdir = workdir if self.resource_conf == None: with open("{}/{}".format(workdir, self.general_model_config_fn), "w") as fout: @@ -328,10 +329,10 @@ class Server(object): os.chdir(self.module_path) need_download = False device_version = self.get_device_version() - floder_name = device_version + serving_server_version - tar_name = floder_name + ".tar.gz" + folder_name = device_version + serving_server_version + tar_name = folder_name + ".tar.gz" bin_url = "https://paddle-serving.bj.bcebos.com/bin/" + tar_name - self.server_path = os.path.join(self.module_path, floder_name) + self.server_path = os.path.join(self.module_path, folder_name) #acquire lock version_file = open("{}/version.py".format(self.module_path), "r") @@ -357,7 +358,7 @@ class Server(object): os.remove(exe_path) raise SystemExit( 'Decompressing failed, please check your permission of {} or disk space left.'. - foemat(self.module_path)) + format(self.module_path)) finally: os.remove(tar_name) #release lock @@ -374,11 +375,11 @@ class Server(object): os.system("touch {}/fluid_time_file".format(workdir)) if not self.port_is_available(port): - raise SystemExit("Prot {} is already used".format(port)) + raise SystemExit("Port {} is already used".format(port)) + self.set_port(port) self._prepare_resource(workdir) self._prepare_engine(self.model_config_paths, device) self._prepare_infer_service(port) - self.port = port self.workdir = workdir infer_service_fn = "{}/{}".format(workdir, self.infer_service_fn) @@ -440,22 +441,29 @@ class Server(object): os.system(command) -class MultiLangServerService( - multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService): - def __init__(self, model_config_path, endpoints): +class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. + MultiLangGeneralModelServiceServicer): + def __init__(self, model_config_path, is_multi_model, endpoints): + self.is_multi_model_ = is_multi_model + self.model_config_path_ = model_config_path + self.endpoints_ = endpoints + with open(self.model_config_path_) as f: + self.model_config_str_ = str(f.read()) + self._parse_model_config(self.model_config_str_) + self._init_bclient(self.model_config_path_, self.endpoints_) + + def _init_bclient(self, model_config_path, endpoints, timeout_ms=None): from paddle_serving_client import Client - self._parse_model_config(model_config_path) self.bclient_ = Client() - self.bclient_.load_client_config( - "{}/serving_server_conf.prototxt".format(model_config_path)) + if timeout_ms is not None: + self.bclient_.set_rpc_timeout_ms(timeout_ms) + self.bclient_.load_client_config(model_config_path) self.bclient_.connect(endpoints) - def _parse_model_config(self, model_config_path): + def _parse_model_config(self, model_config_str): model_conf = m_config.GeneralModelConfig() - f = open("{}/serving_server_conf.prototxt".format(model_config_path), - 'r') - model_conf = google.protobuf.text_format.Merge( - str(f.read()), model_conf) + model_conf = google.protobuf.text_format.Merge(model_config_str, + model_conf) self.feed_names_ = [var.alias_name for var in model_conf.feed_var] self.feed_types_ = {} self.feed_shapes_ = {} @@ -480,7 +488,7 @@ class MultiLangServerService( else: yield item - def _unpack_request(self, request): + def _unpack_inference_request(self, request): feed_names = list(request.feed_var_names) fetch_names = list(request.fetch_var_names) is_python = request.is_python @@ -492,10 +500,12 @@ class MultiLangServerService( v_type = self.feed_types_[name] data = None if is_python: - if v_type == 0: + if v_type == 0: # int64 data = np.frombuffer(var.data, dtype="int64") - elif v_type == 1: + elif v_type == 1: # float32 data = np.frombuffer(var.data, dtype="float32") + elif v_type == 2: # int32 + data = np.frombuffer(var.data, dtype="int32") else: raise Exception("error type.") else: @@ -503,6 +513,8 @@ class MultiLangServerService( data = np.array(list(var.int64_data), dtype="int64") elif v_type == 1: # float32 data = np.array(list(var.float_data), dtype="float32") + elif v_type == 2: # int32 + data = np.array(list(var.int32_data), dtype="int32") else: raise Exception("error type.") data.shape = list(feed_inst.tensor_array[idx].shape) @@ -510,55 +522,132 @@ class MultiLangServerService( feed_batch.append(feed_dict) return feed_batch, fetch_names, is_python - def _pack_resp_package(self, result, fetch_names, is_python, tag): - resp = multi_lang_general_model_service_pb2.Response() - # Only one model is supported temporarily - model_output = multi_lang_general_model_service_pb2.ModelOutput() - inst = multi_lang_general_model_service_pb2.FetchInst() - for idx, name in enumerate(fetch_names): - tensor = multi_lang_general_model_service_pb2.Tensor() - v_type = self.fetch_types_[name] - if is_python: - tensor.data = result[name].tobytes() - else: - if v_type == 0: # int64 - tensor.int64_data.extend(result[name].reshape(-1).tolist()) - elif v_type == 1: # float32 - tensor.float_data.extend(result[name].reshape(-1).tolist()) - else: - raise Exception("error type.") - tensor.shape.extend(list(result[name].shape)) - if name in self.lod_tensor_set_: - tensor.lod.extend(result["{}.lod".format(name)].tolist()) - inst.tensor_array.append(tensor) - model_output.insts.append(inst) - resp.outputs.append(model_output) + def _pack_inference_response(self, ret, fetch_names, is_python): + resp = multi_lang_general_model_service_pb2.InferenceResponse() + if ret is None: + resp.err_code = 1 + return resp + results, tag = ret resp.tag = tag + resp.err_code = 0 + if not self.is_multi_model_: + results = {'general_infer_0': results} + for model_name, model_result in results.items(): + model_output = multi_lang_general_model_service_pb2.ModelOutput() + inst = multi_lang_general_model_service_pb2.FetchInst() + for idx, name in enumerate(fetch_names): + tensor = multi_lang_general_model_service_pb2.Tensor() + v_type = self.fetch_types_[name] + if is_python: + tensor.data = model_result[name].tobytes() + else: + if v_type == 0: # int64 + tensor.int64_data.extend(model_result[name].reshape(-1) + .tolist()) + elif v_type == 1: # float32 + tensor.float_data.extend(model_result[name].reshape(-1) + .tolist()) + elif v_type == 2: # int32 + tensor.int32_data.extend(model_result[name].reshape(-1) + .tolist()) + else: + raise Exception("error type.") + tensor.shape.extend(list(model_result[name].shape)) + if name in self.lod_tensor_set_: + tensor.lod.extend(model_result["{}.lod".format(name)] + .tolist()) + inst.tensor_array.append(tensor) + model_output.insts.append(inst) + model_output.engine_name = model_name + resp.outputs.append(model_output) + return resp + + def SetTimeout(self, request, context): + # This porcess and Inference process cannot be operate at the same time. + # For performance reasons, do not add thread lock temporarily. + timeout_ms = request.timeout_ms + self._init_bclient(self.model_config_path_, self.endpoints_, timeout_ms) + resp = multi_lang_general_model_service_pb2.SimpleResponse() + resp.err_code = 0 return resp - def inference(self, request, context): - feed_dict, fetch_names, is_python = self._unpack_request(request) - data, tag = self.bclient_.predict( + def Inference(self, request, context): + feed_dict, fetch_names, is_python = self._unpack_inference_request( + request) + ret = self.bclient_.predict( feed=feed_dict, fetch=fetch_names, need_variant_tag=True) - return self._pack_resp_package(data, fetch_names, is_python, tag) + return self._pack_inference_response(ret, fetch_names, is_python) + + def GetClientConfig(self, request, context): + resp = multi_lang_general_model_service_pb2.GetClientConfigResponse() + resp.client_config_str = self.model_config_str_ + return resp class MultiLangServer(object): - def __init__(self, worker_num=2): + def __init__(self): self.bserver_ = Server() - self.worker_num_ = worker_num + self.worker_num_ = 4 + self.body_size_ = 64 * 1024 * 1024 + self.concurrency_ = 100000 + self.is_multi_model_ = False # for model ensemble + + def set_max_concurrency(self, concurrency): + self.concurrency_ = concurrency + self.bserver_.set_max_concurrency(concurrency) + + def set_num_threads(self, threads): + self.worker_num_ = threads + self.bserver_.set_num_threads(threads) + + def set_max_body_size(self, body_size): + self.bserver_.set_max_body_size(body_size) + if body_size >= self.body_size_: + self.body_size_ = body_size + else: + print( + "max_body_size is less than default value, will use default value in service." + ) + + def set_port(self, port): + self.gport_ = port + + def set_reload_interval(self, interval): + self.bserver_.set_reload_interval(interval) def set_op_sequence(self, op_seq): self.bserver_.set_op_sequence(op_seq) - def load_model_config(self, model_config_path): - if not isinstance(model_config_path, str): - raise Exception( - "MultiLangServer only supports multi-model temporarily") - self.bserver_.load_model_config(model_config_path) - self.model_config_path_ = model_config_path + def set_op_graph(self, op_graph): + self.bserver_.set_op_graph(op_graph) + + def set_memory_optimize(self, flag=False): + self.bserver_.set_memory_optimize(flag) + + def set_ir_optimize(self, flag=False): + self.bserver_.set_ir_optimize(flag) + + def set_op_sequence(self, op_seq): + self.bserver_.set_op_sequence(op_seq) + + def use_mkl(self, flag): + self.bserver_.use_mkl(flag) + + def load_model_config(self, server_config_paths, client_config_path=None): + self.bserver_.load_model_config(server_config_paths) + if client_config_path is None: + if isinstance(server_config_paths, dict): + self.is_multi_model_ = True + client_config_path = '{}/serving_server_conf.prototxt'.format( + list(server_config_paths.items())[0][1]) + else: + client_config_path = '{}/serving_server_conf.prototxt'.format( + server_config_paths) + self.bclient_config_path_ = client_config_path def prepare_server(self, workdir=None, port=9292, device="cpu"): + if not self._port_is_available(port): + raise SystemExit("Prot {} is already used".format(port)) default_port = 12000 self.port_list_ = [] for i in range(1000): @@ -568,7 +657,7 @@ class MultiLangServer(object): break self.bserver_.prepare_server( workdir=workdir, port=self.port_list_[0], device=device) - self.gport_ = port + self.set_port(port) def _launch_brpc_service(self, bserver): bserver.run_server() @@ -583,12 +672,16 @@ class MultiLangServer(object): p_bserver = Process( target=self._launch_brpc_service, args=(self.bserver_, )) p_bserver.start() + options = [('grpc.max_send_message_length', self.body_size_), + ('grpc.max_receive_message_length', self.body_size_)] server = grpc.server( - futures.ThreadPoolExecutor(max_workers=self.worker_num_)) + futures.ThreadPoolExecutor(max_workers=self.worker_num_), + options=options, + maximum_concurrent_rpcs=self.concurrency_) multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server( - MultiLangServerService(self.model_config_path_, - ["0.0.0.0:{}".format(self.port_list_[0])]), - server) + MultiLangServerServiceServicer( + self.bclient_config_path_, self.is_multi_model_, + ["0.0.0.0:{}".format(self.port_list_[0])]), server) server.add_insecure_port('[::]:{}'.format(self.gport_)) server.start() p_bserver.join() diff --git a/python/paddle_serving_server/serve.py b/python/paddle_serving_server/serve.py index e67cba7cd2bb89a8126c0a74393bdcec648eee17..009a6ce00af2290b64716e211429385d09189831 100644 --- a/python/paddle_serving_server/serve.py +++ b/python/paddle_serving_server/serve.py @@ -55,7 +55,8 @@ def parse_args(): # pylint: disable=doc-string-missing help="Limit sizes of messages") parser.add_argument( "--use_multilang", - action='store_true', + default=False, + action="store_true", help="Use Multi-language-service") return parser.parse_args() @@ -91,16 +92,15 @@ def start_standard_model(): # pylint: disable=doc-string-missing server = None if use_multilang: server = serving.MultiLangServer() - server.set_op_sequence(op_seq_maker.get_op_sequence()) else: server = serving.Server() - server.set_op_sequence(op_seq_maker.get_op_sequence()) - server.set_num_threads(thread_num) - server.set_memory_optimize(mem_optim) - server.set_ir_optimize(ir_optim) - server.use_mkl(use_mkl) - server.set_max_body_size(max_body_size) - server.set_port(port) + server.set_op_sequence(op_seq_maker.get_op_sequence()) + server.set_num_threads(thread_num) + server.set_memory_optimize(mem_optim) + server.set_ir_optimize(ir_optim) + server.use_mkl(use_mkl) + server.set_max_body_size(max_body_size) + server.set_port(port) server.load_model_config(model) server.prepare_server(workdir=workdir, port=port, device=device) diff --git a/python/paddle_serving_server/web_service.py b/python/paddle_serving_server/web_service.py index b3fcc1b880fcbffa1da884e4b68350c1870997c1..ea43c8cb18731f60d905ccbe4bada605709d9c11 100755 --- a/python/paddle_serving_server/web_service.py +++ b/python/paddle_serving_server/web_service.py @@ -85,9 +85,9 @@ class WebService(object): fetch_map = self.client.predict(feed=feed, fetch=fetch) for key in fetch_map: fetch_map[key] = fetch_map[key].tolist() - fetch_map = self.postprocess( + result = self.postprocess( feed=request.json["feed"], fetch=fetch, fetch_map=fetch_map) - result = {"result": fetch_map} + result = {"result": result} except ValueError: result = {"result": "Request Value Error"} return result diff --git a/python/paddle_serving_server_gpu/__init__.py b/python/paddle_serving_server_gpu/__init__.py index cc736a52e82637509d7d2a49efe9685f47c99e16..26288f6ae65ce823a57ee201130d40ff6510c4a5 100644 --- a/python/paddle_serving_server_gpu/__init__.py +++ b/python/paddle_serving_server_gpu/__init__.py @@ -70,7 +70,8 @@ def serve_args(): help="Limit sizes of messages") parser.add_argument( "--use_multilang", - action='store_true', + default=False, + action="store_true", help="Use Multi-language-service") return parser.parse_args() @@ -414,7 +415,7 @@ class Server(object): os.system("touch {}/fluid_time_file".format(workdir)) if not self.port_is_available(port): - raise SystemExit("Prot {} is already used".format(port)) + raise SystemExit("Port {} is already used".format(port)) self.set_port(port) self._prepare_resource(workdir) @@ -488,22 +489,29 @@ class Server(object): os.system(command) -class MultiLangServerService( - multi_lang_general_model_service_pb2_grpc.MultiLangGeneralModelService): - def __init__(self, model_config_path, endpoints): +class MultiLangServerServiceServicer(multi_lang_general_model_service_pb2_grpc. + MultiLangGeneralModelServiceServicer): + def __init__(self, model_config_path, is_multi_model, endpoints): + self.is_multi_model_ = is_multi_model + self.model_config_path_ = model_config_path + self.endpoints_ = endpoints + with open(self.model_config_path_) as f: + self.model_config_str_ = str(f.read()) + self._parse_model_config(self.model_config_str_) + self._init_bclient(self.model_config_path_, self.endpoints_) + + def _init_bclient(self, model_config_path, endpoints, timeout_ms=None): from paddle_serving_client import Client - self._parse_model_config(model_config_path) self.bclient_ = Client() - self.bclient_.load_client_config( - "{}/serving_server_conf.prototxt".format(model_config_path)) + if timeout_ms is not None: + self.bclient_.set_rpc_timeout_ms(timeout_ms) + self.bclient_.load_client_config(model_config_path) self.bclient_.connect(endpoints) - def _parse_model_config(self, model_config_path): + def _parse_model_config(self, model_config_str): model_conf = m_config.GeneralModelConfig() - f = open("{}/serving_server_conf.prototxt".format(model_config_path), - 'r') - model_conf = google.protobuf.text_format.Merge( - str(f.read()), model_conf) + model_conf = google.protobuf.text_format.Merge(model_config_str, + model_conf) self.feed_names_ = [var.alias_name for var in model_conf.feed_var] self.feed_types_ = {} self.feed_shapes_ = {} @@ -528,7 +536,7 @@ class MultiLangServerService( else: yield item - def _unpack_request(self, request): + def _unpack_inference_request(self, request): feed_names = list(request.feed_var_names) fetch_names = list(request.fetch_var_names) is_python = request.is_python @@ -544,6 +552,8 @@ class MultiLangServerService( data = np.frombuffer(var.data, dtype="int64") elif v_type == 1: data = np.frombuffer(var.data, dtype="float32") + elif v_type == 2: + data = np.frombuffer(var.data, dtype="int32") else: raise Exception("error type.") else: @@ -551,6 +561,8 @@ class MultiLangServerService( data = np.array(list(var.int64_data), dtype="int64") elif v_type == 1: # float32 data = np.array(list(var.float_data), dtype="float32") + elif v_type == 2: + data = np.array(list(var.int32_data), dtype="int32") else: raise Exception("error type.") data.shape = list(feed_inst.tensor_array[idx].shape) @@ -558,55 +570,129 @@ class MultiLangServerService( feed_batch.append(feed_dict) return feed_batch, fetch_names, is_python - def _pack_resp_package(self, result, fetch_names, is_python, tag): - resp = multi_lang_general_model_service_pb2.Response() - # Only one model is supported temporarily - model_output = multi_lang_general_model_service_pb2.ModelOutput() - inst = multi_lang_general_model_service_pb2.FetchInst() - for idx, name in enumerate(fetch_names): - tensor = multi_lang_general_model_service_pb2.Tensor() - v_type = self.fetch_types_[name] - if is_python: - tensor.data = result[name].tobytes() - else: - if v_type == 0: # int64 - tensor.int64_data.extend(result[name].reshape(-1).tolist()) - elif v_type == 1: # float32 - tensor.float_data.extend(result[name].reshape(-1).tolist()) - else: - raise Exception("error type.") - tensor.shape.extend(list(result[name].shape)) - if name in self.lod_tensor_set_: - tensor.lod.extend(result["{}.lod".format(name)].tolist()) - inst.tensor_array.append(tensor) - model_output.insts.append(inst) - resp.outputs.append(model_output) + def _pack_inference_response(self, ret, fetch_names, is_python): + resp = multi_lang_general_model_service_pb2.InferenceResponse() + if ret is None: + resp.err_code = 1 + return resp + results, tag = ret resp.tag = tag + resp.err_code = 0 + if not self.is_multi_model_: + results = {'general_infer_0': results} + for model_name, model_result in results.items(): + model_output = multi_lang_general_model_service_pb2.ModelOutput() + inst = multi_lang_general_model_service_pb2.FetchInst() + for idx, name in enumerate(fetch_names): + tensor = multi_lang_general_model_service_pb2.Tensor() + v_type = self.fetch_types_[name] + if is_python: + tensor.data = model_result[name].tobytes() + else: + if v_type == 0: # int64 + tensor.int64_data.extend(model_result[name].reshape(-1) + .tolist()) + elif v_type == 1: # float32 + tensor.float_data.extend(model_result[name].reshape(-1) + .tolist()) + elif v_type == 2: # int32 + tensor.int32_data.extend(model_result[name].reshape(-1) + .tolist()) + else: + raise Exception("error type.") + tensor.shape.extend(list(model_result[name].shape)) + if name in self.lod_tensor_set_: + tensor.lod.extend(model_result["{}.lod".format(name)] + .tolist()) + inst.tensor_array.append(tensor) + model_output.insts.append(inst) + model_output.engine_name = model_name + resp.outputs.append(model_output) + return resp + + def SetTimeout(self, request, context): + # This porcess and Inference process cannot be operate at the same time. + # For performance reasons, do not add thread lock temporarily. + timeout_ms = request.timeout_ms + self._init_bclient(self.model_config_path_, self.endpoints_, timeout_ms) + resp = multi_lang_general_model_service_pb2.SimpleResponse() + resp.err_code = 0 return resp - def inference(self, request, context): - feed_dict, fetch_names, is_python = self._unpack_request(request) - data, tag = self.bclient_.predict( + def Inference(self, request, context): + feed_dict, fetch_names, is_python = self._unpack_inference_request( + request) + ret = self.bclient_.predict( feed=feed_dict, fetch=fetch_names, need_variant_tag=True) - return self._pack_resp_package(data, fetch_names, is_python, tag) + return self._pack_inference_response(ret, fetch_names, is_python) + + def GetClientConfig(self, request, context): + resp = multi_lang_general_model_service_pb2.GetClientConfigResponse() + resp.client_config_str = self.model_config_str_ + return resp class MultiLangServer(object): - def __init__(self, worker_num=2): + def __init__(self): self.bserver_ = Server() - self.worker_num_ = worker_num + self.worker_num_ = 4 + self.body_size_ = 64 * 1024 * 1024 + self.concurrency_ = 100000 + self.is_multi_model_ = False # for model ensemble + + def set_max_concurrency(self, concurrency): + self.concurrency_ = concurrency + self.bserver_.set_max_concurrency(concurrency) + + def set_num_threads(self, threads): + self.worker_num_ = threads + self.bserver_.set_num_threads(threads) + + def set_max_body_size(self, body_size): + self.bserver_.set_max_body_size(body_size) + if body_size >= self.body_size_: + self.body_size_ = body_size + else: + print( + "max_body_size is less than default value, will use default value in service." + ) + + def set_port(self, port): + self.gport_ = port + + def set_reload_interval(self, interval): + self.bserver_.set_reload_interval(interval) def set_op_sequence(self, op_seq): self.bserver_.set_op_sequence(op_seq) - def load_model_config(self, model_config_path): - if not isinstance(model_config_path, str): - raise Exception( - "MultiLangServer only supports multi-model temporarily") - self.bserver_.load_model_config(model_config_path) - self.model_config_path_ = model_config_path + def set_op_graph(self, op_graph): + self.bserver_.set_op_graph(op_graph) + + def set_memory_optimize(self, flag=False): + self.bserver_.set_memory_optimize(flag) + + def set_ir_optimize(self, flag=False): + self.bserver_.set_ir_optimize(flag) + + def set_gpuid(self, gpuid=0): + self.bserver_.set_gpuid(gpuid) + + def load_model_config(self, server_config_paths, client_config_path=None): + self.bserver_.load_model_config(server_config_paths) + if client_config_path is None: + if isinstance(server_config_paths, dict): + self.is_multi_model_ = True + client_config_path = '{}/serving_server_conf.prototxt'.format( + list(server_config_paths.items())[0][1]) + else: + client_config_path = '{}/serving_server_conf.prototxt'.format( + server_config_paths) + self.bclient_config_path_ = client_config_path def prepare_server(self, workdir=None, port=9292, device="cpu"): + if not self._port_is_available(port): + raise SystemExit("Prot {} is already used".format(port)) default_port = 12000 self.port_list_ = [] for i in range(1000): @@ -616,7 +702,7 @@ class MultiLangServer(object): break self.bserver_.prepare_server( workdir=workdir, port=self.port_list_[0], device=device) - self.gport_ = port + self.set_port(port) def _launch_brpc_service(self, bserver): bserver.run_server() @@ -631,12 +717,16 @@ class MultiLangServer(object): p_bserver = Process( target=self._launch_brpc_service, args=(self.bserver_, )) p_bserver.start() + options = [('grpc.max_send_message_length', self.body_size_), + ('grpc.max_receive_message_length', self.body_size_)] server = grpc.server( - futures.ThreadPoolExecutor(max_workers=self.worker_num_)) + futures.ThreadPoolExecutor(max_workers=self.worker_num_), + options=options, + maximum_concurrent_rpcs=self.concurrency_) multi_lang_general_model_service_pb2_grpc.add_MultiLangGeneralModelServiceServicer_to_server( - MultiLangServerService(self.model_config_path_, - ["0.0.0.0:{}".format(self.port_list_[0])]), - server) + MultiLangServerServiceServicer( + self.bclient_config_path_, self.is_multi_model_, + ["0.0.0.0:{}".format(self.port_list_[0])]), server) server.add_insecure_port('[::]:{}'.format(self.gport_)) server.start() p_bserver.join() diff --git a/python/paddle_serving_server_gpu/serve.py b/python/paddle_serving_server_gpu/serve.py index 0769039ef9955aea69af7ab84142a1f735ce4697..e26b32c2699d09b714b2658cafad0ae8c5138071 100644 --- a/python/paddle_serving_server_gpu/serve.py +++ b/python/paddle_serving_server_gpu/serve.py @@ -37,6 +37,7 @@ def start_gpu_card_model(index, gpuid, args): # pylint: disable=doc-string-miss mem_optim = args.mem_optim ir_optim = args.ir_optim max_body_size = args.max_body_size + use_multilang = args.use_multilang workdir = "{}_{}".format(args.workdir, gpuid) if model == "": @@ -54,26 +55,20 @@ def start_gpu_card_model(index, gpuid, args): # pylint: disable=doc-string-miss op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_response_op) - use_multilang = args.use_multilang if use_multilang: server = serving.MultiLangServer() - server.set_op_sequence(op_seq_maker.get_op_sequence()) - server.load_model_config(model) - server.prepare_server(workdir=workdir, port=port, device=device) - if gpuid >= 0: - raise ValueError("gpuid can not >= 0 in MultiLangServer") else: server = serving.Server() - server.set_op_sequence(op_seq_maker.get_op_sequence()) - server.set_num_threads(thread_num) - server.set_memory_optimize(mem_optim) - server.set_ir_optimize(ir_optim) - server.set_max_body_size(max_body_size) - - server.load_model_config(model) - server.prepare_server(workdir=workdir, port=port, device=device) - if gpuid >= 0: - server.set_gpuid(gpuid) + server.set_op_sequence(op_seq_maker.get_op_sequence()) + server.set_num_threads(thread_num) + server.set_memory_optimize(mem_optim) + server.set_ir_optimize(ir_optim) + server.set_max_body_size(max_body_size) + + server.load_model_config(model) + server.prepare_server(workdir=workdir, port=port, device=device) + if gpuid >= 0: + server.set_gpuid(gpuid) server.run_server() diff --git a/python/paddle_serving_server_gpu/web_service.py b/python/paddle_serving_server_gpu/web_service.py index 76721de8a005dfb23fbe2427671446889aa72af1..0eff9c72df84b30ded7dbc7c2e82a96bbd591162 100644 --- a/python/paddle_serving_server_gpu/web_service.py +++ b/python/paddle_serving_server_gpu/web_service.py @@ -50,12 +50,12 @@ class WebService(object): general_infer_op = op_maker.create('general_infer') general_response_op = op_maker.create('general_response') - op_seq_maker = serving.OpSeqMaker() + op_seq_maker = OpSeqMaker() op_seq_maker.add_op(read_op) op_seq_maker.add_op(general_infer_op) op_seq_maker.add_op(general_response_op) - server = serving.Server() + server = Server() server.set_op_sequence(op_seq_maker.get_op_sequence()) server.set_num_threads(thread_num) @@ -171,7 +171,7 @@ class WebService(object): processes=1) def get_app_instance(self): - return app_instance + return self.app_instance def preprocess(self, feed=[], fetch=[]): return feed, fetch diff --git a/tools/Dockerfile b/tools/Dockerfile index 3c701725400350247153f828410d06cec69856f5..dd18a773562bd078771d7df44123ac530764af93 100644 --- a/tools/Dockerfile +++ b/tools/Dockerfile @@ -7,8 +7,9 @@ RUN yum -y install wget && \ yum -y install libXrender-0.9.10-1.el7.x86_64 --setopt=protected_multilib=false && \ yum -y install libXext-1.3.3-3.el7.x86_64 --setopt=protected_multilib=false && \ yum -y install python3 python3-devel && \ - yum clean all && \ - curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py && \ + yum clean all + +RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py && \ python get-pip.py && rm get-pip.py && \ localedef -c -i en_US -f UTF-8 en_US.UTF-8 && \ echo "export LANG=en_US.utf8" >> /root/.bashrc diff --git a/tools/Dockerfile.devel b/tools/Dockerfile.devel index e4bcd33534cb9e887f49fcba5029619aaa1dea4c..dc00384e39bb742400fee74663a551cf44019d61 100644 --- a/tools/Dockerfile.devel +++ b/tools/Dockerfile.devel @@ -1,26 +1,31 @@ FROM centos:7.3.1611 -RUN yum -y install wget >/dev/null \ - && yum -y install gcc gcc-c++ make glibc-static which >/dev/null \ - && yum -y install git openssl-devel curl-devel bzip2-devel python-devel >/dev/null \ - && wget https://cmake.org/files/v3.2/cmake-3.2.0-Linux-x86_64.tar.gz >/dev/null \ +RUN yum -y install wget \ + && yum -y install gcc gcc-c++ make glibc-static which \ + && yum -y install git openssl-devel curl-devel bzip2-devel python-devel + +RUN wget https://cmake.org/files/v3.2/cmake-3.2.0-Linux-x86_64.tar.gz >/dev/null \ && tar xzf cmake-3.2.0-Linux-x86_64.tar.gz \ && mv cmake-3.2.0-Linux-x86_64 /usr/local/cmake3.2.0 \ && echo 'export PATH=/usr/local/cmake3.2.0/bin:$PATH' >> /root/.bashrc \ - && rm cmake-3.2.0-Linux-x86_64.tar.gz \ - && wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz >/dev/null \ + && rm cmake-3.2.0-Linux-x86_64.tar.gz + +RUN wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz >/dev/null \ && tar xzf go1.14.linux-amd64.tar.gz \ && mv go /usr/local/go \ && echo 'export GOROOT=/usr/local/go' >> /root/.bashrc \ && echo 'export PATH=/usr/local/go/bin:$PATH' >> /root/.bashrc \ - && rm go1.14.linux-amd64.tar.gz \ - && yum -y install python-devel sqlite-devel >/dev/null \ + && rm go1.14.linux-amd64.tar.gz + +RUN yum -y install python-devel sqlite-devel \ && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py >/dev/null \ && python get-pip.py >/dev/null \ && pip install google protobuf setuptools wheel flask >/dev/null \ - && rm get-pip.py \ - && yum install -y python3 python3-devel \ + && rm get-pip.py + +RUN yum install -y python3 python3-devel \ && pip3 install google protobuf setuptools wheel flask \ && yum -y install epel-release && yum -y install patchelf libXext libSM libXrender\ - && yum clean all \ - && localedef -c -i en_US -f UTF-8 en_US.UTF-8 \ + && yum clean all + +RUN localedef -c -i en_US -f UTF-8 en_US.UTF-8 \ && echo "export LANG=en_US.utf8" >> /root/.bashrc diff --git a/tools/Dockerfile.gpu b/tools/Dockerfile.gpu index 2f38a3a3cd1c8987d34a81259ec9ad6ba67156a7..adb8e73f86a8fa436de3844a60f08ab22df0177e 100644 --- a/tools/Dockerfile.gpu +++ b/tools/Dockerfile.gpu @@ -8,10 +8,12 @@ RUN yum -y install wget && \ yum -y install libXrender-0.9.10-1.el7.x86_64 --setopt=protected_multilib=false && \ yum -y install libXext-1.3.3-3.el7.x86_64 --setopt=protected_multilib=false && \ yum -y install python3 python3-devel && \ - yum clean all && \ - curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py && \ - python get-pip.py && rm get-pip.py && \ - ln -s /usr/local/cuda-9.0/lib64/libcublas.so.9.0 /usr/local/cuda-9.0/lib64/libcublas.so && \ + yum clean all + +RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py && \ + python get-pip.py && rm get-pip.py + +RUN ln -s /usr/local/cuda-9.0/lib64/libcublas.so.9.0 /usr/local/cuda-9.0/lib64/libcublas.so && \ echo 'export LD_LIBRARY_PATH=/usr/local/cuda/lib64:$LD_LIBRARY_PATH' >> /root/.bashrc && \ ln -s /usr/local/cuda-9.0/targets/x86_64-linux/lib/libcudnn.so.7 /usr/local/cuda-9.0/targets/x86_64-linux/lib/libcudnn.so && \ echo 'export LD_LIBRARY_PATH=/usr/local/cuda-9.0/targets/x86_64-linux/lib:$LD_LIBRARY_PATH' >> /root/.bashrc && \ diff --git a/tools/Dockerfile.gpu.devel b/tools/Dockerfile.gpu.devel index 057201cefa1f8de7a105ea9b7f93e7ca9e342777..583b0566edf85f56c5fcc6f9f36dce6430ba7941 100644 --- a/tools/Dockerfile.gpu.devel +++ b/tools/Dockerfile.gpu.devel @@ -1,26 +1,31 @@ FROM nvidia/cuda:9.0-cudnn7-devel-centos7 - RUN yum -y install wget >/dev/null \ - && yum -y install gcc gcc-c++ make glibc-static which >/dev/null \ - && yum -y install git openssl-devel curl-devel bzip2-devel python-devel >/dev/null \ - && wget https://cmake.org/files/v3.2/cmake-3.2.0-Linux-x86_64.tar.gz >/dev/null \ + && yum -y install gcc gcc-c++ make glibc-static which \ + && yum -y install git openssl-devel curl-devel bzip2-devel python-devel + +RUN wget https://cmake.org/files/v3.2/cmake-3.2.0-Linux-x86_64.tar.gz >/dev/null \ && tar xzf cmake-3.2.0-Linux-x86_64.tar.gz \ && mv cmake-3.2.0-Linux-x86_64 /usr/local/cmake3.2.0 \ && echo 'export PATH=/usr/local/cmake3.2.0/bin:$PATH' >> /root/.bashrc \ - && rm cmake-3.2.0-Linux-x86_64.tar.gz \ - && wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz >/dev/null \ + && rm cmake-3.2.0-Linux-x86_64.tar.gz + +RUN wget https://dl.google.com/go/go1.14.linux-amd64.tar.gz >/dev/null \ && tar xzf go1.14.linux-amd64.tar.gz \ && mv go /usr/local/go \ && echo 'export GOROOT=/usr/local/go' >> /root/.bashrc \ && echo 'export PATH=/usr/local/go/bin:$PATH' >> /root/.bashrc \ - && rm go1.14.linux-amd64.tar.gz \ - && yum -y install python-devel sqlite-devel >/dev/null \ + && rm go1.14.linux-amd64.tar.gz + +RUN yum -y install python-devel sqlite-devel \ && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py >/dev/null \ && python get-pip.py >/dev/null \ && pip install google protobuf setuptools wheel flask >/dev/null \ - && rm get-pip.py \ - && yum install -y python3 python3-devel \ + && rm get-pip.py + +RUN yum install -y python3 python3-devel \ && pip3 install google protobuf setuptools wheel flask \ && yum -y install epel-release && yum -y install patchelf libXext libSM libXrender\ - && yum clean all \ + && yum clean all + +RUN localedef -c -i en_US -f UTF-8 en_US.UTF-8 \ && echo "export LANG=en_US.utf8" >> /root/.bashrc diff --git a/tools/serving_build.sh b/tools/serving_build.sh index 989e48ead9864e717e573f7f0800a1afba2e934a..097123165988fb266f7c4a3a0da603ade6d98be1 100644 --- a/tools/serving_build.sh +++ b/tools/serving_build.sh @@ -134,6 +134,7 @@ function build_server() { function kill_server_process() { ps -ef | grep "serving" | grep -v serving_build | grep -v grep | awk '{print $2}' | xargs kill + sleep 1 } function python_test_fit_a_line() { @@ -246,6 +247,7 @@ function python_run_criteo_ctr_with_cube() { echo "criteo_ctr_with_cube inference auc test success" kill_server_process ps -ef | grep "cube" | grep -v grep | awk '{print $2}' | xargs kill + sleep 1 ;; GPU) check_cmd "wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz" @@ -261,6 +263,8 @@ function python_run_criteo_ctr_with_cube() { check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/" python test_server_gpu.py ctr_serving_model_kv & sleep 5 + # for warm up + python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data > /dev/null || true check_cmd "python test_client.py ctr_client_conf/serving_client_conf.prototxt ./ut_data >score" tail -n 2 score | awk 'NR==1' AUC=$(tail -n 2 score | awk 'NR==1') @@ -273,6 +277,7 @@ function python_run_criteo_ctr_with_cube() { echo "criteo_ctr_with_cube inference auc test success" kill_server_process ps -ef | grep "cube" | grep -v grep | awk '{print $2}' | xargs kill + sleep 1 ;; *) echo "error type" @@ -484,6 +489,7 @@ function python_test_lac() { setproxy # recover proxy state kill_server_process ps -ef | grep "lac_web_service" | grep -v grep | awk '{print $2}' | xargs kill + sleep 1 echo "lac CPU HTTP inference pass" ;; GPU) @@ -499,6 +505,178 @@ function python_test_lac() { cd .. } +function python_test_grpc_impl() { + # pwd: /Serving/python/examples + cd grpc_impl_example # pwd: /Serving/python/examples/grpc_impl_example + local TYPE=$1 + export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving + unsetproxy + case $TYPE in + CPU) + # test general case + cd fit_a_line # pwd: /Serving/python/examples/grpc_impl_example/fit_a_line + sh get_data.sh + + # one line command start + check_cmd "python -m paddle_serving_server.serve --model uci_housing_model --port 9393 --thread 4 --use_multilang > /dev/null &" + sleep 5 # wait for the server to start + check_cmd "python test_sync_client.py > /dev/null" + check_cmd "python test_asyn_client.py > /dev/null" + check_cmd "python test_general_pb_client.py > /dev/null" + check_cmd "python test_numpy_input_client.py > /dev/null" + check_cmd "python test_batch_client.py > /dev/null" + check_cmd "python test_timeout_client.py > /dev/null" + kill_server_process + + check_cmd "python test_server.py uci_housing_model > /dev/null &" + sleep 5 # wait for the server to start + check_cmd "python test_sync_client.py > /dev/null" + check_cmd "python test_asyn_client.py > /dev/null" + check_cmd "python test_general_pb_client.py > /dev/null" + check_cmd "python test_numpy_input_client.py > /dev/null" + check_cmd "python test_batch_client.py > /dev/null" + check_cmd "python test_timeout_client.py > /dev/null" + kill_server_process + + cd .. # pwd: /Serving/python/examples/grpc_impl_example + + # test load server config and client config in Server side + cd criteo_ctr_with_cube # pwd: /Serving/python/examples/grpc_impl_example/criteo_ctr_with_cube + + check_cmd "wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz" + check_cmd "tar xf ctr_cube_unittest.tar.gz" + check_cmd "mv models/ctr_client_conf ./" + check_cmd "mv models/ctr_serving_model_kv ./" + check_cmd "mv models/data ./cube/" + check_cmd "mv models/ut_data ./" + cp ../../../../build-server-$TYPE/output/bin/cube* ./cube/ + sh cube_prepare.sh & + check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/" + python test_server.py ctr_serving_model_kv ctr_client_conf/serving_client_conf.prototxt & + sleep 5 + check_cmd "python test_client.py ./ut_data >score" + tail -n 2 score | awk 'NR==1' + AUC=$(tail -n 2 score | awk 'NR==1') + VAR2="0.67" #TODO: temporarily relax the threshold to 0.67 + RES=$( echo "$AUC>$VAR2" | bc ) + if [[ $RES -eq 0 ]]; then + echo "error with criteo_ctr_with_cube inference auc test, auc should > 0.67" + exit 1 + fi + echo "grpc impl test success" + kill_server_process + ps -ef | grep "cube" | grep -v grep | awk '{print $2}' | xargs kill + + cd .. # pwd: /Serving/python/examples/grpc_impl_example + ;; + GPU) + export CUDA_VISIBLE_DEVICES=0 + # test general case + cd fit_a_line # pwd: /Serving/python/examples/grpc_impl_example/fit_a_line + sh get_data.sh + + # one line command start + check_cmd "python -m paddle_serving_server_gpu.serve --model uci_housing_model --port 9393 --thread 4 --gpu_ids 0 --use_multilang > /dev/null &" + sleep 5 # wait for the server to start + check_cmd "python test_sync_client.py > /dev/null" + check_cmd "python test_asyn_client.py > /dev/null" + check_cmd "python test_general_pb_client.py > /dev/null" + check_cmd "python test_numpy_input_client.py > /dev/null" + check_cmd "python test_batch_client.py > /dev/null" + check_cmd "python test_timeout_client.py > /dev/null" + kill_server_process + + check_cmd "python test_server_gpu.py uci_housing_model > /dev/null &" + sleep 5 # wait for the server to start + check_cmd "python test_sync_client.py > /dev/null" + check_cmd "python test_asyn_client.py > /dev/null" + check_cmd "python test_general_pb_client.py > /dev/null" + check_cmd "python test_numpy_input_client.py > /dev/null" + check_cmd "python test_batch_client.py > /dev/null" + check_cmd "python test_timeout_client.py > /dev/null" + kill_server_process + ps -ef | grep "test_server_gpu" | grep -v serving_build | grep -v grep | awk '{print $2}' | xargs kill + + cd .. # pwd: /Serving/python/examples/grpc_impl_example + + # test load server config and client config in Server side + cd criteo_ctr_with_cube # pwd: /Serving/python/examples/grpc_impl_example/criteo_ctr_with_cube + + check_cmd "wget https://paddle-serving.bj.bcebos.com/unittest/ctr_cube_unittest.tar.gz" + check_cmd "tar xf ctr_cube_unittest.tar.gz" + check_cmd "mv models/ctr_client_conf ./" + check_cmd "mv models/ctr_serving_model_kv ./" + check_cmd "mv models/data ./cube/" + check_cmd "mv models/ut_data ./" + cp ../../../../build-server-$TYPE/output/bin/cube* ./cube/ + sh cube_prepare.sh & + check_cmd "mkdir work_dir1 && cp cube/conf/cube.conf ./work_dir1/" + python test_server_gpu.py ctr_serving_model_kv ctr_client_conf/serving_client_conf.prototxt & + sleep 5 + # for warm up + python test_client.py ./ut_data &> /dev/null || true + check_cmd "python test_client.py ./ut_data >score" + tail -n 2 score | awk 'NR==1' + AUC=$(tail -n 2 score | awk 'NR==1') + VAR2="0.67" #TODO: temporarily relax the threshold to 0.67 + RES=$( echo "$AUC>$VAR2" | bc ) + if [[ $RES -eq 0 ]]; then + echo "error with criteo_ctr_with_cube inference auc test, auc should > 0.67" + exit 1 + fi + echo "grpc impl test success" + kill_server_process + ps -ef | grep "test_server_gpu" | grep -v serving_build | grep -v grep | awk '{print $2}' | xargs kill + ps -ef | grep "cube" | grep -v grep | awk '{print $2}' | xargs kill + cd .. # pwd: /Serving/python/examples/grpc_impl_example + ;; + *) + echo "error type" + exit 1 + ;; + esac + echo "test grpc impl $TYPE part finished as expected." + setproxy + unset SERVING_BIN + cd .. # pwd: /Serving/python/examples +} + + +function python_test_yolov4(){ + #pwd:/ Serving/python/examples + local TYPE=$1 + export SERVING_BIN=${SERVING_WORKDIR}/build-server-${TYPE}/core/general-server/serving + cd yolov4 + case $TYPE in + CPU) + python -m paddle_serving_app.package --get_model yolov4 + tar -xzvf yolov4.tar.gz + check_cmd "python -m paddle_serving_server.serve --model yolov4_model/ --port 9393 &" + sleep 5 + check_cmd "python test_client.py 000000570688.jpg" + echo "yolov4 CPU RPC inference pass" + kill_server_process + ;; + GPU) + python -m paddle_serving_app.package --get_model yolov4 + tar -xzvf yolov4.tar.gz + check_cmd "python -m paddle_serving_server_gpu.serve --model yolov4_model/ --port 9393 --gpu_ids 0 &" + sleep 5 + check_cmd "python test_client.py 000000570688.jpg" + echo "yolov4 GPU RPC inference pass" + kill_server_process + ;; + *) + echo "error type" + exit 1 + ;; + esac + echo "test yolov4 $TYPE finished as expected." + unset SERVING_BIN + cd .. +} + + function python_run_test() { # Using the compiled binary local TYPE=$1 # pwd: /Serving @@ -510,6 +688,8 @@ function python_run_test() { python_test_lac $TYPE # pwd: /Serving/python/examples python_test_multi_process $TYPE # pwd: /Serving/python/examples python_test_multi_fetch $TYPE # pwd: /Serving/python/examples + python_test_yolov4 $TYPE # pwd: /Serving/python/examples + python_test_grpc_impl $TYPE # pwd: /Serving/python/examples echo "test python $TYPE part finished as expected." cd ../.. # pwd: /Serving } @@ -768,3 +948,4 @@ function main() { } main $@ +exit 0