tmqSim.c 29.8 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>

#include "taos.h"
P
plum-lihui 已提交
25
#include "taosdef.h"
P
plum-lihui 已提交
26 27
#include "taoserror.h"
#include "tlog.h"
P
plum-lihui 已提交
28
#include "types.h"
P
plum-lihui 已提交
29 30 31 32 33 34 35 36

#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))

#define MAX_SQL_STR_LEN         (1024 * 1024)
#define MAX_ROW_STR_LEN         (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
P
plum-lihui 已提交
37
#define MAX_VGROUP_CNT          (32)
P
plum-lihui 已提交
38

39 40 41 42 43
typedef enum {
  NOTIFY_CMD_START_CONSUM,
  NOTIFY_CMD_START_COMMIT,
  NOTIFY_CMD_ID_BUTT,
} NOTIFY_CMD_ID;
44

P
plum-lihui 已提交
45 46 47 48
typedef struct {
  TdThread thread;
  int32_t  consumerId;

L
Liu Jicong 已提交
49 50 51 52
  int32_t ifManualCommit;
  // int32_t  autoCommitIntervalMs;  // 1000 ms
  // char     autoCommit[8];         // true, false
  // char     autoOffsetRest[16];    // none, earliest, latest
P
plum-lihui 已提交
53

P
plum-lihui 已提交
54
  TdFilePtr pConsumeRowsFile;
P
plum-lihui 已提交
55 56
  int32_t   ifCheckData;
  int64_t   expectMsgCnt;
P
plum-lihui 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

  int64_t consumeMsgCnt;
  int64_t consumeRowCnt;
  int32_t checkresult;

  char topicString[1024];
  char keyString[1024];

  int32_t numOfTopic;
  char    topics[32][64];

  int32_t numOfKey;
  char    key[32][64];
  char    value[32][64];

  tmq_t*      tmq;
  tmq_list_t* topicList;
L
Liu Jicong 已提交
74

P
plum-lihui 已提交
75 76 77
  int32_t numOfVgroups;
  int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2];  // [i][0]: vgroup id, [i][1]: rows of consume
  int64_t ts;
P
plum-lihui 已提交
78

79 80
  TAOS* taos;

P
plum-lihui 已提交
81 82 83 84 85 86 87
} SThreadInfo;

typedef struct {
  // input from argvs
  char        cdbName[32];
  char        dbName[32];
  int32_t     showMsgFlag;
L
Liu Jicong 已提交
88
  int32_t     showRowFlag;
P
plum-lihui 已提交
89
  int32_t     saveRowFlag;
P
plum-lihui 已提交
90 91
  int32_t     consumeDelay;  // unit s
  int32_t     numOfThread;
P
plum-lihui 已提交
92
  int32_t     useSnapshot;
93
  int64_t     nowTime;
P
plum-lihui 已提交
94 95 96 97 98
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;

static SConfInfo g_stConfInfo;
TdFilePtr        g_fp = NULL;
P
plum-lihui 已提交
99
static int       running = 1;
P
plum-lihui 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
116 117
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
118 119 120 121 122
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
  exit(EXIT_SUCCESS);
}

P
plum-lihui 已提交
123
char* getCurrentTimeString(char* timeString) {
L
Liu Jicong 已提交
124
  time_t    tTime = taosGetTimestampSec();
P
plum-lihui 已提交
125
  struct tm tm = *taosLocalTime(&tTime, NULL);
L
Liu Jicong 已提交
126 127
  sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
          tm.tm_min, tm.tm_sec);
P
plum-lihui 已提交
128 129 130 131

  return timeString;
}

132
static void tmqStop(int signum, void* info, void* ctx) {
P
plum-lihui 已提交
133 134
  running = 0;
  char tmpString[128];
135
  taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
P
plum-lihui 已提交
136 137
}

138
static void tmqSetSignalHandle() { taosSetSignal(SIGINT, tmqStop); }
P
plum-lihui 已提交
139

