From 38a3f15444a62f138d76b03e5ca870dc1dd4823d Mon Sep 17 00:00:00 2001 From: xulongteng Date: Tue, 15 Oct 2019 18:41:53 +0800 Subject: [PATCH] add mutli thread press for cube --- cube/cube-api/src/cube_cli.cpp | 146 ++++++++++++++++++++++++--------- 1 file changed, 106 insertions(+), 40 deletions(-) diff --git a/cube/cube-api/src/cube_cli.cpp b/cube/cube-api/src/cube_cli.cpp index 6df52710..fb2b67fa 100644 --- a/cube/cube-api/src/cube_cli.cpp +++ b/cube/cube-api/src/cube_cli.cpp @@ -14,9 +14,9 @@ #include #include - +#include +#include //NOLINT #include "cube/cube-api/include/cube_api.h" - #define TIME_FLAG(flag) \ struct timeval flag; \ gettimeofday(&(flag), NULL); @@ -28,6 +28,11 @@ DEFINE_uint64(batch, 500, "batch size"); DEFINE_int32(timeout, 200, "timeout in ms"); DEFINE_int32(retry, 3, "retry times"); DEFINE_bool(print_output, false, "print output flag"); +DEFINE_int32(thread_num, 1, "thread num"); +std::atomic g_concurrency(0); + +std::vector time_list; +std::vector request_list; namespace { inline uint64_t time_diff(const struct timeval& start_time, @@ -53,7 +58,7 @@ std::string string_to_hex(const std::string& input) { return output; } -int run(int argc, char** argv) { +int run(int argc, char** argv, int thread_id) { google::ParseCommandLineFlags(&argc, &argv, true); CubeAPI* cube = CubeAPI::instance(); @@ -62,13 +67,13 @@ int run(int argc, char** argv) { LOG(ERROR) << "init cube api failed err=" << ret; return ret; } - - FILE* key_file = fopen(FLAGS_keys.c_str(), "r"); - if (key_file == NULL) { - LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed"; - return -1; - } - + /* + FILE* key_file = fopen(FLAGS_keys.c_str(), "r"); + if (key_file == NULL) { + LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed"; + return -1; + } + */ std::atomic seek_counter(0); std::atomic seek_cost_total(0); uint64_t seek_cost_max = 0; @@ -78,14 +83,32 @@ int run(int argc, char** argv) { std::vector keys; std::vector values; - while (fgets(buffer, 1024, key_file)) { - uint64_t key = strtoul(buffer, NULL, 10); - keys.push_back(key); + std::string line; + std::vector key_list; + std::ifstream key_file(FLAGS_keys.c_str()); + while (getline(key_file, line)) { + key_list.push_back(std::stoll(line)); + } + + uint64_t file_size = key_list.size(); + uint64_t index = 0; + uint64_t request = 0; + + while (g_concurrency.load() >= FLAGS_thread_num) { + } + g_concurrency++; + + while (index < file_size) { + // uint64_t key = strtoul(buffer, NULL, 10); + + keys.push_back(key_list[index]); + index += 1; int ret = 0; if (keys.size() > FLAGS_batch) { TIME_FLAG(seek_start); ret = cube->seek(FLAGS_dict, keys, &values); TIME_FLAG(seek_end); + request += 1; if (ret != 0) { LOG(WARNING) << "cube seek failed"; } else if (FLAGS_print_output) { @@ -110,37 +133,40 @@ int run(int argc, char** argv) { values.clear(); } } + /* + if (keys.size() > 0) { + int ret = 0; + values.resize(keys.size()); + TIME_FLAG(seek_start); + ret = cube->seek(FLAGS_dict, keys, &values); + TIME_FLAG(seek_end); + if (ret != 0) { + LOG(WARNING) << "cube seek failed"; + } else if (FLAGS_print_output) { + for (size_t i = 0; i < keys.size(); ++i) { + fprintf(stdout, + "key:%lu value:%s\n", + keys[i], + string_to_hex(values[i].buff).c_str()); + } + } - if (keys.size() > 0) { - int ret = 0; - values.resize(keys.size()); - TIME_FLAG(seek_start); - ret = cube->seek(FLAGS_dict, keys, &values); - TIME_FLAG(seek_end); - if (ret != 0) { - LOG(WARNING) << "cube seek failed"; - } else if (FLAGS_print_output) { - for (size_t i = 0; i < keys.size(); ++i) { - fprintf(stdout, - "key:%lu value:%s\n", - keys[i], - string_to_hex(values[i].buff).c_str()); + ++seek_counter; + uint64_t seek_cost = time_diff(seek_start, seek_end); + seek_cost_total += seek_cost; + if (seek_cost > seek_cost_max) { + seek_cost_max = seek_cost; + } + if (seek_cost < seek_cost_min) { + seek_cost_min = seek_cost; } } + */ + g_concurrency--; - ++seek_counter; - uint64_t seek_cost = time_diff(seek_start, seek_end); - seek_cost_total += seek_cost; - if (seek_cost > seek_cost_max) { - seek_cost_max = seek_cost; - } - if (seek_cost < seek_cost_min) { - seek_cost_min = seek_cost; - } - } - fclose(key_file); + // fclose(key_file); - ret = cube->destroy(); + // ret = cube->destroy(); if (ret != 0) { LOG(WARNING) << "destroy cube api failed err=" << ret; } @@ -150,10 +176,50 @@ int run(int argc, char** argv) { LOG(INFO) << "seek cost max = " << seek_cost_max; LOG(INFO) << "seek cost min = " << seek_cost_min; + time_list[thread_id] = seek_cost_avg; + request_list[thread_id] = request; + return 0; } +int run_m(int argc, char** argv) { + google::ParseCommandLineFlags(&argc, &argv, true); + int thread_num = FLAGS_thread_num; + request_list.resize(thread_num); + time_list.resize(thread_num); + std::vector thread_pool; + for (int i = 0; i < thread_num; i++) { + thread_pool.push_back(new std::thread(run, argc, argv, i)); + } + for (int i = 0; i < thread_num; i++) { + thread_pool[i]->join(); + delete thread_pool[i]; + } + uint64_t sum_time = 0; + uint64_t max_time = 0; + uint64_t min_time = 1000000; + uint64_t request_num = 0; + for (int i = 0; i < thread_num; i++) { + sum_time += time_list[i]; + if (time_list[i] > max_time) { + max_time = time_list[i]; + } + if (time_list[i] < min_time) { + min_time = time_list[i]; + } + request_num += request_list[i]; + } + uint64_t mean_time = sum_time / thread_num; + LOG(INFO) << thread_num << " thread seek cost" + << " avg = " << std::to_string(mean_time) + << " max = " << std::to_string(max_time) + << " min = " << std::to_string(min_time); + LOG(INFO) << " total_request = " << std::to_string(request_num) + << " speed = " << std::to_string(1000000 * thread_num / mean_time) + << " query per second"; +} + } // namespace mcube } // namespace rec -int main(int argc, char** argv) { return ::rec::mcube::run(argc, argv); } +int main(int argc, char** argv) { return ::rec::mcube::run_m(argc, argv); } -- GitLab