tmqDemo.c 23.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
L
Liu Jicong 已提交
18
#include <stdlib.h>
19 20 21
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
L
Liu Jicong 已提交
22
#include <time.h>
wafwerar's avatar
wafwerar 已提交
23
// #include <unistd.h>
24 25 26 27 28

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"

L
Liu Jicong 已提交
29 30
#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
31 32
#define min(a, b) (((a) < (b)) ? (a) : (b))

L
Liu Jicong 已提交
33 34
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
35

36 37 38 39 40 41
enum _RUN_MODE {
  TMQ_RUN_INSERT_AND_CONSUME,
  TMQ_RUN_ONLY_INSERT,
  TMQ_RUN_ONLY_CONSUME,
  TMQ_RUN_MODE_BUTT,
};
42 43

typedef struct {
L
Liu Jicong 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
  char    dbName[32];
  char    stbName[64];
  char    resultFileName[256];
  char    vnodeWalPath[256];
  int32_t numOfThreads;
  int32_t numOfTables;
  int32_t numOfVgroups;
  int32_t runMode;
  int32_t numOfColumn;
  double  ratio;
  int32_t batchNumOfRow;
  int32_t totalRowsOfPerTbl;
  int64_t startTimestamp;
  int32_t showMsgFlag;
  int32_t simCase;

  int32_t totalRowsOfT2;
61 62 63 64 65
} SConfInfo;

static SConfInfo g_stConfInfo = {
    "tmqdb",
    "stb",
L
Liu Jicong 已提交
66
    "./tmqResult.txt",  // output_file
P
plum-lihui 已提交
67
    "",                 // /data2/dnode/data/vnode/vnode2/wal",
L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78
    1,                  // threads
    1,                  // tables
    1,                  // vgroups
    0,                  // run mode
    1,                  // columns
    1,                  // ratio
    1,                  // batch size
    10000,              // total rows for per table
    0,                  // 2020-01-01 00:00:00.000
    0,                  // show consume msg switch
    0,                  // if run in sim case
79 80 81
    10000,
};

L
Liu Jicong 已提交
82
char*     g_pRowValue = NULL;
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the performance while create table\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", g_stConfInfo.dbName);
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", g_stConfInfo.stbName);
  printf("%s%s\n", indent, "-f");
  printf("%s%s%s%s\n", indent, indent, "The file of result, default is ", g_stConfInfo.resultFileName);
  printf("%s%s\n", indent, "-w");
  printf("%s%s%s%s\n", indent, indent, "The path of vnode of wal, default is ", g_stConfInfo.vnodeWalPath);
  printf("%s%s\n", indent, "-t");
  printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", g_stConfInfo.numOfThreads);
  printf("%s%s\n", indent, "-n");
  printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", g_stConfInfo.numOfTables);
  printf("%s%s\n", indent, "-v");
  printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", g_stConfInfo.numOfVgroups);
  printf("%s%s\n", indent, "-a");
  printf("%s%s%s%d\n", indent, indent, "runMode, default is ", g_stConfInfo.runMode);
  printf("%s%s\n", indent, "-l");
  printf("%s%s%s%d\n", indent, indent, "numOfColumn, default is ", g_stConfInfo.numOfColumn);
  printf("%s%s\n", indent, "-q");
  printf("%s%s%s%f\n", indent, indent, "ratio, default is ", g_stConfInfo.ratio);
  printf("%s%s\n", indent, "-b");
  printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", g_stConfInfo.batchNumOfRow);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "totalRowsOfPerTbl, default is ", g_stConfInfo.totalRowsOfPerTbl);
  printf("%s%s\n", indent, "-m");
  printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp);
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
P
plum-lihui 已提交
119 120
  printf("%s%s\n", indent, "-sim");
  printf("%s%s%s%d\n", indent, indent, "simCase, default is ", g_stConfInfo.simCase);
121 122 123 124

  exit(EXIT_SUCCESS);
}

