tmqDemo.c 23.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
L
Liu Jicong 已提交
18
#include <stdlib.h>
19 20 21
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
L
Liu Jicong 已提交
22
#include <time.h>
wafwerar's avatar
wafwerar 已提交
23
// #include <unistd.h>
24 25 26 27 28

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"

L
Liu Jicong 已提交
29 30
#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
31 32
#define min(a, b) (((a) < (b)) ? (a) : (b))

L
Liu Jicong 已提交
33 34
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
35

36 37 38 39 40 41
enum _RUN_MODE {
  TMQ_RUN_INSERT_AND_CONSUME,
  TMQ_RUN_ONLY_INSERT,
  TMQ_RUN_ONLY_CONSUME,
  TMQ_RUN_MODE_BUTT,
};
42 43

typedef struct {
L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
  char    dbName[32];
  char    stbName[64];
  char    resultFileName[256];
  char    vnodeWalPath[256];
  int32_t numOfThreads;
  int32_t numOfTables;
  int32_t numOfVgroups;
  int32_t runMode;
  int32_t numOfColumn;
  double  ratio;
  int32_t batchNumOfRow;
  int32_t totalRowsOfPerTbl;
  int64_t startTimestamp;
  int32_t showMsgFlag;
  int32_t simCase;

  int32_t totalRowsOfT2;
61 62 63 64 65
} SConfInfo;

static SConfInfo g_stConfInfo = {
    "tmqdb",
    "stb",
L
Liu Jicong 已提交
66
    "./tmqResult.txt",  // output_file
P
plum-lihui 已提交
67
    "",                 // /data2/dnode/data/vnode/vnode2/wal",
L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78
    1,                  // threads
    1,                  // tables
    1,                  // vgroups
    0,                  // run mode
    1,                  // columns
    1,                  // ratio
    1,                  // batch size
    10000,              // total rows for per table
    0,                  // 2020-01-01 00:00:00.000
    0,                  // show consume msg switch
    0,                  // if run in sim case
79 80 81
    10000,
};

L
Liu Jicong 已提交
82
char*     g_pRowValue = NULL;
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the performance while create table\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", g_stConfInfo.dbName);
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", g_stConfInfo.stbName);
  printf("%s%s\n", indent, "-f");
  printf("%s%s%s%s\n", indent, indent, "The file of result, default is ", g_stConfInfo.resultFileName);
  printf("%s%s\n", indent, "-w");
  printf("%s%s%s%s\n", indent, indent, "The path of vnode of wal, default is ", g_stConfInfo.vnodeWalPath);
  printf("%s%s\n", indent, "-t");
  printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", g_stConfInfo.numOfThreads);
  printf("%s%s\n", indent, "-n");
  printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", g_stConfInfo.numOfTables);
  printf("%s%s\n", indent, "-v");
  printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", g_stConfInfo.numOfVgroups);
  printf("%s%s\n", indent, "-a");
  printf("%s%s%s%d\n", indent, indent, "runMode, default is ", g_stConfInfo.runMode);
  printf("%s%s\n", indent, "-l");
  printf("%s%s%s%d\n", indent, indent, "numOfColumn, default is ", g_stConfInfo.numOfColumn);
  printf("%s%s\n", indent, "-q");
  printf("%s%s%s%f\n", indent, indent, "ratio, default is ", g_stConfInfo.ratio);
  printf("%s%s\n", indent, "-b");
  printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", g_stConfInfo.batchNumOfRow);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "totalRowsOfPerTbl, default is ", g_stConfInfo.totalRowsOfPerTbl);
  printf("%s%s\n", indent, "-m");
  printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp);
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
P
plum-lihui 已提交
119 120
  printf("%s%s\n", indent, "-sim");
  printf("%s%s%s%d\n", indent, indent, "simCase, default is ", g_stConfInfo.simCase);
121 122 123 124

  exit(EXIT_SUCCESS);
}