P
plum-lihui 已提交
140
void initLogFile() {
P
plum-lihui 已提交
141
  char filename[256];
L
Liu Jicong 已提交
142
  char tmpString[128];
P
plum-lihui 已提交
143

P
plum-lihui 已提交
144 145 146
  pid_t process_id = getpid();

  sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
wafwerar's avatar
wafwerar 已提交
147 148 149 150 151 152
#ifdef WINDOWS
  for (int i = 2; i < sizeof(filename); i++) {
    if (filename[i] == ':') filename[i] = '-';
    if (filename[i] == '\0') break;
  }
#endif
153
  TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
P
plum-lihui 已提交
154
  if (NULL == pFile) {
155
    fprintf(stderr, "Failed to open %s for save result\n", filename);
P
plum-lihui 已提交
156 157 158 159 160 161 162 163 164 165 166 167
    exit(-1);
  }
  g_fp = pFile;
}

void saveConfigToLogFile() {
  taosFprintfFile(g_fp, "###################################################################\n");
  taosFprintfFile(g_fp, "# configDir:           %s\n", configDir);
  taosFprintfFile(g_fp, "# dbName:              %s\n", g_stConfInfo.dbName);
  taosFprintfFile(g_fp, "# cdbName:             %s\n", g_stConfInfo.cdbName);
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
168
  taosFprintfFile(g_fp, "# saveRowFlag:         %d\n", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
169 170 171 172 173
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
  taosFprintfFile(g_fp, "# numOfThread:         %d\n", g_stConfInfo.numOfThread);

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
L
Liu Jicong 已提交
174 175 176
    // taosFprintfFile(g_fp, "  auto commit:              %s\n", g_stConfInfo.stThreads[i].autoCommit);
    // taosFprintfFile(g_fp, "  auto commit interval ms:  %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
    // taosFprintfFile(g_fp, "  auto offset rest:         %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
P
plum-lihui 已提交
177 178 179 180 181 182 183 184 185 186
    taosFprintfFile(g_fp, "  Topics: ");
    for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
      taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
    }
    taosFprintfFile(g_fp, "\n");
    taosFprintfFile(g_fp, "  Key: ");
    for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
      taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
    }
    taosFprintfFile(g_fp, "\n");
P
plum-lihui 已提交
187
    taosFprintfFile(g_fp, "  expect rows: %d\n", g_stConfInfo.stThreads[i].expectMsgCnt);
P
plum-lihui 已提交
188 189
  }

P
plum-lihui 已提交
190 191
  char tmpString[128];
  taosFprintfFile(g_fp, "# Test time:                %s\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
192 193 194 195 196 197 198
  taosFprintfFile(g_fp, "###################################################################\n");
}

void parseArgument(int32_t argc, char* argv[]) {
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
  g_stConfInfo.showMsgFlag = 0;
  g_stConfInfo.showRowFlag = 0;
P
plum-lihui 已提交
199
  g_stConfInfo.saveRowFlag = 0;
P
plum-lihui 已提交
200 201
  g_stConfInfo.consumeDelay = 5;

202 203
  g_stConfInfo.nowTime = taosGetTimestampMs();

P
plum-lihui 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217
  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.cdbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.showRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
218 219
    } else if (strcmp(argv[i], "-s") == 0) {
      g_stConfInfo.saveRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
220 221
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
P
plum-lihui 已提交
222 223
    } else if (strcmp(argv[i], "-e") == 0) {
      g_stConfInfo.useSnapshot = atol(argv[++i]);
P
plum-lihui 已提交
224
    } else {
P
plum-lihui 已提交
225
      pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
P
plum-lihui 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
      exit(-1);
    }
  }

  initLogFile();

  taosFprintfFile(g_fp, "====parseArgument() success\n");

#if 1
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
P
plum-lihui 已提交
241
  pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
P
plum-lihui 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
#endif
}

void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
}

P
plum-lihui 已提交
267 268 269 270 271
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
  int32_t i;
  for (i = 0; i < pInfo->numOfVgroups; i++) {
    if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
      pInfo->rowsOfPerVgroups[i][1] += rows;
L
Liu Jicong 已提交
272 273
      return;
    }
P
plum-lihui 已提交
274 275 276 277 278
  }

  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
  pInfo->numOfVgroups++;
L
Liu Jicong 已提交
279

P
plum-lihui 已提交
280 281
  taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
  if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
L
Liu Jicong 已提交
282 283
    taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
                    pInfo->numOfVgroups, vgroupId);
P
plum-lihui 已提交
284 285 286 287 288 289
    taosCloseFile(&g_fp);
    exit(-1);
  }
}

int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
P
plum-lihui 已提交
290 291 292 293 294 295 296
  char sqlStr[1100] = {0};

  if (strlen(buf) > 1024) {
    taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
    taosCloseFile(&g_fp);
    exit(-1);
  }