L
Liu Jicong 已提交
125
void parseArgument(int32_t argc, char* argv[]) {
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
  g_stConfInfo.startTimestamp = 1640966400000;  // 2020-01-01 00:00:00.000

  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-s") == 0) {
      strcpy(g_stConfInfo.stbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.vnodeWalPath, argv[++i]);
    } else if (strcmp(argv[i], "-f") == 0) {
      strcpy(g_stConfInfo.resultFileName, argv[++i]);
    } else if (strcmp(argv[i], "-t") == 0) {
      g_stConfInfo.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-n") == 0) {
      g_stConfInfo.numOfTables = atoll(argv[++i]);
    } else if (strcmp(argv[i], "-v") == 0) {
      g_stConfInfo.numOfVgroups = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-a") == 0) {
      g_stConfInfo.runMode = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-b") == 0) {
      g_stConfInfo.batchNumOfRow = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]);
L
Liu Jicong 已提交
154
    } else if (strcmp(argv[i], "-l") == 0) {
155 156 157 158 159 160 161
      g_stConfInfo.numOfColumn = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-q") == 0) {
      g_stConfInfo.ratio = atof(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0) {
      g_stConfInfo.startTimestamp = atol(argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
P
plum-lihui 已提交
162 163
    } else if (strcmp(argv[i], "-sim") == 0) {
      g_stConfInfo.simCase = atol(argv[++i]);
164
    } else {
P
plum-lihui 已提交
165
      printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
L
Liu Jicong 已提交
166
      exit(-1);
167 168 169 170 171
    }
  }

  g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio;

P
plum-lihui 已提交
172
#if 0
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
  pPrint("%s resultFileName:%s %s", GREEN, g_stConfInfo.resultFileName, NC);
  pPrint("%s vnodeWalPath:%s %s", GREEN, g_stConfInfo.vnodeWalPath, NC);
  pPrint("%s numOfTables:%d %s", GREEN, g_stConfInfo.numOfTables, NC);
  pPrint("%s numOfThreads:%d %s", GREEN, g_stConfInfo.numOfThreads, NC);
  pPrint("%s numOfVgroups:%d %s", GREEN, g_stConfInfo.numOfVgroups, NC);
  pPrint("%s runMode:%d %s", GREEN, g_stConfInfo.runMode, NC);
  pPrint("%s ratio:%f %s", GREEN, g_stConfInfo.ratio, NC);
  pPrint("%s numOfColumn:%d %s", GREEN, g_stConfInfo.numOfColumn, NC);
  pPrint("%s batchNumOfRow:%d %s", GREEN, g_stConfInfo.batchNumOfRow, NC);
  pPrint("%s totalRowsOfPerTbl:%d %s", GREEN, g_stConfInfo.totalRowsOfPerTbl, NC);
  pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
  pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
L
Liu Jicong 已提交
189
#endif
190 191
}

L
Liu Jicong 已提交
192
static int running = 1;
L
Liu Jicong 已提交
193
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
194 195

// calc dir size (not include itself 4096Byte)
L
Liu Jicong 已提交
196 197 198 199
int64_t getDirectorySize(char* dir) {
  TdDirPtr      pDir;
  TdDirEntryPtr pDirEntry;
  int64_t       totalSize = 0;
200

L
Liu Jicong 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
  if ((pDir = taosOpenDir(dir)) == NULL) {
    fprintf(stderr, "Cannot open dir: %s\n", dir);
    return -1;
  }

  // lstat(dir, &statbuf);
  // totalSize+=statbuf.st_size;

  while ((pDirEntry = taosReadDir(pDir)) != NULL) {
    char  subdir[1024];
    char* fileName = taosGetDirEntryName(pDirEntry);
    sprintf(subdir, "%s/%s", dir, fileName);

    // printf("===d_name: %s\n", entry->d_name);
    if (taosIsDir(subdir)) {
      if (strcmp(".", fileName) == 0 || strcmp("..", fileName) == 0) {
        continue;
      }

      int64_t subDirSize = getDirectorySize(subdir);
      totalSize += subDirSize;
    } else if (0 == strcmp(strchr(fileName, '.'), ".log")) {  // only calc .log file size, and not include .idx file
      int64_t file_size = 0;
      taosStatFile(subdir, &file_size, NULL);
      totalSize += file_size;
226
    }
L
Liu Jicong 已提交
227
  }
228

wafwerar's avatar
wafwerar 已提交
229
  taosCloseDir(&pDir);
L
Liu Jicong 已提交
230
  return totalSize;
231 232
}

