提交 38a3f154 编写于 作者: X xulongteng

add mutli thread press for cube

上级 97f998db
......@@ -14,9 +14,9 @@
#include <gflags/gflags.h>
#include <atomic>
#include <fstream>
#include <thread> //NOLINT
#include "cube/cube-api/include/cube_api.h"
#define TIME_FLAG(flag) \
struct timeval flag; \
gettimeofday(&(flag), NULL);
......@@ -28,6 +28,11 @@ DEFINE_uint64(batch, 500, "batch size");
DEFINE_int32(timeout, 200, "timeout in ms");
DEFINE_int32(retry, 3, "retry times");
DEFINE_bool(print_output, false, "print output flag");
DEFINE_int32(thread_num, 1, "thread num");
std::atomic<int> g_concurrency(0);
std::vector<uint64_t> time_list;
std::vector<uint64_t> request_list;
namespace {
inline uint64_t time_diff(const struct timeval& start_time,
......@@ -53,7 +58,7 @@ std::string string_to_hex(const std::string& input) {
return output;
}
int run(int argc, char** argv) {
int run(int argc, char** argv, int thread_id) {
google::ParseCommandLineFlags(&argc, &argv, true);
CubeAPI* cube = CubeAPI::instance();
......@@ -62,13 +67,13 @@ int run(int argc, char** argv) {
LOG(ERROR) << "init cube api failed err=" << ret;
return ret;
}
FILE* key_file = fopen(FLAGS_keys.c_str(), "r");
if (key_file == NULL) {
LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed";
return -1;
}
/*
FILE* key_file = fopen(FLAGS_keys.c_str(), "r");
if (key_file == NULL) {
LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed";
return -1;
}
*/
std::atomic<uint64_t> seek_counter(0);
std::atomic<uint64_t> seek_cost_total(0);
uint64_t seek_cost_max = 0;
......@@ -78,14 +83,32 @@ int run(int argc, char** argv) {
std::vector<uint64_t> keys;
std::vector<CubeValue> values;
while (fgets(buffer, 1024, key_file)) {
uint64_t key = strtoul(buffer, NULL, 10);
keys.push_back(key);
std::string line;
std::vector<int64_t> key_list;
std::ifstream key_file(FLAGS_keys.c_str());
while (getline(key_file, line)) {
key_list.push_back(std::stoll(line));
}
uint64_t file_size = key_list.size();
uint64_t index = 0;
uint64_t request = 0;
while (g_concurrency.load() >= FLAGS_thread_num) {
}
g_concurrency++;
while (index < file_size) {
// uint64_t key = strtoul(buffer, NULL, 10);
keys.push_back(key_list[index]);
index += 1;
int ret = 0;
if (keys.size() > FLAGS_batch) {
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
request += 1;
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
......@@ -110,37 +133,40 @@ int run(int argc, char** argv) {
values.clear();
}
}
/*
if (keys.size() > 0) {
int ret = 0;
values.resize(keys.size());
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
}
}
if (keys.size() > 0) {
int ret = 0;
values.resize(keys.size());
TIME_FLAG(seek_start);
ret = cube->seek(FLAGS_dict, keys, &values);
TIME_FLAG(seek_end);
if (ret != 0) {
LOG(WARNING) << "cube seek failed";
} else if (FLAGS_print_output) {
for (size_t i = 0; i < keys.size(); ++i) {
fprintf(stdout,
"key:%lu value:%s\n",
keys[i],
string_to_hex(values[i].buff).c_str());
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
}
*/
g_concurrency--;
++seek_counter;
uint64_t seek_cost = time_diff(seek_start, seek_end);
seek_cost_total += seek_cost;
if (seek_cost > seek_cost_max) {
seek_cost_max = seek_cost;
}
if (seek_cost < seek_cost_min) {
seek_cost_min = seek_cost;
}
}
fclose(key_file);
// fclose(key_file);
ret = cube->destroy();
// ret = cube->destroy();
if (ret != 0) {
LOG(WARNING) << "destroy cube api failed err=" << ret;
}
......@@ -150,10 +176,50 @@ int run(int argc, char** argv) {
LOG(INFO) << "seek cost max = " << seek_cost_max;
LOG(INFO) << "seek cost min = " << seek_cost_min;
time_list[thread_id] = seek_cost_avg;
request_list[thread_id] = request;
return 0;
}
int run_m(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
int thread_num = FLAGS_thread_num;
request_list.resize(thread_num);
time_list.resize(thread_num);
std::vector<std::thread*> thread_pool;
for (int i = 0; i < thread_num; i++) {
thread_pool.push_back(new std::thread(run, argc, argv, i));
}
for (int i = 0; i < thread_num; i++) {
thread_pool[i]->join();
delete thread_pool[i];
}
uint64_t sum_time = 0;
uint64_t max_time = 0;
uint64_t min_time = 1000000;
uint64_t request_num = 0;
for (int i = 0; i < thread_num; i++) {
sum_time += time_list[i];
if (time_list[i] > max_time) {
max_time = time_list[i];
}
if (time_list[i] < min_time) {
min_time = time_list[i];
}
request_num += request_list[i];
}
uint64_t mean_time = sum_time / thread_num;
LOG(INFO) << thread_num << " thread seek cost"
<< " avg = " << std::to_string(mean_time)
<< " max = " << std::to_string(max_time)
<< " min = " << std::to_string(min_time);
LOG(INFO) << " total_request = " << std::to_string(request_num)
<< " speed = " << std::to_string(1000000 * thread_num / mean_time)
<< " query per second";
}
} // namespace mcube
} // namespace rec
int main(int argc, char** argv) { return ::rec::mcube::run(argc, argv); }
int main(int argc, char** argv) { return ::rec::mcube::run_m(argc, argv); }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册