diff --git a/demo-serving/op/ctr_prediction_op.cpp b/demo-serving/op/ctr_prediction_op.cpp index 04a6cebf0d0da2171b2bf146cd3e9f0e49973fa9..541e03fd5f8302280c501b0a8a085f6d636faa89 100644 --- a/demo-serving/op/ctr_prediction_op.cpp +++ b/demo-serving/op/ctr_prediction_op.cpp @@ -14,6 +14,7 @@ #include "demo-serving/op/ctr_prediction_op.h" #include +#include #include "predictor/framework/infer.h" #include "predictor/framework/memory.h" @@ -27,31 +28,94 @@ using baidu::paddle_serving::predictor::ctr_prediction::Response; using baidu::paddle_serving::predictor::ctr_prediction::CTRReqInstance; using baidu::paddle_serving::predictor::ctr_prediction::Request; -const int CTR_PREDICTION_INPUT_SLOTS = - 27; // Total 26 sparse input + 1 dense input -const int CTR_PREDICTION_SPARSE_SLOTS = 26; // First 26: sparse input -const int CTR_PREDICTION_DENSE_SLOT = 26; // Last 1: dense input +// Total 26 sparse input + 1 dense input +const int CTR_PREDICTION_INPUT_SLOTS = 27; + +// First 26: sparse input +const int CTR_PREDICTION_SPARSE_SLOTS = 26; + +// Last 1: dense input +const int CTR_PREDICTION_DENSE_SLOT_ID = 26; +const int CTR_PREDICTION_DENSE_DIM = 13; +const int CTR_PREDICTION_EMBEDDING_SIZE = 10; + +#if 1 +struct CubeValue { + int error; + std::string buff; +}; + +#endif +void fill_response_with_message(Response *response, + int err_code, + std::string err_msg) { + if (response == NULL) { + LOG(ERROR) << "response is NULL"; + return; + } + + response->set_err_code(err_code); + response->set_err_msg(err_msg); + return; +} int CTRPredictionOp::inference() { const Request *req = dynamic_cast(get_request_message()); TensorVector *in = butil::get_object(); + Response *res = mutable_data(); + uint32_t sample_size = req->instances_size(); if (sample_size <= 0) { LOG(WARNING) << "No instances need to inference!"; + fill_response_with_message(res, -1, "Sample size invalid"); return -1; } paddle::PaddleTensor lod_tensors[CTR_PREDICTION_INPUT_SLOTS]; - for (int i = 0; i < CTR_PREDICTION_INPUT_SLOTS; ++i) { + for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) { lod_tensors[i].dtype = paddle::PaddleDType::FLOAT32; std::vector> &lod = lod_tensors[i].lod; lod.resize(1); lod[0].push_back(0); } - lot_tensors[CTR_PREDICTION_SPARSE_SLOTS].dtype = paddle::PaddleDType::INT64; + // Query cube API for sparse embeddings + std::vector keys; + std::vector values; + + for (uint32_t si = 0; si < sample_size; ++si) { + const CTRReqInstance &req_instance = req->instances(si); + if (req_instance.sparse_ids_size() != CTR_PREDICTION_DENSE_DIM) { + std::ostringstream iss; + iss << "dense input size != " << CTR_PREDICTION_DENSE_DIM; + fill_response_with_message(res, -1, iss.str()); + return -1; + } + + for (int i = 0; i < req_instance.sparse_ids_size(); ++i) { + keys.push_back(req_instance.sparse_ids(i)); + } + } + +#if 0 + mCube::CubeAPI* cube = CubeAPI::instance(); + int ret = cube->seek(keys, values); + if (ret != 0) { + fill_response_with_message(res, -1, "Query cube for embeddings error"); + LOG(ERROR) << "Query cube for embeddings error"; + return -1; + } +#else + float buff[CTR_PREDICTION_EMBEDDING_SIZE] = { + 0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.00}; + for (int i = 0; i < keys.size(); ++i) { + values[i].error = 0; + values[i].buff = std::string(reinterpret_cast(buff), sizeof(buff)); + } +#endif + // Sparse embeddings for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) { paddle::PaddleTensor lod_tensor = lod_tensors[i]; std::vector> &lod = lod_tensor.lod; @@ -61,31 +125,48 @@ int CTRPredictionOp::inference() { lod[0].push_back(lod[0].back() + 1); } - lod_tensor.shape = {lod[0].back(), 1}; - lod_tensor.data.Resize(lod[0].back() * sizeof(int64_t)); + lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_EMBEDDING_SIZE}; + lod_tensor.data.Resize(lod[0].back() * sizeof(float) * + CTR_PREDICTION_EMBEDDING_SIZE); int offset = 0; for (uint32_t si = 0; si < sample_size; ++si) { - int64_t *data_ptr = - static_cast(lod_tensor.data.data()) + offset; + float *data_ptr = static_cast(lod_tensor.data.data()) + offset; const CTRReqInstance &req_instance = req->instances(si); - int id_count = 1; - memcpy(data_ptr, &req_instance.sparse_ids().data()[i], sizeof(int64_t)); - offset += 1; + + int idx = si * CTR_PREDICTION_SPARSE_SLOTS + i; + if (values[idx].buff.size() != + sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) { + LOG(ERROR) << "Embedding vector size not expected"; + fill_response_with_message( + res, -1, "Embedding vector size not expected"); + return -1; + } + + memcpy(data_ptr, values[idx].buff.data(), values[idx].buff.size()); + offset += CTR_PREDICTION_EMBEDDING_SIZE; } in->push_back(lod_tensor); } - paddle::PaddleTensor lod_tensor = lod_tensors[CTR_PREDICTION_DENSE_SLOT]; + // Dense features + paddle::PaddleTensor lod_tensor = lod_tensors[CTR_PREDICTION_DENSE_SLOT_ID]; + lod_tensor.dtype = paddle::PaddleDType::INT64; std::vector> &lod = lod_tensor.lod; for (uint32_t si = 0; si < sample_size; ++si) { const CTRReqInstance &req_instance = req->instances(si); + if (req_instance.dense_ids_size() != CTR_PREDICTION_DENSE_DIM) { + std::ostringstream iss; + iss << "dense input size != " << CTR_PREDICTION_DENSE_DIM; + fill_response_with_message(res, -1, iss.str()); + return -1; + } lod[0].push_back(lod[0].back() + req_instance.dense_ids_size()); } - lod_tensor.shape = {lod[0].back(), 1}; + lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_DENSE_DIM}; lod_tensor.data.Resize(lod[0].back() * sizeof(int64_t)); int offset = 0; @@ -94,7 +175,7 @@ int CTRPredictionOp::inference() { const CTRReqInstance &req_instance = req->instances(si); int id_count = req_instance.dense_ids_size(); memcpy(data_ptr, - req_instance.ids().data(), + req_instance.dense_ids().data(), sizeof(int64_t) * req_instance.dense_ids_size()); offset += req_instance.dense_ids_size(); } @@ -104,6 +185,7 @@ int CTRPredictionOp::inference() { TensorVector *out = butil::get_object(); if (!out) { LOG(ERROR) << "Failed get tls output object"; + fill_response_with_message(res, -1, "Failed get thread local resource"); return -1; } @@ -112,22 +194,23 @@ int CTRPredictionOp::inference() { CTR_PREDICTION_MODEL_NAME, in, out, sample_size)) { LOG(ERROR) << "Failed do infer in fluid model: " << CTR_PREDICTION_MODEL_NAME; + fill_response_with_message(res, -1, "Failed do infer in fluid model"); return -1; } if (out->size() != in->size()) { LOG(ERROR) << "Output tensor size not equal that of input"; + fill_response_with_message(res, -1, "Output size != input size"); return -1; } - Response *res = mutable_data(); - for (size_t i = 0; i < out->size(); ++i) { int dim1 = out->at(i).shape[0]; int dim2 = out->at(i).shape[1]; if (out->at(i).dtype != paddle::PaddleDType::FLOAT32) { LOG(ERROR) << "Expected data type float"; + fill_response_with_message(res, -1, "Expected data type float"); return -1; } @@ -150,6 +233,9 @@ int CTRPredictionOp::inference() { } out->clear(); butil::return_object(out); + + res->set_err_code(0); + res->set_err_msg(std::string("")); return 0; } diff --git a/demo-serving/proto/CMakeLists.txt b/demo-serving/proto/CMakeLists.txt index 2edd5e9eacde35b2add7fd11f676662e4c218e45..1790f1d6e770475a17532424022c819d3ba6cd49 100644 --- a/demo-serving/proto/CMakeLists.txt +++ b/demo-serving/proto/CMakeLists.txt @@ -6,6 +6,7 @@ LIST(APPEND protofiles ${CMAKE_CURRENT_LIST_DIR}/echo_kvdb_service.proto ${CMAKE_CURRENT_LIST_DIR}/int64tensor_service.proto ${CMAKE_CURRENT_LIST_DIR}/text_classification.proto + ${CMAKE_CURRENT_LIST_DIR}/ctr_prediction.proto ) PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles}) diff --git a/demo-serving/proto/ctr_prediction.proto b/demo-serving/proto/ctr_prediction.proto index 8efed93c698b9f3d3e63c0233994b29bb312bc81..ad1ccb06652dee6e027fdbe73eaa18f91ffefbe3 100644 --- a/demo-serving/proto/ctr_prediction.proto +++ b/demo-serving/proto/ctr_prediction.proto @@ -31,7 +31,11 @@ message CTRResInstance { required float prob1 = 2; }; -message Response { repeated CTRResInstance predictions = 1; }; +message Response { + repeated CTRResInstance predictions = 1; + required int64 err_code = 2; + optional string err_msg = 3; +}; service CTRPredictionService { rpc inference(Request) returns (Response);