tmqDemo.c 23.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
L
Liu Jicong 已提交
17
#include <dirent.h>
18
#include <stdio.h>
L
Liu Jicong 已提交
19
#include <stdlib.h>
20 21 22
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
L
Liu Jicong 已提交
23
#include <time.h>
24 25 26 27 28 29
#include <unistd.h>

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"

L
Liu Jicong 已提交
30 31
#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
32 33
#define min(a, b) (((a) < (b)) ? (a) : (b))

L
Liu Jicong 已提交
34 35
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
36

37 38 39 40 41 42
enum _RUN_MODE {
  TMQ_RUN_INSERT_AND_CONSUME,
  TMQ_RUN_ONLY_INSERT,
  TMQ_RUN_ONLY_CONSUME,
  TMQ_RUN_MODE_BUTT,
};
43 44

typedef struct {
L
Liu Jicong 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
  char    dbName[32];
  char    stbName[64];
  char    resultFileName[256];
  char    vnodeWalPath[256];
  int32_t numOfThreads;
  int32_t numOfTables;
  int32_t numOfVgroups;
  int32_t runMode;
  int32_t numOfColumn;
  double  ratio;
  int32_t batchNumOfRow;
  int32_t totalRowsOfPerTbl;
  int64_t startTimestamp;
  int32_t showMsgFlag;
  int32_t simCase;

  int32_t totalRowsOfT2;
62 63 64 65 66
} SConfInfo;

static SConfInfo g_stConfInfo = {
    "tmqdb",
    "stb",
L
Liu Jicong 已提交
67
    "./tmqResult.txt",  // output_file
P
plum-lihui 已提交
68
    "",                 // /data2/dnode/data/vnode/vnode2/wal",
L
Liu Jicong 已提交
69 70 71 72 73 74 75 76 77 78 79
    1,                  // threads
    1,                  // tables
    1,                  // vgroups
    0,                  // run mode
    1,                  // columns
    1,                  // ratio
    1,                  // batch size
    10000,              // total rows for per table
    0,                  // 2020-01-01 00:00:00.000
    0,                  // show consume msg switch
    0,                  // if run in sim case
80 81 82
    10000,
};

L
Liu Jicong 已提交
83
char*     g_pRowValue = NULL;
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the performance while create table\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", g_stConfInfo.dbName);
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", g_stConfInfo.stbName);
  printf("%s%s\n", indent, "-f");
  printf("%s%s%s%s\n", indent, indent, "The file of result, default is ", g_stConfInfo.resultFileName);
  printf("%s%s\n", indent, "-w");
  printf("%s%s%s%s\n", indent, indent, "The path of vnode of wal, default is ", g_stConfInfo.vnodeWalPath);
  printf("%s%s\n", indent, "-t");
  printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", g_stConfInfo.numOfThreads);
  printf("%s%s\n", indent, "-n");
  printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", g_stConfInfo.numOfTables);
  printf("%s%s\n", indent, "-v");
  printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", g_stConfInfo.numOfVgroups);
  printf("%s%s\n", indent, "-a");
  printf("%s%s%s%d\n", indent, indent, "runMode, default is ", g_stConfInfo.runMode);
  printf("%s%s\n", indent, "-l");
  printf("%s%s%s%d\n", indent, indent, "numOfColumn, default is ", g_stConfInfo.numOfColumn);
  printf("%s%s\n", indent, "-q");
  printf("%s%s%s%f\n", indent, indent, "ratio, default is ", g_stConfInfo.ratio);
  printf("%s%s\n", indent, "-b");
  printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", g_stConfInfo.batchNumOfRow);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "totalRowsOfPerTbl, default is ", g_stConfInfo.totalRowsOfPerTbl);
  printf("%s%s\n", indent, "-m");
  printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp);
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
P
plum-lihui 已提交
120 121
  printf("%s%s\n", indent, "-sim");
  printf("%s%s%s%d\n", indent, indent, "simCase, default is ", g_stConfInfo.simCase);
