cube_cli.cpp 5.7 KB
Newer Older
Y
yangrui07 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gflags/gflags.h>
#include <atomic>
X
xulongteng 已提交
17 18
#include <fstream>
#include <thread>  //NOLINT
G
guru4elephant 已提交
19
#include "core/cube/cube-api/include/cube_api.h"
Y
yangrui07 已提交
20 21 22 23 24 25 26 27 28 29 30
#define TIME_FLAG(flag) \
  struct timeval flag;  \
  gettimeofday(&(flag), NULL);

DEFINE_string(config_file, "./cube.conf", "m-cube config file");
DEFINE_string(keys, "keys", "keys to seek");
DEFINE_string(dict, "dict", "dict to seek");
DEFINE_uint64(batch, 500, "batch size");
DEFINE_int32(timeout, 200, "timeout in ms");
DEFINE_int32(retry, 3, "retry times");
DEFINE_bool(print_output, false, "print output flag");
X
xulongteng 已提交
31 32 33
DEFINE_int32(thread_num, 1, "thread num");
std::atomic<int> g_concurrency(0);

M
MRXLT 已提交
34
std::vector<std::vector<uint64_t>> time_list;
X
xulongteng 已提交
35
std::vector<uint64_t> request_list;
M
MRXLT 已提交
36
int turns = 1000000 / FLAGS_batch;
Y
yangrui07 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61

namespace {
inline uint64_t time_diff(const struct timeval& start_time,
                          const struct timeval& end_time) {
  return (end_time.tv_sec - start_time.tv_sec) * 1000000 +
         (end_time.tv_usec - start_time.tv_usec);
}
}

