未验证 提交 e22486e5 编写于 作者: M MRXLT 提交者: GitHub

Merge pull request #1 from wangguibao/ctr_model_serving

Ctr model serving update
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "demo-serving/op/ctr_prediction_op.h" #include "demo-serving/op/ctr_prediction_op.h"
#include <algorithm> #include <algorithm>
#include <string>
#include "predictor/framework/infer.h" #include "predictor/framework/infer.h"
#include "predictor/framework/memory.h" #include "predictor/framework/memory.h"
...@@ -27,31 +28,94 @@ using baidu::paddle_serving::predictor::ctr_prediction::Response; ...@@ -27,31 +28,94 @@ using baidu::paddle_serving::predictor::ctr_prediction::Response;
using baidu::paddle_serving::predictor::ctr_prediction::CTRReqInstance; using baidu::paddle_serving::predictor::ctr_prediction::CTRReqInstance;
using baidu::paddle_serving::predictor::ctr_prediction::Request; using baidu::paddle_serving::predictor::ctr_prediction::Request;
const int CTR_PREDICTION_INPUT_SLOTS = // Total 26 sparse input + 1 dense input
27; // Total 26 sparse input + 1 dense input const int CTR_PREDICTION_INPUT_SLOTS = 27;
const int CTR_PREDICTION_SPARSE_SLOTS = 26; // First 26: sparse input
const int CTR_PREDICTION_DENSE_SLOT = 26; // Last 1: dense input // First 26: sparse input
const int CTR_PREDICTION_SPARSE_SLOTS = 26;
// Last 1: dense input
const int CTR_PREDICTION_DENSE_SLOT_ID = 26;
const int CTR_PREDICTION_DENSE_DIM = 13;
const int CTR_PREDICTION_EMBEDDING_SIZE = 10;
#if 1
struct CubeValue {
int error;
std::string buff;
};
#endif
void fill_response_with_message(Response *response,
int err_code,
std::string err_msg) {
if (response == NULL) {
LOG(ERROR) << "response is NULL";
return;
}
response->set_err_code(err_code);
response->set_err_msg(err_msg);
return;
}
int CTRPredictionOp::inference() { int CTRPredictionOp::inference() {
const Request *req = dynamic_cast<const Request *>(get_request_message()); const Request *req = dynamic_cast<const Request *>(get_request_message());
TensorVector *in = butil::get_object<TensorVector>(); TensorVector *in = butil::get_object<TensorVector>();
Response *res = mutable_data<Response>();
uint32_t sample_size = req->instances_size(); uint32_t sample_size = req->instances_size();
if (sample_size <= 0) { if (sample_size <= 0) {
LOG(WARNING) << "No instances need to inference!"; LOG(WARNING) << "No instances need to inference!";
fill_response_with_message(res, -1, "Sample size invalid");
return -1; return -1;
} }
paddle::PaddleTensor lod_tensors[CTR_PREDICTION_INPUT_SLOTS]; paddle::PaddleTensor lod_tensors[CTR_PREDICTION_INPUT_SLOTS];
for (int i = 0; i < CTR_PREDICTION_INPUT_SLOTS; ++i) { for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) {
lod_tensors[i].dtype = paddle::PaddleDType::FLOAT32; lod_tensors[i].dtype = paddle::PaddleDType::FLOAT32;
std::vector<std::vector<size_t>> &lod = lod_tensors[i].lod; std::vector<std::vector<size_t>> &lod = lod_tensors[i].lod;
lod.resize(1); lod.resize(1);
lod[0].push_back(0); lod[0].push_back(0);
} }
lot_tensors[CTR_PREDICTION_SPARSE_SLOTS].dtype = paddle::PaddleDType::INT64; // Query cube API for sparse embeddings
std::vector<int64_t> keys;
std::vector<CubeValue> values;
for (uint32_t si = 0; si < sample_size; ++si) {
const CTRReqInstance &req_instance = req->instances(si);
if (req_instance.sparse_ids_size() != CTR_PREDICTION_DENSE_DIM) {
std::ostringstream iss;
iss << "dense input size != " << CTR_PREDICTION_DENSE_DIM;
fill_response_with_message(res, -1, iss.str());
return -1;
}
for (int i = 0; i < req_instance.sparse_ids_size(); ++i) {
keys.push_back(req_instance.sparse_ids(i));
}
}
#if 0
mCube::CubeAPI* cube = CubeAPI::instance();
int ret = cube->seek(keys, values);
if (ret != 0) {
fill_response_with_message(res, -1, "Query cube for embeddings error");
LOG(ERROR) << "Query cube for embeddings error";
return -1;
}
#else
float buff[CTR_PREDICTION_EMBEDDING_SIZE] = {
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.00};
for (int i = 0; i < keys.size(); ++i) {
values[i].error = 0;
values[i].buff = std::string(reinterpret_cast<char *>(buff), sizeof(buff));
}
#endif
// Sparse embeddings
for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) { for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) {
paddle::PaddleTensor lod_tensor = lod_tensors[i]; paddle::PaddleTensor lod_tensor = lod_tensors[i];
std::vector<std::vector<size_t>> &lod = lod_tensor.lod; std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
...@@ -61,31 +125,48 @@ int CTRPredictionOp::inference() { ...@@ -61,31 +125,48 @@ int CTRPredictionOp::inference() {
lod[0].push_back(lod[0].back() + 1); lod[0].push_back(lod[0].back() + 1);
} }
lod_tensor.shape = {lod[0].back(), 1}; lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_EMBEDDING_SIZE};
lod_tensor.data.Resize(lod[0].back() * sizeof(int64_t)); lod_tensor.data.Resize(lod[0].back() * sizeof(float) *
CTR_PREDICTION_EMBEDDING_SIZE);
int offset = 0; int offset = 0;
for (uint32_t si = 0; si < sample_size; ++si) { for (uint32_t si = 0; si < sample_size; ++si) {
int64_t *data_ptr = float *data_ptr = static_cast<float *>(lod_tensor.data.data()) + offset;
static_cast<int64_t *>(lod_tensor.data.data()) + offset;
const CTRReqInstance &req_instance = req->instances(si); const CTRReqInstance &req_instance = req->instances(si);
int id_count = 1;
memcpy(data_ptr, &req_instance.sparse_ids().data()[i], sizeof(int64_t)); int idx = si * CTR_PREDICTION_SPARSE_SLOTS + i;
offset += 1; if (values[idx].buff.size() !=
sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
LOG(ERROR) << "Embedding vector size not expected";
fill_response_with_message(
res, -1, "Embedding vector size not expected");
return -1;
}
memcpy(data_ptr, values[idx].buff.data(), values[idx].buff.size());
offset += CTR_PREDICTION_EMBEDDING_SIZE;
} }
in->push_back(lod_tensor); in->push_back(lod_tensor);
} }
paddle::PaddleTensor lod_tensor = lod_tensors[CTR_PREDICTION_DENSE_SLOT]; // Dense features
paddle::PaddleTensor lod_tensor = lod_tensors[CTR_PREDICTION_DENSE_SLOT_ID];
lod_tensor.dtype = paddle::PaddleDType::INT64;
std::vector<std::vector<size_t>> &lod = lod_tensor.lod; std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
for (uint32_t si = 0; si < sample_size; ++si) { for (uint32_t si = 0; si < sample_size; ++si) {
const CTRReqInstance &req_instance = req->instances(si); const CTRReqInstance &req_instance = req->instances(si);
if (req_instance.dense_ids_size() != CTR_PREDICTION_DENSE_DIM) {
std::ostringstream iss;
iss << "dense input size != " << CTR_PREDICTION_DENSE_DIM;
fill_response_with_message(res, -1, iss.str());
return -1;
}
lod[0].push_back(lod[0].back() + req_instance.dense_ids_size()); lod[0].push_back(lod[0].back() + req_instance.dense_ids_size());
} }
lod_tensor.shape = {lod[0].back(), 1}; lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_DENSE_DIM};
lod_tensor.data.Resize(lod[0].back() * sizeof(int64_t)); lod_tensor.data.Resize(lod[0].back() * sizeof(int64_t));
int offset = 0; int offset = 0;
...@@ -94,7 +175,7 @@ int CTRPredictionOp::inference() { ...@@ -94,7 +175,7 @@ int CTRPredictionOp::inference() {
const CTRReqInstance &req_instance = req->instances(si); const CTRReqInstance &req_instance = req->instances(si);
int id_count = req_instance.dense_ids_size(); int id_count = req_instance.dense_ids_size();
memcpy(data_ptr, memcpy(data_ptr,
req_instance.ids().data(), req_instance.dense_ids().data(),
sizeof(int64_t) * req_instance.dense_ids_size()); sizeof(int64_t) * req_instance.dense_ids_size());
offset += req_instance.dense_ids_size(); offset += req_instance.dense_ids_size();
} }
...@@ -104,6 +185,7 @@ int CTRPredictionOp::inference() { ...@@ -104,6 +185,7 @@ int CTRPredictionOp::inference() {
TensorVector *out = butil::get_object<TensorVector>(); TensorVector *out = butil::get_object<TensorVector>();
if (!out) { if (!out) {
LOG(ERROR) << "Failed get tls output object"; LOG(ERROR) << "Failed get tls output object";
fill_response_with_message(res, -1, "Failed get thread local resource");
return -1; return -1;
} }
...@@ -112,22 +194,23 @@ int CTRPredictionOp::inference() { ...@@ -112,22 +194,23 @@ int CTRPredictionOp::inference() {
CTR_PREDICTION_MODEL_NAME, in, out, sample_size)) { CTR_PREDICTION_MODEL_NAME, in, out, sample_size)) {
LOG(ERROR) << "Failed do infer in fluid model: " LOG(ERROR) << "Failed do infer in fluid model: "
<< CTR_PREDICTION_MODEL_NAME; << CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(res, -1, "Failed do infer in fluid model");
return -1; return -1;
} }
if (out->size() != in->size()) { if (out->size() != in->size()) {
LOG(ERROR) << "Output tensor size not equal that of input"; LOG(ERROR) << "Output tensor size not equal that of input";
fill_response_with_message(res, -1, "Output size != input size");
return -1; return -1;
} }
Response *res = mutable_data<Response>();
for (size_t i = 0; i < out->size(); ++i) { for (size_t i = 0; i < out->size(); ++i) {
int dim1 = out->at(i).shape[0]; int dim1 = out->at(i).shape[0];
int dim2 = out->at(i).shape[1]; int dim2 = out->at(i).shape[1];
if (out->at(i).dtype != paddle::PaddleDType::FLOAT32) { if (out->at(i).dtype != paddle::PaddleDType::FLOAT32) {
LOG(ERROR) << "Expected data type float"; LOG(ERROR) << "Expected data type float";
fill_response_with_message(res, -1, "Expected data type float");
return -1; return -1;
} }
...@@ -150,6 +233,9 @@ int CTRPredictionOp::inference() { ...@@ -150,6 +233,9 @@ int CTRPredictionOp::inference() {
} }
out->clear(); out->clear();
butil::return_object<TensorVector>(out); butil::return_object<TensorVector>(out);
res->set_err_code(0);
res->set_err_msg(std::string(""));
return 0; return 0;
} }
......
...@@ -6,6 +6,7 @@ LIST(APPEND protofiles ...@@ -6,6 +6,7 @@ LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/echo_kvdb_service.proto ${CMAKE_CURRENT_LIST_DIR}/echo_kvdb_service.proto
${CMAKE_CURRENT_LIST_DIR}/int64tensor_service.proto ${CMAKE_CURRENT_LIST_DIR}/int64tensor_service.proto
${CMAKE_CURRENT_LIST_DIR}/text_classification.proto ${CMAKE_CURRENT_LIST_DIR}/text_classification.proto
${CMAKE_CURRENT_LIST_DIR}/ctr_prediction.proto
) )
PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles}) PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles})
......
...@@ -31,7 +31,11 @@ message CTRResInstance { ...@@ -31,7 +31,11 @@ message CTRResInstance {
required float prob1 = 2; required float prob1 = 2;
}; };
message Response { repeated CTRResInstance predictions = 1; }; message Response {
repeated CTRResInstance predictions = 1;
required int64 err_code = 2;
optional string err_msg = 3;
};
service CTRPredictionService { service CTRPredictionService {
rpc inference(Request) returns (Response); rpc inference(Request) returns (Response);
......
...@@ -150,7 +150,7 @@ type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cp ...@@ -150,7 +150,7 @@ type: 预测引擎的类型。可在inferencer-fluid-cpu/src/fluid_cpu_engine.cp
**fluid Analysis API和fluid Native API的区别** **fluid Analysis API和fluid Native API的区别**
Analysis API在模型加载过程中,会对模型计算逻辑进行多种优化,包括但不限于zero copy tensor,相邻OP的fuse等 Analysis API在模型加载过程中,会对模型计算逻辑进行多种优化,包括但不限于zero copy tensor,相邻OP的fuse等**但优化逻辑不是一定对所有模型都有加速作用,有时甚至会有反作用,请以实测结果为准**
reloadable_meta: 目前实际内容无意义,用来通过对该文件的mtime判断是否超过reload时间阈值 reloadable_meta: 目前实际内容无意义,用来通过对该文件的mtime判断是否超过reload时间阈值
......
...@@ -760,6 +760,7 @@ class InferManager { ...@@ -760,6 +760,7 @@ class InferManager {
} }
LOG(WARNING) << "Succ proc finalize engine, name: " << it->first; LOG(WARNING) << "Succ proc finalize engine, name: " << it->first;
} }
_map.clear();
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册