122 123 124 125

  exit(EXIT_SUCCESS);
}

L
Liu Jicong 已提交
126
void parseArgument(int32_t argc, char* argv[]) {
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
  g_stConfInfo.startTimestamp = 1640966400000;  // 2020-01-01 00:00:00.000

  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-s") == 0) {
      strcpy(g_stConfInfo.stbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.vnodeWalPath, argv[++i]);
    } else if (strcmp(argv[i], "-f") == 0) {
      strcpy(g_stConfInfo.resultFileName, argv[++i]);
    } else if (strcmp(argv[i], "-t") == 0) {
      g_stConfInfo.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-n") == 0) {
      g_stConfInfo.numOfTables = atoll(argv[++i]);
    } else if (strcmp(argv[i], "-v") == 0) {
      g_stConfInfo.numOfVgroups = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-a") == 0) {
      g_stConfInfo.runMode = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-b") == 0) {
      g_stConfInfo.batchNumOfRow = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]);
L
Liu Jicong 已提交
155
    } else if (strcmp(argv[i], "-l") == 0) {
156 157 158 159 160 161 162
      g_stConfInfo.numOfColumn = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-q") == 0) {
      g_stConfInfo.ratio = atof(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0) {
      g_stConfInfo.startTimestamp = atol(argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
P
plum-lihui 已提交
163 164
    } else if (strcmp(argv[i], "-sim") == 0) {
      g_stConfInfo.simCase = atol(argv[++i]);
165
    } else {
P
plum-lihui 已提交
166
      printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
L
Liu Jicong 已提交
167
      exit(-1);
168 169 170 171 172
    }
  }

  g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio;

P
plum-lihui 已提交
173
#if 0
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
  pPrint("%s resultFileName:%s %s", GREEN, g_stConfInfo.resultFileName, NC);
  pPrint("%s vnodeWalPath:%s %s", GREEN, g_stConfInfo.vnodeWalPath, NC);
  pPrint("%s numOfTables:%d %s", GREEN, g_stConfInfo.numOfTables, NC);
  pPrint("%s numOfThreads:%d %s", GREEN, g_stConfInfo.numOfThreads, NC);
  pPrint("%s numOfVgroups:%d %s", GREEN, g_stConfInfo.numOfVgroups, NC);
  pPrint("%s runMode:%d %s", GREEN, g_stConfInfo.runMode, NC);
  pPrint("%s ratio:%f %s", GREEN, g_stConfInfo.ratio, NC);
  pPrint("%s numOfColumn:%d %s", GREEN, g_stConfInfo.numOfColumn, NC);
  pPrint("%s batchNumOfRow:%d %s", GREEN, g_stConfInfo.batchNumOfRow, NC);
  pPrint("%s totalRowsOfPerTbl:%d %s", GREEN, g_stConfInfo.totalRowsOfPerTbl, NC);
  pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
  pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
L
Liu Jicong 已提交
190
#endif
191 192
}

L
Liu Jicong 已提交
193
static int running = 1;
L
Liu Jicong 已提交
194
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
195 196

// calc dir size (not include itself 4096Byte)
L
Liu Jicong 已提交
197 198 199 200
int64_t getDirectorySize(char* dir) {
  TdDirPtr      pDir;
  TdDirEntryPtr pDirEntry;
  int64_t       totalSize = 0;
201

L
Liu Jicong 已提交
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
  if ((pDir = taosOpenDir(dir)) == NULL) {
    fprintf(stderr, "Cannot open dir: %s\n", dir);
    return -1;
  }

  // lstat(dir, &statbuf);
  // totalSize+=statbuf.st_size;

  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char  subdir[1024];
    char* fileName = taosGetDirEntryName(pDirEntry);
    sprintf(subdir, "%s/%s", dir, fileName);

    // printf("===d_name: %s\n", entry->d_name);
    if (taosIsDir(subdir)) {
      if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) {
        continue;
      }

      int64_t subDirSize = getDirectorySize(subdir);
      totalSize += subDirSize;
    } else if (0 == strcmp(strchr(fileName, '.'), ".log")) {  // only calc .log file size, and not include .idx file
      int64_t file_size = 0;
      taosStatFile(subdir, &file_size, NULL);
      totalSize += file_size;
227
    }