P
plum-lihui 已提交
297 298 299 300

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

L
Liu Jicong 已提交
301 302
  sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
          pInfo->ts++, buf);
P
plum-lihui 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

P
plum-lihui 已提交
317 318 319 320 321
static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
  // if (shell.args.is_raw_time) {
  //   sprintf(buf, "%" PRId64, val);
  //   return buf;
  // }
P
plum-lihui 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358

  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /*
    comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
  */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

P
plum-lihui 已提交
359
  struct tm* ptm = taosLocalTime(&tt, NULL);
P
plum-lihui 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}

P
plum-lihui 已提交
373 374
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
                                 int32_t precision) {
P
plum-lihui 已提交
375 376 377 378 379 380 381 382 383
  if (val == NULL) {
    taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
    return;
  }

  int  n;
  char buf[TSDB_MAX_BYTES_PER_ROW];
  switch (field->type) {
    case TSDB_DATA_TYPE_BOOL:
P
plum-lihui 已提交
384
      taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
P
plum-lihui 已提交
385 386
      break;
    case TSDB_DATA_TYPE_TINYINT:
P
plum-lihui 已提交
387
      taosFprintfFile(pFile, "%d", *((int8_t*)val));
P
plum-lihui 已提交
388 389
      break;
    case TSDB_DATA_TYPE_UTINYINT:
P
plum-lihui 已提交
390
      taosFprintfFile(pFile, "%u", *((uint8_t*)val));
P
plum-lihui 已提交
391 392
      break;
    case TSDB_DATA_TYPE_SMALLINT:
P
plum-lihui 已提交
393
      taosFprintfFile(pFile, "%d", *((int16_t*)val));
P
plum-lihui 已提交
394 395
      break;
    case TSDB_DATA_TYPE_USMALLINT:
P
plum-lihui 已提交
396
      taosFprintfFile(pFile, "%u", *((uint16_t*)val));
P
plum-lihui 已提交
397 398
      break;
    case TSDB_DATA_TYPE_INT:
P
plum-lihui 已提交
399
      taosFprintfFile(pFile, "%d", *((int32_t*)val));
P
plum-lihui 已提交
400 401
      break;
    case TSDB_DATA_TYPE_UINT:
P
plum-lihui 已提交
402
      taosFprintfFile(pFile, "%u", *((uint32_t*)val));
P
plum-lihui 已提交
403 404
      break;
    case TSDB_DATA_TYPE_BIGINT:
P
plum-lihui 已提交
405
      taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
P
plum-lihui 已提交
406 407
      break;
    case TSDB_DATA_TYPE_UBIGINT:
P
plum-lihui 已提交
408
      taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
P
plum-lihui 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
      break;
    case TSDB_DATA_TYPE_FLOAT:
      taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
      if (n > TMAX(25, length)) {
        taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
      } else {
        taosFprintfFile(pFile, "%s", buf);
      }
      break;
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
    case TSDB_DATA_TYPE_JSON:
      memcpy(buf, val, length);
      buf[length] = 0;
      taosFprintfFile(pFile, "\'%s\'", buf);
      break;
    case TSDB_DATA_TYPE_TIMESTAMP:
P
plum-lihui 已提交
429
      shellFormatTimestamp(buf, *(int64_t*)val, precision);
P
plum-lihui 已提交
430 431 432 433 434 435 436
      taosFprintfFile(pFile, "'%s'", buf);
      break;
    default:
      break;
  }
}

P
plum-lihui 已提交
437 438
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
                               int32_t precision) {
P
plum-lihui 已提交
439 440
  for (int32_t i = 0; i < num_fields; i++) {
    if (i > 0) {
441
      taosFprintfFile(pFile, ",");
P
plum-lihui 已提交
442
    }
P
plum-lihui 已提交
443
    shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
P
plum-lihui 已提交
444 445 446 447
  }
  taosFprintfFile(pFile, "\n");
}

P
plum-lihui 已提交
448
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
P
plum-lihui 已提交
449 450
  char    buf[1024];
  int32_t totalRows = 0;
L
Liu Jicong 已提交
451

P
plum-lihui 已提交
452
  // printf("topic: %s\n", tmq_get_topic_name(msg));
P
plum-lihui 已提交
453 454
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
  const char* dbName = tmq_get_db_name(msg);
