cube_cli.cpp 6.7 KB
Newer Older
Y
yangrui07 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <gflags/gflags.h>
#include <atomic>
X
xulongteng 已提交
17 18
#include <fstream>
#include <thread>  //NOLINT
G
guru4elephant 已提交
19
#include "core/cube/cube-api/include/cube_api.h"
Y
yangrui07 已提交
20 21 22 23 24 25 26 27 28 29 30
#define TIME_FLAG(flag) \
  struct timeval flag;  \
  gettimeofday(&(flag), NULL);

DEFINE_string(config_file, "./cube.conf", "m-cube config file");
DEFINE_string(keys, "keys", "keys to seek");
DEFINE_string(dict, "dict", "dict to seek");
DEFINE_uint64(batch, 500, "batch size");
DEFINE_int32(timeout, 200, "timeout in ms");
DEFINE_int32(retry, 3, "retry times");
DEFINE_bool(print_output, false, "print output flag");
X
xulongteng 已提交
31 32 33 34 35
DEFINE_int32(thread_num, 1, "thread num");
std::atomic<int> g_concurrency(0);

std::vector<uint64_t> time_list;
std::vector<uint64_t> request_list;
Y
yangrui07 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

namespace {
inline uint64_t time_diff(const struct timeval& start_time,
                          const struct timeval& end_time) {
  return (end_time.tv_sec - start_time.tv_sec) * 1000000 +
         (end_time.tv_usec - start_time.tv_usec);
}
}

