diff --git a/demo-client/src/ctr_prediction.cpp b/demo-client/src/ctr_prediction.cpp index 70b0c841227e411b70ef8c7a6263837804a83b55..92e82a36203e3f39ed871e9f5afc47b619527e90 100644 --- a/demo-client/src/ctr_prediction.cpp +++ b/demo-client/src/ctr_prediction.cpp @@ -30,11 +30,17 @@ using baidu::paddle_serving::predictor::ctr_prediction::Response; using baidu::paddle_serving::predictor::ctr_prediction::CTRReqInstance; using baidu::paddle_serving::predictor::ctr_prediction::CTRResInstance; -int batch_size = 16; int sparse_num = 26; int dense_num = 13; -int thread_num = 1; int hash_dim = 1000001; + +DEFINE_int32(batch_size, 50, "Set the batch size of test file."); +DEFINE_int32(concurrency, 1, "Set the max concurrency of requests"); +DEFINE_int32(repeat, 1, "Number of data samples iteration count. Default 1"); +DEFINE_bool(enable_profiling, + false, + "Enable profiling. Will supress a lot normal output"); + std::vector cont_min = {0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; std::vector cont_diff = { 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50}; @@ -86,7 +92,7 @@ int64_t hash(std::string str) { int create_req(Request* req, const std::vector& data_list, - int data_index, + int start_index, int batch_size) { for (int i = 0; i < batch_size; ++i) { CTRReqInstance* ins = req->add_instances(); @@ -94,12 +100,14 @@ int create_req(Request* req, LOG(ERROR) << "Failed create req instance"; return -1; } + // add data // avoid out of boundary - int cur_index = data_index + i; + int cur_index = start_index + i; if (cur_index >= data_list.size()) { cur_index = cur_index % data_list.size(); } + std::vector feature_list = split(data_list[cur_index], "\t"); for (int fi = 0; fi < dense_num; fi++) { if (feature_list[fi] == "") { @@ -122,10 +130,10 @@ int create_req(Request* req, } return 0; } + void print_res(const Request& req, const Response& res, std::string route_tag, - uint64_t mid_ms, uint64_t elapse_ms) { if (res.err_code() != 0) { LOG(ERROR) << "Get result fail :" << res.err_msg(); @@ -138,72 +146,90 @@ void print_res(const Request& req, LOG(INFO) << "Receive result " << oss.str(); } LOG(INFO) << "Succ call predictor[ctr_prediction_service], the tag is: " - << route_tag << ", mid_ms: " << mid_ms - << ", elapse_ms: " << elapse_ms; + << route_tag << ", elapse_ms: " << elapse_ms; } void thread_worker(PredictorApi* api, int thread_id, - int batch_size, - int server_concurrency, const std::vector& data_list) { // init Request req; Response res; - api->thrd_initialize(); std::string line; - int turns = 0; - while (turns < 1000) { - timeval start; - gettimeofday(&start, NULL); - api->thrd_clear(); - Predictor* predictor = api->fetch_predictor("ctr_prediction_service"); - if (!predictor) { - LOG(ERROR) << "Failed fetch predictor: ctr_prediction_service"; - return; - } - req.Clear(); - res.Clear(); - timeval mid; - gettimeofday(&mid, NULL); - uint64_t mid_ms = (mid.tv_sec * 1000 + mid.tv_usec / 1000) - - (start.tv_sec * 1000 + start.tv_usec / 1000); - // wait for other thread - while (g_concurrency.load() >= server_concurrency) { - } - g_concurrency++; - LOG(INFO) << "Current concurrency " << g_concurrency.load(); - int data_index = turns * batch_size; - if (create_req(&req, data_list, data_index, batch_size) != 0) { - return; - } - timeval start_run; - gettimeofday(&start_run, NULL); - if (predictor->inference(&req, &res) != 0) { - LOG(ERROR) << "failed call predictor with req:" << req.ShortDebugString(); - return; - } - timeval end; - gettimeofday(&end, NULL); - uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000) - - (start_run.tv_sec * 1000 + start_run.tv_usec / 1000); - response_time[thread_id].push_back(elapse_ms); - print_res(req, res, predictor->tag(), mid_ms, elapse_ms); - g_concurrency--; - LOG(INFO) << "Done. Current concurrency " << g_concurrency.load(); - turns++; - } - // + + api->thrd_initialize(); + + for (int i = 0; i < FLAGS_repeat; ++i) { + int start_index = 0; + + while (true) { + if (start_index >= data_list.size()) { + break; + } + + api->thrd_clear(); + + Predictor* predictor = api->fetch_predictor("ctr_prediction_service"); + if (!predictor) { + LOG(ERROR) << "Failed fetch predictor: ctr_prediction_service"; + return; + } + + req.Clear(); + res.Clear(); + + // wait for other thread + while (g_concurrency.load() >= FLAGS_concurrency) { + } + g_concurrency++; + LOG(INFO) << "Current concurrency " << g_concurrency.load(); + + if (create_req(&req, data_list, start_index, FLAGS_batch_size) != 0) { + return; + } + start_index += FLAGS_batch_size; + LOG(INFO) << "start_index = " << start_index; + + timeval start; + gettimeofday(&start, NULL); + + if (predictor->inference(&req, &res) != 0) { + LOG(ERROR) << "failed call predictor with req:" + << req.ShortDebugString(); + return; + } + g_concurrency--; + + timeval end; + gettimeofday(&end, NULL); + uint64_t elapse_ms = (end.tv_sec * 1000 + end.tv_usec / 1000) - + (start.tv_sec * 1000 + start.tv_usec / 1000); + + response_time[thread_id].push_back(elapse_ms); + + if (!FLAGS_enable_profiling) { + print_res(req, res, predictor->tag(), elapse_ms); + } + + LOG(INFO) << "Done. Current concurrency " << g_concurrency.load(); + } // end while + } // end for + api->thrd_finalize(); } -void calc_time(int server_concurrency, int batch_size) { + +void calc_time() { std::vector time_list; for (auto a : response_time) { time_list.insert(time_list.end(), a.begin(), a.end()); } + LOG(INFO) << "Total request : " << (time_list.size()); - LOG(INFO) << "Batch size : " << batch_size; - LOG(INFO) << "Max concurrency : " << server_concurrency; + LOG(INFO) << "Batch size : " << FLAGS_batch_size; + LOG(INFO) << "Max concurrency : " << FLAGS_concurrency; + LOG(INFO) << "enable_profiling: " << FLAGS_enable_profiling; + LOG(INFO) << "repeat count: " << FLAGS_repeat; + float total_time = 0; float max_time = 0; float min_time = 1000000; @@ -212,21 +238,28 @@ void calc_time(int server_concurrency, int batch_size) { if (time_list[i] > max_time) max_time = time_list[i]; if (time_list[i] < min_time) min_time = time_list[i]; } + float mean_time = total_time / (time_list.size()); float var_time; for (int i = 0; i < time_list.size(); ++i) { var_time += (time_list[i] - mean_time) * (time_list[i] - mean_time); } var_time = var_time / time_list.size(); - LOG(INFO) << "Total time : " << total_time / server_concurrency - << " Variance : " << var_time << " Max time : " << max_time - << " Min time : " << min_time; + + LOG(INFO) << "Total time : " << total_time / FLAGS_concurrency << "ms"; + LOG(INFO) << "Variance : " << var_time << "ms"; + LOG(INFO) << "Max time : " << max_time << "ms"; + LOG(INFO) << "Min time : " << min_time << "ms"; + float qps = 0.0; - if (total_time > 0) - qps = (time_list.size() * 1000) / (total_time / server_concurrency); + if (total_time > 0) { + qps = (time_list.size() * 1000) / (total_time / FLAGS_concurrency); + } LOG(INFO) << "QPS: " << qps << "/s"; + LOG(INFO) << "Latency statistics: "; sort(time_list.begin(), time_list.end()); + int percent_pos_50 = time_list.size() * 0.5; int percent_pos_80 = time_list.size() * 0.8; int percent_pos_90 = time_list.size() * 0.9; @@ -244,11 +277,12 @@ void calc_time(int server_concurrency, int batch_size) { } } int main(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + // initialize PredictorApi api; - response_time.resize(thread_num); - int server_concurrency = thread_num; -// log set + response_time.resize(FLAGS_concurrency); + #ifdef BCLOUD logging::LoggingSettings settings; settings.logging_dest = logging::LOG_TO_FILE; @@ -282,32 +316,40 @@ int main(int argc, char** argv) { LOG(ERROR) << "Failed create predictors api!"; return -1; } + + LOG(INFO) << "data sample file: " << data_filename; + + if (FLAGS_enable_profiling) { + LOG(INFO) << "In profiling mode, lot of normal output will be supressed. " + << "Use --enable_profiling=false to turn off this mode"; + } + // read data std::ifstream data_file(data_filename); if (!data_file) { std::cout << "read file error \n" << std::endl; return -1; } + std::vector data_list; std::string line; while (getline(data_file, line)) { data_list.push_back(line); } + // create threads std::vector thread_pool; - for (int i = 0; i < server_concurrency; ++i) { - thread_pool.push_back(new std::thread(thread_worker, - &api, - i, - batch_size, - server_concurrency, - std::ref(data_list))); + for (int i = 0; i < FLAGS_concurrency; ++i) { + thread_pool.push_back(new std::thread(thread_worker, &api, i, data_list)); } - for (int i = 0; i < server_concurrency; ++i) { + + for (int i = 0; i < FLAGS_concurrency; ++i) { thread_pool[i]->join(); delete thread_pool[i]; } - calc_time(server_concurrency, batch_size); + + calc_time(); + api.destroy(); return 0; } diff --git a/demo-serving/op/ctr_prediction_op.cpp b/demo-serving/op/ctr_prediction_op.cpp index a904562b6b303134d5198fbbe01ad2cb79c4ba97..b2166819a2a6b213ae008580349e870e97797984 100644 --- a/demo-serving/op/ctr_prediction_op.cpp +++ b/demo-serving/op/ctr_prediction_op.cpp @@ -23,6 +23,9 @@ #include "predictor/framework/kv_manager.h" #include "predictor/framework/memory.h" +// Flag where enable profiling mode +DECLARE_bool(enable_ctr_profiling); + namespace baidu { namespace paddle_serving { namespace serving { @@ -46,6 +49,11 @@ const int CTR_PREDICTION_DENSE_SLOT_ID = 26; const int CTR_PREDICTION_DENSE_DIM = 13; const int CTR_PREDICTION_EMBEDDING_SIZE = 10; +bthread::Mutex CTRPredictionOp::mutex_; +int64_t CTRPredictionOp::cube_time_us_ = 0; +int32_t CTRPredictionOp::cube_req_num_ = 0; +int32_t CTRPredictionOp::cube_req_key_num_ = 0; + void fill_response_with_message(Response *response, int err_code, std::string err_msg) { @@ -135,7 +143,41 @@ int CTRPredictionOp::inference() { return 0; } else if (kvinfo->sparse_param_service_type == configure::EngineDesc::REMOTE) { - int ret = cube->seek(table_name, keys, &values); + struct timeval start; + struct timeval end; + + int ret; + + gettimeofday(&start, NULL); + ret = cube->seek(table_name, keys, &values); + gettimeofday(&end, NULL); + uint64_t usec = + end.tv_sec * 1e6 + end.tv_usec - start.tv_sec * 1e6 - start.tv_usec; + + // Statistics + mutex_.lock(); + cube_time_us_ += usec; + ++cube_req_num_; + cube_req_key_num_ += keys.size(); + + if (cube_req_num_ >= 1000) { + LOG(INFO) << "Cube request count: " << cube_req_num_; + LOG(INFO) << "Cube request key count: " << cube_req_key_num_; + LOG(INFO) << "Cube request total time: " << cube_time_us_ << "us"; + LOG(INFO) << "Average " + << static_cast(cube_time_us_) / cube_req_num_ + << "us/req"; + LOG(INFO) << "Average " + << static_cast(cube_time_us_) / cube_req_key_num_ + << "us/key"; + + cube_time_us_ = 0; + cube_req_num_ = 0; + cube_req_key_num_ = 0; + } + mutex_.unlock(); + // Statistics end + if (ret != 0) { fill_response_with_message(res, -1, "Query cube for embeddings error"); LOG(ERROR) << "Query cube for embeddings error"; diff --git a/demo-serving/op/ctr_prediction_op.h b/demo-serving/op/ctr_prediction_op.h index 6bec7c64c1f580ee10419ec4743776df9729ef51..ee648151b4ecf4611502798308c2cd81db923bb3 100644 --- a/demo-serving/op/ctr_prediction_op.h +++ b/demo-serving/op/ctr_prediction_op.h @@ -55,6 +55,7 @@ static const char* CTR_PREDICTION_MODEL_NAME = "ctr_prediction"; * and modifications we made * */ + class CTRPredictionOp : public baidu::paddle_serving::predictor::OpWithChannel< baidu::paddle_serving::predictor::ctr_prediction::Response> { @@ -64,6 +65,12 @@ class CTRPredictionOp DECLARE_OP(CTRPredictionOp); int inference(); + + private: + static bthread::Mutex mutex_; + static int64_t cube_time_us_; + static int32_t cube_req_num_; + static int32_t cube_req_key_num_; }; } // namespace serving diff --git a/predictor/common/constant.h b/predictor/common/constant.h index da44103eb8e6d064a642520bb90dd2c9df293889..72509c8d9187f817cf4dd0dfef1bff06370ce537 100644 --- a/predictor/common/constant.h +++ b/predictor/common/constant.h @@ -40,8 +40,6 @@ DECLARE_int32(reload_interval_s); DECLARE_bool(enable_model_toolkit); DECLARE_string(enable_protocol_list); DECLARE_bool(enable_cube); -DECLARE_string(cube_config_path); -DECLARE_string(cube_config_file); // STATIC Variables extern const char* START_OP_NAME; diff --git a/predictor/framework/infer.h b/predictor/framework/infer.h index c2823f5e3d8cbd2484f02053ffd36e6a3a275846..c479479a271601b0d197d7f4fc4672ccc54c3801 100644 --- a/predictor/framework/infer.h +++ b/predictor/framework/infer.h @@ -632,7 +632,6 @@ class VersionedInferEngine : public InferEngine { LOG(ERROR) << "Failed thrd clear version engine: " << iter->first; return -1; } - LOG(INFO) << "Succ thrd clear version engine: " << iter->first; } return 0; } diff --git a/predictor/framework/resource.cpp b/predictor/framework/resource.cpp index 74e3c95204dfb4fb0dcf32201c244550b6df08c2..15a5022d69458eae76c6b3f75ab3076d365ed333 100644 --- a/predictor/framework/resource.cpp +++ b/predictor/framework/resource.cpp @@ -208,7 +208,6 @@ int Resource::thread_clear() { return -1; } - LOG(INFO) << bthread_self() << "Resource::thread_clear success"; // ... return 0; } diff --git a/predictor/src/pdserving.cpp b/predictor/src/pdserving.cpp index a86b39abac7bd007a8fd401bd9a0b8aaaa5c5114..56ffee84aba6338bcd082d12e6bd4c304fe8ca80 100644 --- a/predictor/src/pdserving.cpp +++ b/predictor/src/pdserving.cpp @@ -51,8 +51,6 @@ using baidu::paddle_serving::predictor::FLAGS_port; using baidu::paddle_serving::configure::InferServiceConf; using baidu::paddle_serving::configure::read_proto_conf; -DECLARE_bool(logtostderr); - void print_revision(std::ostream& os, void*) { #if defined(PDSERVING_VERSION) os << PDSERVING_VERSION; @@ -217,7 +215,8 @@ int main(int argc, char** argv) { } LOG(INFO) << "Succ initialize cube"; - FLAGS_logtostderr = false; + // FATAL messages are output to stderr + FLAGS_stderrthreshold = 3; if (ServerManager::instance().start_and_wait() != 0) { LOG(ERROR) << "Failed start server and wait!"; diff --git a/sdk-cpp/src/endpoint.cpp b/sdk-cpp/src/endpoint.cpp index d1c66124c6e7657db23905eb681bfa0b957be9d2..3a30a0de6465512e647321c07637692599f1890b 100644 --- a/sdk-cpp/src/endpoint.cpp +++ b/sdk-cpp/src/endpoint.cpp @@ -64,7 +64,6 @@ int Endpoint::thrd_clear() { return -1; } } - LOG(INFO) << "Succ thrd clear all vars: " << var_size; return 0; } diff --git a/sdk-cpp/src/predictor_sdk.cpp b/sdk-cpp/src/predictor_sdk.cpp index 214473f64204866febb7d842b53551aa1cfe225d..246ac66f2d07f3c1becd7ab6c05be929c5003a03 100644 --- a/sdk-cpp/src/predictor_sdk.cpp +++ b/sdk-cpp/src/predictor_sdk.cpp @@ -94,8 +94,6 @@ int PredictorApi::thrd_clear() { LOG(ERROR) << "Failed thrd clear endpoint:" << it->first; return -1; } - - LOG(INFO) << "Succ thrd clear endpoint:" << it->first; } return 0; }