namespace rec {
namespace mcube {
std::string string_to_hex(const std::string& input) {
  static const char* const lut = "0123456789ABCDEF";
  size_t len = input.length();

  std::string output;
  output.reserve(2 * len);
  for (size_t i = 0; i < len; ++i) {
    const unsigned char c = input[i];
    output.push_back(lut[c >> 4]);
    output.push_back(lut[c & 15]);
  }
  return output;
}

X
xulongteng 已提交
62
int run(int argc, char** argv, int thread_id) {
Y
yangrui07 已提交
63 64 65 66 67 68 69 70
  google::ParseCommandLineFlags(&argc, &argv, true);

  CubeAPI* cube = CubeAPI::instance();
  int ret = cube->init(FLAGS_config_file.c_str());
  if (ret != 0) {
    LOG(ERROR) << "init cube api failed err=" << ret;
    return ret;
  }
X
xulongteng 已提交
71 72 73 74 75 76 77
  /*
    FILE* key_file = fopen(FLAGS_keys.c_str(), "r");
    if (key_file == NULL) {
      LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed";
      return -1;
    }
  */
Y
yangrui07 已提交
78 79 80 81 82 83 84 85 86
  std::atomic<uint64_t> seek_counter(0);
  std::atomic<uint64_t> seek_cost_total(0);
  uint64_t seek_cost_max = 0;
  uint64_t seek_cost_min = 500000;

  char buffer[1024];
  std::vector<uint64_t> keys;
  std::vector<CubeValue> values;

X
xulongteng 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100
  std::string line;
  std::vector<int64_t> key_list;
  std::ifstream key_file(FLAGS_keys.c_str());
  while (getline(key_file, line)) {
    key_list.push_back(std::stoll(line));
  }

  uint64_t file_size = key_list.size();
  uint64_t index = 0;
  uint64_t request = 0;

  while (g_concurrency.load() >= FLAGS_thread_num) {
  }
  g_concurrency++;
M
MRXLT 已提交
101
  time_list[thread_id].resize(turns);
X
xulongteng 已提交
102 103 104 105 106
  while (index < file_size) {
    // uint64_t key = strtoul(buffer, NULL, 10);

    keys.push_back(key_list[index]);
    index += 1;
Y
yangrui07 已提交
107
    int ret = 0;
M
MRXLT 已提交
108
    if (keys.size() >= FLAGS_batch) {
Y
yangrui07 已提交
109 110 111
      TIME_FLAG(seek_start);
      ret = cube->seek(FLAGS_dict, keys, &values);
      TIME_FLAG(seek_end);
X
xulongteng 已提交
112
      request += 1;
Y
yangrui07 已提交
113 114 115 116 117 118 119 120 121 122 123 124
      if (ret != 0) {
        LOG(WARNING) << "cube seek failed";
      } else if (FLAGS_print_output) {
        for (size_t i = 0; i < keys.size(); ++i) {
          fprintf(stdout,
                  "key:%lu value:%s\n",
                  keys[i],
                  string_to_hex(values[i].buff).c_str());
        }
      }
      ++seek_counter;
      uint64_t seek_cost = time_diff(seek_start, seek_end);
M
MRXLT 已提交
125
      time_list[thread_id][request - 1] = seek_cost;
Y
yangrui07 已提交
126 127 128 129 130

      keys.clear();
      values.clear();
    }
  }
X
xulongteng 已提交
131
  g_concurrency--;
Y
yangrui07 已提交
132

X
xulongteng 已提交
133
  // fclose(key_file);
Y
yangrui07 已提交
134

X
xulongteng 已提交
135
  // ret = cube->destroy();
Y
yangrui07 已提交
136 137 138 139
  if (ret != 0) {
    LOG(WARNING) << "destroy cube api failed err=" << ret;
  }

X
xulongteng 已提交
140 141
  request_list[thread_id] = request;

Y
yangrui07 已提交
142 143 144
  return 0;
}

X
xulongteng 已提交
145 146 147 148 149 150
int run_m(int argc, char** argv) {
  google::ParseCommandLineFlags(&argc, &argv, true);
  int thread_num = FLAGS_thread_num;
  request_list.resize(thread_num);
  time_list.resize(thread_num);
  std::vector<std::thread*> thread_pool;
M
MRXLT 已提交
151
  TIME_FLAG(main_start);
X
xulongteng 已提交
152 153 154 155 156 157 158
  for (int i = 0; i < thread_num; i++) {
    thread_pool.push_back(new std::thread(run, argc, argv, i));
  }
  for (int i = 0; i < thread_num; i++) {
    thread_pool[i]->join();
    delete thread_pool[i];
  }
M
MRXLT 已提交
159
  TIME_FLAG(main_end);
X
xulongteng 已提交
160 161 162 163 164
  uint64_t sum_time = 0;
  uint64_t max_time = 0;
  uint64_t min_time = 1000000;
  uint64_t request_num = 0;
  for (int i = 0; i < thread_num; i++) {
M
MRXLT 已提交
165 166 167 168 169 170 171 172
    for (int j = 0; j < request_list[i]; j++) {
      sum_time += time_list[i][j];
      if (time_list[i][j] > max_time) {
        max_time = time_list[i][j];
      }
      if (time_list[i][j] < min_time) {
        min_time = time_list[i][j];
      }
X
xulongteng 已提交
173 174 175
    }
    request_num += request_list[i];
  }
M
MRXLT 已提交
176
  uint64_t mean_time = sum_time / (thread_num * turns);
M
MRXLT 已提交
177
  uint64_t main_time = time_diff(main_start, main_end);
M
MRXLT 已提交
178 179 180 181 182 183 184 185
  LOG(INFO) << "\n"
            << thread_num << " thread seek cost"
            << "\navg = " << std::to_string(mean_time)
            << "\nmax = " << std::to_string(max_time)
            << "\nmin = " << std::to_string(min_time);
  LOG(INFO) << "\ntotal_request = " << std::to_string(request_num)
            << "\nspeed = " << std::to_string(request_num * 1000000 /
                                              main_time)  // mean_time us
X
xulongteng 已提交
186
            << " query per second";
B
barrierye 已提交
187
  return 0;
X
xulongteng 已提交
188 189
}

Y
yangrui07 已提交
190 191 192
}  // namespace mcube
}  // namespace rec

X
xulongteng 已提交
193
int main(int argc, char** argv) { return ::rec::mcube::run_m(argc, argv); }