L
Liu Jicong 已提交
228
  }
229

L
Liu Jicong 已提交
230 231
  taosCloseDir(pDir);
  return totalSize;
232 233
}

L
Liu Jicong 已提交
234 235 236 237 238 239 240 241 242 243 244
int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
245 246 247 248
}

int32_t init_env() {
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
249

250 251 252 253
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
254

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
  sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in create db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  // create row value
wafwerar's avatar
wafwerar 已提交
272
  g_pRowValue = (char*)taosMemoryCalloc(1, g_stConfInfo.numOfColumn * 16 + 128);
273 274 275 276 277 278
  if (NULL == g_pRowValue) {
    return -1;
  }

  int32_t dataLen = 0;
  int32_t sqlLen = 0;
L
Liu Jicong 已提交
279
  sqlLen += sprintf(sqlStr + sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName);
280
  for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) {
L
Liu Jicong 已提交
281 282 283 284 285 286 287 288 289 290 291
    if (i == g_stConfInfo.numOfColumn - 1) {
      sqlLen += sprintf(sqlStr + sqlLen, "c%d int) ", i);
      memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899"));
      dataLen += strlen("66778899");
    } else {
      sqlLen += sprintf(sqlStr + sqlLen, "c%d int, ", i);
      memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, "));
      dataLen += strlen("66778899, ");
    }
  }
  sqlLen += sprintf(sqlStr + sqlLen, "tags (t0 int)");
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309

  pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table %s, reason:%s\n", g_stConfInfo.stbName, taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  for (int32_t i = 0; i < g_stConfInfo.numOfTables; i++) {
    sprintf(sqlStr, "create table if not exists %s%d using %s tags(1)", g_stConfInfo.stbName, i, g_stConfInfo.stbName);
    pRes = taos_query(pConn, sqlStr);
    if (taos_errno(pRes) != 0) {
      printf("failed to create child table %s%d, reason:%s\n", g_stConfInfo.stbName, i, taos_errstr(pRes));
      return -1;
    }
    taos_free_result(pRes);
  }

L
Liu Jicong 已提交
310
  // const char* sql = "select * from tu1";
L
Liu Jicong 已提交
311
  sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
L
Liu Jicong 已提交
312 313
  /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
  pRes = taos_query(pConn, sqlStr);
314 315 316 317 318 319 320 321 322 323
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  taos_close(pConn);
  return 0;
}

tmq_t* build_consumer() {
L
Liu Jicong 已提交
324
#if 0
325 326 327 328 329 330 331 332 333 334 335
  char sqlStr[1024] = {0};
  
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
336
#endif
337 338 339

  tmq_conf_t* conf = tmq_conf_new();
  tmq_conf_set(conf, "group.id", "tg2");
L
Liu Jicong 已提交
340 341 342 343
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
  tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
  tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
344
  assert(tmq);
L
Liu Jicong 已提交
345
  tmq_conf_destroy(conf);
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
  tmq_list_append(topic_list, "test_stb_topic_1");
  return topic_list;
}

void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int            msg_count = 0;
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    return;
  }

  while (running) {
L
Liu Jicong 已提交
367
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
368
    if (tmqmessage) {
L
Liu Jicong 已提交
369
      /*msg_process(tmqmessage);*/
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
      tmq_message_destroy(tmqmessage);

      if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
    }
  }

  err = tmq_consumer_close(tmq);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    printf("subscribe err\n");
    return;
  }
L
Liu Jicong 已提交
391
  /*taosSsleep(3);*/