L
Liu Jicong 已提交
125
void parseArgument(int32_t argc, char* argv[]) {
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
  g_stConfInfo.startTimestamp = 1640966400000;  // 2020-01-01 00:00:00.000

  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-s") == 0) {
      strcpy(g_stConfInfo.stbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.vnodeWalPath, argv[++i]);
    } else if (strcmp(argv[i], "-f") == 0) {
      strcpy(g_stConfInfo.resultFileName, argv[++i]);
    } else if (strcmp(argv[i], "-t") == 0) {
      g_stConfInfo.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-n") == 0) {
      g_stConfInfo.numOfTables = atoll(argv[++i]);
    } else if (strcmp(argv[i], "-v") == 0) {
      g_stConfInfo.numOfVgroups = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-a") == 0) {
      g_stConfInfo.runMode = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-b") == 0) {
      g_stConfInfo.batchNumOfRow = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]);
L
Liu Jicong 已提交
154
    } else if (strcmp(argv[i], "-l") == 0) {
155 156 157 158 159 160 161
      g_stConfInfo.numOfColumn = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-q") == 0) {
      g_stConfInfo.ratio = atof(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0) {
      g_stConfInfo.startTimestamp = atol(argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
P
plum-lihui 已提交
162 163
    } else if (strcmp(argv[i], "-sim") == 0) {
      g_stConfInfo.simCase = atol(argv[++i]);
164
    } else {
P
plum-lihui 已提交
165
      printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
L
Liu Jicong 已提交
166
      exit(-1);
167 168 169 170 171
    }
  }

  g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio;

P
plum-lihui 已提交
172
#if 0
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
  pPrint("%s resultFileName:%s %s", GREEN, g_stConfInfo.resultFileName, NC);
  pPrint("%s vnodeWalPath:%s %s", GREEN, g_stConfInfo.vnodeWalPath, NC);
  pPrint("%s numOfTables:%d %s", GREEN, g_stConfInfo.numOfTables, NC);
  pPrint("%s numOfThreads:%d %s", GREEN, g_stConfInfo.numOfThreads, NC);
  pPrint("%s numOfVgroups:%d %s", GREEN, g_stConfInfo.numOfVgroups, NC);
  pPrint("%s runMode:%d %s", GREEN, g_stConfInfo.runMode, NC);
  pPrint("%s ratio:%f %s", GREEN, g_stConfInfo.ratio, NC);
  pPrint("%s numOfColumn:%d %s", GREEN, g_stConfInfo.numOfColumn, NC);
  pPrint("%s batchNumOfRow:%d %s", GREEN, g_stConfInfo.batchNumOfRow, NC);
  pPrint("%s totalRowsOfPerTbl:%d %s", GREEN, g_stConfInfo.totalRowsOfPerTbl, NC);
  pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
  pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
L
Liu Jicong 已提交
189
#endif
190 191
}

L
Liu Jicong 已提交
192
static int running = 1;
L
Liu Jicong 已提交
193
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
194 195

// calc dir size (not include itself 4096Byte)
L
Liu Jicong 已提交
196 197 198 199
int64_t getDirectorySize(char* dir) {
  TdDirPtr      pDir;
  TdDirEntryPtr pDirEntry;
  int64_t       totalSize = 0;
200

L
Liu Jicong 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
  if ((pDir = taosOpenDir(dir)) == NULL) {
    fprintf(stderr, "Cannot open dir: %s\n", dir);
    return -1;
  }

  // lstat(dir, &statbuf);
  // totalSize+=statbuf.st_size;

  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char  subdir[1024];
    char* fileName = taosGetDirEntryName(pDirEntry);
    sprintf(subdir, "%s/%s", dir, fileName);

    // printf("===d_name: %s\n", entry->d_name);
    if (taosIsDir(subdir)) {
      if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) {
        continue;
      }

      int64_t subDirSize = getDirectorySize(subdir);
      totalSize += subDirSize;
    } else if (0 == strcmp(strchr(fileName, '.'), ".log")) {  // only calc .log file size, and not include .idx file
      int64_t file_size = 0;
      taosStatFile(subdir, &file_size, NULL);
      totalSize += file_size;
226
    }
