diff --git a/core/general-server/op/general_infer_op.cpp b/core/general-server/op/general_infer_op.cpp index 84fae0b99805db6a0ee118966d0f6c26ae3d7b70..c3ca099bf9be0b90f0a6b7f2ceefb3473615467c 100755 --- a/core/general-server/op/general_infer_op.cpp +++ b/core/general-server/op/general_infer_op.cpp @@ -47,9 +47,18 @@ int GeneralInferOp::inference() { const std::string pre_name = pre_node_names[0]; const GeneralBlob *input_blob = get_depend_argument(pre_name); + if(!input_blob){ + LOG(ERROR) << "input_blob is nullptr,error"; + return -1; + } uint64_t log_id = input_blob->GetLogId(); VLOG(2) << "(logid=" << log_id << ") Get precedent op name: " << pre_name; + GeneralBlob *output_blob = mutable_data(); + if(!output_blob){ + LOG(ERROR) << "output_blob is nullptr,error"; + return -1; + } output_blob->SetLogId(log_id); if (!input_blob) { diff --git a/core/general-server/op/general_reader_op.cpp b/core/general-server/op/general_reader_op.cpp index 6f8947db166db8a348a96f4a9bee422012525ad6..135ae63007135116310f3fe24bceaa32834a3457 100755 --- a/core/general-server/op/general_reader_op.cpp +++ b/core/general-server/op/general_reader_op.cpp @@ -32,7 +32,7 @@ using baidu::paddle_serving::predictor::general_model::Tensor; using baidu::paddle_serving::predictor::general_model::Request; using baidu::paddle_serving::predictor::general_model::FeedInst; using baidu::paddle_serving::predictor::PaddleGeneralModelConfig; - +enum ProtoDataType { P_INT64,P_FLOAT32,P_INT32 }; int conf_check(const Request *req, const std::shared_ptr &model_config) { int var_num = req->insts(0).tensor_array_size(); @@ -80,7 +80,10 @@ int GeneralReaderOp::inference() { GeneralBlob *res = mutable_data(); TensorVector *out = &res->tensor_vector; - + if(!res){ + LOG(ERROR) << "res is nullptr,error"; + return -1; + } res->SetLogId(log_id); if (!res) { @@ -124,13 +127,13 @@ int GeneralReaderOp::inference() { paddle::PaddleTensor lod_tensor; elem_type[i] = req->insts(0).tensor_array(i).elem_type(); VLOG(2) << "var[" << i << "] has elem type: " << elem_type[i]; - if (elem_type[i] == 0) { // int64 + if (elem_type[i] == P_INT64) { // int64 elem_size[i] = sizeof(int64_t); lod_tensor.dtype = paddle::PaddleDType::INT64; - } else if (elem_type[i] == 1) { + } else if (elem_type[i] == P_FLOAT32) { elem_size[i] = sizeof(float); lod_tensor.dtype = paddle::PaddleDType::FLOAT32; - } else if (elem_type[i] == 2) { + } else if (elem_type[i] == P_INT32) { elem_size[i] = sizeof(int32_t); lod_tensor.dtype = paddle::PaddleDType::INT32; } @@ -205,30 +208,42 @@ int GeneralReaderOp::inference() { // fill the data into output general_blob for (int i = 0; i < var_num; ++i) { - if (elem_type[i] == 0) { + if (elem_type[i] == P_INT64) { int64_t *dst_ptr = static_cast(out->at(i).data.data()); VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i << "] is " << req->insts(0).tensor_array(i).int64_data(0); int offset = 0; int elem_num = req->insts(0).tensor_array(i).int64_data_size(); + if(!dst_ptr){ + LOG(ERROR) << "dst_ptr is nullptr"; + return -1; + } for (int k = 0; k < elem_num; ++k) { dst_ptr[offset + k] = req->insts(0).tensor_array(i).int64_data(k); } - } else if (elem_type[i] == 1) { + } else if (elem_type[i] == P_FLOAT32) { float *dst_ptr = static_cast(out->at(i).data.data()); VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i << "] is " << req->insts(0).tensor_array(i).float_data(0); int offset = 0; int elem_num = req->insts(0).tensor_array(i).float_data_size(); + if(!dst_ptr){ + LOG(ERROR) << "dst_ptr is nullptr"; + return -1; + } for (int k = 0; k < elem_num; ++k) { dst_ptr[offset + k] = req->insts(0).tensor_array(i).float_data(k); } - } else if (elem_type[i] == 2) { + } else if (elem_type[i] == P_INT32) { int32_t *dst_ptr = static_cast(out->at(i).data.data()); VLOG(2) << "(logid=" << log_id << ") first element data in var[" << i << "] is " << req->insts(0).tensor_array(i).int_data(0); int offset = 0; int elem_num = req->insts(0).tensor_array(i).int_data_size(); + if(!dst_ptr){ + LOG(ERROR) << "dst_ptr is nullptr"; + return -1; + } for (int k = 0; k < elem_num; ++k) { dst_ptr[offset + k] = req->insts(0).tensor_array(i).int_data(k); } diff --git a/core/predictor/framework/infer.h b/core/predictor/framework/infer.h index f70c2b6c800672323fc2c7da65d42e49cb4f0b55..14a1dfd13a0184f4b93c9e9d856f2c925893666c 100755 --- a/core/predictor/framework/infer.h +++ b/core/predictor/framework/infer.h @@ -121,7 +121,7 @@ class InferEngine { virtual int thrd_initialize() { return thrd_initialize_impl(); } virtual int thrd_clear() { return thrd_clear_impl(); } virtual int thrd_finalize() { return thrd_finalize_impl(); } - virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { return infer_impl1(in, out, batch_size); } + virtual int infer(const void* in, void* out, uint32_t batch_size = -1) { return infer_impl(in, out, batch_size); } virtual int reload() = 0; @@ -134,10 +134,10 @@ class InferEngine { virtual int thrd_finalize_impl() = 0; virtual int thrd_clear_impl() = 0; virtual int proc_finalize_impl() = 0; - virtual int infer_impl1(const void* in, + virtual int infer_impl(const void* in, void* out, uint32_t batch_size = -1) = 0; - virtual int infer_impl2(const BatchTensor& in, + virtual int task_infer_impl(const BatchTensor& in, BatchTensor& out) = 0; // NOLINT // end: framework inner call @@ -236,7 +236,7 @@ class ReloadableInferEngine : public InferEngine { im::bsf::TaskExecutor::instance()->set_thread_reset_fn( boost::bind(&InferEngine::thrd_clear_impl, this)); im::bsf::TaskExecutor::instance()->set_thread_callback_fn( - boost::bind(&InferEngine::infer_impl2, this, _1, _2)); + boost::bind(&InferEngine::task_infer_impl, this, _1, _2)); im::bsf::TaskExecutor::instance()->set_batch_size(_infer_batch_size); im::bsf::TaskExecutor::instance()->set_batch_align( _infer_batch_align); @@ -254,7 +254,7 @@ class ReloadableInferEngine : public InferEngine { int infer(const void* in, void* out, uint32_t batch_size = -1) { if (_infer_thread_num <= 0) { - return infer_impl1(in, out, batch_size); + return infer_impl(in, out, batch_size); } im::bsf::TaskManager task_manager; @@ -594,20 +594,24 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { FluidInferEngine() {} ~FluidInferEngine() {} typedef std::vector TensorVector; - int infer_impl1(const void* in, void* out, uint32_t batch_size = -1) { + int infer_impl(const void* in, void* out, uint32_t batch_size = -1) { + //First of all, get the real core acording to the template parameter 'FluidFamilyCore'. FluidFamilyCore* core =DBReloadableInferEngine::get_core(); if (!core || !core->get()) { LOG(ERROR) << "Failed get fluid core in infer_impl()"; return -1; } - - //set inputHandle + //We use the for loop to process the input data. + //Inside each for loop, use the in[i]->name as inputName and call 'core->GetInputHandle(inputName)' to get the pointer of InputData. + //Set the lod and shape information of InputData first. then copy data from cpu to the core. const TensorVector* tensorVector_in_pointer = reinterpret_cast(in); for(int i =0; i< tensorVector_in_pointer->size();++i){ auto lod_tensor_in = core->GetInputHandle((*tensorVector_in_pointer)[i].name); lod_tensor_in->SetLoD((*tensorVector_in_pointer)[i].lod); lod_tensor_in->Reshape((*tensorVector_in_pointer)[i].shape); void* origin_data = (*tensorVector_in_pointer)[i].data.data(); + //Because the core needs to determine the size of memory space according to the data type passed in. + //The pointer type of data must be one of float *,int64_t*,int32_t* instead void*. if((*tensorVector_in_pointer)[i].dtype == paddle::PaddleDType::FLOAT32){ float* data = static_cast(origin_data); lod_tensor_in->CopyFromCpu(data); @@ -619,12 +623,14 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { lod_tensor_in->CopyFromCpu(data); } } + //After the input data is passed in, call 'core->Run()' perform the prediction process. if (!core->Run()) { LOG(ERROR) << "Failed run fluid family core"; return -1; } - //get out and copy to void* out - TensorVector* tensorVector_out_pointer = reinterpret_cast(out); + + //In order to get the results, first, call the 'core->GetOutputNames()' to get the name of output(which is a dict like {OutputName:pointer of OutputValue}). + //Then, use for-loop to get OutputValue by calling 'core->GetOutputHandle'. std::vector outnames = core->GetOutputNames(); std::vector output_shape; int out_num =0; @@ -632,6 +638,13 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { void* databuf_data = NULL; char* databuf_char = NULL; size_t databuf_size = 0; + TensorVector* tensorVector_out_pointer = reinterpret_cast(out); + if(!tensorVector_out_pointer){ + LOG(ERROR) << "tensorVector_out_pointer is nullptr,error"; + return -1; + } + //Get the type and shape information of OutputData first. then copy data to cpu from the core. + //The pointer type of data_out must be one of float *,int64_t*,int32_t* instead void*. for (int i = 0; i < outnames.size(); ++i){ auto lod_tensor_out = core->GetOutputHandle(outnames[i]); output_shape = lod_tensor_out->shape(); @@ -639,28 +652,27 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { dataType = lod_tensor_out->type(); if(dataType == paddle::PaddleDType::FLOAT32){ databuf_size = out_num*sizeof(float); - void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); + databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { LOG(ERROR) << "Malloc failed, size: " << databuf_size; return -1; } float* data_out = reinterpret_cast(databuf_data); - //float* data_out = new float[out_num]; lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); }else if(dataType == paddle::PaddleDType::INT64){ databuf_size = out_num*sizeof(int64_t); - void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); + databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { LOG(ERROR) << "Malloc failed, size: " << databuf_size; return -1; } - int64_t* data_out = reinterpret_cast(data_out); + int64_t* data_out = reinterpret_cast(databuf_data); lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); }else if(dataType == paddle::PaddleDType::INT32){ databuf_size = out_num*sizeof(int32_t); - void* databuf_data = MempoolWrapper::instance().malloc(databuf_size); + databuf_data = MempoolWrapper::instance().malloc(databuf_size); if (!databuf_data) { LOG(ERROR) << "Malloc failed, size: " << databuf_size; return -1; @@ -669,6 +681,9 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { lod_tensor_out->CopyToCpu(data_out); databuf_char = reinterpret_cast(data_out); } + //Because task scheduling requires OPs to use 'Channel'(which is a data structure) to transfer data between OPs. + //We need to copy the processed data to the 'Channel' for the next OP. + //In this function, it means we should copy the 'databuf_char' to the pointer 'void* out'.(which is also called ‘tensorVector_out_pointer’) paddle::PaddleTensor tensor_out; tensor_out.name = outnames[i]; tensor_out.dtype = paddle::PaddleDType(dataType); @@ -679,15 +694,15 @@ class FluidInferEngine : public CloneDBReloadableInferEngine { lod_element.assign(out_lod[li].begin(), out_lod[li].end()); tensor_out.lod.push_back(lod_element); } - paddle::PaddleBuf paddleBuf(databuf_char,databuf_size); + paddle::PaddleBuf paddleBuf(databuf_char, databuf_size); tensor_out.data = paddleBuf; tensorVector_out_pointer->push_back(tensor_out); } return 0; } - int infer_impl2(const BatchTensor& in, BatchTensor& out) { // NOLINT - return infer_impl1(&in, &out); + int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT + return infer_impl(&in, &out); } @@ -875,8 +890,8 @@ class VersionedInferEngine : public InferEngine { int thrd_finalize_impl() { return -1; } int thrd_clear_impl() { return -1; } int proc_finalize_impl() { return -1; } - int infer_impl1(const void* in, void* out, uint32_t batch_size = -1) { return -1; } - int infer_impl2(const BatchTensor& in, BatchTensor& out) { // NOLINT + int infer_impl(const void* in, void* out, uint32_t batch_size = -1) { return -1; } + int task_infer_impl(const BatchTensor& in, BatchTensor& out) { // NOLINT return -1; } // NOLINT