namespace rec {
namespace mcube {
std::string string_to_hex(const std::string& input) {
  static const char* const lut = "0123456789ABCDEF";
  size_t len = input.length();

  std::string output;
  output.reserve(2 * len);
  for (size_t i = 0; i < len; ++i) {
    const unsigned char c = input[i];
    output.push_back(lut[c >> 4]);
    output.push_back(lut[c & 15]);
  }
  return output;
}

X
xulongteng 已提交
61
int run(int argc, char** argv, int thread_id) {
Y
yangrui07 已提交
62 63 64 65 66 67 68 69
  google::ParseCommandLineFlags(&argc, &argv, true);

  CubeAPI* cube = CubeAPI::instance();
  int ret = cube->init(FLAGS_config_file.c_str());
  if (ret != 0) {
    LOG(ERROR) << "init cube api failed err=" << ret;
    return ret;
  }
X
xulongteng 已提交
70 71 72 73 74 75 76
  /*
    FILE* key_file = fopen(FLAGS_keys.c_str(), "r");
    if (key_file == NULL) {
      LOG(ERROR) << "open key file [" << FLAGS_keys << "] failed";
      return -1;
    }
  */
Y
yangrui07 已提交
77 78 79 80 81 82 83 84 85
  std::atomic<uint64_t> seek_counter(0);
  std::atomic<uint64_t> seek_cost_total(0);
  uint64_t seek_cost_max = 0;
  uint64_t seek_cost_min = 500000;

  char buffer[1024];
  std::vector<uint64_t> keys;
  std::vector<CubeValue> values;

X
xulongteng 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  std::string line;
  std::vector<int64_t> key_list;
  std::ifstream key_file(FLAGS_keys.c_str());
  while (getline(key_file, line)) {
    key_list.push_back(std::stoll(line));
  }

  uint64_t file_size = key_list.size();
  uint64_t index = 0;
  uint64_t request = 0;

  while (g_concurrency.load() >= FLAGS_thread_num) {
  }
  g_concurrency++;

  while (index < file_size) {
    // uint64_t key = strtoul(buffer, NULL, 10);

    keys.push_back(key_list[index]);
    index += 1;
Y
yangrui07 已提交
106
    int ret = 0;
M
MRXLT 已提交
107
    if (keys.size() >= FLAGS_batch) {
Y
yangrui07 已提交
108 109 110
      TIME_FLAG(seek_start);
      ret = cube->seek(FLAGS_dict, keys, &values);
      TIME_FLAG(seek_end);
X
xulongteng 已提交
111
      request += 1;
Y
yangrui07 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
      if (ret != 0) {
        LOG(WARNING) << "cube seek failed";
      } else if (FLAGS_print_output) {
        for (size_t i = 0; i < keys.size(); ++i) {
          fprintf(stdout,
                  "key:%lu value:%s\n",
                  keys[i],
                  string_to_hex(values[i].buff).c_str());
        }
      }
      ++seek_counter;
      uint64_t seek_cost = time_diff(seek_start, seek_end);
      seek_cost_total += seek_cost;
      if (seek_cost > seek_cost_max) {
        seek_cost_max = seek_cost;
      }
      if (seek_cost < seek_cost_min) {
        seek_cost_min = seek_cost;
      }

      keys.clear();
      values.clear();
    }
  }
X
xulongteng 已提交
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
  /*
    if (keys.size() > 0) {
      int ret = 0;
      values.resize(keys.size());
      TIME_FLAG(seek_start);
      ret = cube->seek(FLAGS_dict, keys, &values);
      TIME_FLAG(seek_end);
      if (ret != 0) {
        LOG(WARNING) << "cube seek failed";
      } else if (FLAGS_print_output) {
        for (size_t i = 0; i < keys.size(); ++i) {
          fprintf(stdout,
                  "key:%lu value:%s\n",
                  keys[i],
                  string_to_hex(values[i].buff).c_str());
        }
      }
Y
yangrui07 已提交
153

X
xulongteng 已提交
154 155 156 157 158 159 160 161
      ++seek_counter;
      uint64_t seek_cost = time_diff(seek_start, seek_end);
      seek_cost_total += seek_cost;
      if (seek_cost > seek_cost_max) {
        seek_cost_max = seek_cost;
      }
      if (seek_cost < seek_cost_min) {
        seek_cost_min = seek_cost;
Y
yangrui07 已提交
162 163
      }
    }
X
xulongteng 已提交
164 165
  */
  g_concurrency--;
Y
yangrui07 已提交
166

X
xulongteng 已提交
167
  // fclose(key_file);
Y
yangrui07 已提交
168

X
xulongteng 已提交
169
  // ret = cube->destroy();
Y
yangrui07 已提交
170 171 172 173 174 175 176 177 178
  if (ret != 0) {
    LOG(WARNING) << "destroy cube api failed err=" << ret;
  }

  uint64_t seek_cost_avg = seek_cost_total / seek_counter;
  LOG(INFO) << "seek cost avg = " << seek_cost_avg;
  LOG(INFO) << "seek cost max = " << seek_cost_max;
  LOG(INFO) << "seek cost min = " << seek_cost_min;

X
xulongteng 已提交
179 180 181
  time_list[thread_id] = seek_cost_avg;
  request_list[thread_id] = request;

Y
yangrui07 已提交
182 183 184
  return 0;
}

X
xulongteng 已提交
185 186 187 188 189 190
int run_m(int argc, char** argv) {
  google::ParseCommandLineFlags(&argc, &argv, true);
  int thread_num = FLAGS_thread_num;
  request_list.resize(thread_num);
  time_list.resize(thread_num);
  std::vector<std::thread*> thread_pool;
M
MRXLT 已提交
191
  TIME_FLAG(main_start);
X
xulongteng 已提交
192 193 194 195 196 197 198
  for (int i = 0; i < thread_num; i++) {
    thread_pool.push_back(new std::thread(run, argc, argv, i));
  }
  for (int i = 0; i < thread_num; i++) {
    thread_pool[i]->join();
    delete thread_pool[i];
  }
M
MRXLT 已提交
199
  TIME_FLAG(main_end);
X
xulongteng 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
  uint64_t sum_time = 0;
  uint64_t max_time = 0;
  uint64_t min_time = 1000000;
  uint64_t request_num = 0;
  for (int i = 0; i < thread_num; i++) {
    sum_time += time_list[i];
    if (time_list[i] > max_time) {
      max_time = time_list[i];
    }
    if (time_list[i] < min_time) {
      min_time = time_list[i];
    }
    request_num += request_list[i];
  }
  uint64_t mean_time = sum_time / thread_num;
M
MRXLT 已提交
215
  uint64_t main_time = time_diff(main_start, main_end);
X
xulongteng 已提交
216 217 218 219
  LOG(INFO) << thread_num << " thread seek cost"
            << " avg = " << std::to_string(mean_time)
            << " max = " << std::to_string(max_time)
            << " min = " << std::to_string(min_time);
M
MRXLT 已提交
220 221 222
  LOG(INFO) << " total_request = " << std::to_string(request_num)
            << " speed = " << std::to_string(request_num * 1000000 /
                                             main_time)  // mean_time us
X
xulongteng 已提交
223
            << " query per second";
B
barrierye 已提交
224
  return 0;
X
xulongteng 已提交
225 226
}

Y
yangrui07 已提交
227 228 229
}  // namespace mcube
}  // namespace rec

X
xulongteng 已提交
230
int main(int argc, char** argv) { return ::rec::mcube::run_m(argc, argv); }