392 393 394 395
  int32_t batchCnt = 0;
  int32_t skipLogNum = 0;
  int64_t startTime = taosGetTimestampUs();
  while (running) {
L
Liu Jicong 已提交
396
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000);
397 398
    if (tmqmessage) {
      batchCnt++;
L
Liu Jicong 已提交
399
      /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
L
Liu Jicong 已提交
400
      if (0 != g_stConfInfo.showMsgFlag) {
L
Liu Jicong 已提交
401
        /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
402
      }
403 404 405 406 407 408
      tmq_message_destroy(tmqmessage);
    } else {
      break;
    }
  }
  int64_t endTime = taosGetTimestampUs();
L
Liu Jicong 已提交
409
  double  consumeTime = (double)(endTime - startTime) / 1000000;
410 411

  if (batchCnt != totalMsgs) {
L
Liu Jicong 已提交
412 413
    printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
    /*exit(-1);*/
414 415
  }

P
plum-lihui 已提交
416 417 418 419 420
  if (0 == g_stConfInfo.simCase) {
    printf("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime);
  } else {
    printf("{consume success: %d}", totalMsgs);
  }
L
Liu Jicong 已提交
421 422 423
  taosFprintfFile(g_fp, "|%10d    |   %10.3f    |  %8.2f  |  %10.2f|    %10.2f    |\n", batchCnt, consumeTime,
                  (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime,
                  (double)walLogSize / 1024.0 / batchCnt);
424 425 426 427

  err = tmq_consumer_close(tmq);
  if (err) {
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
428
    exit(-1);
429 430 431 432 433 434 435
  }
}

// sync insertion
int32_t syncWriteData() {
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
L
Liu Jicong 已提交
436
    return -1;
437 438 439 440 441 442 443 444 445 446 447 448
  }

  char sqlStr[1024] = {0};
  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  char* buffer = NULL;
wafwerar's avatar
wafwerar 已提交
449
  buffer = (char*)taosMemoryMalloc(MAX_SQL_STR_LEN);
450 451 452 453 454
  if (NULL == buffer) {
    return -1;
  }

  int32_t totalMsgs = 0;
L
Liu Jicong 已提交
455

456 457 458
  int64_t time_counter = g_stConfInfo.startTimestamp;
  for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) {
    for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
L
Liu Jicong 已提交
459
      int     inserted = i;
460 461 462 463 464 465 466 467 468 469 470
      int64_t tmp_time = time_counter;

      int32_t data_len = 0;
      data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
      int k;
      for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
        data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
        inserted++;
        k++;

        if (inserted >= g_stConfInfo.totalRowsOfPerTbl) {
L
Liu Jicong 已提交
471
          break;
472 473
        }

L
Liu Jicong 已提交
474
        if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
475
          break;
L
Liu Jicong 已提交
476
        }
477 478 479
      }

      int code = queryDB(pConn, buffer);
L
Liu Jicong 已提交
480
      if (0 != code) {
481
        fprintf(stderr, "insert data error!\n");
L
Liu Jicong 已提交
482 483 484
        taosMemoryFreeClear(buffer);
        return -1;
      }
485

L
Liu Jicong 已提交
486
      totalMsgs++;
487 488 489 490 491 492 493

      if (tID == g_stConfInfo.numOfTables - 1) {
        i = inserted;
        time_counter = tmp_time;
      }
    }
  }
wafwerar's avatar
wafwerar 已提交
494
  taosMemoryFreeClear(buffer);
495 496 497 498 499 500 501
  return totalMsgs;
}

// sync insertion
int32_t syncWriteDataByRatio() {
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
L
Liu Jicong 已提交
502
    return -1;
503 504 505 506 507 508 509 510 511 512 513 514
  }

  char sqlStr[1024] = {0};
  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  char* buffer = NULL;
wafwerar's avatar
wafwerar 已提交
515
  buffer = (char*)taosMemoryMalloc(MAX_SQL_STR_LEN);
