提交 06f46538 编写于 作者: M MRXLT

add batch predict

上级 cdc4df3c
......@@ -18,9 +18,9 @@
#include <unistd.h>
#include <fstream>
#include <map>
#include <string>
#include <vector>
#include <map>
#include "core/sdk-cpp/builtin_format.pb.h"
#include "core/sdk-cpp/general_model_service.pb.h"
......@@ -37,46 +37,52 @@ namespace general_model {
typedef std::map<std::string, std::vector<float>> FetchedMap;
typedef std::map<std::string, std::vector<std::vector<float> > >
BatchFetchedMap;
typedef std::map<std::string, std::vector<std::vector<float>>> BatchFetchedMap;
class PredictorClient {
public:
PredictorClient() {}
~PredictorClient() {}
void init(const std::string & client_conf);
void init(const std::string& client_conf);
void set_predictor_conf(
const std::string& conf_path,
const std::string& conf_file);
void set_predictor_conf(const std::string& conf_path,
const std::string& conf_file);
int create_predictor();
std::vector<std::vector<float> > predict(
const std::vector<std::vector<float> > & float_feed,
const std::vector<std::string> & float_feed_name,
const std::vector<std::vector<int64_t> > & int_feed,
const std::vector<std::string> & int_feed_name,
const std::vector<std::string> & fetch_name);
std::vector<std::vector<float> > predict_with_profile(
const std::vector<std::vector<float> > & float_feed,
const std::vector<std::string> & float_feed_name,
const std::vector<std::vector<int64_t> > & int_feed,
const std::vector<std::string> & int_feed_name,
const std::vector<std::string> & fetch_name);
std::vector<std::vector<float>> predict(
const std::vector<std::vector<float>>& float_feed,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int64_t>>& int_feed,
const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name);
std::vector<std::vector<std::vector<float>>> predict_for_batch(
const std::vector<std::vector<std::vector<float>>>& float_feed_batch,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<std::vector<int64_t>>>& int_feed_batch,
const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name,
const int64_t& batch_size);
std::vector<std::vector<float>> predict_with_profile(
const std::vector<std::vector<float>>& float_feed,
const std::vector<std::string>& float_feed_name,
const std::vector<std::vector<int64_t>>& int_feed,
const std::vector<std::string>& int_feed_name,
const std::vector<std::string>& fetch_name);
private:
PredictorApi _api;
Predictor * _predictor;
Predictor* _predictor;
std::string _predictor_conf;
std::string _predictor_path;
std::string _conf_file;
std::map<std::string, int> _feed_name_to_idx;
std::map<std::string, int> _fetch_name_to_idx;
std::map<std::string, std::string> _fetch_name_to_var_name;
std::vector<std::vector<int> > _shape;
std::vector<std::vector<int>> _shape;
std::vector<int> _type;
};
......
......@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include <fstream>
#include "core/general-client/include/general_model.h"
#include <fstream>
#include "core/sdk-cpp/builtin_format.pb.h"
#include "core/sdk-cpp/include/common.h"
#include "core/sdk-cpp/include/predictor_sdk.h"
......@@ -28,7 +28,7 @@ namespace baidu {
namespace paddle_serving {
namespace general_model {
void PredictorClient::init(const std::string & conf_file) {
void PredictorClient::init(const std::string &conf_file) {
_conf_file = conf_file;
std::ifstream fin(conf_file);
if (!fin) {
......@@ -68,9 +68,8 @@ void PredictorClient::init(const std::string & conf_file) {
}
}
void PredictorClient::set_predictor_conf(
const std::string & conf_path,
const std::string & conf_file) {
void PredictorClient::set_predictor_conf(const std::string &conf_path,
const std::string &conf_file) {
_predictor_path = conf_path;
_predictor_conf = conf_file;
}
......@@ -83,14 +82,13 @@ int PredictorClient::create_predictor() {
_api.thrd_initialize();
}
std::vector<std::vector<float> > PredictorClient::predict(
const std::vector<std::vector<float> > & float_feed,
const std::vector<std::string> & float_feed_name,
const std::vector<std::vector<int64_t> > & int_feed,
const std::vector<std::string> & int_feed_name,
const std::vector<std::string> & fetch_name) {
std::vector<std::vector<float> > fetch_result;
std::vector<std::vector<float>> PredictorClient::predict(
const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name) {
std::vector<std::vector<float>> fetch_result;
if (fetch_name.size() == 0) {
return fetch_result;
}
......@@ -100,41 +98,43 @@ std::vector<std::vector<float> > PredictorClient::predict(
_predictor = _api.fetch_predictor("general_model");
Request req;
std::vector<Tensor *> tensor_vec;
FeedInst * inst = req.add_insts();
for (auto & name : float_feed_name) {
FeedInst *inst = req.add_insts();
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
for (auto & name : int_feed_name) {
for (auto &name : int_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
int vec_idx = 0;
for (auto & name : float_feed_name) {
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor * tensor = tensor_vec[idx];
Tensor *tensor = tensor_vec[idx];
for (int j = 0; j < _shape[idx].size(); ++j) {
tensor->add_shape(_shape[idx][j]);
}
tensor->set_elem_type(1);
for (int j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_data(
(char *)(&(float_feed[vec_idx][j])), sizeof(float));
tensor->add_data(const_cast<char *>(reinterpret_cast<const char *>(
&(float_feed[vec_idx][j]))),
sizeof(float));
}
vec_idx++;
}
vec_idx = 0;
for (auto & name : int_feed_name) {
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor * tensor = tensor_vec[idx];
Tensor *tensor = tensor_vec[idx];
for (int j = 0; j < _shape[idx].size(); ++j) {
tensor->add_shape(_shape[idx][j]);
}
tensor->set_elem_type(0);
for (int j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_data(
(char *)(&(int_feed[vec_idx][j])), sizeof(int64_t));
tensor->add_data(const_cast<char *>(reinterpret_cast<const char *>(
&(int_feed[vec_idx][j]))),
sizeof(int64_t));
}
vec_idx++;
}
......@@ -147,7 +147,7 @@ std::vector<std::vector<float> > PredictorClient::predict(
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
exit(-1);
} else {
for (auto & name : fetch_name) {
for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name];
int len = res.insts(0).tensor_array(idx).data_size();
VLOG(3) << "fetch name: " << name;
......@@ -162,8 +162,8 @@ std::vector<std::vector<float> > PredictorClient::predict(
fetch_result[name][i] = *(const float *)
res.insts(0).tensor_array(idx).data(i).c_str();
*/
fetch_result[idx][i] = *(const float *)
res.insts(0).tensor_array(idx).data(i).c_str();
fetch_result[idx][i] =
*(const float *)res.insts(0).tensor_array(idx).data(i).c_str();
}
}
}
......@@ -171,13 +171,105 @@ std::vector<std::vector<float> > PredictorClient::predict(
return fetch_result;
}
std::vector<std::vector<float> > PredictorClient::predict_with_profile(
const std::vector<std::vector<float> > & float_feed,
const std::vector<std::string> & float_feed_name,
const std::vector<std::vector<int64_t> > & int_feed,
const std::vector<std::string> & int_feed_name,
const std::vector<std::string> & fetch_name) {
std::vector<std::vector<float> > res;
std::vector<std::vector<std::vector<float>>> PredictorClient::predict_for_batch(
const std::vector<std::vector<std::vector<float>>> &float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<std::vector<int64_t>>> &int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name,
const int64_t &batch_size) {
std::vector<std::vector<std::vector<float>>> fetch_result_batch;
if (fetch_name.size() == 0) {
return fetch_result_batch;
}
fetch_result_batch.resize(batch_size);
int fetch_name_num = fetch_name.size();
for (int bi = 0; bi < batch_size; bi++) {
fetch_result_batch[bi].resize(fetch_name_num);
}
_api.thrd_clear();
_predictor = _api.fetch_predictor("general_model");
Request req;
//
for (int bi = 0; bi < batch_size; bi++) {
std::vector<Tensor *> tensor_vec;
FeedInst *inst = req.add_insts();
std::vector<std::vector<float>> float_feed = float_feed_batch[bi];
std::vector<std::vector<int64_t>> int_feed = int_feed_batch[bi];
for (auto &name : float_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
for (auto &name : int_feed_name) {
tensor_vec.push_back(inst->add_tensor_array());
}
int vec_idx = 0;
for (auto &name : float_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
for (int j = 0; j < _shape[idx].size(); ++j) {
tensor->add_shape(_shape[idx][j]);
}
tensor->set_elem_type(1);
for (int j = 0; j < float_feed[vec_idx].size(); ++j) {
tensor->add_data(const_cast<char *>(reinterpret_cast<const char *>(
&(float_feed[vec_idx][j]))),
sizeof(float));
}
vec_idx++;
}
vec_idx = 0;
for (auto &name : int_feed_name) {
int idx = _feed_name_to_idx[name];
Tensor *tensor = tensor_vec[idx];
for (int j = 0; j < _shape[idx].size(); ++j) {
tensor->add_shape(_shape[idx][j]);
}
tensor->set_elem_type(0);
for (int j = 0; j < int_feed[vec_idx].size(); ++j) {
tensor->add_data(const_cast<char *>(reinterpret_cast<const char *>(
&(int_feed[vec_idx][j]))),
sizeof(int64_t));
}
vec_idx++;
}
}
Response res;
res.Clear();
if (_predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req: " << req.ShortDebugString();
exit(-1);
} else {
for (int bi = 0; bi < batch_size; bi++) {
for (auto &name : fetch_name) {
int idx = _fetch_name_to_idx[name];
int len = res.insts(0).tensor_array(idx).data_size();
VLOG(3) << "fetch name: " << name;
VLOG(3) << "tensor data size: " << len;
fetch_result_batch[bi][idx].resize(len);
for (int i = 0; i < len; ++i) {
fetch_result_batch[bi][idx][i] =
*(const float *)res.insts(0).tensor_array(idx).data(i).c_str();
}
}
}
}
return fetch_result_batch;
}
std::vector<std::vector<float>> PredictorClient::predict_with_profile(
const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name) {
std::vector<std::vector<float>> res;
return res;
}
......
// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Python.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <unordered_map>
#include "core/general-client/include/general_model.h"
#include <pybind11/stl.h>
namespace py = pybind11;
using baidu::paddle_serving::general_model::FetchedMap;
......@@ -19,28 +32,47 @@ PYBIND11_MODULE(serving_client, m) {
py::class_<PredictorClient>(m, "PredictorClient", py::buffer_protocol())
.def(py::init())
.def("init",
[](PredictorClient &self, const std::string & conf) {
[](PredictorClient &self, const std::string &conf) {
self.init(conf);
})
.def("set_predictor_conf",
[](PredictorClient &self, const std::string & conf_path,
const std::string & conf_file) {
[](PredictorClient &self,
const std::string &conf_path,
const std::string &conf_file) {
self.set_predictor_conf(conf_path, conf_file);
})
.def("create_predictor",
[](PredictorClient & self) {
self.create_predictor();
})
[](PredictorClient &self) { self.create_predictor(); })
.def("predict",
[](PredictorClient &self,
const std::vector<std::vector<float> > & float_feed,
const std::vector<std::string> & float_feed_name,
const std::vector<std::vector<int64_t> > & int_feed,
const std::vector<std::string> & int_feed_name,
const std::vector<std::string> & fetch_name) {
const std::vector<std::vector<float>> &float_feed,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<int64_t>> &int_feed,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name) {
return self.predict(float_feed,
float_feed_name,
int_feed,
int_feed_name,
fetch_name);
})
return self.predict(float_feed, float_feed_name,
int_feed, int_feed_name, fetch_name);
.def("predict_for_batch",
[](PredictorClient &self,
const std::vector<std::vector<std::vector<float>>>
&float_feed_batch,
const std::vector<std::string> &float_feed_name,
const std::vector<std::vector<std::vector<int64_t>>>
&int_feed_batch,
const std::vector<std::string> &int_feed_name,
const std::vector<std::string> &fetch_name,
const int64_t &batch_size) {
return self.predict_for_batch(float_feed_batch,
float_feed_name,
int_feed_batch,
int_feed_name,
fetch_name,
batch_size);
});
}
......
......@@ -15,7 +15,7 @@
from paddle_serving import Client
import sys
import subprocess
from multiprocessing import Pool, Queue
from multiprocessing import Pool
import time
......
......@@ -19,6 +19,7 @@ import time
int_type = 0
float_type = 1
class SDKConfig(object):
def __init__(self):
self.sdk_desc = sdk.SDKConf()
......@@ -37,7 +38,8 @@ class SDKConfig(object):
variant_desc = sdk.VariantConf()
variant_desc.tag = "var1"
variant_desc.naming_conf.cluster = "list://{}".format(":".join(self.endpoints))
variant_desc.naming_conf.cluster = "list://{}".format(":".join(
self.endpoints))
predictor_desc.variants.extend([variant_desc])
......@@ -50,7 +52,7 @@ class SDKConfig(object):
self.sdk_desc.default_variant_conf.connection_conf.hedge_request_timeout_ms = -1
self.sdk_desc.default_variant_conf.connection_conf.hedge_fetch_retry_count = 2
self.sdk_desc.default_variant_conf.connection_conf.connection_type = "pooled"
self.sdk_desc.default_variant_conf.naming_conf.cluster_filter_strategy = "Default"
self.sdk_desc.default_variant_conf.naming_conf.load_balance_strategy = "la"
......@@ -114,8 +116,7 @@ class Client(object):
predictor_file = "%s_predictor.conf" % timestamp
with open(predictor_path + predictor_file, "w") as fout:
fout.write(sdk_desc)
self.client_handle_.set_predictor_conf(
predictor_path, predictor_file)
self.client_handle_.set_predictor_conf(predictor_path, predictor_file)
self.client_handle_.create_predictor()
def get_feed_names(self):
......@@ -145,13 +146,49 @@ class Client(object):
fetch_names.append(key)
result = self.client_handle_.predict(
float_slot, float_feed_names,
int_slot, int_feed_names,
fetch_names)
float_slot, float_feed_names, int_slot, int_feed_names, fetch_names)
result_map = {}
for i, name in enumerate(fetch_names):
result_map[name] = result[i]
return result_map
def predict_for_batch(self, feed_batch=[], fetch=[]):
batch_size = len(feed_batch)
int_slot_batch = []
float_slot_batch = []
int_feed_names = []
float_feed_names = []
fetch_names = []
for feed in feed_batch:
int_slot = []
float_slot = []
for key in feed:
if key not in self.feed_names_:
continue
if self.feed_types_[key] == int_type:
int_feed_names.append(key)
int_slot.append(feed[key])
elif self.feed_types_[key] == float_type:
float_feed_names.append(key)
float_slot.append(feed[key])
int_slot_batch.append(int_slot)
float_slot_batch.append(float_slot)
for key in fetch:
if key in self.fetch_names_:
fetch_names.append(key)
result_batch = self.client_handle_.predict_for_batch(
float_slot_batch, float_feed_names, int_slot_batch, int_feed_names,
fetch_names, batch_size)
result_map_batch = []
for result in result_batch:
result_map = {}
for i, name in enumerate(fetch_names):
result_map[name] = result[i]
result_map_batch.append(result_map)
return result_map_batch
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册