L
Liu Jicong 已提交
233 234 235 236 237 238 239 240 241 242
int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
243 244 245 246
}

int32_t init_env() {
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
247

248 249 250 251
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
    return -1;
  }
L
Liu Jicong 已提交
252

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
  sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in create db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  // create row value
wafwerar's avatar
wafwerar 已提交
270
  g_pRowValue = (char*)taosMemoryCalloc(1, g_stConfInfo.numOfColumn * 16 + 128);
271 272 273 274 275 276
  if (NULL == g_pRowValue) {
    return -1;
  }

  int32_t dataLen = 0;
  int32_t sqlLen = 0;
L
Liu Jicong 已提交
277
  sqlLen += sprintf(sqlStr + sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName);
278
  for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) {
L
Liu Jicong 已提交
279 280 281 282 283 284 285 286 287 288 289
    if (i == g_stConfInfo.numOfColumn - 1) {
      sqlLen += sprintf(sqlStr + sqlLen, "c%d int) ", i);
      memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899"));
      dataLen += strlen("66778899");
    } else {
      sqlLen += sprintf(sqlStr + sqlLen, "c%d int, ", i);
      memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, "));
      dataLen += strlen("66778899, ");
    }
  }
  sqlLen += sprintf(sqlStr + sqlLen, "tags (t0 int)");
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307

  pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("failed to create super table %s, reason:%s\n", g_stConfInfo.stbName, taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  for (int32_t i = 0; i < g_stConfInfo.numOfTables; i++) {
    sprintf(sqlStr, "create table if not exists %s%d using %s tags(1)", g_stConfInfo.stbName, i, g_stConfInfo.stbName);
    pRes = taos_query(pConn, sqlStr);
    if (taos_errno(pRes) != 0) {
      printf("failed to create child table %s%d, reason:%s\n", g_stConfInfo.stbName, i, taos_errstr(pRes));
      return -1;
    }
    taos_free_result(pRes);
  }

L
Liu Jicong 已提交
308
  // const char* sql = "select * from tu1";
L
Liu Jicong 已提交
309
  sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
L
Liu Jicong 已提交
310 311
  /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
  pRes = taos_query(pConn, sqlStr);
312 313 314 315 316 317 318 319 320 321
  if (taos_errno(pRes) != 0) {
    printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);
  taos_close(pConn);
  return 0;
}

tmq_t* build_consumer() {
L
Liu Jicong 已提交
322
#if 0
323 324 325 326 327 328 329 330 331 332 333
  char sqlStr[1024] = {0};
  
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
334
#endif
335 336 337

  tmq_conf_t* conf = tmq_conf_new();
  tmq_conf_set(conf, "group.id", "tg2");
L
Liu Jicong 已提交
338 339 340
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
  tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
L
Liu Jicong 已提交
341
  tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
342
  assert(tmq);
L
Liu Jicong 已提交
343
  tmq_conf_destroy(conf);
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
  tmq_list_append(topic_list, "test_stb_topic_1");
  return topic_list;
}

void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
  static const int MIN_COMMIT_COUNT = 1000;

  int            msg_count = 0;
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    return;
  }

  while (running) {
L
Liu Jicong 已提交
365
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1);
366
    if (tmqmessage) {
L
Liu Jicong 已提交
367
      /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
368
      taos_free_result(tmqmessage);
369

L
Liu Jicong 已提交
370
      if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL);
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388
    }
  }

  err = tmq_consumer_close(tmq);
  if (err)
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
  else
    fprintf(stderr, "%% Consumer closed\n");
}

void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
  tmq_resp_err_t err;

  if ((err = tmq_subscribe(tmq, topics))) {
    fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
    printf("subscribe err\n");
    return;
  }