L
Liu Jicong 已提交
455

P
plum-lihui 已提交
456
  taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
P
plum-lihui 已提交
457 458
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
                  tmq_get_topic_name(msg), vgroupId);
P
plum-lihui 已提交
459 460 461

  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
P
plum-lihui 已提交
462

L
Liu Jicong 已提交
463
    if (row == NULL) break;
P
plum-lihui 已提交
464

P
plum-lihui 已提交
465
    TAOS_FIELD* fields = taos_fetch_fields(msg);
P
plum-lihui 已提交
466
    int32_t     numOfFields = taos_field_count(msg);
P
plum-lihui 已提交
467 468 469
    int32_t*    length = taos_fetch_lengths(msg);
    int32_t     precision = taos_result_precision(msg);
    const char* tbName = tmq_get_table_name(msg);
L
Liu Jicong 已提交
470

471
#if 0
P
plum-lihui 已提交
472 473 474 475 476 477
	// get schema
	//============================== stub =================================================//
	for (int32_t i = 0; i < numOfFields; i++) {
	  taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
	}
	//============================== stub =================================================//
478
#endif
P
plum-lihui 已提交
479

480
    dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
P
plum-lihui 已提交
481

P
plum-lihui 已提交
482
    taos_print_row(buf, row, fields, numOfFields);
L
Liu Jicong 已提交
483 484

    if (0 != g_stConfInfo.showRowFlag) {
L
Liu Jicong 已提交
485
      taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
P
plum-lihui 已提交
486 487 488
      // if (0 != g_stConfInfo.saveRowFlag) {
      //   saveConsumeContentToTbl(pInfo, buf);
      // }
P
plum-lihui 已提交
489
    }
L
Liu Jicong 已提交
490

P
plum-lihui 已提交
491 492 493
    totalRows++;
  }

P
plum-lihui 已提交
494
  addRowsToVgroupId(pInfo, vgroupId, totalRows);
L
Liu Jicong 已提交
495

P
plum-lihui 已提交
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
  return totalRows;
}

int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
}

P
plum-lihui 已提交
511
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
512 513 514 515 516

int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
  char sqlStr[1024] = {0};

  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
517
  sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId,
P
plum-lihui 已提交
518
          pInfo->consumerId);
519 520 521 522 523 524 525 526 527

  taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);

  taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);

  return 0;
}

static int32_t g_once_commit_flag = 0;
P
plum-lihui 已提交
528
static void    tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
529
  pError("tmq_commit_cb_print() commit %d\n", code);
530

531 532 533
  if (0 == g_once_commit_flag) {
    g_once_commit_flag = 1;
    notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
534
  }
535

536 537
  char tmpString[128];
  taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
538 539 540 541 542 543 544 545 546 547
}

void build_consumer(SThreadInfo* pInfo) {
  tmq_conf_t* conf = tmq_conf_new();

  // tmq_conf_set(conf, "td.connect.ip", "localhost");
  // tmq_conf_set(conf, "td.connect.port", "6030");
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");

L
Liu Jicong 已提交
548
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
P
plum-lihui 已提交
549

550
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
P
plum-lihui 已提交
551 552 553 554 555 556

  // tmq_conf_set(conf, "group.id", "cgrp1");
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
  }

557 558
  tmq_conf_set(conf, "msg.with.table.name", "true");

P
plum-lihui 已提交
559 560 561 562 563 564 565 566 567 568
  // tmq_conf_set(conf, "client.id", "c-001");

  // tmq_conf_set(conf, "enable.auto.commit", "true");
  // tmq_conf_set(conf, "enable.auto.commit", "false");

  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");

  // tmq_conf_set(conf, "auto.offset.reset", "none");
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
  // tmq_conf_set(conf, "auto.offset.reset", "latest");
P
plum-lihui 已提交
569 570
  //
  if (g_stConfInfo.useSnapshot) {
L
Liu Jicong 已提交
571
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
P
plum-lihui 已提交
572
  }
P
plum-lihui 已提交
573 574 575 576

  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);

  tmq_conf_destroy(conf);
L
Liu Jicong 已提交
577

P
plum-lihui 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
  return;
}

void build_topic_list(SThreadInfo* pInfo) {
  pInfo->topicList = tmq_list_new();
  // tmq_list_append(topic_list, "test_stb_topic_1");
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
  }
  return;
}

