diff --git a/core/general-client/include/general_model.h b/core/general-client/include/general_model.h index cec57bdd9e5ae955586693a2ad5a5143e5e4b74b..f7363d46afc4031acff437425181cdb7d3b61e55 100644 --- a/core/general-client/include/general_model.h +++ b/core/general-client/include/general_model.h @@ -18,9 +18,9 @@ #include #include +#include #include #include -#include #include "core/sdk-cpp/builtin_format.pb.h" #include "core/sdk-cpp/general_model_service.pb.h" @@ -37,46 +37,52 @@ namespace general_model { typedef std::map> FetchedMap; -typedef std::map > > - BatchFetchedMap; +typedef std::map>> BatchFetchedMap; class PredictorClient { public: PredictorClient() {} ~PredictorClient() {} - void init(const std::string & client_conf); + void init(const std::string& client_conf); - void set_predictor_conf( - const std::string& conf_path, - const std::string& conf_file); + void set_predictor_conf(const std::string& conf_path, + const std::string& conf_file); int create_predictor(); - std::vector > predict( - const std::vector > & float_feed, - const std::vector & float_feed_name, - const std::vector > & int_feed, - const std::vector & int_feed_name, - const std::vector & fetch_name); - - std::vector > predict_with_profile( - const std::vector > & float_feed, - const std::vector & float_feed_name, - const std::vector > & int_feed, - const std::vector & int_feed_name, - const std::vector & fetch_name); + std::vector> predict( + const std::vector>& float_feed, + const std::vector& float_feed_name, + const std::vector>& int_feed, + const std::vector& int_feed_name, + const std::vector& fetch_name); + + std::vector>> predict_for_batch( + const std::vector>>& float_feed_batch, + const std::vector& float_feed_name, + const std::vector>>& int_feed_batch, + const std::vector& int_feed_name, + const std::vector& fetch_name, + const int64_t& batch_size); + + std::vector> predict_with_profile( + const std::vector>& float_feed, + const std::vector& float_feed_name, + const std::vector>& int_feed, + const std::vector& int_feed_name, + const std::vector& fetch_name); private: PredictorApi _api; - Predictor * _predictor; + Predictor* _predictor; std::string _predictor_conf; std::string _predictor_path; std::string _conf_file; std::map _feed_name_to_idx; std::map _fetch_name_to_idx; std::map _fetch_name_to_var_name; - std::vector > _shape; + std::vector> _shape; std::vector _type; }; diff --git a/core/general-client/src/general_model.cpp b/core/general-client/src/general_model.cpp index 2b18543c8d278eae96adf144d4b0d108e542c296..42ac29ee9b7cc84c41ff7185334dfd262e6ddce1 100644 --- a/core/general-client/src/general_model.cpp +++ b/core/general-client/src/general_model.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include "core/general-client/include/general_model.h" +#include #include "core/sdk-cpp/builtin_format.pb.h" #include "core/sdk-cpp/include/common.h" #include "core/sdk-cpp/include/predictor_sdk.h" @@ -28,7 +28,7 @@ namespace baidu { namespace paddle_serving { namespace general_model { -void PredictorClient::init(const std::string & conf_file) { +void PredictorClient::init(const std::string &conf_file) { _conf_file = conf_file; std::ifstream fin(conf_file); if (!fin) { @@ -68,9 +68,8 @@ void PredictorClient::init(const std::string & conf_file) { } } -void PredictorClient::set_predictor_conf( - const std::string & conf_path, - const std::string & conf_file) { +void PredictorClient::set_predictor_conf(const std::string &conf_path, + const std::string &conf_file) { _predictor_path = conf_path; _predictor_conf = conf_file; } @@ -83,14 +82,13 @@ int PredictorClient::create_predictor() { _api.thrd_initialize(); } -std::vector > PredictorClient::predict( - const std::vector > & float_feed, - const std::vector & float_feed_name, - const std::vector > & int_feed, - const std::vector & int_feed_name, - const std::vector & fetch_name) { - - std::vector > fetch_result; +std::vector> PredictorClient::predict( + const std::vector> &float_feed, + const std::vector &float_feed_name, + const std::vector> &int_feed, + const std::vector &int_feed_name, + const std::vector &fetch_name) { + std::vector> fetch_result; if (fetch_name.size() == 0) { return fetch_result; } @@ -100,41 +98,43 @@ std::vector > PredictorClient::predict( _predictor = _api.fetch_predictor("general_model"); Request req; std::vector tensor_vec; - FeedInst * inst = req.add_insts(); - for (auto & name : float_feed_name) { + FeedInst *inst = req.add_insts(); + for (auto &name : float_feed_name) { tensor_vec.push_back(inst->add_tensor_array()); } - for (auto & name : int_feed_name) { + for (auto &name : int_feed_name) { tensor_vec.push_back(inst->add_tensor_array()); } int vec_idx = 0; - for (auto & name : float_feed_name) { + for (auto &name : float_feed_name) { int idx = _feed_name_to_idx[name]; - Tensor * tensor = tensor_vec[idx]; + Tensor *tensor = tensor_vec[idx]; for (int j = 0; j < _shape[idx].size(); ++j) { tensor->add_shape(_shape[idx][j]); } tensor->set_elem_type(1); for (int j = 0; j < float_feed[vec_idx].size(); ++j) { - tensor->add_data( - (char *)(&(float_feed[vec_idx][j])), sizeof(float)); + tensor->add_data(const_cast(reinterpret_cast( + &(float_feed[vec_idx][j]))), + sizeof(float)); } vec_idx++; } vec_idx = 0; - for (auto & name : int_feed_name) { + for (auto &name : int_feed_name) { int idx = _feed_name_to_idx[name]; - Tensor * tensor = tensor_vec[idx]; + Tensor *tensor = tensor_vec[idx]; for (int j = 0; j < _shape[idx].size(); ++j) { tensor->add_shape(_shape[idx][j]); } tensor->set_elem_type(0); for (int j = 0; j < int_feed[vec_idx].size(); ++j) { - tensor->add_data( - (char *)(&(int_feed[vec_idx][j])), sizeof(int64_t)); + tensor->add_data(const_cast(reinterpret_cast( + &(int_feed[vec_idx][j]))), + sizeof(int64_t)); } vec_idx++; } @@ -147,7 +147,7 @@ std::vector > PredictorClient::predict( LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString(); exit(-1); } else { - for (auto & name : fetch_name) { + for (auto &name : fetch_name) { int idx = _fetch_name_to_idx[name]; int len = res.insts(0).tensor_array(idx).data_size(); VLOG(3) << "fetch name: " << name; @@ -162,8 +162,8 @@ std::vector > PredictorClient::predict( fetch_result[name][i] = *(const float *) res.insts(0).tensor_array(idx).data(i).c_str(); */ - fetch_result[idx][i] = *(const float *) - res.insts(0).tensor_array(idx).data(i).c_str(); + fetch_result[idx][i] = + *(const float *)res.insts(0).tensor_array(idx).data(i).c_str(); } } } @@ -171,13 +171,105 @@ std::vector > PredictorClient::predict( return fetch_result; } -std::vector > PredictorClient::predict_with_profile( - const std::vector > & float_feed, - const std::vector & float_feed_name, - const std::vector > & int_feed, - const std::vector & int_feed_name, - const std::vector & fetch_name) { - std::vector > res; +std::vector>> PredictorClient::predict_for_batch( + const std::vector>> &float_feed_batch, + const std::vector &float_feed_name, + const std::vector>> &int_feed_batch, + const std::vector &int_feed_name, + const std::vector &fetch_name, + const int64_t &batch_size) { + std::vector>> fetch_result_batch; + if (fetch_name.size() == 0) { + return fetch_result_batch; + } + fetch_result_batch.resize(batch_size); + int fetch_name_num = fetch_name.size(); + for (int bi = 0; bi < batch_size; bi++) { + fetch_result_batch[bi].resize(fetch_name_num); + } + + _api.thrd_clear(); + _predictor = _api.fetch_predictor("general_model"); + Request req; + // + for (int bi = 0; bi < batch_size; bi++) { + std::vector tensor_vec; + FeedInst *inst = req.add_insts(); + std::vector> float_feed = float_feed_batch[bi]; + std::vector> int_feed = int_feed_batch[bi]; + for (auto &name : float_feed_name) { + tensor_vec.push_back(inst->add_tensor_array()); + } + + for (auto &name : int_feed_name) { + tensor_vec.push_back(inst->add_tensor_array()); + } + + int vec_idx = 0; + for (auto &name : float_feed_name) { + int idx = _feed_name_to_idx[name]; + Tensor *tensor = tensor_vec[idx]; + for (int j = 0; j < _shape[idx].size(); ++j) { + tensor->add_shape(_shape[idx][j]); + } + tensor->set_elem_type(1); + for (int j = 0; j < float_feed[vec_idx].size(); ++j) { + tensor->add_data(const_cast(reinterpret_cast( + &(float_feed[vec_idx][j]))), + sizeof(float)); + } + vec_idx++; + } + + vec_idx = 0; + for (auto &name : int_feed_name) { + int idx = _feed_name_to_idx[name]; + Tensor *tensor = tensor_vec[idx]; + for (int j = 0; j < _shape[idx].size(); ++j) { + tensor->add_shape(_shape[idx][j]); + } + tensor->set_elem_type(0); + for (int j = 0; j < int_feed[vec_idx].size(); ++j) { + tensor->add_data(const_cast(reinterpret_cast( + &(int_feed[vec_idx][j]))), + sizeof(int64_t)); + } + vec_idx++; + } + } + + Response res; + + res.Clear(); + if (_predictor->inference(&req, &res) != 0) { + LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString(); + exit(-1); + } else { + for (int bi = 0; bi < batch_size; bi++) { + for (auto &name : fetch_name) { + int idx = _fetch_name_to_idx[name]; + int len = res.insts(0).tensor_array(idx).data_size(); + VLOG(3) << "fetch name: " << name; + VLOG(3) << "tensor data size: " << len; + fetch_result_batch[bi][idx].resize(len); + for (int i = 0; i < len; ++i) { + fetch_result_batch[bi][idx][i] = + *(const float *)res.insts(0).tensor_array(idx).data(i).c_str(); + } + } + } + } + + return fetch_result_batch; +} + +std::vector> PredictorClient::predict_with_profile( + const std::vector> &float_feed, + const std::vector &float_feed_name, + const std::vector> &int_feed, + const std::vector &int_feed_name, + const std::vector &fetch_name) { + std::vector> res; return res; } diff --git a/core/general-client/src/pybind_general_model.cpp b/core/general-client/src/pybind_general_model.cpp index 287b7e337d78f2f4ac0a11fc0334a79c53680eee..ac82d91984027d912774244358105dafec30301c 100644 --- a/core/general-client/src/pybind_general_model.cpp +++ b/core/general-client/src/pybind_general_model.cpp @@ -1,10 +1,23 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + #include #include +#include #include #include "core/general-client/include/general_model.h" -#include - namespace py = pybind11; using baidu::paddle_serving::general_model::FetchedMap; @@ -19,28 +32,47 @@ PYBIND11_MODULE(serving_client, m) { py::class_(m, "PredictorClient", py::buffer_protocol()) .def(py::init()) .def("init", - [](PredictorClient &self, const std::string & conf) { + [](PredictorClient &self, const std::string &conf) { self.init(conf); }) .def("set_predictor_conf", - [](PredictorClient &self, const std::string & conf_path, - const std::string & conf_file) { + [](PredictorClient &self, + const std::string &conf_path, + const std::string &conf_file) { self.set_predictor_conf(conf_path, conf_file); }) .def("create_predictor", - [](PredictorClient & self) { - self.create_predictor(); - }) + [](PredictorClient &self) { self.create_predictor(); }) .def("predict", [](PredictorClient &self, - const std::vector > & float_feed, - const std::vector & float_feed_name, - const std::vector > & int_feed, - const std::vector & int_feed_name, - const std::vector & fetch_name) { + const std::vector> &float_feed, + const std::vector &float_feed_name, + const std::vector> &int_feed, + const std::vector &int_feed_name, + const std::vector &fetch_name) { + return self.predict(float_feed, + float_feed_name, + int_feed, + int_feed_name, + fetch_name); + }) - return self.predict(float_feed, float_feed_name, - int_feed, int_feed_name, fetch_name); + .def("predict_for_batch", + [](PredictorClient &self, + const std::vector>> + &float_feed_batch, + const std::vector &float_feed_name, + const std::vector>> + &int_feed_batch, + const std::vector &int_feed_name, + const std::vector &fetch_name, + const int64_t &batch_size) { + return self.predict_for_batch(float_feed_batch, + float_feed_name, + int_feed_batch, + int_feed_name, + fetch_name, + batch_size); }); } diff --git a/python/examples/imdb/test_client_multithread.py b/python/examples/imdb/test_client_multithread.py index c5122c31a0fb859cfc81a392228f25422287a140..770d14665cf9c3287b8274ef11ae8945f5759b6d 100644 --- a/python/examples/imdb/test_client_multithread.py +++ b/python/examples/imdb/test_client_multithread.py @@ -15,7 +15,7 @@ from paddle_serving import Client import sys import subprocess -from multiprocessing import Pool, Queue +from multiprocessing import Pool import time diff --git a/python/paddle_serving/serving_client/__init__.py b/python/paddle_serving/serving_client/__init__.py index d12dc2b4f2604f8a0f9e02adc74e0af298f999e3..8bf66904edeec142a99634ae93d73a4380d5b0de 100644 --- a/python/paddle_serving/serving_client/__init__.py +++ b/python/paddle_serving/serving_client/__init__.py @@ -19,6 +19,7 @@ import time int_type = 0 float_type = 1 + class SDKConfig(object): def __init__(self): self.sdk_desc = sdk.SDKConf() @@ -37,7 +38,8 @@ class SDKConfig(object): variant_desc = sdk.VariantConf() variant_desc.tag = "var1" - variant_desc.naming_conf.cluster = "list://{}".format(":".join(self.endpoints)) + variant_desc.naming_conf.cluster = "list://{}".format(":".join( + self.endpoints)) predictor_desc.variants.extend([variant_desc]) @@ -50,7 +52,7 @@ class SDKConfig(object): self.sdk_desc.default_variant_conf.connection_conf.hedge_request_timeout_ms = -1 self.sdk_desc.default_variant_conf.connection_conf.hedge_fetch_retry_count = 2 self.sdk_desc.default_variant_conf.connection_conf.connection_type = "pooled" - + self.sdk_desc.default_variant_conf.naming_conf.cluster_filter_strategy = "Default" self.sdk_desc.default_variant_conf.naming_conf.load_balance_strategy = "la" @@ -114,8 +116,7 @@ class Client(object): predictor_file = "%s_predictor.conf" % timestamp with open(predictor_path + predictor_file, "w") as fout: fout.write(sdk_desc) - self.client_handle_.set_predictor_conf( - predictor_path, predictor_file) + self.client_handle_.set_predictor_conf(predictor_path, predictor_file) self.client_handle_.create_predictor() def get_feed_names(self): @@ -145,13 +146,49 @@ class Client(object): fetch_names.append(key) result = self.client_handle_.predict( - float_slot, float_feed_names, - int_slot, int_feed_names, - fetch_names) - + float_slot, float_feed_names, int_slot, int_feed_names, fetch_names) + result_map = {} for i, name in enumerate(fetch_names): result_map[name] = result[i] - + return result_map + def predict_for_batch(self, feed_batch=[], fetch=[]): + batch_size = len(feed_batch) + int_slot_batch = [] + float_slot_batch = [] + int_feed_names = [] + float_feed_names = [] + fetch_names = [] + for feed in feed_batch: + int_slot = [] + float_slot = [] + for key in feed: + if key not in self.feed_names_: + continue + if self.feed_types_[key] == int_type: + int_feed_names.append(key) + int_slot.append(feed[key]) + elif self.feed_types_[key] == float_type: + float_feed_names.append(key) + float_slot.append(feed[key]) + int_slot_batch.append(int_slot) + float_slot_batch.append(float_slot) + + for key in fetch: + if key in self.fetch_names_: + fetch_names.append(key) + + result_batch = self.client_handle_.predict_for_batch( + float_slot_batch, float_feed_names, int_slot_batch, int_feed_names, + fetch_names, batch_size) + + result_map_batch = [] + for result in result_batch: + result_map = {} + for i, name in enumerate(fetch_names): + result_map[name] = result[i] + result_map_batch.append(result_map) + + return result_map_batch