L
Liu Jicong 已提交
389
  /*taosSsleep(3);*/
390 391 392 393
  int32_t batchCnt = 0;
  int32_t skipLogNum = 0;
  int64_t startTime = taosGetTimestampUs();
  while (running) {
L
Liu Jicong 已提交
394
    TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000);
395 396
    if (tmqmessage) {
      batchCnt++;
L
Liu Jicong 已提交
397
      /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
L
Liu Jicong 已提交
398
      if (0 != g_stConfInfo.showMsgFlag) {
L
Liu Jicong 已提交
399
        /*msg_process(tmqmessage);*/
L
Liu Jicong 已提交
400
      }
L
Liu Jicong 已提交
401
      taos_free_result(tmqmessage);
402 403 404 405 406
    } else {
      break;
    }
  }
  int64_t endTime = taosGetTimestampUs();
L
Liu Jicong 已提交
407
  double  consumeTime = (double)(endTime - startTime) / 1000000;
408 409

  if (batchCnt != totalMsgs) {
L
Liu Jicong 已提交
410 411
    printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
    /*exit(-1);*/
412 413
  }

P
plum-lihui 已提交
414 415 416 417 418
  if (0 == g_stConfInfo.simCase) {
    printf("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime);
  } else {
    printf("{consume success: %d}", totalMsgs);
  }
L
Liu Jicong 已提交
419 420 421
  taosFprintfFile(g_fp, "|%10d    |   %10.3f    |  %8.2f  |  %10.2f|    %10.2f    |\n", batchCnt, consumeTime,
                  (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime,
                  (double)walLogSize / 1024.0 / batchCnt);
422 423 424 425

  err = tmq_consumer_close(tmq);
  if (err) {
    fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
426
    exit(-1);
427 428 429 430 431 432 433
  }
}

// sync insertion
int32_t syncWriteData() {
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
L
Liu Jicong 已提交
434
    return -1;
435 436 437 438 439 440 441 442 443 444 445 446
  }

  char sqlStr[1024] = {0};
  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  char* buffer = NULL;
wafwerar's avatar
wafwerar 已提交
447
  buffer = (char*)taosMemoryMalloc(MAX_SQL_STR_LEN);
448 449 450 451 452
  if (NULL == buffer) {
    return -1;
  }

  int32_t totalMsgs = 0;
L
Liu Jicong 已提交
453

454 455 456
  int64_t time_counter = g_stConfInfo.startTimestamp;
  for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) {
    for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
L
Liu Jicong 已提交
457
      int     inserted = i;
458 459 460 461 462 463 464 465 466 467 468
      int64_t tmp_time = time_counter;

      int32_t data_len = 0;
      data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
      int k;
      for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
        data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
        inserted++;
        k++;

        if (inserted >= g_stConfInfo.totalRowsOfPerTbl) {
L
Liu Jicong 已提交
469
          break;
470 471
        }

L
Liu Jicong 已提交
472
        if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
473
          break;
L
Liu Jicong 已提交
474
        }
475 476 477
      }

      int code = queryDB(pConn, buffer);
L
Liu Jicong 已提交
478
      if (0 != code) {
479
        fprintf(stderr, "insert data error!\n");
L
Liu Jicong 已提交
480 481 482
        taosMemoryFreeClear(buffer);
        return -1;
      }
483

L
Liu Jicong 已提交
484
      totalMsgs++;
485 486 487 488 489 490 491

      if (tID == g_stConfInfo.numOfTables - 1) {
        i = inserted;
        time_counter = tmp_time;
      }
    }
  }
wafwerar's avatar
wafwerar 已提交
492
  taosMemoryFreeClear(buffer);
493 494 495 496 497 498 499
  return totalMsgs;
}

// sync insertion
int32_t syncWriteDataByRatio() {
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pConn == NULL) {
L
Liu Jicong 已提交
500
    return -1;
501 502 503 504 505 506 507 508 509 510 511 512
  }

  char sqlStr[1024] = {0};
  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
    return -1;
  }
  taos_free_result(pRes);

  char* buffer = NULL;