L
Liu Jicong 已提交
227
  }
228

wafwerar's avatar
wafwerar 已提交
229
  taosCloseDir(&pDir);
L
Liu Jicong 已提交
230
  return totalSize;
231 232
}

L
Liu Jicong 已提交
233 234 235 236 237 238 239 240 241 242 243
int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
244 245 246 247
}

int32_t init_env() {
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
248

249 250 251 252
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
253

254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in create db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  // create row value
wafwerar's avatar
wafwerar 已提交
271
  g_pRowValue = (char*)taosMemoryCalloc(1, g_stConfInfo.numOfColumn * 16 + 128);
272 273 274 275 276 277
  if (NULL == g_pRowValue) {
    return -1;
  }

  int32_t dataLen = 0;
  int32_t sqlLen = 0;
L
Liu Jicong 已提交
278
  sqlLen += sprintf(sqlStr + sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName);
279
  for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) {
L
Liu Jicong 已提交
280 281 282 283 284 285 286 287 288 289 290
    if (i == g_stConfInfo.numOfColumn - 1) {
      sqlLen += sprintf(sqlStr + sqlLen, "c%d int) ", i);
      memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899"));
      dataLen += strlen("66778899");
    } else {
      sqlLen += sprintf(sqlStr + sqlLen, "c%d int, ", i);
      memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, "));
      dataLen += strlen("66778899, ");
    }
  }
  sqlLen += sprintf(sqlStr + sqlLen, "tags (t0 int)");
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308

  pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table %s, reason:%s\n", g_stConfInfo.stbName, taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  for (int32_t i = 0; i < g_stConfInfo.numOfTables; i++) {
    sprintf(sqlStr, "create table if not exists %s%d using %s tags(1)", g_stConfInfo.stbName, i, g_stConfInfo.stbName);
    pRes = taos_query(pConn, sqlStr);
    if (taos_errno(pRes) != 0) {
      printf("failed to create child table %s%d, reason:%s\n", g_stConfInfo.stbName, i, taos_errstr(pRes));
      return -1;
    }
    taos_free_result(pRes);
  }

L
Liu Jicong 已提交
309
  // const char* sql = "select * from tu1";
L
Liu Jicong 已提交
310
  sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
L
Liu Jicong 已提交
311 312
  /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
  pRes = taos_query(pConn, sqlStr);
313 314 315 316 317 318 319 320 321 322
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  taos_close(pConn);
  return 0;
}

tmq_t* build_consumer() {
L
Liu Jicong 已提交
323
#if 0
324 325 326 327 328 329 330 331 332 333 334
  char sqlStr[1024] = {0};
  
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
335
#endif
336 337 338

  tmq_conf_t* conf = tmq_conf_new();
  tmq_conf_set(conf, "group.id", "tg2");
L
Liu Jicong 已提交
339 340 341
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
  tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
L
Liu Jicong 已提交
342
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
343
  assert(tmq);
L
Liu Jicong 已提交
344
  tmq_conf_destroy(conf);
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
  tmq_list_append(topic_list, "test_stb_topic_1");
  return topic_list;
}

void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int            msg_count = 0;
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    return;
  }

  while (running) {
L
Liu Jicong 已提交
366
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
367
    if (tmqmessage) {
L
Liu Jicong 已提交
368
      /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
369
      taos_free_result(tmqmessage);
370

L
Liu Jicong 已提交
371
      if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL);
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
    }
  }

  err = tmq_consumer_close(tmq);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    printf("subscribe err\n");
    return;
  }
L
Liu Jicong 已提交
390
  /*taosSsleep(3);*/