516 517 518 519 520 521 522
  if (NULL == buffer) {
    return -1;
  }

  int32_t totalMsgs = 0;

  int32_t insertedOfT1 = 0;
L
Liu Jicong 已提交
523
  int32_t insertedOfT2 = 0;
524 525 526 527

  int64_t tsOfT1 = g_stConfInfo.startTimestamp;
  int64_t tsOfT2 = g_stConfInfo.startTimestamp;
  int64_t tmp_time;
L
Liu Jicong 已提交
528

529
  for (;;) {
L
Liu Jicong 已提交
530
    if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) {
531
      break;
L
Liu Jicong 已提交
532 533
    }

534 535
    for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
      if (0 == tID) {
L
Liu Jicong 已提交
536
        tmp_time = tsOfT1;
537
        if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
L
Liu Jicong 已提交
538
          continue;
539
        }
L
Liu Jicong 已提交
540 541
      } else if (1 == tID) {
        tmp_time = tsOfT2;
542
        if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
L
Liu Jicong 已提交
543
          continue;
544 545 546 547 548 549 550 551 552
        }
      }

      int32_t data_len = 0;
      data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
      int k;
      for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
        data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
        k++;
L
Liu Jicong 已提交
553
        if (0 == tID) {
554
          insertedOfT1++;
L
Liu Jicong 已提交
555 556 557 558
          if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
            break;
          }
        } else if (1 == tID) {
559
          insertedOfT2++;
L
Liu Jicong 已提交
560 561 562 563
          if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
            break;
          }
        }
564

L
Liu Jicong 已提交
565
        if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
566
          break;
L
Liu Jicong 已提交
567
        }
568 569 570
      }

      int code = queryDB(pConn, buffer);
L
Liu Jicong 已提交
571
      if (0 != code) {
572
        fprintf(stderr, "insert data error!\n");
L
Liu Jicong 已提交
573 574 575 576
        taosMemoryFreeClear(buffer);
        return -1;
      }

577
      if (0 == tID) {
L
Liu Jicong 已提交
578 579 580
        tsOfT1 = tmp_time;
      } else if (1 == tID) {
        tsOfT2 = tmp_time;
581 582
      }

L
Liu Jicong 已提交
583
      totalMsgs++;
584 585
    }
  }
L
Liu Jicong 已提交
586 587
  pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl,
         g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2);
wafwerar's avatar
wafwerar 已提交
588
  taosMemoryFreeClear(buffer);
589 590 591 592 593
  return totalMsgs;
}

void printParaIntoFile() {
  // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
L
Liu Jicong 已提交
594 595
  TdFilePtr pFile =
      taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
596 597
  if (NULL == pFile) {
    fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName);
L
Liu Jicong 已提交
598
    exit - 1;
599 600 601
  };
  g_fp = pFile;

L
Liu Jicong 已提交
602
  time_t    tTime = taosGetTimestampSec();
603
  struct tm tm = *taosLocalTime(&tTime, NULL);
604 605

  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
606 607 608 609 610 611 612 613 614 615 616 617 618
  taosFprintfFile(pFile, "# configDir:                %s\n", configDir);
  taosFprintfFile(pFile, "# dbName:                   %s\n", g_stConfInfo.dbName);
  taosFprintfFile(pFile, "# stbName:                  %s\n", g_stConfInfo.stbName);
  taosFprintfFile(pFile, "# vnodeWalPath:             %s\n", g_stConfInfo.vnodeWalPath);
  taosFprintfFile(pFile, "# numOfTables:              %d\n", g_stConfInfo.numOfTables);
  taosFprintfFile(pFile, "# numOfThreads:             %d\n", g_stConfInfo.numOfThreads);
  taosFprintfFile(pFile, "# numOfVgroups:             %d\n", g_stConfInfo.numOfVgroups);
  taosFprintfFile(pFile, "# runMode:                  %d\n", g_stConfInfo.runMode);
  taosFprintfFile(pFile, "# ratio:                    %f\n", g_stConfInfo.ratio);
  taosFprintfFile(pFile, "# numOfColumn:              %d\n", g_stConfInfo.numOfColumn);
  taosFprintfFile(pFile, "# batchNumOfRow:            %d\n", g_stConfInfo.batchNumOfRow);
  taosFprintfFile(pFile, "# totalRowsOfPerTbl:        %d\n", g_stConfInfo.totalRowsOfPerTbl);
  taosFprintfFile(pFile, "# totalRowsOfT2:            %d\n", g_stConfInfo.totalRowsOfT2);