wafwerar's avatar
wafwerar 已提交
513
  buffer = (char*)taosMemoryMalloc(MAX_SQL_STR_LEN);
514 515 516 517 518 519 520
  if (NULL == buffer) {
    return -1;
  }

  int32_t totalMsgs = 0;

  int32_t insertedOfT1 = 0;
L
Liu Jicong 已提交
521
  int32_t insertedOfT2 = 0;
522 523 524 525

  int64_t tsOfT1 = g_stConfInfo.startTimestamp;
  int64_t tsOfT2 = g_stConfInfo.startTimestamp;
  int64_t tmp_time;
L
Liu Jicong 已提交
526

527
  for (;;) {
L
Liu Jicong 已提交
528
    if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) {
529
      break;
L
Liu Jicong 已提交
530 531
    }

532 533
    for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
      if (0 == tID) {
L
Liu Jicong 已提交
534
        tmp_time = tsOfT1;
535
        if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
L
Liu Jicong 已提交
536
          continue;
537
        }
L
Liu Jicong 已提交
538 539
      } else if (1 == tID) {
        tmp_time = tsOfT2;
540
        if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
L
Liu Jicong 已提交
541
          continue;
542 543 544 545 546 547 548 549 550
        }
      }

      int32_t data_len = 0;
      data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
      int k;
      for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
        data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
        k++;
L
Liu Jicong 已提交
551
        if (0 == tID) {
552
          insertedOfT1++;
L
Liu Jicong 已提交
553 554 555 556
          if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
            break;
          }
        } else if (1 == tID) {
557
          insertedOfT2++;
L
Liu Jicong 已提交
558 559 560 561
          if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
            break;
          }
        }
562

L
Liu Jicong 已提交
563
        if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
564
          break;
L
Liu Jicong 已提交
565
        }
566 567 568
      }

      int code = queryDB(pConn, buffer);
L
Liu Jicong 已提交
569
      if (0 != code) {
570
        fprintf(stderr, "insert data error!\n");
L
Liu Jicong 已提交
571 572 573 574
        taosMemoryFreeClear(buffer);
        return -1;
      }

575
      if (0 == tID) {
L
Liu Jicong 已提交
576 577 578
        tsOfT1 = tmp_time;
      } else if (1 == tID) {
        tsOfT2 = tmp_time;
579 580
      }

L
Liu Jicong 已提交
581
      totalMsgs++;
582 583
    }
  }
L
Liu Jicong 已提交
584 585
  pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl,
         g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2);
wafwerar's avatar
wafwerar 已提交
586
  taosMemoryFreeClear(buffer);
587 588 589 590 591
  return totalMsgs;
}

void printParaIntoFile() {
  // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
L
Liu Jicong 已提交
592 593
  TdFilePtr pFile =
      taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
594 595
  if (NULL == pFile) {
    fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName);
wafwerar's avatar
wafwerar 已提交
596 597
    exit(-1);
  }
598 599
  g_fp = pFile;

L
Liu Jicong 已提交
600
  time_t    tTime = taosGetTimestampSec();
601
  struct tm tm = *taosLocalTime(&tTime, NULL);
602 603

  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
604 605 606 607 608 609 610 611 612 613 614 615 616
  taosFprintfFile(pFile, "# configDir:                %s\n", configDir);
  taosFprintfFile(pFile, "# dbName:                   %s\n", g_stConfInfo.dbName);
  taosFprintfFile(pFile, "# stbName:                  %s\n", g_stConfInfo.stbName);
  taosFprintfFile(pFile, "# vnodeWalPath:             %s\n", g_stConfInfo.vnodeWalPath);
  taosFprintfFile(pFile, "# numOfTables:              %d\n", g_stConfInfo.numOfTables);
  taosFprintfFile(pFile, "# numOfThreads:             %d\n", g_stConfInfo.numOfThreads);
  taosFprintfFile(pFile, "# numOfVgroups:             %d\n", g_stConfInfo.numOfVgroups);
  taosFprintfFile(pFile, "# runMode:                  %d\n", g_stConfInfo.runMode);
  taosFprintfFile(pFile, "# ratio:                    %f\n", g_stConfInfo.ratio);
  taosFprintfFile(pFile, "# numOfColumn:              %d\n", g_stConfInfo.numOfColumn);
  taosFprintfFile(pFile, "# batchNumOfRow:            %d\n", g_stConfInfo.batchNumOfRow);
  taosFprintfFile(pFile, "# totalRowsOfPerTbl:        %d\n", g_stConfInfo.totalRowsOfPerTbl);
  taosFprintfFile(pFile, "# totalRowsOfT2:            %d\n", g_stConfInfo.totalRowsOfT2);