391 392 393 394
  int32_t batchCnt = 0;
  int32_t skipLogNum = 0;
  int64_t startTime = taosGetTimestampUs();
  while (running) {
L
Liu Jicong 已提交
395
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000);
396 397
    if (tmqmessage) {
      batchCnt++;
L
Liu Jicong 已提交
398
      /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
L
Liu Jicong 已提交
399
      if (0 != g_stConfInfo.showMsgFlag) {
L
Liu Jicong 已提交
400
        /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
401
      }
L
Liu Jicong 已提交
402
      taos_free_result(tmqmessage);
403 404 405 406 407
    } else {
      break;
    }
  }
  int64_t endTime = taosGetTimestampUs();
L
Liu Jicong 已提交
408
  double  consumeTime = (double)(endTime - startTime) / 1000000;
409 410

  if (batchCnt != totalMsgs) {
L
Liu Jicong 已提交
411 412
    printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
    /*exit(-1);*/
413 414
  }

P
plum-lihui 已提交
415 416 417 418 419
  if (0 == g_stConfInfo.simCase) {
    printf("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime);
  } else {
    printf("{consume success: %d}", totalMsgs);
  }
L
Liu Jicong 已提交
420 421 422
  taosFprintfFile(g_fp, "|%10d    |   %10.3f    |  %8.2f  |  %10.2f|    %10.2f    |\n", batchCnt, consumeTime,
                  (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime,
                  (double)walLogSize / 1024.0 / batchCnt);
423 424 425 426

  err = tmq_consumer_close(tmq);
  if (err) {
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
427
    exit(-1);
428 429 430 431 432 433 434
  }
}

// sync insertion
int32_t syncWriteData() {
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
L
Liu Jicong 已提交
435
    return -1;
436 437 438 439 440 441 442 443 444 445 446 447
  }

  char sqlStr[1024] = {0};
  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  char* buffer = NULL;
wafwerar's avatar
wafwerar 已提交
448
  buffer = (char*)taosMemoryMalloc(MAX_SQL_STR_LEN);
449 450 451 452 453
  if (NULL == buffer) {
    return -1;
  }

  int32_t totalMsgs = 0;
L
Liu Jicong 已提交
454

455 456 457
  int64_t time_counter = g_stConfInfo.startTimestamp;
  for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) {
    for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
L
Liu Jicong 已提交
458
      int     inserted = i;
459 460 461 462 463 464 465 466 467 468 469
      int64_t tmp_time = time_counter;

      int32_t data_len = 0;
      data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
      int k;
      for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
        data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
        inserted++;
        k++;

        if (inserted >= g_stConfInfo.totalRowsOfPerTbl) {
L
Liu Jicong 已提交
470
          break;
471 472
        }

L
Liu Jicong 已提交
473
        if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
474
          break;
L
Liu Jicong 已提交
475
        }
476 477 478
      }

      int code = queryDB(pConn, buffer);
L
Liu Jicong 已提交
479
      if (0 != code) {
480
        fprintf(stderr, "insert data error!\n");
L
Liu Jicong 已提交
481 482 483
        taosMemoryFreeClear(buffer);
        return -1;
      }
484

L
Liu Jicong 已提交
485
      totalMsgs++;
486 487 488 489 490 491 492

      if (tID == g_stConfInfo.numOfTables - 1) {
        i = inserted;
        time_counter = tmp_time;
      }
    }
  }
wafwerar's avatar
wafwerar 已提交
493
  taosMemoryFreeClear(buffer);
494 495 496 497 498 499 500
  return totalMsgs;
}

// sync insertion
int32_t syncWriteDataByRatio() {
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
L
Liu Jicong 已提交
501
    return -1;
502 503 504 505 506 507 508 509 510 511 512 513
  }

  char sqlStr[1024] = {0};
  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  char* buffer = NULL;
wafwerar's avatar
wafwerar 已提交
514
  buffer = (char*)taosMemoryMalloc(MAX_SQL_STR_LEN);