619
  taosFprintfFile(pFile, "# Test time:                %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
L
Liu Jicong 已提交
620
                  tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
621
  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
622 623 624 625 626 627 628
  taosFprintfFile(pFile,
                  "|-------------------------------insert "
                  "info-----------------------------|--------------------------------consume "
                  "info---------------------------------|\n");
  taosFprintfFile(pFile,
                  "|batch size| insert msgs | insert time(s) |   msgs/s   | walLogSize(MB) | consume msgs | consume "
                  "time(s) |   msgs/s   |    MB/s    | avg msg size(KB) |\n");
629 630 631
  taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow);
}

L
Liu Jicong 已提交
632
int main(int32_t argc, char* argv[]) {
633 634 635 636 637 638 639 640 641
  parseArgument(argc, argv);
  printParaIntoFile();

  int64_t walLogSize = 0;

  int code;
  code = init_env();
  if (code != 0) {
    fprintf(stderr, "%% init_env error!\n");
L
Liu Jicong 已提交
642
    return -1;
643 644 645 646 647 648 649
  }

  int32_t totalMsgs = 0;

  if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) {
    int64_t startTs = taosGetTimestampUs();
    if (1 == g_stConfInfo.ratio) {
L
Liu Jicong 已提交
650
      totalMsgs = syncWriteData();
651
    } else {
L
Liu Jicong 已提交
652 653 654
      totalMsgs = syncWriteDataByRatio();
    }

655
    if (totalMsgs <= 0) {
L
Liu Jicong 已提交
656
      pError("inset data error!\n");
657
      return -1;
P
plum-lihui 已提交
658
    }
L
Liu Jicong 已提交
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
    int64_t endTs = taosGetTimestampUs();
    int64_t delay = endTs - startTs;

    int32_t totalRows = 0;
    if (1 == g_stConfInfo.ratio) {
      totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables;
    } else {
      totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio);
    }

    float seconds = delay / 1000000.0;
    float rowsSpeed = totalRows / seconds;
    float msgsSpeed = totalMsgs / seconds;

    if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) {
      walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
      if (walLogSize <= 0) {
        printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);
        exit(-1);
      } else {
        pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize / (1024 * 1024.0));
      }
    }

    if (0 == g_stConfInfo.simCase) {
      pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows,
             totalMsgs, seconds, rowsSpeed, msgsSpeed);
    }
    taosFprintfFile(g_fp, "|%10d   |   %10.3f   |  %8.2f  |   %10.3f   ", totalMsgs, seconds, msgsSpeed,
                    (double)walLogSize / (1024 * 1024.0));
689 690 691 692 693
  }

  if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) {
    return 0;
  }
L
Liu Jicong 已提交
694 695

  tmq_t*      tmq = build_consumer();
696
  tmq_list_t* topic_list = build_topic_list();
L
Liu Jicong 已提交
697
  if ((NULL == tmq) || (NULL == topic_list)) {
698 699
    return -1;
  }
L
Liu Jicong 已提交
700

701 702
  perf_loop(tmq, topic_list, totalMsgs, walLogSize);

wafwerar's avatar
wafwerar 已提交
703
  taosMemoryFreeClear(g_pRowValue);
L
Liu Jicong 已提交
704 705
  taosFprintfFile(g_fp, "\n");
  taosCloseFile(&g_fp);
706 707 708
  return 0;
}