int32_t saveConsumeResult(SThreadInfo* pInfo) {
  char sqlStr[1024] = {0};
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
L
Liu Jicong 已提交
593
  sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
594
          g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
595
          pInfo->consumeRowCnt, pInfo->checkresult);
P
plum-lihui 已提交
596

P
plum-lihui 已提交
597
  char tmpString[128];
L
Liu Jicong 已提交
598
  taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
L
Liu Jicong 已提交
599

600
  TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
P
plum-lihui 已提交
601
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
602
    pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
603 604 605 606 607 608 609 610 611 612
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

void loop_consume(SThreadInfo* pInfo) {
L
Liu Jicong 已提交
613
  int32_t code;
P
plum-lihui 已提交
614

615 616
  int32_t once_flag = 0;

P
plum-lihui 已提交
617 618 619
  int64_t totalMsgs = 0;
  int64_t totalRows = 0;

P
plum-lihui 已提交
620
  char tmpString[128];
L
Liu Jicong 已提交
621 622
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
                  pInfo->consumerId);
P
plum-lihui 已提交
623

P
plum-lihui 已提交
624
  pInfo->ts = taosGetTimestampMs();
P
plum-lihui 已提交
625

P
plum-lihui 已提交
626
  if (pInfo->ifCheckData) {
P
plum-lihui 已提交
627
    char filename[256] = {0};
P
plum-lihui 已提交
628
    char tmpString[128];
P
plum-lihui 已提交
629 630 631
    // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
    // getCurrentTimeString(tmpString));
    sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
P
plum-lihui 已提交
632 633 634 635 636 637
    pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
    if (pInfo->pConsumeRowsFile == NULL) {
      taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
      return;
    }
  }
P
plum-lihui 已提交
638

639 640 641
  int64_t  lastTotalMsgs = 0;
  uint64_t lastPrintTime = taosGetTimestampMs();
  uint64_t startTs = taosGetTimestampMs();
P
plum-lihui 已提交
642

P
plum-lihui 已提交
643
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
P
plum-lihui 已提交
644
  while (running) {
P
plum-lihui 已提交
645
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
P
plum-lihui 已提交
646 647
    if (tmqMsg) {
      if (0 != g_stConfInfo.showMsgFlag) {
P
plum-lihui 已提交
648
        totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
P
plum-lihui 已提交
649 650 651 652 653
      }

      taos_free_result(tmqMsg);

      totalMsgs++;
654 655 656 657 658 659 660 661 662 663

      int64_t currentPrintTime = taosGetTimestampMs();
      if (currentPrintTime - lastPrintTime > 10 * 1000) {
        taosFprintfFile(
            g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
            pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / (currentPrintTime - lastPrintTime));
        lastPrintTime = currentPrintTime;
        lastTotalMsgs = totalMsgs;
      }

P
plum-lihui 已提交
664
      if (0 == once_flag) {
665
        once_flag = 1;
P
plum-lihui 已提交
666 667
        notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
      }
668

P
plum-lihui 已提交
669
      if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
L
Liu Jicong 已提交
670
        char tmpString[128];
P
plum-lihui 已提交
671
        taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
672 673
        break;
      }
L
Liu Jicong 已提交
674
    } else {
P
plum-lihui 已提交
675 676
      char tmpString[128];
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
677 678 679
      break;
    }
  }
680

P
plum-lihui 已提交
681 682 683 684
  if (0 == running) {
    taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
  }

P
plum-lihui 已提交
685 686 687 688 689 690 691 692 693 694
  pInfo->consumeMsgCnt = totalMsgs;
  pInfo->consumeRowCnt = totalRows;

  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
                  pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
}

void* consumeThreadFunc(void* param) {
  SThreadInfo* pInfo = (SThreadInfo*)param;

695 696 697
  pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pInfo->taos == NULL) {
    taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
698
    ASSERT(0);
699
    return NULL;
700 701
  }

P
plum-lihui 已提交
702 703 704
  build_consumer(pInfo);
  build_topic_list(pInfo);
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
705
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
D
dapan1121 已提交
706
    assert(0);
P
plum-lihui 已提交
707 708 709
    return NULL;
  }

L
Liu Jicong 已提交
710 711
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
  if (err != 0) {
P
plum-lihui 已提交
712
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
713
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
714 715
    assert(0);
    return NULL;
P
plum-lihui 已提交
716
  }
