From cd0aca04f5e7294a7d5ae380f0666072fa438e8f Mon Sep 17 00:00:00 2001 From: HexToString <506181616@qq.com> Date: Wed, 10 Mar 2021 11:37:49 +0000 Subject: [PATCH] try to fix 2.0API bug --- core/general-server/op/general_infer_op.cpp | 40 +- core/general-server/op/general_reader_op.cpp | 160 ++++++-- .../general-server/op/general_response_op.cpp | 184 +++++++--- core/predictor/framework/infer.h | 343 ++++++++---------- core/predictor/framework/infer_data.h | 7 +- 5 files changed, 464 insertions(+), 270 deletions(-) mode change 100644 => 100755 core/general-server/op/general_infer_op.cpp mode change 100644 => 100755 core/general-server/op/general_reader_op.cpp mode change 100644 => 100755 core/general-server/op/general_response_op.cpp mode change 100644 => 100755 core/predictor/framework/infer.h mode change 100644 => 100755 core/predictor/framework/infer_data.h diff --git a/core/general-server/op/general_infer_op.cpp b/core/general-server/op/general_infer_op.cpp old mode 100644 new mode 100755 index 5b9df806..84fae0b9 --- a/core/general-server/op/general_infer_op.cpp +++ b/core/general-server/op/general_infer_op.cpp @@ -44,13 +44,49 @@ int GeneralInferOp::inference() { << pre_node_names.size(); return -1; } - if (InferManager::instance().infer(engine_name().c_str())) { + const std::string pre_name = pre_node_names[0]; + + const GeneralBlob *input_blob = get_depend_argument(pre_name); + uint64_t log_id = input_blob->GetLogId(); + VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name; + GeneralBlob *output_blob = mutable_data(); + output_blob->SetLogId(log_id); + + if (!input_blob) { + LOG(ERROR) << "(logid=" << log_id + << ") Failed mutable depended argument, op:" << pre_name; return -1; } + + const TensorVector *in = &input_blob->tensor_vector; + TensorVector *out = &output_blob->tensor_vector; + + int batch_size = input_blob->_batch_size; + VLOG(2) << "(logid=" << log_id << ") input batch size: " << batch_size; + + output_blob->_batch_size = batch_size; + + VLOG(2) << "(logid=" << log_id << ") infer batch size: " << batch_size; + + Timer timeline; + int64_t start = timeline.TimeStampUS(); + timeline.Start(); + + if (InferManager::instance().infer( + engine_name().c_str(), in, out, batch_size)) { + LOG(ERROR) << "(logid=" << log_id + << ") Failed do infer in fluid model: " << engine_name().c_str(); + return -1; + } + + int64_t end = timeline.TimeStampUS(); + CopyBlobInfo(input_blob, output_blob); + AddBlobInfo(output_blob, start); + AddBlobInfo(output_blob, end); return 0; } DEFINE_OP(GeneralInferOp); } // namespace serving } // namespace paddle_serving -} // namespace baidu +} // namespace baidu \ No newline at end of file diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp old mode 100644 new mode 100755 index b46071a3..6f8947db --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -20,7 +20,6 @@ #include "core/general-server/op/general_infer_helper.h" #include "core/predictor/framework/infer.h" #include "core/predictor/framework/memory.h" -#include "core/predictor/framework/resource.h" #include "core/util/include/timer.h" namespace baidu { @@ -33,7 +32,6 @@ using baidu::paddle_serving::predictor::general_model::Tensor; using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::FeedInst; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; -using baidu::paddle_serving::predictor::InferManager; int conf_check(const Request *req, const std::shared_ptr &model_config) { @@ -73,34 +71,75 @@ int conf_check(const Request *req, int GeneralReaderOp::inference() { // reade request from client - // TODO: only support one engine here - std::string engine_name = "general_infer_0"; const Request *req = dynamic_cast(get_request_message()); uint64_t log_id = req->log_id(); int input_var_num = 0; std::vector elem_type; std::vector elem_size; std::vector capacity; + + GeneralBlob *res = mutable_data(); + TensorVector *out = &res->tensor_vector; + + res->SetLogId(log_id); + + if (!res) { + LOG(ERROR) << "(logid=" << log_id + << ") Failed get op tls reader object output"; + } + + Timer timeline; + int64_t start = timeline.TimeStampUS(); int var_num = req->insts(0).tensor_array_size(); + VLOG(2) << "(logid=" << log_id << ") var num: " << var_num; + + VLOG(2) << "(logid=" << log_id + << ") start to call load general model_conf op"; + baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource::instance(); + + VLOG(2) << "(logid=" << log_id << ") get resource pointer done."; std::shared_ptr model_config = resource.get_general_model_config(); + + VLOG(2) << "(logid=" << log_id << ") print general model config done."; + + // TODO(guru4elephant): how to do conditional check? + /* + int ret = conf_check(req, model_config); + if (ret != 0) { + LOG(ERROR) << "model conf of server:"; + resource.print_general_model_config(model_config); + return 0; + } + */ + // package tensor + elem_type.resize(var_num); elem_size.resize(var_num); capacity.resize(var_num); + // prepare basic information for input for (int i = 0; i < var_num; ++i) { - std::string tensor_name = model_config->_feed_name[i]; - VLOG(2) << "(logid=" << log_id << ") get tensor name: " << tensor_name; - auto lod_tensor = InferManager::instance().GetInputHandle( - engine_name.c_str(), tensor_name.c_str()); - std::vector> lod; - std::vector shape; - // get lod info here + paddle::PaddleTensor lod_tensor; + elem_type[i] = req->insts(0).tensor_array(i).elem_type(); + VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i]; + if (elem_type[i] == 0) { // int64 + elem_size[i] = sizeof(int64_t); + lod_tensor.dtype = paddle::PaddleDType::INT64; + } else if (elem_type[i] == 1) { + elem_size[i] = sizeof(float); + lod_tensor.dtype = paddle::PaddleDType::FLOAT32; + } else if (elem_type[i] == 2) { + elem_size[i] = sizeof(int32_t); + lod_tensor.dtype = paddle::PaddleDType::INT32; + } + // implement lod tensor here if (req->insts(0).tensor_array(i).lod_size() > 0) { - lod.resize(1); + VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is lod_tensor"; + lod_tensor.lod.resize(1); for (int k = 0; k < req->insts(0).tensor_array(i).lod_size(); ++k) { - lod[0].push_back(req->insts(0).tensor_array(i).lod(k)); + lod_tensor.lod[0].push_back(req->insts(0).tensor_array(i).lod(k)); } capacity[i] = 1; for (int k = 0; k < req->insts(0).tensor_array(i).shape_size(); ++k) { @@ -108,7 +147,7 @@ int GeneralReaderOp::inference() { VLOG(2) << "(logid=" << log_id << ") shape for var[" << i << "]: " << dim; capacity[i] *= dim; - shape.push_back(dim); + lod_tensor.shape.push_back(dim); } VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is tensor, capacity: " << capacity[i]; @@ -119,44 +158,95 @@ int GeneralReaderOp::inference() { VLOG(2) << "(logid=" << log_id << ") shape for var[" << i << "]: " << dim; capacity[i] *= dim; - shape.push_back(dim); + lod_tensor.shape.push_back(dim); } VLOG(2) << "(logid=" << log_id << ") var[" << i << "] is tensor, capacity: " << capacity[i]; } - lod_tensor->SetLoD(lod); - lod_tensor->Reshape(shape); - // insert data here - if (req->insts(0).tensor_array(i).elem_type() == 0) { - // TODO: Copy twice here, can optimize + lod_tensor.name = model_config->_feed_name[i]; + out->push_back(lod_tensor); + } + // specify the memory needed for output tensor_vector + for (int i = 0; i < var_num; ++i) { + if (out->at(i).lod.size() == 1) { + int tensor_size = 0; + const Tensor &tensor = req->insts(0).tensor_array(i); + int data_len = 0; + if (tensor.int64_data_size() > 0) { + data_len = tensor.int64_data_size(); + } else if (tensor.float_data_size() > 0) { + data_len = tensor.float_data_size(); + } else if (tensor.int_data_size() > 0) { + data_len = tensor.int_data_size(); + } + VLOG(2) << "(logid=" << log_id << ") tensor size for var[" << i + << "]: " << data_len; + tensor_size += data_len; + + int cur_len = out->at(i).lod[0].back(); + VLOG(2) << "(logid=" << log_id << ") current len: " << cur_len; + + int sample_len = 0; + if (tensor.shape_size() == 1) { + sample_len = data_len; + } else { + sample_len = tensor.shape(0); + } + VLOG(2) << "(logid=" << log_id << ") new len: " << cur_len + sample_len; + out->at(i).data.Resize(tensor_size * elem_size[i]); + VLOG(2) << "(logid=" << log_id << ") var[" << i + << "] is lod_tensor and len=" << out->at(i).lod[0].back(); + } else { + out->at(i).data.Resize(capacity[i] * elem_size[i]); + VLOG(2) << "(logid=" << log_id << ") var[" << i + << "] is tensor and capacity=" << capacity[i]; + } + } + + // fill the data into output general_blob + for (int i = 0; i < var_num; ++i) { + if (elem_type[i] == 0) { + int64_t *dst_ptr = static_cast(out->at(i).data.data()); + VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i + << "] is " << req->insts(0).tensor_array(i).int64_data(0); + int offset = 0; int elem_num = req->insts(0).tensor_array(i).int64_data_size(); - std::vector data(elem_num); - int64_t *dst_ptr = data.data(); for (int k = 0; k < elem_num; ++k) { - dst_ptr[k] = req->insts(0).tensor_array(i).int64_data(k); + dst_ptr[offset + k] = req->insts(0).tensor_array(i).int64_data(k); } - lod_tensor->CopyFromCpu(dst_ptr); - } else if (req->insts(0).tensor_array(i).elem_type() == 1) { + } else if (elem_type[i] == 1) { + float *dst_ptr = static_cast(out->at(i).data.data()); + VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i + << "] is " << req->insts(0).tensor_array(i).float_data(0); + int offset = 0; int elem_num = req->insts(0).tensor_array(i).float_data_size(); - std::vector data(elem_num); - float *dst_ptr = data.data(); for (int k = 0; k < elem_num; ++k) { - dst_ptr[k] = req->insts(0).tensor_array(i).float_data(k); + dst_ptr[offset + k] = req->insts(0).tensor_array(i).float_data(k); } - lod_tensor->CopyFromCpu(dst_ptr); - } else if (req->insts(0).tensor_array(i).elem_type() == 2) { + } else if (elem_type[i] == 2) { + int32_t *dst_ptr = static_cast(out->at(i).data.data()); + VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i + << "] is " << req->insts(0).tensor_array(i).int_data(0); + int offset = 0; int elem_num = req->insts(0).tensor_array(i).int_data_size(); - std::vector data(elem_num); - int32_t *dst_ptr = data.data(); for (int k = 0; k < elem_num; ++k) { - dst_ptr[k] = req->insts(0).tensor_array(i).int_data(k); + dst_ptr[offset + k] = req->insts(0).tensor_array(i).int_data(k); } - lod_tensor->CopyFromCpu(dst_ptr); } } + + VLOG(2) << "(logid=" << log_id << ") output size: " << out->size(); + timeline.Pause(); + int64_t end = timeline.TimeStampUS(); + res->p_size = 0; + res->_batch_size = 1; + AddBlobInfo(res, start); + AddBlobInfo(res, end); + + VLOG(2) << "(logid=" << log_id << ") read data from client success"; return 0; } DEFINE_OP(GeneralReaderOp); } // namespace serving } // namespace paddle_serving -} // namespace baidu +} // namespace baidu \ No newline at end of file diff --git a/core/general-server/op/general_response_op.cpp b/core/general-server/op/general_response_op.cpp old mode 100644 new mode 100755 index dbc24c4c..e424ada5 --- a/core/general-server/op/general_response_op.cpp +++ b/core/general-server/op/general_response_op.cpp @@ -40,60 +40,160 @@ using baidu::paddle_serving::predictor::InferManager; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; int GeneralResponseOp::inference() { + const std::vector pre_node_names = pre_names(); + VLOG(2) << "pre node names size: " << pre_node_names.size(); + const GeneralBlob *input_blob; + uint64_t log_id = + get_depend_argument(pre_node_names[0])->GetLogId(); + const Request *req = dynamic_cast(get_request_message()); // response inst with only fetch_var_names Response *res = mutable_data(); + + Timer timeline; + // double response_time = 0.0; + // timeline.Start(); + int64_t start = timeline.TimeStampUS(); + + VLOG(2) << "(logid=" << log_id + << ") start to call load general model_conf op"; baidu::paddle_serving::predictor::Resource &resource = baidu::paddle_serving::predictor::Resource::instance(); + + VLOG(2) << "(logid=" << log_id << ") get resource pointer done."; std::shared_ptr model_config = resource.get_general_model_config(); - std::vector capacity(req->fetch_var_names_size(), 1); - std::string engine_name = "general_infer_0"; - ModelOutput *output = res->add_outputs(); - FetchInst *fetch_inst = output->add_insts(); - FetchInst *fetch_p = output->mutable_insts(0); - std::vector outs = - InferManager::instance().GetOutputNames(engine_name.c_str()); + + VLOG(2) << "(logid=" << log_id + << ") max body size : " << brpc::fLU64::FLAGS_max_body_size; + + std::vector fetch_index; + fetch_index.resize(req->fetch_var_names_size()); for (int i = 0; i < req->fetch_var_names_size(); ++i) { - Tensor *tensor = fetch_inst->add_tensor_array(); - std::string tensor_name = outs[i]; - auto lod_tensor = InferManager::instance().GetOutputHandle( - engine_name.c_str(), tensor_name.c_str()); - std::vector shape = lod_tensor->shape(); - for (int k = 0; k < shape.size(); ++k) { - capacity[i] *= shape[k]; - tensor->add_shape(shape[k]); + fetch_index[i] = + model_config->_fetch_alias_name_to_index[req->fetch_var_names(i)]; + } + + for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { + const std::string &pre_name = pre_node_names[pi]; + VLOG(2) << "(logid=" << log_id << ") pre names[" << pi << "]: " << pre_name + << " (" << pre_node_names.size() << ")"; + input_blob = get_depend_argument(pre_name); + // fprintf(stderr, "input(%s) blob address %x\n", pre_names.c_str(), + // input_blob); + if (!input_blob) { + LOG(ERROR) << "(logid=" << log_id + << ") Failed mutable depended argument, op: " << pre_name; + return -1; } - auto dtype = lod_tensor->type(); - if (dtype == paddle::PaddleDType::INT64) { - std::vector datas(capacity[i]); - int64_t *data_ptr = datas.data(); - lod_tensor->CopyToCpu(data_ptr); - google::protobuf::RepeatedField tmp_data(data_ptr, - data_ptr + capacity[i]); - tensor->mutable_int64_data()->Swap(&tmp_data); - } else if (dtype == paddle::PaddleDType::FLOAT32) { - std::vector datas(capacity[i]); - float *data_ptr = datas.data(); - lod_tensor->CopyToCpu(data_ptr); - google::protobuf::RepeatedField tmp_data(data_ptr, - data_ptr + capacity[i]); - tensor->mutable_float_data()->Swap(&tmp_data); - } else if (dtype == paddle::PaddleDType::INT32) { - std::vector datas(capacity[i]); - int32_t *data_ptr = datas.data(); - lod_tensor->CopyToCpu(data_ptr); - google::protobuf::RepeatedField tmp_data(data_ptr, - data_ptr + capacity[i]); - tensor->mutable_int_data()->Swap(&tmp_data); + + const TensorVector *in = &input_blob->tensor_vector; + + ModelOutput *output = res->add_outputs(); + // To get the order of model return values + output->set_engine_name(pre_name); + FetchInst *fetch_inst = output->add_insts(); + + for (auto &idx : fetch_index) { + Tensor *tensor = fetch_inst->add_tensor_array(); + if (model_config->_is_lod_fetch[idx]) { + VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] " + << model_config->_fetch_name[idx] << " is lod_tensor"; + for (int k = 0; k < in->at(idx).shape.size(); ++k) { + VLOG(2) << "(logid=" << log_id << ") shape[" << k + << "]: " << in->at(idx).shape[k]; + tensor->add_shape(in->at(idx).shape[k]); + } + } else { + VLOG(2) << "(logid=" << log_id << ") out[" << idx << "] " + << model_config->_fetch_name[idx] << " is tensor"; + for (int k = 0; k < in->at(idx).shape.size(); ++k) { + VLOG(2) << "(logid=" << log_id << ") shape[" << k + << "]: " << in->at(idx).shape[k]; + tensor->add_shape(in->at(idx).shape[k]); + } + } } - std::vector> lod = lod_tensor->lod(); - if (lod.size() > 0) { - for (int j = 0; j < lod[0].size(); ++j) { - tensor->add_lod(lod[0][j]); + + int var_idx = 0; + for (auto &idx : fetch_index) { + int cap = 1; + for (int j = 0; j < in->at(idx).shape.size(); ++j) { + cap *= in->at(idx).shape[j]; } + + FetchInst *fetch_p = output->mutable_insts(0); + auto dtype = in->at(idx).dtype; + + if (dtype == paddle::PaddleDType::INT64) { + VLOG(2) << "(logid=" << log_id << ") Prepare int64 var [" + << model_config->_fetch_name[idx] << "]."; + int64_t *data_ptr = static_cast(in->at(idx).data.data()); + // from + // https://stackoverflow.com/questions/15499641/copy-a-stdvector-to-a-repeated-field-from-protobuf-with-memcpy + // `Swap` method is faster than `{}` method. + google::protobuf::RepeatedField tmp_data(data_ptr, + data_ptr + cap); + fetch_p->mutable_tensor_array(var_idx)->mutable_int64_data()->Swap( + &tmp_data); + } else if (dtype == paddle::PaddleDType::FLOAT32) { + VLOG(2) << "(logid=" << log_id << ") Prepare float var [" + << model_config->_fetch_name[idx] << "]."; + float *data_ptr = static_cast(in->at(idx).data.data()); + google::protobuf::RepeatedField tmp_data(data_ptr, + data_ptr + cap); + fetch_p->mutable_tensor_array(var_idx)->mutable_float_data()->Swap( + &tmp_data); + } else if (dtype == paddle::PaddleDType::INT32) { + VLOG(2) << "(logid=" << log_id << ")Prepare int32 var [" + << model_config->_fetch_name[idx] << "]."; + int32_t *data_ptr = static_cast(in->at(idx).data.data()); + google::protobuf::RepeatedField tmp_data(data_ptr, + data_ptr + cap); + fetch_p->mutable_tensor_array(var_idx)->mutable_int_data()->Swap( + &tmp_data); + } + + if (model_config->_is_lod_fetch[idx]) { + if (in->at(idx).lod.size() > 0) { + for (int j = 0; j < in->at(idx).lod[0].size(); ++j) { + fetch_p->mutable_tensor_array(var_idx)->add_lod( + in->at(idx).lod[0][j]); + } + } + } + + VLOG(2) << "(logid=" << log_id << ") fetch var [" + << model_config->_fetch_name[idx] << "] ready"; + var_idx++; } } + + if (req->profile_server()) { + int64_t end = timeline.TimeStampUS(); + // TODO(barriery): multi-model profile_time. + // At present, only the response_op is multi-input, so here we get + // the profile_time by hard coding. It needs to be replaced with + // a more elegant way. + for (uint32_t pi = 0; pi < pre_node_names.size(); ++pi) { + input_blob = get_depend_argument(pre_node_names[pi]); + VLOG(2) << "(logid=" << log_id + << ") p size for input blob: " << input_blob->p_size; + int profile_time_idx = -1; + if (pi == 0) { + profile_time_idx = 0; + } else { + profile_time_idx = input_blob->p_size - 2; + } + for (; profile_time_idx < input_blob->p_size; ++profile_time_idx) { + res->add_profile_time(input_blob->time_stamp[profile_time_idx]); + } + } + // TODO(guru4elephant): find more elegant way to do this + res->add_profile_time(start); + res->add_profile_time(end); + } + return 0; } @@ -101,4 +201,4 @@ DEFINE_OP(GeneralResponseOp); } // namespace serving } // namespace paddle_serving -} // namespace baidu +} // namespace baidu \ No newline at end of file diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h old mode 100644 new mode 100755 index ba0c18e0..6dd9e3d8 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -20,9 +20,10 @@ #include #include #include "core/predictor/common/inner_common.h" +#include "core/predictor/framework/bsf.h" #include "core/predictor/framework/factory.h" #include "core/predictor/framework/infer_data.h" -#include "paddle_inference_api.h" // NOLINT +//#include "paddle_inference_api.h" // NOLINT namespace baidu { namespace paddle_serving { namespace predictor { @@ -119,7 +120,7 @@ class InferEngine { virtual int thrd_initialize() { return thrd_initialize_impl(); } virtual int thrd_clear() { return thrd_clear_impl(); } virtual int thrd_finalize() { return thrd_finalize_impl(); } - virtual int infer() { return infer_impl(); } + virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { return infer_impl1(in, out, batch_size); } virtual int reload() = 0; @@ -132,13 +133,12 @@ class InferEngine { virtual int thrd_finalize_impl() = 0; virtual int thrd_clear_impl() = 0; virtual int proc_finalize_impl() = 0; - virtual std::vector GetInputNames() = 0; - virtual std::vector GetOutputNames() = 0; - virtual std::unique_ptr GetInputHandle( - const std::string& name) = 0; - virtual std::unique_ptr GetOutputHandle( - const std::string& name) = 0; - virtual int infer_impl() = 0; + virtual int infer_impl1(const void* in, + void* out, + uint32_t batch_size = -1) = 0; + virtual int infer_impl2(const BatchTensor& in, + BatchTensor& out) = 0; // NOLINT + // end: framework inner call }; @@ -152,6 +152,8 @@ class ReloadableInferEngine : public InferEngine { uint64_t last_revision; }; + typedef im::bsf::Task TaskT; + virtual int load(const InferEngineCreationParams& params) = 0; int proc_initialize_impl(const configure::EngineDesc& conf, bool version) { @@ -221,10 +223,45 @@ class ReloadableInferEngine : public InferEngine { LOG(ERROR) << "Failed proc initialize impl"; return -1; } + + // init bsf framework + if (_infer_thread_num <= 0) { + return 0; + } + + // init bsf framework + im::bsf::TaskExecutor::instance()->set_thread_init_fn( + boost::bind(&InferEngine::thrd_initialize_impl, this)); + im::bsf::TaskExecutor::instance()->set_thread_reset_fn( + boost::bind(&InferEngine::thrd_clear_impl, this)); + im::bsf::TaskExecutor::instance()->set_thread_callback_fn( + boost::bind(&InferEngine::infer_impl2, this, _1, _2)); + im::bsf::TaskExecutor::instance()->set_batch_size(_infer_batch_size); + im::bsf::TaskExecutor::instance()->set_batch_align( + _infer_batch_align); + if (im::bsf::TaskExecutor::instance()->start(_infer_thread_num) != + 0) { + LOG(ERROR) << "Failed start bsf executor, threads:" << _infer_thread_num; + return -1; + } + + LOG(WARNING) << "Enable batch schedule framework, thread_num:" + << _infer_thread_num << ", batch_size:" << _infer_batch_size + << ", enable_batch_align:" << _infer_batch_align; return 0; } - int infer() { return infer_impl(); } + int infer(const void* in, void* out, uint32_t batch_size = -1) { + if (_infer_thread_num <= 0) { + return infer_impl1(in, out, batch_size); + } + + im::bsf::TaskManager task_manager; + task_manager.schedule(*(reinterpret_cast(in)), + *(reinterpret_cast(out))); + task_manager.wait(); + return 0; + } int thrd_initialize() { if (_infer_thread_num > 0) { @@ -248,6 +285,9 @@ class ReloadableInferEngine : public InferEngine { return -1; } + if (_infer_thread_num > 0) { + im::bsf::TaskExecutor::instance()->stop(); + } return 0; } @@ -398,6 +438,10 @@ class DBReloadableInferEngine : public ReloadableInferEngine { virtual int thrd_initialize_impl() { // memory pool to be inited in non-serving-threads + if (MempoolWrapper::instance().thread_initialize() != 0) { + LOG(ERROR) << "Failed thread initialize mempool"; + return -1; + } ModelData* md = new (std::nothrow) ModelData; if (!md || load_data(md, _infer_engine_params) != 0) { @@ -407,12 +451,17 @@ class DBReloadableInferEngine : public ReloadableInferEngine { } THREAD_SETSPECIFIC(_skey, md); + im::bsf::AutoMutex lock(_mutex); _reload_vec.push_back(md); return 0; } int thrd_clear_impl() { // for non-serving-threads + if (MempoolWrapper::instance().thread_clear() != 0) { + LOG(ERROR) << "Failed thread clear mempool"; + return -1; + } return 0; } @@ -510,6 +559,12 @@ class CloneDBReloadableInferEngine } virtual int thrd_initialize_impl() { + // memory pool to be inited in non-serving-threads + if (MempoolWrapper::instance().thread_initialize() != 0) { + LOG(ERROR) << "Failed thread initialize mempool"; + return -1; + } + ModelData* md = new (std::nothrow) ModelData; if (!md || load_data(md, _pd->cores[_pd->current_idx]) != 0) { LOG(ERROR) << "Failed clone thread data, origin_core[" @@ -518,6 +573,7 @@ class CloneDBReloadableInferEngine } THREAD_SETSPECIFIC(DBReloadableInferEngine::_skey, md); + im::bsf::AutoMutex lock(DBReloadableInferEngine::_mutex); DBReloadableInferEngine::_reload_vec.push_back(md); return 0; } @@ -536,58 +592,86 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { public: // NOLINT FluidInferEngine() {} ~FluidInferEngine() {} - std::vector GetInputNames() { - FluidFamilyCore* core = - DBReloadableInferEngine::get_core(); - if (!core || !core->get()) { - LOG(ERROR) << "Failed get fluid core in GetInputHandle()"; - } - return core->GetInputNames(); - } - std::vector GetOutputNames() { - FluidFamilyCore* core = - DBReloadableInferEngine::get_core(); + int infer_impl1(const void* in, void* out, uint32_t batch_size = -1) { + FluidFamilyCore* core =DBReloadableInferEngine::get_core(); if (!core || !core->get()) { - LOG(ERROR) << "Failed get fluid core in GetInputHandle()"; + LOG(ERROR) << "Failed get fluid core in infer_impl()"; + return -1; } - return core->GetOutputNames(); - } - std::unique_ptr GetInputHandle( - const std::string& name) { - FluidFamilyCore* core = - DBReloadableInferEngine::get_core(); - if (!core || !core->get()) { - LOG(ERROR) << "Failed get fluid core in GetInputHandle()"; + //set inputHandle + BatchTensor* batchTensor_pointer_in = reinterpret_cast(in); + for(int i =0; i< batchTensor_pointer_in->count();++i){ + Tensor tensor_in_batchTensor = (*batchTensor_pointer_in)[i]; + auto lod_tensor_in = core.GetInputHandle(tensor_in_batchTensor.name); + lod_tensor_in->SetLoD(tensor_in_batchTensor.lod); + lod_tensor_in->Reshape(tensor_in_batchTensor.shape); + void* origin_data = tensor_in_batchTensor.data().data(); + if(tensor_in_batchTensor.type == FLOAT32){ + float* data = reinterpret_cast(origin_data); + lod_tensor_in->CopyFromCpu(data); + }else if(tensor_in_batchTensor.type == INT64){ + int64_t* data = reinterpret_cast(origin_data); + lod_tensor_in->CopyFromCpu(data); + }else if(tensor_in_batchTensor.type == INT32){ + int32_t* data = reinterpret_cast(origin_data); + lod_tensor_in->CopyFromCpu(data); + } } - return core->GetInputHandle(name); - } - - std::unique_ptr GetOutputHandle( - const std::string& name) { - FluidFamilyCore* core = - DBReloadableInferEngine::get_core(); - if (!core || !core->get()) { - LOG(ERROR) << "Failed get fluid core in GetOutputHandle()"; + if (!core->Run()) { + LOG(ERROR) << "Failed run fluid family core"; + return -1; } - return core->GetOutputHandle(name); - } - int infer_impl() { - FluidFamilyCore* core = - DBReloadableInferEngine::get_core(); - if (!core || !core->get()) { - LOG(ERROR) << "Failed get fluid core in infer_impl()"; - return -1; - } + //get out and copy to void* out + BatchTensor* batchTensor_pointer_out = reinterpret_cast(out); + std::vector outnames = core.GetOutputNames(); + for (int i = 0; i < outnames.size(); ++i){ + auto lod_tensor_out = core.GetOutputHandle(outnames[i]); + std::vector output_shape = lod_tensor_out->shape(); + int out_num = std::accumulate(output_shape.begin(), output_shape.end(), 1, std::multiplies()); + int dataType = lod_tensor_out->type(); + void* databuf_data = NULL; + size_t databuf_size = 0; + if(dataType == FLOAT32){ + float* data_out = new float[out_num]; + lod_tensor_out->CopyToCpu(data_out); + databuf_data = reinterpret_cast(data_out); + databuf_size = sizeof(float); + }else if (dataType == INT64){ + int64_t* data_out = new int64_t[out_num]; + lod_tensor_out->CopyToCpu(data_out); + databuf_data = reinterpret_cast(data_out); + databuf_size = sizeof(int64_t); + }else if (dataType == INT32){ + int32_t* data_out = new int32_t[out_num]; + lod_tensor_out->CopyToCpu(data_out); + databuf_data = reinterpret_cast(data_out); + databuf_size = sizeof(int32_t); + } + Tensor tensor_out; + tensor_out.name = outnames[i]; + tensor_out.type = dataType; + tensor_out.shape.assign(output_shape.begin(), output_shape.end()); + std::vector> out_lod = lod_tensor_out->lod(); + for (int li = 0; li < out_lod.size(); ++li) { + std::vector lod_element; + lod_element.assign(out_lod[li].begin(), out_lod[li].end()); + tensor_out.lod.push_back(lod_element); + } + tensor_out.data = DataBuf(databuf_data,databuf_size,true); - if (!core->Run()) { - LOG(ERROR) << "Failed run fluid family core"; - return -1; + batchTensor_pointer_out->push_back(tensor_out); } return 0; } + + int infer_impl2(const BatchTensor& in, BatchTensor& out) { // NOLINT + return infer_impl1(&in, &out); + } + + }; typedef FactoryPool StaticInferFactory; @@ -713,45 +797,13 @@ class VersionedInferEngine : public InferEngine { return _versions.begin()->second; } - int infer() { + int infer(const void* in, void* out, uint32_t batch_size) { InferEngine* engine = default_engine(); if (!engine) { LOG(WARNING) << "fail to get default engine"; return -1; } - return engine->infer(); - } - - std::vector GetInputNames() { - InferEngine* engine = default_engine(); - if (!engine) { - LOG(WARNING) << "fail to get default engine"; - } - return engine->GetInputNames(); - } - std::vector GetOutputNames() { - InferEngine* engine = default_engine(); - if (!engine) { - LOG(WARNING) << "fail to get default engine"; - } - return engine->GetOutputNames(); - } - std::unique_ptr GetInputHandle( - const std::string& name) { - InferEngine* engine = default_engine(); - if (!engine) { - LOG(WARNING) << "fail to get default engine"; - } - return engine->GetInputHandle(name); - } - - std::unique_ptr GetOutputHandle( - const std::string& name) { - InferEngine* engine = default_engine(); - if (!engine) { - LOG(WARNING) << "fail to get default engine"; - } - return engine->GetOutputHandle(name); + return engine->infer(in, out, batch_size); } template @@ -770,47 +822,14 @@ class VersionedInferEngine : public InferEngine { } // versioned inference interface - int infer(uint64_t version) { + int infer(const void* in, void* out, uint32_t batch_size, uint64_t version) { auto iter = _versions.find(version); if (iter == _versions.end()) { LOG(ERROR) << "Not found version engine: " << version; return -1; } - return iter->second->infer(); - } - std::vector GetInputNames(uint64_t version) { - auto iter = _versions.find(version); - if (iter == _versions.end()) { - LOG(ERROR) << "Not found version engine: " << version; - } - return iter->second->GetInputNames(); - } - - std::vector GetOutputNames(uint64_t version) { - auto iter = _versions.find(version); - if (iter == _versions.end()) { - LOG(ERROR) << "Not found version engine: " << version; - } - return iter->second->GetOutputNames(); - } - - std::unique_ptr GetInputHandle( - uint64_t version, const std::string& name) { - auto iter = _versions.find(version); - if (iter == _versions.end()) { - LOG(ERROR) << "Not found version engine: " << version; - } - return iter->second->GetInputHandle(name); - } - - std::unique_ptr GetOutputHandle( - uint64_t version, const std::string& name) { - auto iter = _versions.find(version); - if (iter == _versions.end()) { - LOG(ERROR) << "Not found version engine: " << version; - } - return iter->second->GetOutputHandle(name); + return iter->second->infer(in, out, batch_size); } template @@ -837,7 +856,10 @@ class VersionedInferEngine : public InferEngine { int thrd_finalize_impl() { return -1; } int thrd_clear_impl() { return -1; } int proc_finalize_impl() { return -1; } - int infer_impl() { return -1; } + int infer_impl1(const void* in, void* out, uint32_t batch_size = -1) { return -1; } + int infer_impl2(const BatchTensor& in, BatchTensor& out) { // NOLINT + return -1; + } // NOLINT private: boost::unordered_map _versions; @@ -935,44 +957,16 @@ class InferManager { } // Inference interface - int infer(const char* model_name) { + int infer(const char* model_name, + const void* in, + void* out, + uint32_t batch_size = -1) { auto it = _map.find(model_name); if (it == _map.end()) { LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; return -1; } - return it->second->infer(); - } - - std::vector GetInputNames(const char* model_name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetInputNames(); - } - std::vector GetOutputNames(const char* model_name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetOutputNames(); - } - std::unique_ptr GetInputHandle( - const char* model_name, const std::string& name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetInputHandle(name); - } - std::unique_ptr GetOutputHandle( - const char* model_name, const std::string& name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetOutputHandle(name); + return it->second->infer(in, out, batch_size); } template @@ -992,48 +986,19 @@ class InferManager { } // Versioned inference interface - int infer(const char* model_name, uint64_t version) { + int infer(const char* model_name, + const void* in, + void* out, + uint32_t batch_size, + uint64_t version) { auto it = _map.find(model_name); if (it == _map.end()) { LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; return -1; } - return it->second->infer(version); - } - std::vector GetInputNames(const char* model_name, - uint64_t version) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetInputNames(version); + return it->second->infer(in, out, batch_size, version); } - std::vector GetOutputNames(const char* model_name, - uint64_t version) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetOutputNames(version); - } - - std::unique_ptr GetInputHandle( - const char* model_name, uint64_t version, const std::string& name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetInputHandle(version, name); - } - std::unique_ptr GetOutputHandle( - const char* model_name, uint64_t version, const std::string& name) { - auto it = _map.find(model_name); - if (it == _map.end()) { - LOG(WARNING) << "Cannot find engine in map, model name:" << model_name; - } - return it->second->GetOutputHandle(version, name); - } template T* get_core(const char* model_name, uint64_t version) { auto it = _map.find(model_name); diff --git a/core/predictor/framework/infer_data.h b/core/predictor/framework/infer_data.h old mode 100644 new mode 100755 index 1ad62ce6..93791204 --- a/core/predictor/framework/infer_data.h +++ b/core/predictor/framework/infer_data.h @@ -21,7 +21,7 @@ namespace baidu { namespace paddle_serving { namespace predictor { -enum DataType { FLOAT32, INT64 }; +enum DataType { FLOAT32, INT64,INT32 }; class DataBuf { public: @@ -80,9 +80,12 @@ struct Tensor { size_t ele_byte() const { if (type == INT64) { return sizeof(int64_t); - } else { + } else if(type == FLOAT32) { return sizeof(float); + } else if(type == INT32){ + return sizeof(int32_t); } + return sizeof(int32_t); } bool valid() const { -- GitLab