617
  taosFprintfFile(pFile, "# Test time:                %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
L
Liu Jicong 已提交
618
                  tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
619
  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
620 621 622 623 624 625 626
  taosFprintfFile(pFile,
                  "|-------------------------------insert "
                  "info-----------------------------|--------------------------------consume "
                  "info---------------------------------|\n");
  taosFprintfFile(pFile,
                  "|batch size| insert msgs | insert time(s) |   msgs/s   | walLogSize(MB) | consume msgs | consume "
                  "time(s) |   msgs/s   |    MB/s    | avg msg size(KB) |\n");
627 628 629
  taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow);
}

L
Liu Jicong 已提交
630
int main(int32_t argc, char* argv[]) {
631 632 633 634 635 636 637 638 639
  parseArgument(argc, argv);
  printParaIntoFile();

  int64_t walLogSize = 0;

  int code;
  code = init_env();
  if (code != 0) {
    fprintf(stderr, "%% init_env error!\n");
L
Liu Jicong 已提交
640
    return -1;
641 642 643 644 645 646 647
  }

  int32_t totalMsgs = 0;

  if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) {
    int64_t startTs = taosGetTimestampUs();
    if (1 == g_stConfInfo.ratio) {
L
Liu Jicong 已提交
648
      totalMsgs = syncWriteData();
649
    } else {
L
Liu Jicong 已提交
650 651 652
      totalMsgs = syncWriteDataByRatio();
    }

653
    if (totalMsgs <= 0) {
L
Liu Jicong 已提交
654
      pError("inset data error!\n");
655
      return -1;
P
plum-lihui 已提交
656
    }
L
Liu Jicong 已提交
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
    int64_t endTs = taosGetTimestampUs();
    int64_t delay = endTs - startTs;

    int32_t totalRows = 0;
    if (1 == g_stConfInfo.ratio) {
      totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables;
    } else {
      totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio);
    }

    float seconds = delay / 1000000.0;
    float rowsSpeed = totalRows / seconds;
    float msgsSpeed = totalMsgs / seconds;

    if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) {
      walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
      if (walLogSize <= 0) {
        printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);
        exit(-1);
      } else {
        pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize / (1024 * 1024.0));
      }
    }

    if (0 == g_stConfInfo.simCase) {
      pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows,
             totalMsgs, seconds, rowsSpeed, msgsSpeed);
    }
    taosFprintfFile(g_fp, "|%10d   |   %10.3f   |  %8.2f  |   %10.3f   ", totalMsgs, seconds, msgsSpeed,
                    (double)walLogSize / (1024 * 1024.0));
687 688 689 690 691
  }

  if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) {
    return 0;
  }
L
Liu Jicong 已提交
692 693

  tmq_t*      tmq = build_consumer();
694
  tmq_list_t* topic_list = build_topic_list();
L
Liu Jicong 已提交
695
  if ((NULL == tmq) || (NULL == topic_list)) {
696 697
    return -1;
  }
L
Liu Jicong 已提交
698

699 700
  perf_loop(tmq, topic_list, totalMsgs, walLogSize);

wafwerar's avatar
wafwerar 已提交
701
  taosMemoryFreeClear(g_pRowValue);
L
Liu Jicong 已提交
702 703
  taosFprintfFile(g_fp, "\n");
  taosCloseFile(&g_fp);
704 705 706
  return 0;
}