提交 aedfcb6d 编写于 作者: W wangguibao

elastic_ctr solution initial commit

上级 1c050f12
......@@ -122,3 +122,6 @@ add_subdirectory(inferencer-fluid-gpu)
endif()
add_subdirectory(demo-serving)
endif()
# Paddle Serving Solutions
add_subdirectory(elastic-ctr)
/home/work/image-class/bin/image_class --workflow_path=/home/work/image-class/conf/ --inferservice_path=/home/work/image-class/conf/ --logger_path=/home/work/image-class/conf/ --resource_path=/home/work/image-class/conf/
add_subdirectory(client)
add_subdirectory(serving)
include(proto/CMakeLists.txt)
file(GLOB sdk_cpp_srcs ${CMAKE_SOURCE_DIR}/sdk-cpp/src/*.cpp)
list(APPEND elasticctr_srcs ${elastic_ctr_cpp_srcs})
list(APPEND elasticctr_srcs ${sdk_cpp_srcs})
list(APPEND elasticctr_srcs
${CMAKE_CURRENT_LIST_DIR}/api/elastic_ctr_api.cpp)
add_library(elasticctr ${elasticctr_srcs})
target_link_libraries(elasticctr brpc configure protobuf leveldb)
add_executable(elastic_ctr_demo ${CMAKE_CURRENT_LIST_DIR}/demo/demo.cpp)
target_link_libraries(elastic_ctr_demo elasticctr -lpthread -lcrypto -lm -lrt -lssl -ldl -lz)
# install
install(TARGETS elastic_ctr_demo
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/demo/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/demo/data/ctr_prediction DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/data)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "elastic-ctr/client/api/elastic_ctr_api.h"
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fstream>
#include <vector>
namespace baidu {
namespace paddle_serving {
namespace elastic_ctr {
const int VARIABLE_NAME_LEN = 256;
static void thread_resource_delete(void *d) {
#if 1
LOG(INFO) << "thread_resource_delete on " << bthread_self();
#endif
delete static_cast<ThreadResource *>(d);
}
std::set<std::string> ElasticCTRPredictorApi::slot_names_;
int ThreadResource::clear() {
request_.clear_instances();
response_.clear_predictions();
for (auto it : instance_map_) {
delete it.second;
}
return 0;
}
ReqInstance *ThreadResource::add_instance() {
ReqInstance *instance = request_.add_instances();
InstanceInfo *instance_info = new InstanceInfo();
instance_map_[instance] = instance_info;
return instance;
}
int ThreadResource::add_slot(ReqInstance *instance,
const std::string &slot_name,
uint64_t value) {
auto instance_it = instance_map_.find(instance);
if (instance_it == instance_map_.end()) {
return -1;
}
InstanceInfo *instance_info = instance_it->second;
auto slot_it = instance_info->slot_map_.find(slot_name);
Slot *slot = NULL;
if (slot_it == instance_info->slot_map_.end()) {
slot = instance->add_slots();
instance_info->slot_map_[slot_name] = slot;
} else {
slot = slot_it->second;
}
slot->set_slot_name(slot_name);
slot->add_feasigns(value);
return 0;
}
void ThreadResource::validate_request(const std::set<std::string> &slot_names) {
for (auto it : instance_map_) {
ReqInstance *req_instance = it.first;
InstanceInfo *instance_info = it.second;
for (auto slot_name : slot_names) {
if (instance_info->slot_map_.find(slot_name) ==
instance_info->slot_map_.end()) {
LOG(INFO) << "Missing values for slot " << slot_name.c_str();
add_slot(req_instance, slot_name, 0);
}
}
}
}
void ElasticCTRPredictorApi::read_slot_conf(const char *path,
const char *slot_conf_file) {
struct stat stat_buf;
char name[VARIABLE_NAME_LEN];
snprintf(name, VARIABLE_NAME_LEN, "%s/%s", path, slot_conf_file);
if (stat(name, &stat_buf) != 0) {
LOG(ERROR) << "Error stating file" << name;
return;
}
std::ifstream fs(name);
for (std::string line; std::getline(fs, line);) {
slot_names_.insert(line);
}
#if 1
for (auto x : slot_names_) {
LOG(INFO) << "slot: " << x.c_str();
}
#endif
}
int ElasticCTRPredictorApi::init(const char *path,
const char *slot_conf_file,
const char *serving_conf_file) {
api_.create(path, serving_conf_file);
read_slot_conf(path, slot_conf_file);
// Thread-local storage
if (pthread_key_create(&tls_bspec_key_, thread_resource_delete) != 0) {
LOG(ERROR) << "unable to create tls_bthread_key of thrd_data";
return -1;
}
return 0;
}
int ElasticCTRPredictorApi::thrd_initialize() {
api_.thrd_initialize();
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
thread_resource = new (std::nothrow) ThreadResource;
if (thread_resource == NULL) {
LOG(ERROR) << "failed to create thread local resource";
return -1;
}
if (pthread_setspecific(tls_bspec_key_, thread_resource) != 0) {
LOG(ERROR) << "unable to set tls thread local resource";
delete thread_resource;
thread_resource = NULL;
return -1;
}
}
return 0;
}
int ElasticCTRPredictorApi::thrd_clear() {
api_.thrd_clear();
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return -1;
}
}
if (thread_resource->clear() != 0) {
LOG(ERROR) << "ElasticCTRPredictorApi: thrd_clear() fail";
}
return 0;
}
int ElasticCTRPredictorApi::thrd_finalize() {
api_.thrd_finalize();
return 0;
}
void ElasticCTRPredictorApi::destroy() {
pthread_key_delete(tls_bspec_key_);
return;
}
ReqInstance *ElasticCTRPredictorApi::add_instance() {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return NULL;
}
}
ReqInstance *instance = thread_resource->add_instance();
return instance;
}
int ElasticCTRPredictorApi::add_slot(ReqInstance *instance,
const std::string slot_name,
int64_t value) {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return -1;
}
}
if (slot_names_.find(slot_name) == slot_names_.end()) {
LOG(ERROR) << "Slot name not match with those in slot.conf: "
<< slot_name.c_str();
return -1;
}
return thread_resource->add_slot(instance, slot_name, value);
}
void ElasticCTRPredictorApi::validate_request() {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return;
}
}
thread_resource->validate_request(slot_names_);
}
int ElasticCTRPredictorApi::inference() {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return -1;
}
}
Predictor *predictor = api_.fetch_predictor("ctr_prediction_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: ctr_prediction_service";
return -1;
}
validate_request();
int ret = predictor->inference(thread_resource->get_request(),
thread_resource->get_response());
if (ret != 0) {
LOG(ERROR) << "Failed call predictor with req "
<< thread_resource->get_request()->ShortDebugString();
return ret;
}
return 0;
}
std::vector<Prediction> ElasticCTRPredictorApi::get_results() {
std::vector<Prediction> prediction_vec;
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return prediction_vec;
}
}
Response *response = thread_resource->get_response();
for (int i = 0; i < response->predictions_size(); ++i) {
const ResInstance &res_instance = response->predictions(i);
Prediction prediction;
prediction.prob0 = res_instance.prob0();
prediction.prob1 = res_instance.prob1();
prediction_vec.push_back(prediction);
}
return prediction_vec;
}
} // namespace elastic_ctr
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <map>
#include <set>
#include <string>
#include <vector>
#include "elastic-ctr/client/elastic_ctr_prediction.pb.h"
#include "sdk-cpp/include/common.h"
#include "sdk-cpp/include/predictor_sdk.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::elastic_ctr::Slot;
using baidu::paddle_serving::predictor::elastic_ctr::Request;
using baidu::paddle_serving::predictor::elastic_ctr::ReqInstance;
using baidu::paddle_serving::predictor::elastic_ctr::Response;
using baidu::paddle_serving::predictor::elastic_ctr::ResInstance;
namespace baidu {
namespace paddle_serving {
namespace elastic_ctr {
struct InstanceInfo {
std::map<std::string, Slot *> slot_map_;
};
class ThreadResource {
public:
int clear();
Request *get_request() { return &request_; }
Response *get_response() { return &response_; }
ReqInstance *add_instance();
int add_slot(ReqInstance *instance,
const std::string &slot_name,
uint64_t value);
void validate_request(const std::set<std::string> &slot_names);
private:
Request request_;
Response response_;
std::map<ReqInstance *, InstanceInfo *> instance_map_;
};
struct Prediction {
float prob0;
float prob1;
};
class ElasticCTRPredictorApi {
public:
ElasticCTRPredictorApi() {}
int init(const char *path,
const char *slot_conf_file,
const char *serving_conf_file);
int thrd_initialize();
int thrd_clear();
int thrd_finalize();
void destroy();
static ElasticCTRPredictorApi &instance() {
static ElasticCTRPredictorApi api;
return api;
}
public:
ReqInstance *add_instance();
int add_slot(ReqInstance *instance,
const std::string slot_name,
int64_t value);
int inference();
std::vector<Prediction> get_results();
private:
static void read_slot_conf(const char *path, const char *slot_conf_file);
void validate_request();
private:
PredictorApi api_;
pthread_key_t tls_bspec_key_;
static std::set<std::string> slot_names_;
};
} // namespace elastic_ctr
} // namespace paddle_serving
} // namespace baidu
default_variant_conf {
tag: "default"
connection_conf {
connect_timeout_ms: 2000
rpc_timeout_ms: 20000
connect_retry_count: 2
max_connection_per_host: 100
hedge_request_timeout_ms: -1
hedge_fetch_retry_count: 2
connection_type: "pooled"
}
naming_conf {
cluster_filter_strategy: "Default"
load_balance_strategy: "la"
}
rpc_parameter {
compress_type: 0
package_size: 20
protocol: "baidu_std"
max_channel_per_request: 3
}
}
predictors {
name: "ctr_prediction_service"
service_name: "baidu.paddle_serving.predictor.elastic_ctr.ElasticCTRPredictionService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
因为 它太大了无法显示 source diff 。你可以改为 查看blob
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdlib>
#include <fstream>
#include <sstream>
#include <string>
#include <thread> // NOLINT
#include "elastic-ctr/client/api/elastic_ctr_api.h"
using baidu::paddle_serving::elastic_ctr::ElasticCTRPredictorApi;
using baidu::paddle_serving::elastic_ctr::Prediction;
DEFINE_int32(batch_size, 10, "Infernce batch_size");
DEFINE_string(test_file, "", "test file");
const int VARIABLE_NAME_LEN = 256;
const int CTR_EMBEDDING_TABLE_SIZE = 400000001;
struct Sample {
std::map<std::string, std::vector<uint64_t>> slots;
};
std::vector<Sample> samples;
int read_samples(const char* file) {
std::ifstream fs(file);
for (std::string line; std::getline(fs, line);) {
std::vector<std::string> tokens;
std::stringstream ss(line);
std::string token;
Sample sample;
while (std::getline(ss, token, ' ')) {
tokens.push_back(token);
}
if (tokens.size() <= 3) {
continue;
}
for (std::size_t i = 2; i < tokens.size(); ++i) {
std::size_t pos = tokens[i].find(':');
if (pos == std::string::npos) {
continue;
}
uint64_t x = std::strtoull(tokens[i].substr(0, pos).c_str(), NULL, 10);
std::string slot_name = tokens[i].substr(pos + 1);
if (sample.slots.find(slot_name) == sample.slots.end()) {
std::vector<uint64_t> values;
values.push_back(x % 400000001);
sample.slots[slot_name] = values;
} else {
auto it = sample.slots.find(slot_name);
it->second.push_back(x);
}
}
samples.push_back(sample);
}
LOG(INFO) << "Samples size = " << samples.size();
#if 1
for (std::size_t i = 0; i < samples.size(); ++i) {
LOG(INFO) << "=============Sample " << i << "=========";
for (auto slot : samples[i].slots) {
LOG(INFO) << "slot_name: " << slot.first.c_str();
for (auto x : slot.second) {
LOG(INFO) << x;
}
}
LOG(INFO) << "========================================";
}
#endif
return 0;
}
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
ElasticCTRPredictorApi api;
#ifdef BCLOUD
logging::LoggingSettings settings;
settings.logging_dest = logging::LOG_TO_FILE;
std::string log_filename(argv[0]);
log_filename = log_filename.substr(log_filename.find_last_of('/') + 1);
settings.log_file = (std::string("./log/") + log_filename + ".log").c_str();
settings.delete_old = logging::DELETE_OLD_LOG_FILE;
logging::InitLogging(settings);
logging::ComlogSinkOptions cso;
cso.process_name = log_filename;
cso.enable_wf_device = true;
logging::ComlogSink::GetInstance()->Setup(&cso);
#else
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
FLAGS_logbufsecs = 0;
FLAGS_logbuflevel = -1;
#endif
// predictor conf
if (api.init("./conf", "slot.conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
api.thrd_initialize();
ret = read_samples(FLAGS_test_file.c_str());
std::size_t index = 0;
while (index < samples.size()) {
api.thrd_clear();
for (int i = 0; i < FLAGS_batch_size && index < samples.size(); ++i) {
ReqInstance* ins = api.add_instance();
if (!ins) {
LOG(ERROR) << "Failed create req instance";
return -1;
}
for (auto slot : samples[index].slots) {
for (auto x : slot.second) {
api.add_slot(ins, slot.first.c_str(), x);
}
}
++index;
}
Response res;
if (api.inference() != 0) {
LOG(ERROR) << "failed call predictor";
return -1;
}
std::vector<Prediction> ret = api.get_results();
#if 1
for (std::size_t i = 0; i < ret.size(); ++i) {
LOG(INFO) << "sample " << i << ": [" << ret[i].prob0 << ", "
<< ret[i].prob1 << "]";
}
#endif
} // end while
api.thrd_finalize();
api.destroy();
return 0;
}
FILE(GLOB protos ${CMAKE_CURRENT_LIST_DIR}/*.proto)
list(APPEND protos ${CMAKE_SOURCE_DIR}/predictor/proto/pds_option.proto
${CMAKE_SOURCE_DIR}/predictor/proto/builtin_format.proto)
PROTOBUF_GENERATE_SERVING_CPP(FALSE PROTO_SRCS PROTO_HDRS ${protos})
LIST(APPEND elastic_ctr_cpp_srcs ${PROTO_SRCS})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.elastic_ctr;
option cc_generic_services = true;
message Slot {
required string slot_name = 1;
repeated int64 feasigns = 2;
};
message ReqInstance { repeated Slot slots = 1; };
message Request { repeated ReqInstance instances = 1; };
message ResInstance {
required float prob0 = 1;
required float prob1 = 2;
};
message Response {
repeated ResInstance predictions = 1;
required int64 err_code = 2;
optional string err_msg = 3;
};
service ElasticCTRPredictionService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
if (NOT EXISTS
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid/ctr_prediction)
execute_process(COMMAND wget --no-check-certificate
https://paddle-serving.bj.bcebos.com/data/ctr_prediction/ctr_prediction.tar.gz
--output-document
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid/ctr_prediction.tar.gz)
execute_process(COMMAND ${CMAKE_COMMAND} -E tar xzf
"${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid/ctr_prediction.tar.gz"
WORKING_DIRECTORY
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid)
endif()
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../kvdb/include)
include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(elastic_serving ${serving_srcs})
add_dependencies(elastic_serving pdcodegen fluid_cpu_engine pdserving paddle_fluid cube-api)
target_include_directories(elastic_serving PUBLIC
${CMAKE_CURRENT_BINARY_DIR}/../../predictor
)
target_link_libraries(elastic_serving -Wl,--whole-archive fluid_cpu_engine
-Wl,--no-whole-archive)
target_link_libraries(elastic_serving paddle_fluid ${paddle_depend_libs})
target_link_libraries(elastic_serving pdserving)
target_link_libraries(elastic_serving cube-api)
target_link_libraries(elastic_serving kvdb rocksdb)
target_link_libraries(elastic_serving -liomp5 -lmklml_intel -lmkldnn -lpthread
-lcrypto -lm -lrt -lssl -ldl -lz -lbz2)
install(TARGETS elastic_serving
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/)
FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h)
install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/serving)
if (${WITH_MKL})
install(FILES
${CMAKE_BINARY_DIR}/third_party/install/Paddle/third_party/install/mklml/lib/libmklml_intel.so
${CMAKE_BINARY_DIR}/third_party/install/Paddle/third_party/install/mklml/lib/libiomp5.so
${CMAKE_BINARY_DIR}/third_party/install/Paddle/third_party/install/mkldnn/lib/libmkldnn.so
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/bin)
endif()
[{
"dict_name": "test_dict",
"shard": 2,
"dup": 1,
"timeout": 200,
"retry": 3,
"backup_request": 100,
"type": "ipport_list",
"load_balancer": "rr",
"nodes": [{
"ipport_list": "list://xxx.xxx.xxx.xxx:8000"
},{
"ipport_list": "list://xxx.xxx.xxx.xxx:8000"
}]
}]
--enable_model_toolkit
--enable_cube=true
engines {
name: "ctr_prediction"
type: "FLUID_CPU_ANALYSIS_DIR"
reloadable_meta: "./data/model/paddle/fluid_time_file"
reloadable_type: "timestamp_ne"
model_data_path: "./data/model/paddle/fluid/ctr_prediction"
runtime_thread_num: 0
batch_infer_size: 0
enable_batch_align: 0
sparse_param_service_type: REMOTE
sparse_param_service_table_name: "test_dict"
}
model_toolkit_path: "./conf/"
model_toolkit_file: "model_toolkit.prototxt"
cube_config_file: "./conf/cube.conf"
services {
name: "ElasticCTRPredictionService"
workflows: "workflow1"
}
workflows {
name: "workflow1"
workflow_type: "Sequence"
nodes {
name: "elastic_ctr_prediction_op"
type: "ElasticCTRPredictionOp"
}
}
FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
LIST(APPEND serving_srcs ${op_srcs})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "elastic-ctr/serving/op/elastic_ctr_prediction_op.h"
#include <algorithm>
#include <string>
#include "cube/cube-api/include/cube_api.h"
#include "predictor/framework/infer.h"
#include "predictor/framework/kv_manager.h"
#include "predictor/framework/memory.h"
// Flag where enable profiling mode
DECLARE_bool(enable_ctr_profiling);
namespace baidu {
namespace paddle_serving {
namespace serving {
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::elastic_ctr::Slot;
using baidu::paddle_serving::predictor::elastic_ctr::ResInstance;
using baidu::paddle_serving::predictor::elastic_ctr::Response;
using baidu::paddle_serving::predictor::elastic_ctr::ReqInstance;
using baidu::paddle_serving::predictor::elastic_ctr::Request;
const int VARIABLE_NAME_LEN = 256;
const int CTR_PREDICTION_DENSE_DIM = 13;
const int CTR_PREDICTION_EMBEDDING_SIZE = 10;
bthread::Mutex ElasticCTRPredictionOp::mutex_;
int64_t ElasticCTRPredictionOp::cube_time_us_ = 0;
int32_t ElasticCTRPredictionOp::cube_req_num_ = 0;
int32_t ElasticCTRPredictionOp::cube_req_key_num_ = 0;
void fill_response_with_message(Response *response,
int err_code,
std::string err_msg) {
if (response == NULL) {
LOG(ERROR) << "response is NULL";
return;
}
response->set_err_code(err_code);
response->set_err_msg(err_msg);
return;
}
int ElasticCTRPredictionOp::inference() {
const Request *req = dynamic_cast<const Request *>(get_request_message());
TensorVector *in = butil::get_object<TensorVector>();
Response *res = mutable_data<Response>();
uint32_t sample_size = req->instances_size();
if (sample_size <= 0) {
LOG(WARNING) << "No instances need to inference!";
fill_response_with_message(res, -1, "Sample size invalid");
return 0;
}
// Verify all request instances have same slots
int slot_num = req->instances(0).slots_size();
#if 1
LOG(INFO) << "slot_num =" << slot_num;
#endif
for (int i = 1; i < req->instances_size(); ++i) {
if (req->instances(i).slots_size() != slot_num) {
LOG(WARNING) << "Req " << i
<< " has different slot num with that of req 0";
fill_response_with_message(
res, -1, "Req intance has varying slot numbers");
}
}
// Query cube API for sparse embeddings
std::vector<uint64_t> keys;
std::vector<rec::mcube::CubeValue> values;
// How to organize feasigns in the above `keys` vector:
//
// Assuming N instances, each instance having M feature slots:
//
// ins1:
// slot_1: ins1_slot1_1|ins1_slot1_2 slot2: ins1_slot2_1|ins1_slot2_2
//
// ins2:
// slot_1: ins2_slot1_1 slot2: ins2_slot2_1|ins2_slot2_2
//
// ...
//
// insN:
// slot_1: insN_slot1_1|insN_slot1_2 slot2: insN_slot2_1
//
// We organize the features in such a way that all slot_1 features are before
// slot_2 features:
//
// ins1_slot1_1|ins1_slot1_2|ins2_slot1_1|...|insN_slot1_1|insN_slot1_2
// ins1_slot2_1|ins1_slot2_2|ins2_slot2_1|ins2_slot2_2|...|insN_slot2_1
//
// With this placement, after querying KV service, we can retrieve the
// embeddings for each feature slot from the returned `values` vector easily,
// as they are grouped togegher.
// Level of details of each feature slot
std::vector<std::vector<size_t>> feature_slot_lods;
feature_slot_lods.resize(slot_num);
// Number of feature signs in each slot
std::vector<int> feature_slot_sizes;
feature_slot_sizes.resize(slot_num);
// Iterate over each feature slot
for (int i = 0; i < slot_num; ++i) {
feature_slot_lods[i].push_back(0);
feature_slot_sizes[i] = 0;
// Extract feature i values from each instance si
for (int si = 0; si < sample_size; ++si) {
#if 1
LOG(INFO) << "slot " << i << " sample " << si;
#endif
const ReqInstance &req_instance = req->instances(si);
const Slot &slot = req_instance.slots(i);
feature_slot_lods[i].push_back(feature_slot_lods[i].back() +
slot.feasigns_size());
feature_slot_sizes[i] += slot.feasigns_size();
for (int j = 0; j < slot.feasigns_size(); ++j) {
keys.push_back(slot.feasigns(j));
}
}
}
#if 0
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
predictor::KVManager &kv_manager = predictor::KVManager::instance();
const predictor::KVInfo *kvinfo =
kv_manager.get_kv_info(CTR_PREDICTION_MODEL_NAME);
if (kvinfo == NULL) {
LOG(ERROR) << "Sparse param service info not found for model "
<< CTR_PREDICTION_MODEL_NAME
<< ". Maybe forgot to specify sparse_param_service_type and "
<< "sparse_param_service_table_name in "
<< "conf/model_toolkit.prototxt";
fill_response_with_message(res, -1, "Sparse param service info not found");
return 0;
}
std::string table_name;
if (kvinfo->sparse_param_service_type != configure::EngineDesc::NONE) {
table_name = kvinfo->sparse_param_service_table_name;
if (table_name.empty()) {
LOG(ERROR) << "sparse_param_service_table_name not specified. "
<< "Please specify it in conf/model_toolkit.protxt for model "
<< CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(
res, -1, "sparse_param_service_table_name not specified");
return 0;
}
}
if (kvinfo->sparse_param_service_type == configure::EngineDesc::LOCAL) {
// Query local KV service
LOG(ERROR) << "Local kv service not supported for model "
<< CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(
res, -1, "Local kv service not supported for this model");
return 0;
} else if (kvinfo->sparse_param_service_type ==
configure::EngineDesc::REMOTE) {
struct timeval start;
struct timeval end;
int ret;
gettimeofday(&start, NULL);
ret = cube->seek(table_name, keys, &values);
gettimeofday(&end, NULL);
uint64_t usec =
end.tv_sec * 1e6 + end.tv_usec - start.tv_sec * 1e6 - start.tv_usec;
// Statistics
mutex_.lock();
cube_time_us_ += usec;
++cube_req_num_;
cube_req_key_num_ += keys.size();
if (cube_req_num_ >= 1000) {
LOG(INFO) << "Cube request count: " << cube_req_num_;
LOG(INFO) << "Cube request key count: " << cube_req_key_num_;
LOG(INFO) << "Cube request total time: " << cube_time_us_ << "us";
LOG(INFO) << "Average "
<< static_cast<float>(cube_time_us_) / cube_req_num_
<< "us/req";
LOG(INFO) << "Average "
<< static_cast<float>(cube_time_us_) / cube_req_key_num_
<< "us/key";
cube_time_us_ = 0;
cube_req_num_ = 0;
cube_req_key_num_ = 0;
}
mutex_.unlock();
// Statistics end
if (ret != 0) {
fill_response_with_message(res, -1, "Query cube for embeddings error");
LOG(ERROR) << "Query cube for embeddings error";
return 0;
}
}
if (values.size() != keys.size()) {
LOG(ERROR) << "Sparse embeddings not ready; "
<< "maybe forgot to set sparse_param_service_type and "
<< "sparse_param_sevice_table_name for "
<< CTR_PREDICTION_MODEL_NAME
<< " in conf/model_toolkit.prototxt";
fill_response_with_message(
res, -1, "Sparse param service not configured properly");
return 0;
}
for (int i = 0; i < keys.size(); ++i) {
std::ostringstream oss;
oss << keys[i] << ": ";
const char *value = (values[i].buff.data());
if (values[i].buff.size() !=
sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
LOG(WARNING) << "Key " << keys[i] << " has values less than "
<< CTR_PREDICTION_EMBEDDING_SIZE;
}
#if 0
for (int j = 0; j < values[i].buff.size(); ++j) {
oss << std::hex << std::uppercase << std::setw(2) << std::setfill('0')
<< (static_cast<int>(value[j]) & 0xff);
}
LOG(INFO) << oss.str().c_str();
#endif
}
// Fill feature embedding into feed tensors
std::vector<paddle::PaddleTensor> lod_tensors;
lod_tensors.resize(slot_num);
const ReqInstance &instance = req->instances(0);
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
char name[VARIABLE_NAME_LEN];
snprintf(name,
VARIABLE_NAME_LEN,
"embedding_%s.tmp_0",
instance.slots(i).slot_name().c_str());
lod_tensor.name = std::string(name);
lod_tensors[i].dtype = paddle::PaddleDType::INT64;
std::vector<std::vector<size_t>> &lod = lod_tensors[i].lod;
lod.resize(1);
lod[0].push_back(0);
}
int base = 0;
// Iterate over all slots
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
lod[0] = feature_slot_lods[i];
lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_EMBEDDING_SIZE};
lod_tensor.data.Resize(lod[0].back() * sizeof(float) *
CTR_PREDICTION_EMBEDDING_SIZE);
int offset = 0;
// Copy all slot i feature embeddings to lod_tensor[i]
for (uint32_t j = 0; j < feature_slot_sizes[i]; ++j) {
float *data_ptr = static_cast<float *>(lod_tensor.data.data()) + offset;
int idx = base + j;
if (values[idx].buff.size() !=
sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
LOG(ERROR) << "Embedding vector size not expected";
fill_response_with_message(
res, -1, "Embedding vector size not expected");
return 0;
}
memcpy(data_ptr, values[idx].buff.data(), values[idx].buff.size());
offset += CTR_PREDICTION_EMBEDDING_SIZE;
}
in->push_back(lod_tensor);
// Bump base counter
base += feature_slot_sizes[i];
}
#else
// Fill all tensors
std::vector<paddle::PaddleTensor> lod_tensors;
lod_tensors.resize(slot_num);
const ReqInstance &instance = req->instances(0);
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
lod_tensor.name = instance.slots(i).slot_name();
LOG(INFO) << "slot " << i << "name: " << lod_tensor.name.c_str();
lod_tensors[i].dtype = paddle::PaddleDType::INT64;
}
// Iterate over all slots
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
lod.push_back(feature_slot_lods[i]);
lod_tensor.shape = {lod[0].back(), 1};
lod_tensor.data.Resize(lod[0].back() * sizeof(uint64_t));
int offset = 0;
// Copy all slot i features to lod_tensor[i]
uint64_t *keys_block = keys.data();
uint64_t *data_ptr = static_cast<uint64_t *>(lod_tensor.data.data());
memcpy(data_ptr,
keys_block + offset,
feature_slot_sizes[i] * sizeof(uint64_t));
offset += feature_slot_sizes[i];
in->push_back(lod_tensor);
// Bump base counter
offset += feature_slot_sizes[i];
}
#endif
TensorVector *out = butil::get_object<TensorVector>();
if (!out) {
LOG(ERROR) << "Failed get tls output object";
fill_response_with_message(res, -1, "Failed get thread local resource");
return 0;
}
// call paddle fluid model for inference
if (predictor::InferManager::instance().infer(
CTR_PREDICTION_MODEL_NAME, in, out, sample_size)) {
LOG(ERROR) << "Failed do infer in fluid model: "
<< CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(res, -1, "Failed do infer in fluid model");
return 0;
}
if (out->size() != 1) {
LOG(ERROR) << "Model returned number of fetch tensor more than 1";
fill_response_with_message(
res, -1, "Model returned number of fetch tensor more than 1");
return 0;
}
int output_shape_dim = out->at(0).shape.size();
if (output_shape_dim != 2) {
LOG(ERROR) << "Fetch LoDTensor should be shape of [sample_size, 2]";
fill_response_with_message(
res, -1, "Fetch LoDTensor should be shape of [sample_size, 2]");
return 0;
}
if (out->at(0).dtype != paddle::PaddleDType::FLOAT32) {
LOG(ERROR) << "Fetch LoDTensor data type should be FLOAT32";
fill_response_with_message(
res, -1, "Fetch LoDTensor data type should be FLOAT32");
return 0;
}
int dim1 = out->at(0).shape[0];
int dim2 = out->at(0).shape[1];
if (dim1 != sample_size) {
LOG(ERROR) << "Returned result count not equal to sample_size";
fill_response_with_message(
res, -1, "Returned result count not equal to sample size");
return 0;
}
if (dim2 != 2) {
LOG(ERROR) << "Returned result is not expected, should be 2 floats for "
"each sample";
fill_response_with_message(
res, -1, "Retunred result is not 2 floats for each sample");
return 0;
}
float *data = static_cast<float *>(out->at(0).data.data());
for (int i = 0; i < dim1; ++i) {
ResInstance *res_instance = res->add_predictions();
res_instance->set_prob0(data[i * dim2]);
res_instance->set_prob1(data[i * dim2 + 1]);
}
for (size_t i = 0; i < in->size(); ++i) {
(*in)[i].shape.clear();
}
in->clear();
butil::return_object<TensorVector>(in);
for (size_t i = 0; i < out->size(); ++i) {
(*out)[i].shape.clear();
}
out->clear();
butil::return_object<TensorVector>(out);
res->set_err_code(0);
res->set_err_msg(std::string(""));
return 0;
}
DEFINE_OP(ElasticCTRPredictionOp);
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#ifdef BCLOUD
#ifdef WITH_GPU
#include "paddle/paddle_inference_api.h"
#else
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#endif
#else
#include "paddle_inference_api.h" // NOLINT
#endif
#include "elastic-ctr/serving/elastic_ctr_prediction.pb.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
static const char* CTR_PREDICTION_MODEL_NAME = "elastic_ctr_prediction";
/**
* ElasticCTRPredictionOp: Serve CTR prediction requests.
*
*/
class ElasticCTRPredictionOp
: public baidu::paddle_serving::predictor::OpWithChannel<
baidu::paddle_serving::predictor::elastic_ctr::Response> {
public:
typedef std::vector<paddle::PaddleTensor> TensorVector;
DECLARE_OP(ElasticCTRPredictionOp);
int inference();
private:
static bthread::Mutex mutex_;
static int64_t cube_time_us_;
static int32_t cube_req_num_;
static int32_t cube_req_key_num_;
};
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/elastic_ctr_prediction.proto
)
PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles})
LIST(APPEND serving_srcs ${PROTO_SRCS})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.elastic_ctr;
option cc_generic_services = true;
message Slot {
required string slot_name = 1;
repeated int64 feasigns = 2;
};
message ReqInstance { repeated Slot slots = 1; };
message Request { repeated ReqInstance instances = 1; };
message ResInstance {
required float prob0 = 1;
required float prob1 = 2;
};
message Response {
repeated ResInstance predictions = 1;
required int64 err_code = 2;
optional string err_msg = 3;
};
service ElasticCTRPredictionService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_impl = true;
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册