515 516 517 518 519 520 521
  if (NULL == buffer) {
    return -1;
  }

  int32_t totalMsgs = 0;

  int32_t insertedOfT1 = 0;
L
Liu Jicong 已提交
522
  int32_t insertedOfT2 = 0;
523 524 525 526

  int64_t tsOfT1 = g_stConfInfo.startTimestamp;
  int64_t tsOfT2 = g_stConfInfo.startTimestamp;
  int64_t tmp_time;
L
Liu Jicong 已提交
527

528
  for (;;) {
L
Liu Jicong 已提交
529
    if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) {
530
      break;
L
Liu Jicong 已提交
531 532
    }

533 534
    for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
      if (0 == tID) {
L
Liu Jicong 已提交
535
        tmp_time = tsOfT1;
536
        if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
L
Liu Jicong 已提交
537
          continue;
538
        }
L
Liu Jicong 已提交
539 540
      } else if (1 == tID) {
        tmp_time = tsOfT2;
541
        if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
L
Liu Jicong 已提交
542
          continue;
543 544 545 546 547 548 549 550 551
        }
      }

      int32_t data_len = 0;
      data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
      int k;
      for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
        data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
        k++;
L
Liu Jicong 已提交
552
        if (0 == tID) {
553
          insertedOfT1++;
L
Liu Jicong 已提交
554 555 556 557
          if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
            break;
          }
        } else if (1 == tID) {
558
          insertedOfT2++;
L
Liu Jicong 已提交
559 560 561 562
          if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
            break;
          }
        }
563

L
Liu Jicong 已提交
564
        if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
565
          break;
L
Liu Jicong 已提交
566
        }
567 568 569
      }

      int code = queryDB(pConn, buffer);
L
Liu Jicong 已提交
570
      if (0 != code) {
571
        fprintf(stderr, "insert data error!\n");
L
Liu Jicong 已提交
572 573 574 575
        taosMemoryFreeClear(buffer);
        return -1;
      }

576
      if (0 == tID) {
L
Liu Jicong 已提交
577 578 579
        tsOfT1 = tmp_time;
      } else if (1 == tID) {
        tsOfT2 = tmp_time;
580 581
      }

L
Liu Jicong 已提交
582
      totalMsgs++;
583 584
    }
  }
L
Liu Jicong 已提交
585 586
  pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl,
         g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2);
wafwerar's avatar
wafwerar 已提交
587
  taosMemoryFreeClear(buffer);
588 589 590 591 592
  return totalMsgs;
}

void printParaIntoFile() {
  // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
L
Liu Jicong 已提交
593 594
  TdFilePtr pFile =
      taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
595 596
  if (NULL == pFile) {
    fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName);
wafwerar's avatar
wafwerar 已提交
597 598
    exit(-1);
  }
599 600
  g_fp = pFile;

L
Liu Jicong 已提交
601
  time_t    tTime = taosGetTimestampSec();
602
  struct tm tm = *taosLocalTime(&tTime, NULL);
603 604

  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
605 606 607 608 609 610 611 612 613 614 615 616 617
  taosFprintfFile(pFile, "# configDir:                %s\n", configDir);
  taosFprintfFile(pFile, "# dbName:                   %s\n", g_stConfInfo.dbName);
  taosFprintfFile(pFile, "# stbName:                  %s\n", g_stConfInfo.stbName);
  taosFprintfFile(pFile, "# vnodeWalPath:             %s\n", g_stConfInfo.vnodeWalPath);
  taosFprintfFile(pFile, "# numOfTables:              %d\n", g_stConfInfo.numOfTables);
  taosFprintfFile(pFile, "# numOfThreads:             %d\n", g_stConfInfo.numOfThreads);
  taosFprintfFile(pFile, "# numOfVgroups:             %d\n", g_stConfInfo.numOfVgroups);
  taosFprintfFile(pFile, "# runMode:                  %d\n", g_stConfInfo.runMode);
  taosFprintfFile(pFile, "# ratio:                    %f\n", g_stConfInfo.ratio);
  taosFprintfFile(pFile, "# numOfColumn:              %d\n", g_stConfInfo.numOfColumn);
  taosFprintfFile(pFile, "# batchNumOfRow:            %d\n", g_stConfInfo.batchNumOfRow);
  taosFprintfFile(pFile, "# totalRowsOfPerTbl:        %d\n", g_stConfInfo.totalRowsOfPerTbl);
  taosFprintfFile(pFile, "# totalRowsOfT2:            %d\n", g_stConfInfo.totalRowsOfT2);
