提交 0ba28a79 编写于 作者: M MRXLT

Merge remote-tracking branch 'upstream/develop' into develop

......@@ -50,6 +50,7 @@ option(WITH_AVX "Compile Paddle Serving with AVX intrinsics" ${AVX_FOUND}
option(WITH_MKL "Compile Paddle Serving with MKL support." ${AVX_FOUND})
option(WITH_GPU "Compile Paddle Serving with NVIDIA GPU" ${CUDA_FOUND})
option(CLIENT_ONLY "Compile client libraries and demos only" FALSE)
option(WITH_ELASTIC_CTR "Compile ELASITC-CTR solution" FALSE)
set(WITH_MKLML ${WITH_MKL})
if (NOT DEFINED WITH_MKLDNN)
......@@ -123,3 +124,8 @@ add_subdirectory(inferencer-fluid-gpu)
endif()
add_subdirectory(demo-serving)
endif()
# Paddle Serving Solutions
if (WITH_ELASTIC_CTR)
add_subdirectory(elastic-ctr)
endif()
/home/work/image-class/bin/image_class --workflow_path=/home/work/image-class/conf/ --inferservice_path=/home/work/image-class/conf/ --logger_path=/home/work/image-class/conf/ --resource_path=/home/work/image-class/conf/
......@@ -20,10 +20,12 @@ bzip2-devel
## 编译
推荐使用Docker编译Paddle Serving, [Docker编译使用说明](./DOCKER.md)
推荐使用Docker准备Paddle Serving编译环境. [Docker编译使用说明](./DOCKER.md)
以下命令将会下载Paddle Serving最新代码,并执行编译
```shell
$ git clone https://github.com/PaddlePaddle/serving.git
$ git clone https://github.com/PaddlePaddle/Serving.git
$ cd serving
$ mkdir build
$ cd build
......@@ -32,39 +34,31 @@ $ make -j4
$ make install
```
`make install`将把目标产出放在/path/to/paddle-serving/build/output/目录下,目录结构:
`make install`将把目标产出放在/path/to/Paddle-Serving/build/output/目录下,目录结构:
```
.
|-- bin # Paddle Serving protobuf编译插件pdcodegen所在目录
|-- bin # Paddle Serving工具和protobuf编译插件pdcodegen所在目录
|-- conf
|-- demo # demo总目录
| |-- client
| |-- client # Demo client端
| | |-- bert # bert模型客户端
| | |-- ctr_prediction # CTR prediction模型客户端
| | |-- dense_format # dense_format客户端
| | | |-- bin # bin/dense_format是dense_format客户端bin
| | | `-- conf
| | |-- echo # echo服务客户端
| | | |-- bin # bin/echo是echo客户端bin
| | | \-- conf
| | |-- image_classification # image_classification服务客户端
| | | |-- bin # bin/ximage是image_classification客户端bin
| | | |-- conf
| | | |-- data
| | | `-- images
| | |-- int64tensor_format # int64tensor_format服务客户端
| | | |-- bin # bin/int64tensor_format是客户端bin
| | | `-- conf
| | `-- sparse_format # sparse_format客户端
| | |-- bin # bin/sparse_format是客户端bin
| | `-- conf
| `-- serving # serving端,同时提供echo/dense_format/sparse_format/int64tensor_format/image_class等5种服务
| |-- bin # bin/serving是serving端可执行bin
| |-- conf # 配置文件目录
| |-- data
| | `-- model
| | `-- paddle
| | `-- fluid
| | `-- SE_ResNeXt50_32x4d # image_classification模型
`-- lib # Paddle Serving产出的静态库文件: libpdseving.a, libsdk-cpp.a, libconfigure.a, libfluid_cpu_engine.a
| | |-- echo # 最简单的echo service客户端
| | |-- echo_kvdb # local KV读取demo客户端
| | |-- image_classification # 图像分类任务客户端
| | |-- int64tensor_format # int64tensor_format示例客户端
| | |-- sparse_format # sparse_format示例客户端
| | `-- text_classification # 文本分类任务示例客户端
| |-- db_func
| |-- db_thread
| |-- kvdb_test
| `-- serving # Demo serving端;该serving可同时响应所有demo client请求
|-- include # Paddle Serving发布的头文件
|-- lib # Paddle Serving发布的libs
`-- tool # Paddle Serving发布的工具目录
```
如要编写新的预测服务,请参考[从零开始写一个预测服务](CREATING.md)
......
add_subdirectory(client)
add_subdirectory(serving)
include(proto/CMakeLists.txt)
file(GLOB sdk_cpp_srcs ${CMAKE_SOURCE_DIR}/sdk-cpp/src/*.cpp)
list(APPEND elasticctr_srcs ${elastic_ctr_cpp_srcs})
list(APPEND elasticctr_srcs ${sdk_cpp_srcs})
list(APPEND elasticctr_srcs
${CMAKE_CURRENT_LIST_DIR}/api/elastic_ctr_api.cpp)
add_library(elasticctr SHARED ${elasticctr_srcs})
target_link_libraries(elasticctr brpc configure protobuf leveldb -lcrypto
-lssl -lz -lrt)
set_target_properties(elasticctr PROPERTIES INTERFACE_LINK_LIBRARIES "")
add_executable(elastic_ctr_demo ${CMAKE_CURRENT_LIST_DIR}/demo/demo.cpp)
set_target_properties(elastic_ctr_demo PROPERTIES LINK_LIBRARIES "")
target_link_libraries(elastic_ctr_demo elasticctr -lpthread -lcrypto -lm -lrt
-lssl -ldl -lz)
# install
install(TARGETS elasticctr elastic_ctr_demo
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/bin
LIBRARY DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/demo/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/demo/data/ctr_prediction DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/data)
install(TARGETS elasticctr
LIBRARY DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/lib)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/api/elastic_ctr_api.h
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/include/elastic-ctr/client/api/)
install(FILES
${CMAKE_BINARY_DIR}/elastic-ctr/client/elastic_ctr_prediction.pb.h
${CMAKE_BINARY_DIR}/elastic-ctr/client/pds_option.pb.h
${CMAKE_BINARY_DIR}/elastic-ctr/client/builtin_format.pb.h
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/include/elastic-ctr/client/)
install(DIRECTORY
${CMAKE_SOURCE_DIR}/sdk-cpp/include
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/include/sdk-cpp/)
install(DIRECTORY
${CMAKE_SOURCE_DIR}/configure/include
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/include/configure)
install(FILES
${CMAKE_BINARY_DIR}/configure/sdk_configure.pb.h
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/include/configure)
install(DIRECTORY
${CMAKE_BINARY_DIR}/third_party/install/protobuf/include/google
${CMAKE_BINARY_DIR}/third_party/install/brpc/include/brpc
${CMAKE_BINARY_DIR}/third_party/install/brpc/include/butil
${CMAKE_BINARY_DIR}/third_party/install/brpc/include/bthread
${CMAKE_BINARY_DIR}/third_party/install/brpc/include/bvar
${CMAKE_BINARY_DIR}/third_party/install/brpc/include/json2pb
${CMAKE_BINARY_DIR}/third_party/install/gflags/include/gflags
${CMAKE_BINARY_DIR}/third_party/install/glog/include/glog
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/include)
# Python client API
install(FILES ${CMAKE_CURRENT_LIST_DIR}/api/python/elasticctr/elastic_ctr_api.py
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/api/lib)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/api/python/elasticctr/elastic_ctr_api.py
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/bin)
install(FILES ${CMAKE_CURRENT_LIST_DIR}/demo/elastic_ctr.py
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/client/bin)
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "elastic-ctr/client/api/elastic_ctr_api.h"
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fstream>
#include <vector>
namespace baidu {
namespace paddle_serving {
namespace elastic_ctr {
const int VARIABLE_NAME_LEN = 256;
static void thread_resource_delete(void *d) {
#if 1
LOG(INFO) << "thread_resource_delete on " << bthread_self();
#endif
delete static_cast<ThreadResource *>(d);
}
std::set<std::string> ElasticCTRPredictorApi::slot_names_;
int ThreadResource::clear() {
request_.clear_instances();
response_.clear_predictions();
for (auto it : instance_map_) {
delete it.second;
}
return 0;
}
ReqInstance *ThreadResource::add_instance() {
ReqInstance *instance = request_.add_instances();
InstanceInfo *instance_info = new InstanceInfo();
instance_map_[instance] = instance_info;
return instance;
}
int ThreadResource::add_slot(ReqInstance *instance,
const std::string &slot_name,
uint64_t value) {
auto instance_it = instance_map_.find(instance);
if (instance_it == instance_map_.end()) {
return -1;
}
InstanceInfo *instance_info = instance_it->second;
auto slot_it = instance_info->slot_map_.find(slot_name);
Slot *slot = NULL;
if (slot_it == instance_info->slot_map_.end()) {
slot = instance->add_slots();
instance_info->slot_map_[slot_name] = slot;
} else {
slot = slot_it->second;
}
slot->set_slot_name(slot_name);
slot->add_feasigns(value);
return 0;
}
void ThreadResource::validate_request(const std::set<std::string> &slot_names) {
for (auto it : instance_map_) {
ReqInstance *req_instance = it.first;
InstanceInfo *instance_info = it.second;
for (auto slot_name : slot_names) {
if (instance_info->slot_map_.find(slot_name) ==
instance_info->slot_map_.end()) {
LOG(INFO) << "Missing values for slot " << slot_name.c_str();
add_slot(req_instance, slot_name, 0);
}
}
}
}
int ElasticCTRPredictorApi::read_slot_conf(const char *path,
const char *slot_conf_file) {
struct stat stat_buf;
char name[VARIABLE_NAME_LEN];
snprintf(name, VARIABLE_NAME_LEN, "%s/%s", path, slot_conf_file);
if (stat(name, &stat_buf) != 0) {
LOG(ERROR) << "Error stating file" << name;
return -1;
}
std::ifstream fs(name);
for (std::string line; std::getline(fs, line);) {
slot_names_.insert(line);
}
#if 1
for (auto x : slot_names_) {
LOG(INFO) << "slot: " << x.c_str();
}
#endif
return 0;
}
int ElasticCTRPredictorApi::init(const char *path,
const char *slot_conf_file,
const char *serving_conf_file) {
int ret = api_.create(path, serving_conf_file);
if (ret != 0) {
return ret;
}
ret = read_slot_conf(path, slot_conf_file);
if (ret != 0) {
return ret;
}
// Thread-local storage
if (pthread_key_create(&tls_bspec_key_, thread_resource_delete) != 0) {
LOG(ERROR) << "unable to create tls_bthread_key of thrd_data";
return -1;
}
return 0;
}
int ElasticCTRPredictorApi::thrd_initialize() {
api_.thrd_initialize();
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
thread_resource = new (std::nothrow) ThreadResource;
if (thread_resource == NULL) {
LOG(ERROR) << "failed to create thread local resource";
return -1;
}
if (pthread_setspecific(tls_bspec_key_, thread_resource) != 0) {
LOG(ERROR) << "unable to set tls thread local resource";
delete thread_resource;
thread_resource = NULL;
return -1;
}
}
return 0;
}
int ElasticCTRPredictorApi::thrd_clear() {
api_.thrd_clear();
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return -1;
}
}
if (thread_resource->clear() != 0) {
LOG(ERROR) << "ElasticCTRPredictorApi: thrd_clear() fail";
}
return 0;
}
int ElasticCTRPredictorApi::thrd_finalize() {
api_.thrd_finalize();
return 0;
}
void ElasticCTRPredictorApi::destroy() {
pthread_key_delete(tls_bspec_key_);
return;
}
ReqInstance *ElasticCTRPredictorApi::add_instance() {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return NULL;
}
}
ReqInstance *instance = thread_resource->add_instance();
return instance;
}
int ElasticCTRPredictorApi::add_slot(ReqInstance *instance,
const std::string slot_name,
int64_t value) {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return -1;
}
}
if (slot_names_.find(slot_name) == slot_names_.end()) {
LOG(ERROR) << "Slot name not match with those in slot.conf: "
<< slot_name.c_str();
return -1;
}
return thread_resource->add_slot(instance, slot_name, value);
}
void ElasticCTRPredictorApi::validate_request() {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return;
}
}
thread_resource->validate_request(slot_names_);
}
int ElasticCTRPredictorApi::inference(
std::vector<std::vector<float>> &results_vec) {
ThreadResource *thread_resource =
reinterpret_cast<ThreadResource *>(pthread_getspecific(tls_bspec_key_));
if (thread_resource == NULL) {
if (thread_resource == NULL) {
LOG(ERROR) << "ERROR: thread local resource is null";
return -1;
}
}
Predictor *predictor = api_.fetch_predictor("ctr_prediction_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: ctr_prediction_service";
return -1;
}
validate_request();
int ret = predictor->inference(thread_resource->get_request(),
thread_resource->get_response());
if (ret != 0) {
LOG(ERROR) << "Failed call predictor with req "
<< thread_resource->get_request()->ShortDebugString();
return ret;
}
Response *response = thread_resource->get_response();
for (int i = 0; i < response->predictions_size(); ++i) {
const ResInstance &res_instance = response->predictions(i);
std::vector<float> res;
res.push_back(res_instance.prob0());
res.push_back(res_instance.prob1());
results_vec.push_back(res);
}
return 0;
}
} // namespace elastic_ctr
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#define BRPC_WITH_GLOG 1 // To make sure prpoer glog inclusion
#include <map>
#include <set>
#include <string>
#include <vector>
#include "elastic-ctr/client/elastic_ctr_prediction.pb.h"
#include "sdk-cpp/include/predictor_sdk.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::elastic_ctr::Slot;
using baidu::paddle_serving::predictor::elastic_ctr::Request;
using baidu::paddle_serving::predictor::elastic_ctr::ReqInstance;
using baidu::paddle_serving::predictor::elastic_ctr::Response;
using baidu::paddle_serving::predictor::elastic_ctr::ResInstance;
namespace baidu {
namespace paddle_serving {
namespace elastic_ctr {
struct InstanceInfo {
std::map<std::string, Slot *> slot_map_;
};
class ThreadResource {
public:
int clear();
Request *get_request() { return &request_; }
Response *get_response() { return &response_; }
ReqInstance *add_instance();
int add_slot(ReqInstance *instance,
const std::string &slot_name,
uint64_t value);
void validate_request(const std::set<std::string> &slot_names);
private:
Request request_;
Response response_;
std::map<ReqInstance *, InstanceInfo *> instance_map_;
};
struct Prediction {
float prob0;
float prob1;
};
class ElasticCTRPredictorApi {
public:
ElasticCTRPredictorApi() {}
int init(const char *path,
const char *slot_conf_file,
const char *serving_conf_file);
int thrd_initialize();
int thrd_clear();
int thrd_finalize();
void destroy();
static ElasticCTRPredictorApi &instance() {
static ElasticCTRPredictorApi api;
return api;
}
public:
ReqInstance *add_instance();
int add_slot(ReqInstance *instance,
const std::string slot_name,
int64_t value);
int inference(std::vector<std::vector<float>> &results_vec); // NOLINT
private:
static int read_slot_conf(const char *path, const char *slot_conf_file);
void validate_request();
private:
PredictorApi api_;
pthread_key_t tls_bspec_key_;
static std::set<std::string> slot_names_;
};
} // namespace elastic_ctr
} // namespace paddle_serving
} // namespace baidu
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import json
import sys
import os
if sys.version_info[0] == 2:
import httplib
elif sys.version_info[0] == 3:
import http.client
class ElasticCTRAPI(object):
def __init__(self, serving_ip, serving_port):
self._instances = []
self._slots = []
self._conn = self.conn(serving_ip, serving_port)
def read_slots_conf(self, slots_conf_file):
if not os.path.exists(slots_conf_file):
print("Path %s not exist" % sltos_conf_file)
return -1
with open(slots_conf_file, "r") as f:
for line in f:
self._slots.append(line.rstrip('\n'))
return 0
def conn(self, ip, port):
if sys.version_info[0] == 2:
return httplib.HTTPConnection(ip, port)
elif sys.version_info[0] == 3:
return http.client.HTTPConnection(ip, port)
def add_instance(self):
feature_slots = []
instance = [{"slots": feature_slots}]
self._instances += instance
return instance
def clear(self):
self._instances = []
def add_slot(self, instance, slot, feasigns):
if not isinstance(instance, list):
print("add slot: parameter invalid: instance should be list")
return -1
if not isinstance(feasigns, list):
print("add slot: value format invalid: feasigns should be list")
return -1
kv = [{"slot_name": slot, "feasigns": feasigns}]
instance[0]["slots"] += kv
def inference(self):
for instance in self._instances:
feature_slots = instance["slots"]
keys = []
for x in feature_slots:
keys += [x["slot_name"]]
for slot in self._slots:
if not slot in keys:
feature_slots += [{"slot_name": slot, "feasigns": [0]}]
req = {"instances": self._instances}
request_json = json.dumps(req)
if sys.version_info[0] == 2:
try:
self._conn.request(
'POST', "/ElasticCTRPredictionService/inference",
request_json, {"Content-Type": "application/json"})
response = self._conn.getresponse()
return response.read()
except httplib.HTTPException as e:
print(e.reason)
elif sys.version_info[0] == 3:
try:
self._conn.request(
'POST', "/ElasticCTRPredictionService/inference",
request_json, {"Content-Type": "application/json"})
response = self._conn.getresponse()
return response.read()
except http.clinet.HTTPException as e:
print(e.reason)
default_variant_conf {
tag: "default"
connection_conf {
connect_timeout_ms: 2000
rpc_timeout_ms: 20000
connect_retry_count: 2
max_connection_per_host: 100
hedge_request_timeout_ms: -1
hedge_fetch_retry_count: 2
connection_type: "pooled"
}
naming_conf {
cluster_filter_strategy: "Default"
load_balance_strategy: "la"
}
rpc_parameter {
compress_type: 0
package_size: 20
protocol: "baidu_std"
max_channel_per_request: 3
}
}
predictors {
name: "ctr_prediction_service"
service_name: "baidu.paddle_serving.predictor.elastic_ctr.ElasticCTRPredictionService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
此差异已折叠。
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdlib>
#include <fstream>
#include <sstream>
#include <string>
#include <thread> // NOLINT
#include "elastic-ctr/client/api/elastic_ctr_api.h"
using baidu::paddle_serving::elastic_ctr::ElasticCTRPredictorApi;
using baidu::paddle_serving::elastic_ctr::Prediction;
DEFINE_int32(batch_size, 10, "Infernce batch_size");
DEFINE_string(test_file, "", "test file");
const int VARIABLE_NAME_LEN = 256;
const int CTR_EMBEDDING_TABLE_SIZE = 100000001;
struct Sample {
std::map<std::string, std::vector<uint64_t>> slots;
};
std::vector<Sample> samples;
int read_samples(const char* file) {
std::ifstream fs(file);
for (std::string line; std::getline(fs, line);) {
std::vector<std::string> tokens;
std::stringstream ss(line);
std::string token;
Sample sample;
while (std::getline(ss, token, ' ')) {
tokens.push_back(token);
}
if (tokens.size() <= 3) {
continue;
}
for (std::size_t i = 2; i < tokens.size(); ++i) {
std::size_t pos = tokens[i].find(':');
if (pos == std::string::npos) {
continue;
}
uint64_t x = std::strtoull(tokens[i].substr(0, pos).c_str(), NULL, 10);
std::string slot_name = tokens[i].substr(pos + 1);
if (sample.slots.find(slot_name) == sample.slots.end()) {
std::vector<uint64_t> values;
values.push_back(x % CTR_EMBEDDING_TABLE_SIZE);
sample.slots[slot_name] = values;
} else {
auto it = sample.slots.find(slot_name);
it->second.push_back(x);
}
}
samples.push_back(sample);
}
LOG(INFO) << "Samples size = " << samples.size();
#if 1
for (std::size_t i = 0; i < samples.size(); ++i) {
LOG(INFO) << "=============Sample " << i << "=========";
for (auto slot : samples[i].slots) {
LOG(INFO) << "slot_name: " << slot.first.c_str();
for (auto x : slot.second) {
LOG(INFO) << x;
}
}
LOG(INFO) << "========================================";
}
#endif
return 0;
}
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
ElasticCTRPredictorApi api;
#ifdef BCLOUD
logging::LoggingSettings settings;
settings.logging_dest = logging::LOG_TO_FILE;
std::string log_filename(argv[0]);
log_filename = log_filename.substr(log_filename.find_last_of('/') + 1);
settings.log_file = (std::string("./log/") + log_filename + ".log").c_str();
settings.delete_old = logging::DELETE_OLD_LOG_FILE;
logging::InitLogging(settings);
logging::ComlogSinkOptions cso;
cso.process_name = log_filename;
cso.enable_wf_device = true;
logging::ComlogSink::GetInstance()->Setup(&cso);
#else
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
FLAGS_logbufsecs = 0;
FLAGS_logbuflevel = -1;
#endif
// predictor conf
if (api.init("./conf", "slot.conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
api.thrd_initialize();
ret = read_samples(FLAGS_test_file.c_str());
std::size_t index = 0;
while (index < samples.size()) {
api.thrd_clear();
for (int i = 0; i < FLAGS_batch_size && index < samples.size(); ++i) {
ReqInstance* ins = api.add_instance();
if (!ins) {
LOG(ERROR) << "Failed create req instance";
return -1;
}
for (auto slot : samples[index].slots) {
for (auto x : slot.second) {
api.add_slot(ins, slot.first.c_str(), x);
}
}
++index;
}
std::vector<std::vector<float>> results_vec;
if (api.inference(results_vec) != 0) {
LOG(ERROR) << "failed call predictor";
return -1;
}
#if 1
for (std::size_t i = 0; i < results_vec.size(); ++i) {
LOG(INFO) << "sample " << i << ": [" << results_vec[i].at(0) << ", "
<< results_vec[i].at(1) << "]";
}
#endif
} // end while
api.thrd_finalize();
api.destroy();
return 0;
}
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import json
import sys
import os
from elastic_ctr_api import ElasticCTRAPI
BATCH_SIZE = 3
SERVING_IP = "127.0.0.1"
SLOT_CONF_FILE = "./conf/slot.conf"
CTR_EMBEDDING_TABLE_SIZE = 100000001
SLOTS = []
def str2long(str):
if sys.version_info[0] == 2:
return long(str)
elif sys.version_info[0] == 3:
return int(str)
def data_reader(data_file, samples, labels):
if not os.path.exists(data_file):
print("Path %s not exist" % data_file)
return -1
with open(data_file, "r") as f:
for line in f:
sample = {}
line = line.rstrip('\n')
feature_slots = line.split(' ')
labels.append(int(feature_slots[1]))
feature_slots = feature_slots[2:]
feature_slot_maps = [x.split(':') for x in feature_slots]
features = [x[0] for x in feature_slot_maps]
slots = [x[1] for x in feature_slot_maps]
for i in range(0, len(features)):
if slots[i] in sample:
sample[slots[i]] = [
sample[slots[i]] + str2long(features[i]) %
CTR_EMBEDDING_TABLE_SIZE
]
else:
sample[slots[i]] = [
str2long(features[i]) % CTR_EMBEDDING_TABLE_SIZE
]
for x in SLOTS:
if not x in sample:
sample[x] = [0]
samples.append(sample)
if __name__ == "__main__":
""" main
"""
if len(sys.argv) != 5:
print(
"Usage: python elastic_ctr.py SERVING_IP SERVING_PORT SLOT_CONF_FILE DATA_FILE"
)
sys.exit(-1)
samples = []
labels = []
SERVING_IP = sys.argv[1]
SERVING_PORT = sys.argv[2]
SLOT_CONF_FILE = sys.argv[3]
api = ElasticCTRAPI(SERVING_IP, SERVING_PORT)
ret = api.read_slots_conf(SLOT_CONF_FILE)
if ret != 0:
sys.exit(-1)
ret = data_reader(sys.argv[4], samples, labels)
correct = 0
for i in range(0, len(samples) - BATCH_SIZE, BATCH_SIZE):
api.clear()
batch = samples[i:i + BATCH_SIZE]
instances = []
for sample in batch:
instance = api.add_instance()
if sys.version_info[0] == 2:
for k, v in sample.iteritems():
api.add_slot(instance, k, v)
elif sys.version_info[0] == 3:
for k, v in sample.items():
api.add_slot(instance, k, v)
ret = api.inference()
ret = json.loads(ret)
predictions = ret["predictions"]
idx = 0
for x in predictions:
if x["prob0"] >= x["prob1"]:
pred = 0
else:
pred = 1
if labels[i + idx] == pred:
correct += 1
else:
print("id=%d predict incorrect: pred=%d label=%d (%f %f)" %
(i + idx, pred, labels[i + idx], x["prob0"], x["prob1"]))
idx = idx + 1
print("Acc=%f" % (float(correct) / len(samples)))
FILE(GLOB protos ${CMAKE_CURRENT_LIST_DIR}/*.proto)
list(APPEND protos ${CMAKE_SOURCE_DIR}/predictor/proto/pds_option.proto
${CMAKE_SOURCE_DIR}/predictor/proto/builtin_format.proto)
PROTOBUF_GENERATE_SERVING_CPP(FALSE PROTO_SRCS PROTO_HDRS ${protos})
LIST(APPEND elastic_ctr_cpp_srcs ${PROTO_SRCS})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.elastic_ctr;
option cc_generic_services = true;
message Slot {
required string slot_name = 1;
repeated int64 feasigns = 2;
};
message ReqInstance { repeated Slot slots = 1; };
message Request { repeated ReqInstance instances = 1; };
message ResInstance {
required float prob0 = 1;
required float prob1 = 2;
};
message Response {
repeated ResInstance predictions = 1;
required int64 err_code = 2;
optional string err_msg = 3;
};
service ElasticCTRPredictionService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
if (NOT EXISTS
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid/ctr_prediction)
execute_process(COMMAND wget --no-check-certificate
https://paddle-serving.bj.bcebos.com/data/ctr_prediction/elastic_ctr_model.tar.gz
--output-document
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid/elastic_ctr_model.tar.gz)
execute_process(COMMAND ${CMAKE_COMMAND} -E tar xzf
"${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid/elastic_ctr_model.tar.gz"
WORKING_DIRECTORY
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid)
execute_process(COMMAND ${CMAKE_COMMAND} -E rename inference_only ctr_prediction
WORKING_DIRECTORY
${CMAKE_CURRENT_LIST_DIR}/data/model/paddle/fluid)
endif()
include_directories(SYSTEM ${CMAKE_CURRENT_LIST_DIR}/../kvdb/include)
include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(elastic_serving ${serving_srcs})
add_dependencies(elastic_serving pdcodegen fluid_cpu_engine pdserving paddle_fluid cube-api)
target_include_directories(elastic_serving PUBLIC
${CMAKE_CURRENT_BINARY_DIR}/../../predictor
)
target_link_libraries(elastic_serving -Wl,--whole-archive fluid_cpu_engine
-Wl,--no-whole-archive)
target_link_libraries(elastic_serving paddle_fluid ${paddle_depend_libs})
target_link_libraries(elastic_serving pdserving)
target_link_libraries(elastic_serving cube-api)
target_link_libraries(elastic_serving kvdb rocksdb)
target_link_libraries(elastic_serving -liomp5 -lmklml_intel -lmkldnn -lpthread
-lcrypto -lm -lrt -lssl -ldl -lz -lbz2)
install(TARGETS elastic_serving
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/)
FILE(GLOB inc ${CMAKE_CURRENT_BINARY_DIR}/*.pb.h)
install(FILES ${inc}
DESTINATION ${PADDLE_SERVING_INSTALL_DIR}/include/serving)
if (${WITH_MKL})
install(FILES
${CMAKE_BINARY_DIR}/third_party/install/Paddle/third_party/install/mklml/lib/libmklml_intel.so
${CMAKE_BINARY_DIR}/third_party/install/Paddle/third_party/install/mklml/lib/libiomp5.so
${CMAKE_BINARY_DIR}/third_party/install/Paddle/third_party/install/mkldnn/lib/libmkldnn.so.0
DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/elastic_ctr/serving/bin)
endif()
[{
"dict_name": "test_dict",
"shard": 2,
"dup": 1,
"timeout": 200,
"retry": 3,
"backup_request": 100,
"type": "ipport_list",
"load_balancer": "rr",
"nodes": [{
"ipport_list": "list://xxx.xxx.xxx.xxx:8000"
},{
"ipport_list": "list://xxx.xxx.xxx.xxx:8000"
}]
}]
--enable_model_toolkit
--enable_cube=true
engines {
name: "elastic_ctr_prediction"
type: "FLUID_CPU_ANALYSIS_DIR"
reloadable_meta: "./data/model/paddle/fluid_time_file"
reloadable_type: "timestamp_ne"
model_data_path: "./data/model/paddle/fluid/ctr_prediction"
runtime_thread_num: 0
batch_infer_size: 0
enable_batch_align: 0
sparse_param_service_type: REMOTE
sparse_param_service_table_name: "test_dict"
}
model_toolkit_path: "./conf/"
model_toolkit_file: "model_toolkit.prototxt"
cube_config_file: "./conf/cube.conf"
services {
name: "ElasticCTRPredictionService"
workflows: "workflow1"
}
workflows {
name: "workflow1"
workflow_type: "Sequence"
nodes {
name: "elastic_ctr_prediction_op"
type: "ElasticCTRPredictionOp"
}
}
FILE(GLOB op_srcs ${CMAKE_CURRENT_LIST_DIR}/*.cpp)
LIST(APPEND serving_srcs ${op_srcs})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "elastic-ctr/serving/op/elastic_ctr_prediction_op.h"
#include <algorithm>
#include <string>
#include "cube/cube-api/include/cube_api.h"
#include "predictor/framework/infer.h"
#include "predictor/framework/kv_manager.h"
#include "predictor/framework/memory.h"
// Flag where enable profiling mode
DECLARE_bool(enable_ctr_profiling);
namespace baidu {
namespace paddle_serving {
namespace serving {
using baidu::paddle_serving::predictor::MempoolWrapper;
using baidu::paddle_serving::predictor::elastic_ctr::Slot;
using baidu::paddle_serving::predictor::elastic_ctr::ResInstance;
using baidu::paddle_serving::predictor::elastic_ctr::Response;
using baidu::paddle_serving::predictor::elastic_ctr::ReqInstance;
using baidu::paddle_serving::predictor::elastic_ctr::Request;
const int VARIABLE_NAME_LEN = 256;
const int CTR_PREDICTION_DENSE_DIM = 13;
const int CTR_PREDICTION_EMBEDDING_SIZE = 9;
bthread::Mutex ElasticCTRPredictionOp::mutex_;
int64_t ElasticCTRPredictionOp::cube_time_us_ = 0;
int32_t ElasticCTRPredictionOp::cube_req_num_ = 0;
int32_t ElasticCTRPredictionOp::cube_req_key_num_ = 0;
void fill_response_with_message(Response *response,
int err_code,
std::string err_msg) {
if (response == NULL) {
LOG(ERROR) << "response is NULL";
return;
}
response->set_err_code(err_code);
response->set_err_msg(err_msg);
return;
}
int ElasticCTRPredictionOp::inference() {
const Request *req = dynamic_cast<const Request *>(get_request_message());
TensorVector *in = butil::get_object<TensorVector>();
Response *res = mutable_data<Response>();
uint32_t sample_size = req->instances_size();
if (sample_size <= 0) {
LOG(WARNING) << "No instances need to inference!";
fill_response_with_message(res, -1, "Sample size invalid");
return 0;
}
// Verify all request instances have same slots
int slot_num = req->instances(0).slots_size();
#if 1
LOG(INFO) << "slot_num =" << slot_num;
#endif
for (int i = 1; i < req->instances_size(); ++i) {
if (req->instances(i).slots_size() != slot_num) {
LOG(WARNING) << "Req " << i
<< " has different slot num with that of req 0";
fill_response_with_message(
res, -1, "Req intance has varying slot numbers");
}
}
// Query cube API for sparse embeddings
std::vector<uint64_t> keys;
std::vector<rec::mcube::CubeValue> values;
// How to organize feasigns in the above `keys` vector:
//
// Assuming N instances, each instance having M feature slots:
//
// ins1:
// slot_1: ins1_slot1_1|ins1_slot1_2 slot2: ins1_slot2_1|ins1_slot2_2
//
// ins2:
// slot_1: ins2_slot1_1 slot2: ins2_slot2_1|ins2_slot2_2
//
// ...
//
// insN:
// slot_1: insN_slot1_1|insN_slot1_2 slot2: insN_slot2_1
//
// We organize the features in such a way that all slot_1 features are before
// slot_2 features:
//
// ins1_slot1_1|ins1_slot1_2|ins2_slot1_1|...|insN_slot1_1|insN_slot1_2
// ins1_slot2_1|ins1_slot2_2|ins2_slot2_1|ins2_slot2_2|...|insN_slot2_1
//
// With this placement, after querying KV service, we can retrieve the
// embeddings for each feature slot from the returned `values` vector easily,
// as they are grouped togegher.
// Level of details of each feature slot
std::vector<std::vector<size_t>> feature_slot_lods;
feature_slot_lods.resize(slot_num);
// Number of feature signs in each slot
std::vector<int> feature_slot_sizes;
feature_slot_sizes.resize(slot_num);
// Iterate over each feature slot
for (int i = 0; i < slot_num; ++i) {
feature_slot_lods[i].push_back(0);
feature_slot_sizes[i] = 0;
// Extract feature i values from each instance si
for (int si = 0; si < sample_size; ++si) {
#if 1
LOG(INFO) << "slot " << i << " sample " << si;
#endif
const ReqInstance &req_instance = req->instances(si);
const Slot &slot = req_instance.slots(i);
feature_slot_lods[i].push_back(feature_slot_lods[i].back() +
slot.feasigns_size());
feature_slot_sizes[i] += slot.feasigns_size();
for (int j = 0; j < slot.feasigns_size(); ++j) {
keys.push_back(slot.feasigns(j));
}
}
}
#if 1
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
predictor::KVManager &kv_manager = predictor::KVManager::instance();
const predictor::KVInfo *kvinfo =
kv_manager.get_kv_info(CTR_PREDICTION_MODEL_NAME);
if (kvinfo == NULL) {
LOG(ERROR) << "Sparse param service info not found for model "
<< CTR_PREDICTION_MODEL_NAME
<< ". Maybe forgot to specify sparse_param_service_type and "
<< "sparse_param_service_table_name in "
<< "conf/model_toolkit.prototxt";
fill_response_with_message(res, -1, "Sparse param service info not found");
return 0;
}
std::string table_name;
if (kvinfo->sparse_param_service_type != configure::EngineDesc::NONE) {
table_name = kvinfo->sparse_param_service_table_name;
if (table_name.empty()) {
LOG(ERROR) << "sparse_param_service_table_name not specified. "
<< "Please specify it in conf/model_toolkit.protxt for model "
<< CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(
res, -1, "sparse_param_service_table_name not specified");
return 0;
}
}
if (kvinfo->sparse_param_service_type == configure::EngineDesc::LOCAL) {
// Query local KV service
LOG(ERROR) << "Local kv service not supported for model "
<< CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(
res, -1, "Local kv service not supported for this model");
return 0;
} else if (kvinfo->sparse_param_service_type ==
configure::EngineDesc::REMOTE) {
struct timeval start;
struct timeval end;
int ret;
gettimeofday(&start, NULL);
ret = cube->seek(table_name, keys, &values);
gettimeofday(&end, NULL);
uint64_t usec =
end.tv_sec * 1e6 + end.tv_usec - start.tv_sec * 1e6 - start.tv_usec;
// Statistics
mutex_.lock();
cube_time_us_ += usec;
++cube_req_num_;
cube_req_key_num_ += keys.size();
if (cube_req_num_ >= 1000) {
LOG(INFO) << "Cube request count: " << cube_req_num_;
LOG(INFO) << "Cube request key count: " << cube_req_key_num_;
LOG(INFO) << "Cube request total time: " << cube_time_us_ << "us";
LOG(INFO) << "Average "
<< static_cast<float>(cube_time_us_) / cube_req_num_
<< "us/req";
LOG(INFO) << "Average "
<< static_cast<float>(cube_time_us_) / cube_req_key_num_
<< "us/key";
cube_time_us_ = 0;
cube_req_num_ = 0;
cube_req_key_num_ = 0;
}
mutex_.unlock();
// Statistics end
if (ret != 0) {
fill_response_with_message(res, -1, "Query cube for embeddings error");
LOG(ERROR) << "Query cube for embeddings error";
return 0;
}
}
if (values.size() != keys.size()) {
LOG(ERROR) << "Sparse embeddings not ready; "
<< "maybe forgot to set sparse_param_service_type and "
<< "sparse_param_sevice_table_name for "
<< CTR_PREDICTION_MODEL_NAME
<< " in conf/model_toolkit.prototxt";
fill_response_with_message(
res, -1, "Sparse param service not configured properly");
return 0;
}
for (int i = 0; i < keys.size(); ++i) {
std::ostringstream oss;
oss << keys[i] << ": ";
const char *value = (values[i].buff.data());
if (values[i].buff.size() !=
sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
LOG(WARNING) << "Key " << keys[i] << " has values less than "
<< CTR_PREDICTION_EMBEDDING_SIZE;
}
#if 0
for (int j = 0; j < values[i].buff.size(); ++j) {
oss << std::hex << std::uppercase << std::setw(2) << std::setfill('0')
<< (static_cast<int>(value[j]) & 0xff);
}
LOG(INFO) << oss.str().c_str();
#endif
}
// Fill feature embedding into feed tensors
std::vector<paddle::PaddleTensor> lod_tensors;
lod_tensors.resize(slot_num);
const ReqInstance &instance = req->instances(0);
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
char name[VARIABLE_NAME_LEN];
snprintf(name,
VARIABLE_NAME_LEN,
"embedding_%s.tmp_0",
instance.slots(i).slot_name().c_str());
lod_tensor.name = std::string(name);
lod_tensors[i].dtype = paddle::PaddleDType::FLOAT32;
std::vector<std::vector<size_t>> &lod = lod_tensors[i].lod;
lod.resize(1);
lod[0].push_back(0);
}
int base = 0;
// Iterate over all slots
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
lod[0] = feature_slot_lods[i];
lod_tensor.shape = {lod[0].back(), CTR_PREDICTION_EMBEDDING_SIZE};
lod_tensor.data.Resize(lod[0].back() * sizeof(float) *
CTR_PREDICTION_EMBEDDING_SIZE);
int offset = 0;
// Copy all slot i feature embeddings to lod_tensor[i]
for (uint32_t j = 0; j < feature_slot_sizes[i]; ++j) {
float *data_ptr = static_cast<float *>(lod_tensor.data.data()) + offset;
int idx = base + j;
if (values[idx].buff.size() !=
sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE) {
#if 0
LOG(ERROR) << "Embedding vector size not expected";
fill_response_with_message(
res, -1, "Embedding vector size not expected");
return 0;
#else
// sizeof(float) * CTR_PREDICTION_EMBEDDING_SIZE = 36
values[idx].buff.append(36, '0');
#endif
}
memcpy(data_ptr, values[idx].buff.data(), values[idx].buff.size());
offset += CTR_PREDICTION_EMBEDDING_SIZE;
}
in->push_back(lod_tensor);
// Bump base counter
base += feature_slot_sizes[i];
}
#else
// Fill all tensors
std::vector<paddle::PaddleTensor> lod_tensors;
lod_tensors.resize(slot_num);
const ReqInstance &instance = req->instances(0);
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
lod_tensor.name = instance.slots(i).slot_name();
LOG(INFO) << "slot " << i << "name: " << lod_tensor.name.c_str();
lod_tensors[i].dtype = paddle::PaddleDType::INT64;
}
// Iterate over all slots
for (int i = 0; i < slot_num; ++i) {
paddle::PaddleTensor &lod_tensor = lod_tensors[i];
std::vector<std::vector<size_t>> &lod = lod_tensor.lod;
lod.push_back(feature_slot_lods[i]);
lod_tensor.shape = {lod[0].back(), 1};
lod_tensor.data.Resize(lod[0].back() * sizeof(uint64_t));
int offset = 0;
// Copy all slot i features to lod_tensor[i]
uint64_t *keys_block = keys.data();
uint64_t *data_ptr = static_cast<uint64_t *>(lod_tensor.data.data());
memcpy(data_ptr,
keys_block + offset,
feature_slot_sizes[i] * sizeof(uint64_t));
offset += feature_slot_sizes[i];
in->push_back(lod_tensor);
// Bump base counter
offset += feature_slot_sizes[i];
}
#endif
TensorVector *out = butil::get_object<TensorVector>();
if (!out) {
LOG(ERROR) << "Failed get tls output object";
fill_response_with_message(res, -1, "Failed get thread local resource");
return 0;
}
// call paddle fluid model for inference
if (predictor::InferManager::instance().infer(
CTR_PREDICTION_MODEL_NAME, in, out, sample_size)) {
LOG(ERROR) << "Failed do infer in fluid model: "
<< CTR_PREDICTION_MODEL_NAME;
fill_response_with_message(res, -1, "Failed do infer in fluid model");
return 0;
}
if (out->size() != 1) {
LOG(ERROR) << "Model returned number of fetch tensor more than 1";
fill_response_with_message(
res, -1, "Model returned number of fetch tensor more than 1");
return 0;
}
int output_shape_dim = out->at(0).shape.size();
if (output_shape_dim != 2) {
LOG(ERROR) << "Fetch LoDTensor should be shape of [sample_size, 2]";
fill_response_with_message(
res, -1, "Fetch LoDTensor should be shape of [sample_size, 2]");
return 0;
}
if (out->at(0).dtype != paddle::PaddleDType::FLOAT32) {
LOG(ERROR) << "Fetch LoDTensor data type should be FLOAT32";
fill_response_with_message(
res, -1, "Fetch LoDTensor data type should be FLOAT32");
return 0;
}
int dim1 = out->at(0).shape[0];
int dim2 = out->at(0).shape[1];
if (dim1 != sample_size) {
LOG(ERROR) << "Returned result count not equal to sample_size";
fill_response_with_message(
res, -1, "Returned result count not equal to sample size");
return 0;
}
if (dim2 != 2) {
LOG(ERROR) << "Returned result is not expected, should be 2 floats for "
"each sample";
fill_response_with_message(
res, -1, "Retunred result is not 2 floats for each sample");
return 0;
}
float *data = static_cast<float *>(out->at(0).data.data());
for (int i = 0; i < dim1; ++i) {
ResInstance *res_instance = res->add_predictions();
res_instance->set_prob0(data[i * dim2]);
res_instance->set_prob1(data[i * dim2 + 1]);
}
for (size_t i = 0; i < in->size(); ++i) {
(*in)[i].shape.clear();
}
in->clear();
butil::return_object<TensorVector>(in);
for (size_t i = 0; i < out->size(); ++i) {
(*out)[i].shape.clear();
}
out->clear();
butil::return_object<TensorVector>(out);
res->set_err_code(0);
res->set_err_msg(std::string(""));
return 0;
}
DEFINE_OP(ElasticCTRPredictionOp);
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#ifdef BCLOUD
#ifdef WITH_GPU
#include "paddle/paddle_inference_api.h"
#else
#include "paddle/fluid/inference/api/paddle_inference_api.h"
#endif
#else
#include "paddle_inference_api.h" // NOLINT
#endif
#include "elastic-ctr/serving/elastic_ctr_prediction.pb.h"
namespace baidu {
namespace paddle_serving {
namespace serving {
static const char* CTR_PREDICTION_MODEL_NAME = "elastic_ctr_prediction";
/**
* ElasticCTRPredictionOp: Serve CTR prediction requests.
*
*/
class ElasticCTRPredictionOp
: public baidu::paddle_serving::predictor::OpWithChannel<
baidu::paddle_serving::predictor::elastic_ctr::Response> {
public:
typedef std::vector<paddle::PaddleTensor> TensorVector;
DECLARE_OP(ElasticCTRPredictionOp);
int inference();
private:
static bthread::Mutex mutex_;
static int64_t cube_time_us_;
static int32_t cube_req_num_;
static int32_t cube_req_key_num_;
};
} // namespace serving
} // namespace paddle_serving
} // namespace baidu
LIST(APPEND protofiles
${CMAKE_CURRENT_LIST_DIR}/elastic_ctr_prediction.proto
)
PROTOBUF_GENERATE_SERVING_CPP(TRUE PROTO_SRCS PROTO_HDRS ${protofiles})
LIST(APPEND serving_srcs ${PROTO_SRCS})
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.elastic_ctr;
option cc_generic_services = true;
message Slot {
required string slot_name = 1;
repeated int64 feasigns = 2;
};
message ReqInstance { repeated Slot slots = 1; };
message Request { repeated ReqInstance instances = 1; };
message ResInstance {
required float prob0 = 1;
required float prob1 = 2;
};
message Response {
repeated ResInstance predictions = 1;
required int64 err_code = 2;
optional string err_msg = 3;
};
service ElasticCTRPredictionService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_impl = true;
};
......@@ -23,7 +23,6 @@
#include <unistd.h>
#include <exception>
#include "boost/unordered_map.hpp"
#include "gflags/gflags.h"
#include "google/protobuf/message.h"
......@@ -35,7 +34,7 @@
#include "base/logging.h"
#include "base/object_pool.h"
#include "base/time.h"
#include "bthread.h"
#include "bthread.h" // NOLINT
#else
#include "brpc/channel.h"
#include "brpc/parallel_channel.h"
......@@ -49,7 +48,7 @@
#include "bvar/bvar.h"
#ifdef BCLOUD
#include "json_to_pb.h"
#include "json_to_pb.h" // NOLINT
#else
#include "json2pb/json_to_pb.h"
#endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册