L
Liu Jicong 已提交
717

P
plum-lihui 已提交
718 719 720 721 722
  tmq_list_destroy(pInfo->topicList);
  pInfo->topicList = NULL;

  loop_consume(pInfo);

P
plum-lihui 已提交
723
  if (pInfo->ifManualCommit) {
L
Liu Jicong 已提交
724 725 726
    pPrint("tmq_commit() manual commit when consume end.\n");
    /*tmq_commit(pInfo->tmq, NULL, 0);*/
    tmq_commit_sync(pInfo->tmq, NULL);
L
Liu Jicong 已提交
727
    taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
L
Liu Jicong 已提交
728
    pPrint("tmq_commit() manual commit over.\n");
P
plum-lihui 已提交
729
  }
L
Liu Jicong 已提交
730

P
plum-lihui 已提交
731
  err = tmq_unsubscribe(pInfo->tmq);
L
Liu Jicong 已提交
732
  if (err != 0) {
P
plum-lihui 已提交
733
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
734
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
735
  }
L
Liu Jicong 已提交
736

P
plum-lihui 已提交
737
  err = tmq_consumer_close(pInfo->tmq);
L
Liu Jicong 已提交
738
  if (err != 0) {
P
plum-lihui 已提交
739
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
740
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
741 742 743 744 745 746
  }
  pInfo->tmq = NULL;

  // save consume result into consumeresult table
  saveConsumeResult(pInfo);

P
plum-lihui 已提交
747 748 749 750 751 752
  // save rows from per vgroup
  taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
  for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
    taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
  }

753 754 755
  taos_close(pInfo->taos);
  pInfo->taos = NULL;

P
plum-lihui 已提交
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
  return NULL;
}

void parseConsumeInfo() {
  char*      token;
  const char delim[2] = ",";
  const char ch = ':';

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
      g_stConfInfo.stThreads[i].numOfTopic++;

      token = strtok(NULL, delim);
    }

    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      {
        char* pstr = token;
        ltrim(pstr);
        char* ret = strchr(pstr, ch);
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
        strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
        g_stConfInfo.stThreads[i].numOfKey++;
      }

      token = strtok(NULL, delim);
    }
  }
}

int32_t getConsumeInfo() {
  char sqlStr[1024] = {0};

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
804
    pError("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
    taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  TAOS_ROW    row = NULL;
  int         num_fields = taos_num_fields(pRes);
  TAOS_FIELD* fields = taos_fetch_fields(pRes);

  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
  // ifcheckdata int

  int32_t numOfThread = 0;
  while ((row = taos_fetch_row(pRes))) {
    int32_t* lengths = taos_fetch_lengths(pRes);

    // set default value
L
Liu Jicong 已提交
823 824 825
    // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
P
plum-lihui 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841

    for (int i = 0; i < num_fields; ++i) {
      if (row[i] == NULL || 0 == i) {
        continue;
      }

      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
P
plum-lihui 已提交
842 843
      } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
P
plum-lihui 已提交
844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
      }
    }
    numOfThread++;
  }
  g_stConfInfo.numOfThread = numOfThread;

  taos_free_result(pRes);

  parseConsumeInfo();

  return 0;
}

int main(int32_t argc, char* argv[]) {
  parseArgument(argc, argv);
  getConsumeInfo();
  saveConfigToLogFile();

P
plum-lihui 已提交
862 863
  tmqSetSignalHandle();

P
plum-lihui 已提交
864 865 866 867 868 869 870 871 872 873 874
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);

  // pthread_create one thread to consume
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
                     (void*)(&(g_stConfInfo.stThreads[i])));
  }

P
plum-lihui 已提交
875 876
  int64_t start = taosGetTimestampUs();

P
plum-lihui 已提交
877 878
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
879
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
P
plum-lihui 已提交
880 881
  }

P
plum-lihui 已提交
882 883 884 885 886 887 888 889 890
  int64_t end = taosGetTimestampUs();

  int64_t totalMsgs = 0;
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
  }

  int64_t t = end - start;
  if (0 == t) t = 1;
891

P
plum-lihui 已提交
892 893
  double tInMs = (double)t / 1000000.0;
  taosFprintfFile(g_fp,
894 895
                  "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
                  tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
P
plum-lihui 已提交
896 897 898 899 900 901

  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
  taosCloseFile(&g_fp);

  return 0;
}