618
  taosFprintfFile(pFile, "# Test time:                %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
L
Liu Jicong 已提交
619
                  tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
620
  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
621 622 623 624 625 626 627
  taosFprintfFile(pFile,
                  "|-------------------------------insert "
                  "info-----------------------------|--------------------------------consume "
                  "info---------------------------------|\n");
  taosFprintfFile(pFile,
                  "|batch size| insert msgs | insert time(s) |   msgs/s   | walLogSize(MB) | consume msgs | consume "
                  "time(s) |   msgs/s   |    MB/s    | avg msg size(KB) |\n");
628 629 630
  taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow);
}

L
Liu Jicong 已提交
631
int main(int32_t argc, char* argv[]) {
632 633 634 635 636 637 638 639 640
  parseArgument(argc, argv);
  printParaIntoFile();

  int64_t walLogSize = 0;

  int code;
  code = init_env();
  if (code != 0) {
    fprintf(stderr, "%% init_env error!\n");
L
Liu Jicong 已提交
641
    return -1;
642 643 644 645 646 647 648
  }

  int32_t totalMsgs = 0;

  if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) {
    int64_t startTs = taosGetTimestampUs();
    if (1 == g_stConfInfo.ratio) {
L
Liu Jicong 已提交
649
      totalMsgs = syncWriteData();
650
    } else {
L
Liu Jicong 已提交
651 652 653
      totalMsgs = syncWriteDataByRatio();
    }

654
    if (totalMsgs <= 0) {
L
Liu Jicong 已提交
655
      pError("inset data error!\n");
656
      return -1;
P
plum-lihui 已提交
657
    }
L
Liu Jicong 已提交
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
    int64_t endTs = taosGetTimestampUs();
    int64_t delay = endTs - startTs;

    int32_t totalRows = 0;
    if (1 == g_stConfInfo.ratio) {
      totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables;
    } else {
      totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio);
    }

    float seconds = delay / 1000000.0;
    float rowsSpeed = totalRows / seconds;
    float msgsSpeed = totalMsgs / seconds;

    if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) {
      walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
      if (walLogSize <= 0) {
        printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);
        exit(-1);
      } else {
        pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize / (1024 * 1024.0));
      }
    }

    if (0 == g_stConfInfo.simCase) {
      pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows,
             totalMsgs, seconds, rowsSpeed, msgsSpeed);
    }
    taosFprintfFile(g_fp, "|%10d   |   %10.3f   |  %8.2f  |   %10.3f   ", totalMsgs, seconds, msgsSpeed,
                    (double)walLogSize / (1024 * 1024.0));
688 689 690 691 692
  }

  if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) {
    return 0;
  }
L
Liu Jicong 已提交
693 694

  tmq_t*      tmq = build_consumer();
695
  tmq_list_t* topic_list = build_topic_list();
L
Liu Jicong 已提交
696
  if ((NULL == tmq) || (NULL == topic_list)) {
697 698
    return -1;
  }
L
Liu Jicong 已提交
699

700 701
  perf_loop(tmq, topic_list, totalMsgs, walLogSize);

wafwerar's avatar
wafwerar 已提交
702
  taosMemoryFreeClear(g_pRowValue);
L
Liu Jicong 已提交
703 704
  taosFprintfFile(g_fp, "\n");
  taosCloseFile(&g_fp);
705 706 707
  return 0;
}