提交 cd0aca04 编写于 作者: H HexToString

try to fix 2.0API bug

上级 304b4e05
......@@ -44,9 +44,45 @@ int GeneralInferOp::inference() {
<< pre_node_names.size();
return -1;
}
if (InferManager::instance().infer(engine_name().c_str())) {
const std::string pre_name = pre_node_names[0];
const GeneralBlob *input_blob = get_depend_argument<GeneralBlob>(pre_name);
uint64_t log_id = input_blob->GetLogId();
VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name;
GeneralBlob *output_blob = mutable_data<GeneralBlob>();
output_blob->SetLogId(log_id);
if (!input_blob) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op:" << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
TensorVector *out = &output_blob->tensor_vector;
int batch_size = input_blob->_batch_size;
VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size;
output_blob->_batch_size = batch_size;
VLOG(2) << "(logid=" << log_id << ") infer batch size: " << batch_size;
Timer timeline;
int64_t start = timeline.TimeStampUS();
timeline.Start();
if (InferManager::instance().infer(
engine_name().c_str(), in, out, batch_size)) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed do infer in fluid model: " << engine_name().c_str();
return -1;
}
int64_t end = timeline.TimeStampUS();
CopyBlobInfo(input_blob, output_blob);
AddBlobInfo(output_blob, start);
AddBlobInfo(output_blob, end);
return 0;
}
DEFINE_OP(GeneralInferOp);
......
......@@ -20,7 +20,6 @@
#include "core/general-server/op/general_infer_helper.h"
#include "core/predictor/framework/infer.h"
#include "core/predictor/framework/memory.h"
#include "core/predictor/framework/resource.h"
#include "core/util/include/timer.h"
namespace baidu {
......@@ -33,7 +32,6 @@ using baidu::paddle_serving::predictor::general_model::Tensor;
using baidu::paddle_serving::predictor::general_model::Request;
using baidu::paddle_serving::predictor::general_model::FeedInst;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
using baidu::paddle_serving::predictor::InferManager;
int conf_check(const Request *req,
const std::shared_ptr<PaddleGeneralModelConfig> &model_config) {
......@@ -73,34 +71,75 @@ int conf_check(const Request *req,
int GeneralReaderOp::inference() {
// reade request from client
// TODO: only support one engine here
std::string engine_name = "general_infer_0";
const Request *req = dynamic_cast<const Request *>(get_request_message());
uint64_t log_id = req->log_id();
int input_var_num = 0;
std::vector<int64_t> elem_type;
std::vector<int64_t> elem_size;
std::vector<int64_t> capacity;
GeneralBlob *res = mutable_data<GeneralBlob>();
TensorVector *out = &res->tensor_vector;
res->SetLogId(log_id);
if (!res) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed get op tls reader object output";
}
Timer timeline;
int64_t start = timeline.TimeStampUS();
int var_num = req->insts(0).tensor_array_size();
VLOG(2) << "(logid=" << log_id << ") var num: " << var_num;
VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
VLOG(2) << "(logid=" << log_id << ") print general model config done.";
// TODO(guru4elephant): how to do conditional check?
/*
int ret = conf_check(req, model_config);
if (ret != 0) {
LOG(ERROR) << "model conf of server:";
resource.print_general_model_config(model_config);
return 0;
}
*/
// package tensor
elem_type.resize(var_num);
elem_size.resize(var_num);
capacity.resize(var_num);
// prepare basic information for input
for (int i = 0; i < var_num; ++i) {
std::string tensor_name = model_config->_feed_name[i];
VLOG(2) << "(logid=" << log_id << ") get tensor name: " << tensor_name;
auto lod_tensor = InferManager::instance().GetInputHandle(
engine_name.c_str(), tensor_name.c_str());
std::vector<std::vector<size_t>> lod;
std::vector<int> shape;
// get lod info here
paddle::PaddleTensor lod_tensor;
elem_type[i] = req->insts(0).tensor_array(i).elem_type();
VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i];
if (elem_type[i] == 0) { // int64
elem_size[i] = sizeof(int64_t);
lod_tensor.dtype = paddle::PaddleDType::INT64;
} else if (elem_type[i] == 1) {
elem_size[i] = sizeof(float);
lod_tensor.dtype = paddle::PaddleDType::FLOAT32;
} else if (elem_type[i] == 2) {
elem_size[i] = sizeof(int32_t);
lod_tensor.dtype = paddle::PaddleDType::INT32;
}
// implement lod tensor here
if (req->insts(0).tensor_array(i).lod_size() > 0) {
lod.resize(1);
VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor";
lod_tensor.lod.resize(1);
for (int k = 0; k < req->insts(0).tensor_array(i).lod_size(); ++k) {
lod[0].push_back(req->insts(0).tensor_array(i).lod(k));
lod_tensor.lod[0].push_back(req->insts(0).tensor_array(i).lod(k));
}
capacity[i] = 1;
for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) {
......@@ -108,7 +147,7 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
shape.push_back(dim);
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
......@@ -119,41 +158,92 @@ int GeneralReaderOp::inference() {
VLOG(2) << "(logid=" << log_id << ") shape for var[" << i
<< "]: " << dim;
capacity[i] *= dim;
shape.push_back(dim);
lod_tensor.shape.push_back(dim);
}
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor, capacity: " << capacity[i];
}
lod_tensor->SetLoD(lod);
lod_tensor->Reshape(shape);
// insert data here
if (req->insts(0).tensor_array(i).elem_type() == 0) {
// TODO: Copy twice here, can optimize
lod_tensor.name = model_config->_feed_name[i];
out->push_back(lod_tensor);
}
// specify the memory needed for output tensor_vector
for (int i = 0; i < var_num; ++i) {
if (out->at(i).lod.size() == 1) {
int tensor_size = 0;
const Tensor &tensor = req->insts(0).tensor_array(i);
int data_len = 0;
if (tensor.int64_data_size() > 0) {
data_len = tensor.int64_data_size();
} else if (tensor.float_data_size() > 0) {
data_len = tensor.float_data_size();
} else if (tensor.int_data_size() > 0) {
data_len = tensor.int_data_size();
}
VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i
<< "]: " << data_len;
tensor_size += data_len;
int cur_len = out->at(i).lod[0].back();
VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len;
int sample_len = 0;
if (tensor.shape_size() == 1) {
sample_len = data_len;
} else {
sample_len = tensor.shape(0);
}
VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len;
out->at(i).data.Resize(tensor_size * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is lod_tensor and len=" << out->at(i).lod[0].back();
} else {
out->at(i).data.Resize(capacity[i] * elem_size[i]);
VLOG(2) << "(logid=" << log_id << ") var[" << i
<< "] is tensor and capacity=" << capacity[i];
}
}
// fill the data into output general_blob
for (int i = 0; i < var_num; ++i) {
if (elem_type[i] == 0) {
int64_t *dst_ptr = static_cast<int64_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int64_data(0);
int offset = 0;
int elem_num = req->insts(0).tensor_array(i).int64_data_size();
std::vector<int64_t> data(elem_num);
int64_t *dst_ptr = data.data();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[k] = req->insts(0).tensor_array(i).int64_data(k);
dst_ptr[offset + k] = req->insts(0).tensor_array(i).int64_data(k);
}
lod_tensor->CopyFromCpu(dst_ptr);
} else if (req->insts(0).tensor_array(i).elem_type() == 1) {
} else if (elem_type[i] == 1) {
float *dst_ptr = static_cast<float *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).float_data(0);
int offset = 0;
int elem_num = req->insts(0).tensor_array(i).float_data_size();
std::vector<float> data(elem_num);
float *dst_ptr = data.data();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[k] = req->insts(0).tensor_array(i).float_data(k);
dst_ptr[offset + k] = req->insts(0).tensor_array(i).float_data(k);
}
lod_tensor->CopyFromCpu(dst_ptr);
} else if (req->insts(0).tensor_array(i).elem_type() == 2) {
} else if (elem_type[i] == 2) {
int32_t *dst_ptr = static_cast<int32_t *>(out->at(i).data.data());
VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i
<< "] is " << req->insts(0).tensor_array(i).int_data(0);
int offset = 0;
int elem_num = req->insts(0).tensor_array(i).int_data_size();
std::vector<int32_t> data(elem_num);
int32_t *dst_ptr = data.data();
for (int k = 0; k < elem_num; ++k) {
dst_ptr[k] = req->insts(0).tensor_array(i).int_data(k);
dst_ptr[offset + k] = req->insts(0).tensor_array(i).int_data(k);
}
lod_tensor->CopyFromCpu(dst_ptr);
}
}
VLOG(2) << "(logid=" << log_id << ") output size: " << out->size();
timeline.Pause();
int64_t end = timeline.TimeStampUS();
res->p_size = 0;
res->_batch_size = 1;
AddBlobInfo(res, start);
AddBlobInfo(res, end);
VLOG(2) << "(logid=" << log_id << ") read data from client success";
return 0;
}
DEFINE_OP(GeneralReaderOp);
......
......@@ -40,60 +40,160 @@ using baidu::paddle_serving::predictor::InferManager;
using baidu::paddle_serving::predictor::PaddleGeneralModelConfig;
int GeneralResponseOp::inference() {
const std::vector<std::string> pre_node_names = pre_names();
VLOG(2) << "pre node names size: " << pre_node_names.size();
const GeneralBlob *input_blob;
uint64_t log_id =
get_depend_argument<GeneralBlob>(pre_node_names[0])->GetLogId();
const Request *req = dynamic_cast<const Request *>(get_request_message());
// response inst with only fetch_var_names
Response *res = mutable_data<Response>();
Timer timeline;
// double response_time = 0.0;
// timeline.Start();
int64_t start = timeline.TimeStampUS();
VLOG(2) << "(logid=" << log_id
<< ") start to call load general model_conf op";
baidu::paddle_serving::predictor::Resource &resource =
baidu::paddle_serving::predictor::Resource::instance();
VLOG(2) << "(logid=" << log_id << ") get resource pointer done.";
std::shared_ptr<PaddleGeneralModelConfig> model_config =
resource.get_general_model_config();
std::vector<int> capacity(req->fetch_var_names_size(), 1);
std::string engine_name = "general_infer_0";
VLOG(2) << "(logid=" << log_id
<< ") max body size : " << brpc::fLU64::FLAGS_max_body_size;
std::vector<int> fetch_index;
fetch_index.resize(req->fetch_var_names_size());
for (int i = 0; i < req->fetch_var_names_size(); ++i) {
fetch_index[i] =
model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)];
}
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
const std::string &pre_name = pre_node_names[pi];
VLOG(2) << "(logid=" << log_id << ") pre names[" << pi << "]: " << pre_name
<< " (" << pre_node_names.size() << ")";
input_blob = get_depend_argument<GeneralBlob>(pre_name);
// fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(),
// input_blob);
if (!input_blob) {
LOG(ERROR) << "(logid=" << log_id
<< ") Failed mutable depended argument, op: " << pre_name;
return -1;
}
const TensorVector *in = &input_blob->tensor_vector;
ModelOutput *output = res->add_outputs();
// To get the order of model return values
output->set_engine_name(pre_name);
FetchInst *fetch_inst = output->add_insts();
FetchInst *fetch_p = output->mutable_insts(0);
std::vector<std::string> outs =
InferManager::instance().GetOutputNames(engine_name.c_str());
for (int i = 0; i < req->fetch_var_names_size(); ++i) {
for (auto &idx : fetch_index) {
Tensor *tensor = fetch_inst->add_tensor_array();
std::string tensor_name = outs[i];
auto lod_tensor = InferManager::instance().GetOutputHandle(
engine_name.c_str(), tensor_name.c_str());
std::vector<int> shape = lod_tensor->shape();
for (int k = 0; k < shape.size(); ++k) {
capacity[i] *= shape[k];
tensor->add_shape(shape[k]);
}
auto dtype = lod_tensor->type();
if (model_config->_is_lod_fetch[idx]) {
VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
<< model_config->_fetch_name[idx] << " is lod_tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "(logid=" << log_id << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]);
}
} else {
VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] "
<< model_config->_fetch_name[idx] << " is tensor";
for (int k = 0; k < in->at(idx).shape.size(); ++k) {
VLOG(2) << "(logid=" << log_id << ") shape[" << k
<< "]: " << in->at(idx).shape[k];
tensor->add_shape(in->at(idx).shape[k]);
}
}
}
int var_idx = 0;
for (auto &idx : fetch_index) {
int cap = 1;
for (int j = 0; j < in->at(idx).shape.size(); ++j) {
cap *= in->at(idx).shape[j];
}
FetchInst *fetch_p = output->mutable_insts(0);
auto dtype = in->at(idx).dtype;
if (dtype == paddle::PaddleDType::INT64) {
std::vector<int64_t> datas(capacity[i]);
int64_t *data_ptr = datas.data();
lod_tensor->CopyToCpu(data_ptr);
VLOG(2) << "(logid=" << log_id << ") Prepare int64 var ["
<< model_config->_fetch_name[idx] << "].";
int64_t *data_ptr = static_cast<int64_t *>(in->at(idx).data.data());
// from
// https://stackoverflow.com/questions/15499641/copy-a-stdvector-to-a-repeated-field-from-protobuf-with-memcpy
// `Swap` method is faster than `{}` method.
google::protobuf::RepeatedField<int64_t> tmp_data(data_ptr,
data_ptr + capacity[i]);
tensor->mutable_int64_data()->Swap(&tmp_data);
data_ptr + cap);
fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap(
&tmp_data);
} else if (dtype == paddle::PaddleDType::FLOAT32) {
std::vector<float> datas(capacity[i]);
float *data_ptr = datas.data();
lod_tensor->CopyToCpu(data_ptr);
VLOG(2) << "(logid=" << log_id << ") Prepare float var ["
<< model_config->_fetch_name[idx] << "].";
float *data_ptr = static_cast<float *>(in->at(idx).data.data());
google::protobuf::RepeatedField<float> tmp_data(data_ptr,
data_ptr + capacity[i]);
tensor->mutable_float_data()->Swap(&tmp_data);
data_ptr + cap);
fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap(
&tmp_data);
} else if (dtype == paddle::PaddleDType::INT32) {
std::vector<int32_t> datas(capacity[i]);
int32_t *data_ptr = datas.data();
lod_tensor->CopyToCpu(data_ptr);
VLOG(2) << "(logid=" << log_id << ")Prepare int32 var ["
<< model_config->_fetch_name[idx] << "].";
int32_t *data_ptr = static_cast<int32_t *>(in->at(idx).data.data());
google::protobuf::RepeatedField<int32_t> tmp_data(data_ptr,
data_ptr + capacity[i]);
tensor->mutable_int_data()->Swap(&tmp_data);
data_ptr + cap);
fetch_p->mutable_tensor_array(var_idx)->mutable_int_data()->Swap(
&tmp_data);
}
std::vector<std::vector<size_t>> lod = lod_tensor->lod();
if (lod.size() > 0) {
for (int j = 0; j < lod[0].size(); ++j) {
tensor->add_lod(lod[0][j]);
if (model_config->_is_lod_fetch[idx]) {
if (in->at(idx).lod.size() > 0) {
for (int j = 0; j < in->at(idx).lod[0].size(); ++j) {
fetch_p->mutable_tensor_array(var_idx)->add_lod(
in->at(idx).lod[0][j]);
}
}
}
VLOG(2) << "(logid=" << log_id << ") fetch var ["
<< model_config->_fetch_name[idx] << "] ready";
var_idx++;
}
}
if (req->profile_server()) {
int64_t end = timeline.TimeStampUS();
// TODO(barriery): multi-model profile_time.
// At present, only the response_op is multi-input, so here we get
// the profile_time by hard coding. It needs to be replaced with
// a more elegant way.
for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) {
input_blob = get_depend_argument<GeneralBlob>(pre_node_names[pi]);
VLOG(2) << "(logid=" << log_id
<< ") p size for input blob: " << input_blob->p_size;
int profile_time_idx = -1;
if (pi == 0) {
profile_time_idx = 0;
} else {
profile_time_idx = input_blob->p_size - 2;
}
for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) {
res->add_profile_time(input_blob->time_stamp[profile_time_idx]);
}
}
// TODO(guru4elephant): find more elegant way to do this
res->add_profile_time(start);
res->add_profile_time(end);
}
return 0;
}
......
......@@ -20,9 +20,10 @@
#include <utility>
#include <vector>
#include "core/predictor/common/inner_common.h"
#include "core/predictor/framework/bsf.h"
#include "core/predictor/framework/factory.h"
#include "core/predictor/framework/infer_data.h"
#include "paddle_inference_api.h" // NOLINT
//#include "paddle_inference_api.h" // NOLINT
namespace baidu {
namespace paddle_serving {
namespace predictor {
......@@ -119,7 +120,7 @@ class InferEngine {
virtual int thrd_initialize() { return thrd_initialize_impl(); }
virtual int thrd_clear() { return thrd_clear_impl(); }
virtual int thrd_finalize() { return thrd_finalize_impl(); }
virtual int infer() { return infer_impl(); }
virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { return infer_impl1(in, out, batch_size); }
virtual int reload() = 0;
......@@ -132,13 +133,12 @@ class InferEngine {
virtual int thrd_finalize_impl() = 0;
virtual int thrd_clear_impl() = 0;
virtual int proc_finalize_impl() = 0;
virtual std::vector<std::string> GetInputNames() = 0;
virtual std::vector<std::string> GetOutputNames() = 0;
virtual std::unique_ptr<paddle_infer::Tensor> GetInputHandle(
const std::string& name) = 0;
virtual std::unique_ptr<paddle_infer::Tensor> GetOutputHandle(
const std::string& name) = 0;
virtual int infer_impl() = 0;
virtual int infer_impl1(const void* in,
void* out,
uint32_t batch_size = -1) = 0;
virtual int infer_impl2(const BatchTensor& in,
BatchTensor& out) = 0; // NOLINT
// end: framework inner call
};
......@@ -152,6 +152,8 @@ class ReloadableInferEngine : public InferEngine {
uint64_t last_revision;
};
typedef im::bsf::Task<Tensor, Tensor> TaskT;
virtual int load(const InferEngineCreationParams& params) = 0;
int proc_initialize_impl(const configure::EngineDesc& conf, bool version) {
......@@ -221,10 +223,45 @@ class ReloadableInferEngine : public InferEngine {
LOG(ERROR) << "Failed proc initialize impl";
return -1;
}
// init bsf framework
if (_infer_thread_num <= 0) {
return 0;
}
// init bsf framework
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_init_fn(
boost::bind(&InferEngine::thrd_initialize_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_reset_fn(
boost::bind(&InferEngine::thrd_clear_impl, this));
im::bsf::TaskExecutor<TaskT>::instance()->set_thread_callback_fn(
boost::bind(&InferEngine::infer_impl2, this, _1, _2));
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_size(_infer_batch_size);
im::bsf::TaskExecutor<TaskT>::instance()->set_batch_align(
_infer_batch_align);
if (im::bsf::TaskExecutor<TaskT>::instance()->start(_infer_thread_num) !=
0) {
LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num;
return -1;
}
LOG(WARNING) << "Enable batch schedule framework, thread_num:"
<< _infer_thread_num << ", batch_size:" << _infer_batch_size
<< ", enable_batch_align:" << _infer_batch_align;
return 0;
}
int infer() { return infer_impl(); }
int infer(const void* in, void* out, uint32_t batch_size = -1) {
if (_infer_thread_num <= 0) {
return infer_impl1(in, out, batch_size);
}
im::bsf::TaskManager<Tensor, Tensor> task_manager;
task_manager.schedule(*(reinterpret_cast<const BatchTensor*>(in)),
*(reinterpret_cast<BatchTensor*>(out)));
task_manager.wait();
return 0;
}
int thrd_initialize() {
if (_infer_thread_num > 0) {
......@@ -248,6 +285,9 @@ class ReloadableInferEngine : public InferEngine {
return -1;
}
if (_infer_thread_num > 0) {
im::bsf::TaskExecutor<TaskT>::instance()->stop();
}
return 0;
}
......@@ -398,6 +438,10 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
virtual int thrd_initialize_impl() {
// memory pool to be inited in non-serving-threads
if (MempoolWrapper::instance().thread_initialize() != 0) {
LOG(ERROR) << "Failed thread initialize mempool";
return -1;
}
ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
if (!md || load_data(md, _infer_engine_params) != 0) {
......@@ -407,12 +451,17 @@ class DBReloadableInferEngine : public ReloadableInferEngine {
}
THREAD_SETSPECIFIC(_skey, md);
im::bsf::AutoMutex lock(_mutex);
_reload_vec.push_back(md);
return 0;
}
int thrd_clear_impl() {
// for non-serving-threads
if (MempoolWrapper::instance().thread_clear() != 0) {
LOG(ERROR) << "Failed thread clear mempool";
return -1;
}
return 0;
}
......@@ -510,6 +559,12 @@ class CloneDBReloadableInferEngine
}
virtual int thrd_initialize_impl() {
// memory pool to be inited in non-serving-threads
if (MempoolWrapper::instance().thread_initialize() != 0) {
LOG(ERROR) << "Failed thread initialize mempool";
return -1;
}
ModelData<EngineCore>* md = new (std::nothrow) ModelData<EngineCore>;
if (!md || load_data(md, _pd->cores[_pd->current_idx]) != 0) {
LOG(ERROR) << "Failed clone thread data, origin_core["
......@@ -518,6 +573,7 @@ class CloneDBReloadableInferEngine
}
THREAD_SETSPECIFIC(DBReloadableInferEngine<EngineCore>::_skey, md);
im::bsf::AutoMutex lock(DBReloadableInferEngine<EngineCore>::_mutex);
DBReloadableInferEngine<EngineCore>::_reload_vec.push_back(md);
return 0;
}
......@@ -536,58 +592,86 @@ class FluidInferEngine : public CloneDBReloadableInferEngine<FluidFamilyCore> {
public: // NOLINT
FluidInferEngine() {}
~FluidInferEngine() {}
std::vector<std::string> GetInputNames() {
FluidFamilyCore* core =
DBReloadableInferEngine<FluidFamilyCore>::get_core();
if (!core || !core->get()) {
LOG(ERROR) << "Failed get fluid core in GetInputHandle()";
}
return core->GetInputNames();
}
std::vector<std::string> GetOutputNames() {
FluidFamilyCore* core =
DBReloadableInferEngine<FluidFamilyCore>::get_core();
if (!core || !core->get()) {
LOG(ERROR) << "Failed get fluid core in GetInputHandle()";
}
return core->GetOutputNames();
}
std::unique_ptr<paddle_infer::Tensor> GetInputHandle(
const std::string& name) {
FluidFamilyCore* core =
DBReloadableInferEngine<FluidFamilyCore>::get_core();
if (!core || !core->get()) {
LOG(ERROR) << "Failed get fluid core in GetInputHandle()";
}
return core->GetInputHandle(name);
}
std::unique_ptr<paddle_infer::Tensor> GetOutputHandle(
const std::string& name) {
FluidFamilyCore* core =
DBReloadableInferEngine<FluidFamilyCore>::get_core();
if (!core || !core->get()) {
LOG(ERROR) << "Failed get fluid core in GetOutputHandle()";
}
return core->GetOutputHandle(name);
}
int infer_impl() {
FluidFamilyCore* core =
DBReloadableInferEngine<FluidFamilyCore>::get_core();
int infer_impl1(const void* in, void* out, uint32_t batch_size = -1) {
FluidFamilyCore* core =DBReloadableInferEngine<FluidFamilyCore>::get_core();
if (!core || !core->get()) {
LOG(ERROR) << "Failed get fluid core in infer_impl()";
return -1;
}
//set inputHandle
BatchTensor* batchTensor_pointer_in = reinterpret_cast<const BatchTensor*>(in);
for(int i =0; i< batchTensor_pointer_in->count();++i){
Tensor tensor_in_batchTensor = (*batchTensor_pointer_in)[i];
auto lod_tensor_in = core.GetInputHandle(tensor_in_batchTensor.name);
lod_tensor_in->SetLoD(tensor_in_batchTensor.lod);
lod_tensor_in->Reshape(tensor_in_batchTensor.shape);
void* origin_data = tensor_in_batchTensor.data().data();
if(tensor_in_batchTensor.type == FLOAT32){
float* data = reinterpret_cast<float*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}else if(tensor_in_batchTensor.type == INT64){
int64_t* data = reinterpret_cast<int64_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}else if(tensor_in_batchTensor.type == INT32){
int32_t* data = reinterpret_cast<int32_t*>(origin_data);
lod_tensor_in->CopyFromCpu(data);
}
}
if (!core->Run()) {
LOG(ERROR) << "Failed run fluid family core";
return -1;
}
//get out and copy to void* out
BatchTensor* batchTensor_pointer_out = reinterpret_cast<BatchTensor*>(out);
std::vector<std::string> outnames = core.GetOutputNames();
for (int i = 0; i < outnames.size(); ++i){
auto lod_tensor_out = core.GetOutputHandle(outnames[i]);
std::vector<int> output_shape = lod_tensor_out->shape();
int out_num = std::accumulate(output_shape.begin(), output_shape.end(), 1, std::multiplies<int>());
int dataType = lod_tensor_out->type();
void* databuf_data = NULL;
size_t databuf_size = 0;
if(dataType == FLOAT32){
float* data_out = new float[out_num];
lod_tensor_out->CopyToCpu(data_out);
databuf_data = reinterpret_cast<void*>(data_out);
databuf_size = sizeof(float);
}else if (dataType == INT64){
int64_t* data_out = new int64_t[out_num];
lod_tensor_out->CopyToCpu(data_out);
databuf_data = reinterpret_cast<void*>(data_out);
databuf_size = sizeof(int64_t);
}else if (dataType == INT32){
int32_t* data_out = new int32_t[out_num];
lod_tensor_out->CopyToCpu(data_out);
databuf_data = reinterpret_cast<void*>(data_out);
databuf_size = sizeof(int32_t);
}
Tensor tensor_out;
tensor_out.name = outnames[i];
tensor_out.type = dataType;
tensor_out.shape.assign(output_shape.begin(), output_shape.end());
std::vector<std::vector<size_t>> out_lod = lod_tensor_out->lod();
for (int li = 0; li < out_lod.size(); ++li) {
std::vector<size_t> lod_element;
lod_element.assign(out_lod[li].begin(), out_lod[li].end());
tensor_out.lod.push_back(lod_element);
}
tensor_out.data = DataBuf(databuf_data,databuf_size,true);
batchTensor_pointer_out->push_back(tensor_out);
}
return 0;
}
int infer_impl2(const BatchTensor& in, BatchTensor& out) { // NOLINT
return infer_impl1(&in, &out);
}
};
typedef FactoryPool<InferEngine> StaticInferFactory;
......@@ -713,45 +797,13 @@ class VersionedInferEngine : public InferEngine {
return _versions.begin()->second;
}
int infer() {
int infer(const void* in, void* out, uint32_t batch_size) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
return -1;
}
return engine->infer();
}
std::vector<std::string> GetInputNames() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
}
return engine->GetInputNames();
}
std::vector<std::string> GetOutputNames() {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
}
return engine->GetOutputNames();
}
std::unique_ptr<paddle_infer::Tensor> GetInputHandle(
const std::string& name) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
}
return engine->GetInputHandle(name);
}
std::unique_ptr<paddle_infer::Tensor> GetOutputHandle(
const std::string& name) {
InferEngine* engine = default_engine();
if (!engine) {
LOG(WARNING) << "fail to get default engine";
}
return engine->GetOutputHandle(name);
return engine->infer(in, out, batch_size);
}
template <typename T>
......@@ -770,47 +822,14 @@ class VersionedInferEngine : public InferEngine {
}
// versioned inference interface
int infer(uint64_t version) {
int infer(const void* in, void* out, uint32_t batch_size, uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
return -1;
}
return iter->second->infer();
}
std::vector<std::string> GetInputNames(uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
}
return iter->second->GetInputNames();
}
std::vector<std::string> GetOutputNames(uint64_t version) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
}
return iter->second->GetOutputNames();
}
std::unique_ptr<paddle_infer::Tensor> GetInputHandle(
uint64_t version, const std::string& name) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
}
return iter->second->GetInputHandle(name);
}
std::unique_ptr<paddle_infer::Tensor> GetOutputHandle(
uint64_t version, const std::string& name) {
auto iter = _versions.find(version);
if (iter == _versions.end()) {
LOG(ERROR) << "Not found version engine: " << version;
}
return iter->second->GetOutputHandle(name);
return iter->second->infer(in, out, batch_size);
}
template <typename T>
......@@ -837,7 +856,10 @@ class VersionedInferEngine : public InferEngine {
int thrd_finalize_impl() { return -1; }
int thrd_clear_impl() { return -1; }
int proc_finalize_impl() { return -1; }
int infer_impl() { return -1; }
int infer_impl1(const void* in, void* out, uint32_t batch_size = -1) { return -1; }
int infer_impl2(const BatchTensor& in, BatchTensor& out) { // NOLINT
return -1;
} // NOLINT
private:
boost::unordered_map<uint64_t, InferEngine*> _versions;
......@@ -935,44 +957,16 @@ class InferManager {
}
// Inference interface
int infer(const char* model_name) {
int infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size = -1) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer();
}
std::vector<std::string> GetInputNames(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetInputNames();
}
std::vector<std::string> GetOutputNames(const char* model_name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetOutputNames();
}
std::unique_ptr<paddle_infer::Tensor> GetInputHandle(
const char* model_name, const std::string& name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetInputHandle(name);
}
std::unique_ptr<paddle_infer::Tensor> GetOutputHandle(
const char* model_name, const std::string& name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetOutputHandle(name);
return it->second->infer(in, out, batch_size);
}
template <typename T>
......@@ -992,48 +986,19 @@ class InferManager {
}
// Versioned inference interface
int infer(const char* model_name, uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->infer(version);
}
std::vector<std::string> GetInputNames(const char* model_name,
uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetInputNames(version);
}
std::vector<std::string> GetOutputNames(const char* model_name,
int infer(const char* model_name,
const void* in,
void* out,
uint32_t batch_size,
uint64_t version) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
return -1;
}
return it->second->GetOutputNames(version);
return it->second->infer(in, out, batch_size, version);
}
std::unique_ptr<paddle_infer::Tensor> GetInputHandle(
const char* model_name, uint64_t version, const std::string& name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetInputHandle(version, name);
}
std::unique_ptr<paddle_infer::Tensor> GetOutputHandle(
const char* model_name, uint64_t version, const std::string& name) {
auto it = _map.find(model_name);
if (it == _map.end()) {
LOG(WARNING) << "Cannot find engine in map, model name:" << model_name;
}
return it->second->GetOutputHandle(version, name);
}
template <typename T>
T* get_core(const char* model_name, uint64_t version) {
auto it = _map.find(model_name);
......
......@@ -21,7 +21,7 @@ namespace baidu {
namespace paddle_serving {
namespace predictor {
enum DataType { FLOAT32, INT64 };
enum DataType { FLOAT32, INT64,INT32 };
class DataBuf {
public:
......@@ -80,9 +80,12 @@ struct Tensor {
size_t ele_byte() const {
if (type == INT64) {
return sizeof(int64_t);
} else {
} else if(type == FLOAT32) {
return sizeof(float);
} else if(type == INT32){
return sizeof(int32_t);
}
return sizeof(int32_t);
}
bool valid() const {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册