未验证 提交 c29df1db 编写于 作者: W Wang Guibao 提交者: GitHub

Merge pull request #1 from MRXLT/ctr_model_serving

add cube init and ctr demo
......@@ -35,6 +35,8 @@ message ModelToolkitConf { repeated EngineDesc engines = 1; };
message ResourceConf {
required string model_toolkit_path = 1;
required string model_toolkit_file = 2;
optional string cube_config_path = 3;
optional string cube_config_file = 4;
};
// DAG node depency info
......
......@@ -57,6 +57,10 @@ add_executable(text_classification_press
target_link_libraries(text_classification_press -Wl,--whole-archive sdk-cpp -Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl
-lz)
add_executable(ctr_prediction
${CMAKE_CURRENT_LIST_DIR}/src/ctr_prediction.cpp)
target_link_libraries(ctr_prediction -Wl,--whole-archive sdk-cpp
-Wl,--no-whole-archive -lpthread -lcrypto -lm -lrt -lssl -ldl -lz)
# install
install(TARGETS ximage
RUNTIME DESTINATION
......@@ -104,3 +108,11 @@ install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/text_classification/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data/text_classification DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/text_classification/data)
install(TARGETS ctr_prediction
RUNTIME DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/ctr_prediction/bin)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/conf DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/ctr_prediction/)
install(DIRECTORY ${CMAKE_CURRENT_LIST_DIR}/data/ctr_prediction DESTINATION
${PADDLE_SERVING_INSTALL_DIR}/demo/client/ctr_prediction/data)
......@@ -124,3 +124,18 @@ predictors {
}
}
}
predictors {
name: "ctr_prediction_service"
service_name: "baidu.paddle_serving.predictor.ctr_prediction.CTRPredictionService"
endpoint_router: "WeightedRandomRender"
weighted_random_render_conf {
variant_weight_list: "50"
}
variants {
tag: "var1"
naming_conf {
cluster: "list://127.0.0.1:8010"
}
}
}
因为 它太大了无法显示 source diff 。你可以改为 查看blob
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdlib>
#include <fstream>
#include <sstream>
#include <string>
#include <thread> // NOLINT
#include "sdk-cpp/ctr_prediction.pb.h"
#include "sdk-cpp/include/common.h"
#include "sdk-cpp/include/predictor_sdk.h"
using baidu::paddle_serving::sdk_cpp::Predictor;
using baidu::paddle_serving::sdk_cpp::PredictorApi;
using baidu::paddle_serving::predictor::ctr_prediction::Request;
using baidu::paddle_serving::predictor::ctr_prediction::Response;
using baidu::paddle_serving::predictor::ctr_prediction::CTRReqInstance;
using baidu::paddle_serving::predictor::ctr_prediction::CTRResInstance;
int batch_size = 1;
int sparse_num = 26;
int dense_num = 13;
int thread_num = 1;
int hash_dim = 1000001;
std::vector<float> cont_min = {0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
std::vector<float> cont_diff = {
20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50};
char* data_filename = "./data/ctr_prediction/data.txt";
std::atomic<int> g_concurrency(0);
std::vector<std::vector<int>> response_time;
std::vector<std::string> split(const std::string& str,
const std::string& pattern) {
std::vector<std::string> res;
if (str == "") return res;
std::string strs = str + pattern;
size_t pos = strs.find(pattern);
while (pos != strs.npos) {
std::string temp = strs.substr(0, pos);
res.push_back(temp);
strs = strs.substr(pos + 1, strs.size());
pos = strs.find(pattern);
}
return res;
}
int64_t hash(std::string str) {
int64_t len;
unsigned char* p;
int64_t x;
len = str.size();
p = (unsigned char*)str.c_str();
x = *p << 7;
while (--len >= 0) {
x = (1000003 * x) ^ *p++;
}
x ^= str.size();
if (x == -1) {
x = -2;
}
return x;
}
int create_req(Request* req,
const std::vector<std::string>& data_list,
int data_index,
int batch_size) {
for (int i = 0; i < batch_size; ++i) {
CTRReqInstance* ins = req->add_instances();
if (!ins) {
LOG(ERROR) << "Failed create req instance";
return -1;
}
// add data
std::vector<std::string> feature_list =
split(data_list[data_index + i], "\t");
for (int fi = 0; fi < dense_num; fi++) {
if (feature_list[fi] == "") {
ins->add_dense_ids(0.0);
} else {
float dense_id = std::stof(feature_list[fi]);
dense_id = (dense_id - cont_min[fi]) / cont_diff[fi];
ins->add_dense_ids(dense_id);
}
}
for (int fi = dense_num; fi < (dense_num + sparse_num); fi++) {
int64_t sparse_id =
hash(std::to_string(fi) + feature_list[fi]) % hash_dim;
if (sparse_id < 0) {
// diff between c++ and python
sparse_id += hash_dim;
}
ins->add_sparse_ids(sparse_id);
}
}
return 0;
}
void print_res(const Request& req,
const Response& res,
std::string route_tag,
uint64_t mid_ms,
uint64_t elapse_ms) {
if (res.err_code() != 0) {
LOG(ERROR) << "Get result fail :" << res.err_msg();
return;
}
for (uint32_t i = 0; i < res.predictions_size(); ++i) {
const CTRResInstance& res_ins = res.predictions(i);
std::ostringstream oss;
oss << res_ins.prob0() << " ";
LOG(INFO) << "Receive result " << oss.str();
}
LOG(INFO) << "Succ call predictor[ctr_prediction_service], the tag is: "
<< route_tag << ", mid_ms: " << mid_ms
<< ", elapse_ms: " << elapse_ms;
}
void thread_worker(PredictorApi* api,
int thread_id,
int batch_size,
int server_concurrency,
const std::vector<std::string>& data_list) {
// init
Request req;
Response res;
api->thrd_initialize();
std::string line;
int turns = 0;
while (turns < 1000) {
///
timeval start;
gettimeofday(&start, NULL);
api->thrd_clear();
Predictor* predictor = api->fetch_predictor("ctr_prediction_service");
if (!predictor) {
LOG(ERROR) << "Failed fetch predictor: ctr_prediction_service";
return;
}
req.Clear();
res.Clear();
timeval mid;
gettimeofday(&mid, NULL);
uint64_t mid_ms = (mid.tv_sec * 1000 + mid.tv_usec / 1000) -
(start.tv_sec * 1000 + start.tv_usec / 1000);
// wait for other thread
while (g_concurrency.load() >= server_concurrency) {
}
g_concurrency++;
LOG(INFO) << "Current concurrency " << g_concurrency.load();
int data_index = turns * batch_size;
if (create_req(&req, data_list, data_index, batch_size) != 0) {
return;
}
timeval start_run;
gettimeofday(&start_run, NULL);
if (predictor->inference(&req, &res) != 0) {
LOG(ERROR) << "failed call predictor with req:" << req.ShortDebugString();
return;
}
timeval end;
gettimeofday(&end, NULL);
uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000) -
(start_run.tv_sec * 1000 + start_run.tv_usec / 1000);
response_time[thread_id].push_back(elapse_ms);
print_res(req, res, predictor->tag(), mid_ms, elapse_ms);
g_concurrency--;
LOG(INFO) << "Done. Current concurrency " << g_concurrency.load();
turns++;
}
//
api->thrd_finalize();
}
void calc_time(int server_concurrency, int batch_size) {
std::vector<int> time_list;
for (auto a : response_time) {
time_list.insert(time_list.end(), a.begin(), a.end());
}
LOG(INFO) << "Total request : " << (time_list.size());
LOG(INFO) << "Batch size : " << batch_size;
LOG(INFO) << "Max concurrency : " << server_concurrency;
float total_time = 0;
float max_time = 0;
float min_time = 1000000;
for (int i = 0; i < time_list.size(); ++i) {
total_time += time_list[i];
if (time_list[i] > max_time) max_time = time_list[i];
if (time_list[i] < min_time) min_time = time_list[i];
}
float mean_time = total_time / (time_list.size());
float var_time;
for (int i = 0; i < time_list.size(); ++i) {
var_time += (time_list[i] - mean_time) * (time_list[i] - mean_time);
}
var_time = var_time / time_list.size();
LOG(INFO) << "Total time : " << total_time / server_concurrency
<< " Variance : " << var_time << " Max time : " << max_time
<< " Min time : " << min_time;
float qps = 0.0;
if (total_time > 0)
qps = (time_list.size() * 1000) / (total_time / server_concurrency);
LOG(INFO) << "QPS: " << qps << "/s";
LOG(INFO) << "Latency statistics: ";
sort(time_list.begin(), time_list.end());
int percent_pos_50 = time_list.size() * 0.5;
int percent_pos_80 = time_list.size() * 0.8;
int percent_pos_90 = time_list.size() * 0.9;
int percent_pos_99 = time_list.size() * 0.99;
int percent_pos_999 = time_list.size() * 0.999;
if (time_list.size() != 0) {
LOG(INFO) << "Mean time : " << mean_time;
LOG(INFO) << "50 percent ms: " << time_list[percent_pos_50];
LOG(INFO) << "80 percent ms: " << time_list[percent_pos_80];
LOG(INFO) << "90 percent ms: " << time_list[percent_pos_90];
LOG(INFO) << "99 percent ms: " << time_list[percent_pos_99];
LOG(INFO) << "99.9 percent ms: " << time_list[percent_pos_999];
} else {
LOG(INFO) << "N/A";
}
}
int main(int argc, char** argv) {
// initialize
PredictorApi api;
response_time.resize(thread_num);
int server_concurrency = thread_num;
// log set
#ifdef BCLOUD
logging::LoggingSettings settings;
settings.logging_dest = logging::LOG_TO_FILE;
std::string log_filename(argv[0]);
log_filename = log_filename.substr(log_filename.find_last_of('/') + 1);
settings.log_file = (std::string("./log/") + log_filename + ".log").c_str();
settings.delete_old = logging::DELETE_OLD_LOG_FILE;
logging::InitLogging(settings);
logging::ComlogSinkOptions cso;
cso.process_name = log_filename;
cso.enable_wf_device = true;
logging::ComlogSink::GetInstance()->Setup(&cso);
#else
struct stat st_buf;
int ret = 0;
if ((ret = stat("./log", &st_buf)) != 0) {
mkdir("./log", 0777);
ret = stat("./log", &st_buf);
if (ret != 0) {
LOG(WARNING) << "Log path ./log not exist, and create fail";
return -1;
}
}
FLAGS_log_dir = "./log";
google::InitGoogleLogging(strdup(argv[0]));
FLAGS_logbufsecs = 0;
FLAGS_logbuflevel = -1;
#endif
// predictor conf
if (api.create("./conf", "predictors.prototxt") != 0) {
LOG(ERROR) << "Failed create predictors api!";
return -1;
}
// read data
std::ifstream data_file(data_filename);
if (!data_file) {
std::cout << "read file error \n" << std::endl;
return -1;
}
std::vector<std::string> data_list;
std::string line;
while (getline(data_file, line)) {
data_list.push_back(line);
}
// create threads
std::vector<std::thread*> thread_pool;
for (int i = 0; i < server_concurrency; ++i) {
thread_pool.push_back(new std::thread(thread_worker,
&api,
i,
batch_size,
server_concurrency,
std::ref(data_list)));
}
for (int i = 0; i < server_concurrency; ++i) {
thread_pool[i]->join();
delete thread_pool[i];
}
calc_time(server_concurrency, batch_size);
api.destroy();
return 0;
}
......@@ -18,7 +18,7 @@ include(op/CMakeLists.txt)
include(proto/CMakeLists.txt)
add_executable(serving ${serving_srcs})
add_dependencies(serving pdcodegen fluid_cpu_engine pdserving paddle_fluid
opencv_imgcodecs)
opencv_imgcodecs cube-api)
if (WITH_GPU)
add_dependencies(serving fluid_gpu_engine)
endif()
......@@ -40,6 +40,7 @@ target_link_libraries(serving opencv_imgcodecs
${opencv_depend_libs})
target_link_libraries(serving pdserving)
target_link_libraries(serving cube-api)
target_link_libraries(serving kvdb rocksdb)
......
[{
"dict_name": "dict",
"shard": 2,
"dup": 1,
"timeout": 200,
"retry": 3,
"backup_request": 100,
"type": "ipport_list",
"load_balancer": "rr",
"nodes": [{
"ipport_list": "list://xxx.xxx.xxx.xxx:8000"
},{
"ipport_list": "list://xxx.xxx.xxx.xxx:8000"
}]
}]
--enable_model_toolkit
--enable_cube=false
model_toolkit_path: "./conf/"
model_toolkit_file: "model_toolkit.prototxt"
cube_config_file: "./conf/cube.conf"
......@@ -15,6 +15,7 @@
#include "demo-serving/op/ctr_prediction_op.h"
#include <algorithm>
#include <string>
#include "cube/cube-api/include/cube_api.h"
#include "predictor/framework/infer.h"
#include "predictor/framework/memory.h"
......@@ -41,12 +42,8 @@ const int CTR_PREDICTION_DENSE_SLOT_ID = 26;
const int CTR_PREDICTION_DENSE_DIM = 13;
const int CTR_PREDICTION_EMBEDDING_SIZE = 10;
#if 1
struct CubeValue {
int error;
std::string buff;
};
#endif
// dict name
const char dict_name[] = "dict";
void fill_response_with_message(Response *response,
int err_code,
......@@ -83,8 +80,8 @@ int CTRPredictionOp::inference() {
}
// Query cube API for sparse embeddings
std::vector<int64_t> keys;
std::vector<CubeValue> values;
std::vector<uint64_t> keys;
std::vector<rec::mcube::CubeValue> values;
for (uint32_t si = 0; si < sample_size; ++si) {
const CTRReqInstance &req_instance = req->instances(si);
......@@ -100,24 +97,13 @@ int CTRPredictionOp::inference() {
}
}
#if 0
mCube::CubeAPI* cube = CubeAPI::instance();
int ret = cube->seek(keys, values);
rec::mcube::CubeAPI *cube = rec::mcube::CubeAPI::instance();
int ret = cube->seek(dict_name, keys, &values);
if (ret != 0) {
fill_response_with_message(res, -1, "Query cube for embeddings error");
LOG(ERROR) << "Query cube for embeddings error";
return -1;
}
#else
float buff[CTR_PREDICTION_EMBEDDING_SIZE] = {
0.01, 0.02, 0.03, 0.04, 0.05, 0.06, 0.07, 0.08, 0.09, 0.00};
for (int i = 0; i < keys.size(); ++i) {
CubeValue value;
value.error = 0;
value.buff = std::string(reinterpret_cast<char *>(buff), sizeof(buff));
values.push_back(value);
}
#endif
// Sparse embeddings
for (int i = 0; i < CTR_PREDICTION_SPARSE_SLOTS; ++i) {
......
......@@ -40,6 +40,7 @@ DEFINE_int32(
DEFINE_int32(reload_interval_s, 10, "");
DEFINE_bool(enable_model_toolkit, false, "enable model toolkit");
DEFINE_string(enable_protocol_list, "baidu_std", "set protocol list");
DEFINE_bool(enable_cube, false, "enable cube");
const char* START_OP_NAME = "startup_op";
} // namespace predictor
......
......@@ -39,6 +39,9 @@ DECLARE_int32(num_threads);
DECLARE_int32(reload_interval_s);
DECLARE_bool(enable_model_toolkit);
DECLARE_string(enable_protocol_list);
DECLARE_bool(enable_cube);
DECLARE_string(cube_config_path);
DECLARE_string(cube_config_file);
// STATIC Variables
extern const char* START_OP_NAME;
......
......@@ -22,7 +22,7 @@ namespace paddle_serving {
namespace predictor {
using configure::ResourceConf;
using rec::mcube::CubeAPI;
// __thread bool p_thread_initialized = false;
static void dynamic_resource_deleter(void* d) {
......@@ -91,6 +91,44 @@ int Resource::initialize(const std::string& path, const std::string& file) {
return 0;
}
int Resource::cube_initialize(const std::string& path,
const std::string& file) {
// cube
if (!FLAGS_enable_cube) {
return 0;
}
ResourceConf resource_conf;
if (configure::read_proto_conf(path, file, &resource_conf) != 0) {
LOG(ERROR) << "Failed initialize resource from: " << path << "/" << file;
return -1;
}
int err = 0;
std::string cube_config_path = resource_conf.cube_config_path();
if (err != 0) {
LOG(ERROR) << "reade cube_config_path failed, path[" << path << "], file["
<< cube_config_path << "]";
return -1;
}
std::string cube_config_file = resource_conf.cube_config_file();
if (err != 0) {
LOG(ERROR) << "reade cube_config_file failed, path[" << path << "], file["
<< cube_config_file << "]";
return -1;
}
err = CubeAPI::instance()->init(cube_config_file.c_str());
if (err != 0) {
LOG(ERROR) << "failed initialize cube, config: " << cube_config_path << "/"
<< cube_config_file << " error code : " << err;
return -1;
}
LOG(INFO) << "Successfully initialize cube";
return 0;
}
int Resource::thread_initialize() {
// mempool
if (MempoolWrapper::instance().thread_initialize() != 0) {
......@@ -192,7 +230,10 @@ int Resource::finalize() {
LOG(ERROR) << "Failed proc finalize infer manager";
return -1;
}
if (CubeAPI::instance()->destroy() != 0) {
LOG(ERROR) << "Destory cube api failed ";
return -1;
}
THREAD_KEY_DELETE(_tls_bspec_key);
return 0;
......
......@@ -13,7 +13,9 @@
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "cube/cube-api/include/cube_api.h"
#include "kvdb/paddle_rocksdb.h"
#include "predictor/common/inner_common.h"
#include "predictor/framework/memory.h"
......@@ -45,7 +47,7 @@ class Resource {
}
int initialize(const std::string& path, const std::string& file);
int cube_initialize(const std::string& path, const std::string& file);
int thread_initialize();
int thread_clear();
......
......@@ -209,6 +209,14 @@ int main(int argc, char** argv) {
}
LOG(INFO) << "Succ call pthread worker start function";
if (Resource::instance().cube_initialize(FLAGS_resource_path,
FLAGS_resource_file) != 0) {
LOG(ERROR) << "Failed initialize cube, conf: " << FLAGS_resource_path << "/"
<< FLAGS_resource_file;
return -1;
}
LOG(INFO) << "Succ initialize cube";
FLAGS_logtostderr = false;
if (ServerManager::instance().start_and_wait() != 0) {
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto2";
import "pds_option.proto";
import "builtin_format.proto";
package baidu.paddle_serving.predictor.ctr_prediction;
option cc_generic_services = true;
message CTRReqInstance {
repeated int64 sparse_ids = 1;
repeated float dense_ids = 2;
};
message Request { repeated CTRReqInstance instances = 1; };
message CTRResInstance {
required float prob0 = 1;
required float prob1 = 2;
};
message Response {
repeated CTRResInstance predictions = 1;
required int64 err_code = 2;
optional string err_msg = 3;
};
service CTRPredictionService {
rpc inference(Request) returns (Response);
rpc debug(Request) returns (Response);
option (pds.options).generate_stub = true;
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册