tmqSim.c 29.8 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>

#include "taos.h"
P
plum-lihui 已提交
25
#include "taosdef.h"
P
plum-lihui 已提交
26 27
#include "taoserror.h"
#include "tlog.h"
P
plum-lihui 已提交
28
#include "types.h"
P
plum-lihui 已提交
29 30 31 32 33 34 35 36

#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))

#define MAX_SQL_STR_LEN         (1024 * 1024)
#define MAX_ROW_STR_LEN         (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
P
plum-lihui 已提交
37
#define MAX_VGROUP_CNT          (32)
P
plum-lihui 已提交
38

39
int64_t now;
40 41 42 43 44
typedef enum {
  NOTIFY_CMD_START_CONSUM,
  NOTIFY_CMD_START_COMMIT,
  NOTIFY_CMD_ID_BUTT,
} NOTIFY_CMD_ID;
45

P
plum-lihui 已提交
46 47 48 49
typedef struct {
  TdThread thread;
  int32_t  consumerId;

L
Liu Jicong 已提交
50 51 52 53
  int32_t ifManualCommit;
  // int32_t  autoCommitIntervalMs;  // 1000 ms
  // char     autoCommit[8];         // true, false
  // char     autoOffsetRest[16];    // none, earliest, latest
P
plum-lihui 已提交
54

P
plum-lihui 已提交
55
  TdFilePtr pConsumeRowsFile;
P
plum-lihui 已提交
56 57
  int32_t   ifCheckData;
  int64_t   expectMsgCnt;
P
plum-lihui 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74

  int64_t consumeMsgCnt;
  int64_t consumeRowCnt;
  int32_t checkresult;

  char topicString[1024];
  char keyString[1024];

  int32_t numOfTopic;
  char    topics[32][64];

  int32_t numOfKey;
  char    key[32][64];
  char    value[32][64];

  tmq_t*      tmq;
  tmq_list_t* topicList;
L
Liu Jicong 已提交
75

P
plum-lihui 已提交
76 77 78
  int32_t numOfVgroups;
  int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2];  // [i][0]: vgroup id, [i][1]: rows of consume
  int64_t ts;
P
plum-lihui 已提交
79

80 81
  TAOS* taos;

P
plum-lihui 已提交
82 83 84 85 86 87 88
} SThreadInfo;

typedef struct {
  // input from argvs
  char        cdbName[32];
  char        dbName[32];
  int32_t     showMsgFlag;
L
Liu Jicong 已提交
89
  int32_t     showRowFlag;
P
plum-lihui 已提交
90
  int32_t     saveRowFlag;
P
plum-lihui 已提交
91 92
  int32_t     consumeDelay;  // unit s
  int32_t     numOfThread;
P
plum-lihui 已提交
93
  int32_t     useSnapshot;
P
plum-lihui 已提交
94 95 96 97 98
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;

static SConfInfo g_stConfInfo;
TdFilePtr        g_fp = NULL;
P
plum-lihui 已提交
99
static int       running = 1;
P
plum-lihui 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
116 117
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
118 119 120 121 122
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
  exit(EXIT_SUCCESS);
}

P
plum-lihui 已提交
123
char* getCurrentTimeString(char* timeString) {
L
Liu Jicong 已提交
124
  time_t    tTime = taosGetTimestampSec();
P
plum-lihui 已提交
125
  struct tm tm = *taosLocalTime(&tTime, NULL);
L
Liu Jicong 已提交
126 127
  sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
          tm.tm_min, tm.tm_sec);
P
plum-lihui 已提交
128 129 130 131

  return timeString;
}

132
static void tmqStop(int signum, void* info, void* ctx) {
P
plum-lihui 已提交
133 134
  running = 0;
  char tmpString[128];
135
  taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
P
plum-lihui 已提交
136 137
}

138
static void tmqSetSignalHandle() { taosSetSignal(SIGINT, tmqStop); }
P
plum-lihui 已提交
139

P
plum-lihui 已提交
140
void initLogFile() {
P
plum-lihui 已提交
141
  char filename[256];
L
Liu Jicong 已提交
142
  char tmpString[128];
P
plum-lihui 已提交
143

P
plum-lihui 已提交
144 145 146
  pid_t process_id = getpid();

  sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
wafwerar's avatar
wafwerar 已提交
147 148 149 150 151 152
#ifdef WINDOWS
  for (int i = 2; i < sizeof(filename); i++) {
    if (filename[i] == ':') filename[i] = '-';
    if (filename[i] == '\0') break;
  }
#endif
153
  TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
P
plum-lihui 已提交
154
  if (NULL == pFile) {
155
    fprintf(stderr, "Failed to open %s for save result\n", filename);
P
plum-lihui 已提交
156 157 158 159 160 161 162 163 164 165 166 167
    exit(-1);
  }
  g_fp = pFile;
}

void saveConfigToLogFile() {
  taosFprintfFile(g_fp, "###################################################################\n");
  taosFprintfFile(g_fp, "# configDir:           %s\n", configDir);
  taosFprintfFile(g_fp, "# dbName:              %s\n", g_stConfInfo.dbName);
  taosFprintfFile(g_fp, "# cdbName:             %s\n", g_stConfInfo.cdbName);
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
168
  taosFprintfFile(g_fp, "# saveRowFlag:         %d\n", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
169 170 171 172 173
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
  taosFprintfFile(g_fp, "# numOfThread:         %d\n", g_stConfInfo.numOfThread);

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
L
Liu Jicong 已提交
174 175 176
    // taosFprintfFile(g_fp, "  auto commit:              %s\n", g_stConfInfo.stThreads[i].autoCommit);
    // taosFprintfFile(g_fp, "  auto commit interval ms:  %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
    // taosFprintfFile(g_fp, "  auto offset rest:         %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
P
plum-lihui 已提交
177 178 179 180 181 182 183 184 185 186
    taosFprintfFile(g_fp, "  Topics: ");
    for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
      taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
    }
    taosFprintfFile(g_fp, "\n");
    taosFprintfFile(g_fp, "  Key: ");
    for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
      taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
    }
    taosFprintfFile(g_fp, "\n");
P
plum-lihui 已提交
187
    taosFprintfFile(g_fp, "  expect rows: %d\n", g_stConfInfo.stThreads[i].expectMsgCnt);
P
plum-lihui 已提交
188 189
  }

P
plum-lihui 已提交
190 191
  char tmpString[128];
  taosFprintfFile(g_fp, "# Test time:                %s\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
192 193 194 195 196 197 198
  taosFprintfFile(g_fp, "###################################################################\n");
}

void parseArgument(int32_t argc, char* argv[]) {
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
  g_stConfInfo.showMsgFlag = 0;
  g_stConfInfo.showRowFlag = 0;
P
plum-lihui 已提交
199
  g_stConfInfo.saveRowFlag = 0;
P
plum-lihui 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
  g_stConfInfo.consumeDelay = 5;

  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.cdbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.showRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
216 217
    } else if (strcmp(argv[i], "-s") == 0) {
      g_stConfInfo.saveRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
218 219
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
P
plum-lihui 已提交
220 221
    } else if (strcmp(argv[i], "-e") == 0) {
      g_stConfInfo.useSnapshot = atol(argv[++i]);
P
plum-lihui 已提交
222
    } else {
P
plum-lihui 已提交
223
      pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
P
plum-lihui 已提交
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
      exit(-1);
    }
  }

  initLogFile();

  taosFprintfFile(g_fp, "====parseArgument() success\n");

#if 1
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
P
plum-lihui 已提交
239
  pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
P
plum-lihui 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
#endif
}

void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
}

P
plum-lihui 已提交
265 266 267 268 269
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
  int32_t i;
  for (i = 0; i < pInfo->numOfVgroups; i++) {
    if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
      pInfo->rowsOfPerVgroups[i][1] += rows;
L
Liu Jicong 已提交
270 271
      return;
    }
P
plum-lihui 已提交
272 273 274 275 276
  }

  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
  pInfo->numOfVgroups++;
L
Liu Jicong 已提交
277

P
plum-lihui 已提交
278 279
  taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
  if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
L
Liu Jicong 已提交
280 281
    taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
                    pInfo->numOfVgroups, vgroupId);
P
plum-lihui 已提交
282 283 284 285 286 287
    taosCloseFile(&g_fp);
    exit(-1);
  }
}

int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
P
plum-lihui 已提交
288 289 290 291 292 293 294
  char sqlStr[1100] = {0};

  if (strlen(buf) > 1024) {
    taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
    taosCloseFile(&g_fp);
    exit(-1);
  }
P
plum-lihui 已提交
295 296 297 298

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

L
Liu Jicong 已提交
299 300
  sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
          pInfo->ts++, buf);
P
plum-lihui 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

P
plum-lihui 已提交
315 316 317 318 319
static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
  // if (shell.args.is_raw_time) {
  //   sprintf(buf, "%" PRId64, val);
  //   return buf;
  // }
P
plum-lihui 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356

  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /*
    comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
  */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

P
plum-lihui 已提交
357
  struct tm* ptm = taosLocalTime(&tt, NULL);
P
plum-lihui 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}

P
plum-lihui 已提交
371 372
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
                                 int32_t precision) {
P
plum-lihui 已提交
373 374 375 376 377 378 379 380 381
  if (val == NULL) {
    taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
    return;
  }

  int  n;
  char buf[TSDB_MAX_BYTES_PER_ROW];
  switch (field->type) {
    case TSDB_DATA_TYPE_BOOL:
P
plum-lihui 已提交
382
      taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
P
plum-lihui 已提交
383 384
      break;
    case TSDB_DATA_TYPE_TINYINT:
P
plum-lihui 已提交
385
      taosFprintfFile(pFile, "%d", *((int8_t*)val));
P
plum-lihui 已提交
386 387
      break;
    case TSDB_DATA_TYPE_UTINYINT:
P
plum-lihui 已提交
388
      taosFprintfFile(pFile, "%u", *((uint8_t*)val));
P
plum-lihui 已提交
389 390
      break;
    case TSDB_DATA_TYPE_SMALLINT:
P
plum-lihui 已提交
391
      taosFprintfFile(pFile, "%d", *((int16_t*)val));
P
plum-lihui 已提交
392 393
      break;
    case TSDB_DATA_TYPE_USMALLINT:
P
plum-lihui 已提交
394
      taosFprintfFile(pFile, "%u", *((uint16_t*)val));
P
plum-lihui 已提交
395 396
      break;
    case TSDB_DATA_TYPE_INT:
P
plum-lihui 已提交
397
      taosFprintfFile(pFile, "%d", *((int32_t*)val));
P
plum-lihui 已提交
398 399
      break;
    case TSDB_DATA_TYPE_UINT:
P
plum-lihui 已提交
400
      taosFprintfFile(pFile, "%u", *((uint32_t*)val));
P
plum-lihui 已提交
401 402
      break;
    case TSDB_DATA_TYPE_BIGINT:
P
plum-lihui 已提交
403
      taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
P
plum-lihui 已提交
404 405
      break;
    case TSDB_DATA_TYPE_UBIGINT:
P
plum-lihui 已提交
406
      taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
P
plum-lihui 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
      break;
    case TSDB_DATA_TYPE_FLOAT:
      taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
      if (n > TMAX(25, length)) {
        taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
      } else {
        taosFprintfFile(pFile, "%s", buf);
      }
      break;
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
    case TSDB_DATA_TYPE_JSON:
      memcpy(buf, val, length);
      buf[length] = 0;
      taosFprintfFile(pFile, "\'%s\'", buf);
      break;
    case TSDB_DATA_TYPE_TIMESTAMP:
P
plum-lihui 已提交
427
      shellFormatTimestamp(buf, *(int64_t*)val, precision);
P
plum-lihui 已提交
428 429 430 431 432 433 434
      taosFprintfFile(pFile, "'%s'", buf);
      break;
    default:
      break;
  }
}

P
plum-lihui 已提交
435 436
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
                               int32_t precision) {
P
plum-lihui 已提交
437 438
  for (int32_t i = 0; i < num_fields; i++) {
    if (i > 0) {
439
      taosFprintfFile(pFile, ",");
P
plum-lihui 已提交
440
    }
P
plum-lihui 已提交
441
    shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
P
plum-lihui 已提交
442 443 444 445
  }
  taosFprintfFile(pFile, "\n");
}

P
plum-lihui 已提交
446
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
P
plum-lihui 已提交
447 448
  char    buf[1024];
  int32_t totalRows = 0;
L
Liu Jicong 已提交
449

P
plum-lihui 已提交
450
  // printf("topic: %s\n", tmq_get_topic_name(msg));
P
plum-lihui 已提交
451 452
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
  const char* dbName = tmq_get_db_name(msg);
L
Liu Jicong 已提交
453

P
plum-lihui 已提交
454
  taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
P
plum-lihui 已提交
455 456
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
                  tmq_get_topic_name(msg), vgroupId);
P
plum-lihui 已提交
457 458 459

  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
P
plum-lihui 已提交
460

L
Liu Jicong 已提交
461
    if (row == NULL) break;
P
plum-lihui 已提交
462

P
plum-lihui 已提交
463
    TAOS_FIELD* fields = taos_fetch_fields(msg);
P
plum-lihui 已提交
464
    int32_t     numOfFields = taos_field_count(msg);
P
plum-lihui 已提交
465 466 467
    int32_t*    length = taos_fetch_lengths(msg);
    int32_t     precision = taos_result_precision(msg);
    const char* tbName = tmq_get_table_name(msg);
L
Liu Jicong 已提交
468

469
#if 0
P
plum-lihui 已提交
470 471 472 473 474 475
	// get schema
	//============================== stub =================================================//
	for (int32_t i = 0; i < numOfFields; i++) {
	  taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
	}
	//============================== stub =================================================//
476
#endif
P
plum-lihui 已提交
477

478
    dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
P
plum-lihui 已提交
479

P
plum-lihui 已提交
480
    taos_print_row(buf, row, fields, numOfFields);
L
Liu Jicong 已提交
481 482

    if (0 != g_stConfInfo.showRowFlag) {
L
Liu Jicong 已提交
483
      taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
P
plum-lihui 已提交
484 485 486
      // if (0 != g_stConfInfo.saveRowFlag) {
      //   saveConsumeContentToTbl(pInfo, buf);
      // }
P
plum-lihui 已提交
487
    }
L
Liu Jicong 已提交
488

P
plum-lihui 已提交
489 490 491
    totalRows++;
  }

P
plum-lihui 已提交
492
  addRowsToVgroupId(pInfo, vgroupId, totalRows);
L
Liu Jicong 已提交
493

P
plum-lihui 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
  return totalRows;
}

int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
}

P
plum-lihui 已提交
509
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
510 511 512 513 514 515 516

int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
  char sqlStr[1024] = {0};

  int64_t now = taosGetTimestampMs();

  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
P
plum-lihui 已提交
517 518
  sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId,
          pInfo->consumerId);
519 520 521 522 523 524 525 526 527

  taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);

  taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);

  return 0;
}

static int32_t g_once_commit_flag = 0;
P
plum-lihui 已提交
528
static void    tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
529
  pError("tmq_commit_cb_print() commit %d\n", code);
530

531 532 533
  if (0 == g_once_commit_flag) {
    g_once_commit_flag = 1;
    notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
534
  }
535

536 537
  char tmpString[128];
  taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
538 539 540 541 542 543 544 545 546 547
}

void build_consumer(SThreadInfo* pInfo) {
  tmq_conf_t* conf = tmq_conf_new();

  // tmq_conf_set(conf, "td.connect.ip", "localhost");
  // tmq_conf_set(conf, "td.connect.port", "6030");
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");

L
Liu Jicong 已提交
548
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
P
plum-lihui 已提交
549

550
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
P
plum-lihui 已提交
551 552 553 554 555 556

  // tmq_conf_set(conf, "group.id", "cgrp1");
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
  }

557 558
  tmq_conf_set(conf, "msg.with.table.name", "true");

P
plum-lihui 已提交
559 560 561 562 563 564 565 566 567 568
  // tmq_conf_set(conf, "client.id", "c-001");

  // tmq_conf_set(conf, "enable.auto.commit", "true");
  // tmq_conf_set(conf, "enable.auto.commit", "false");

  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");

  // tmq_conf_set(conf, "auto.offset.reset", "none");
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
  // tmq_conf_set(conf, "auto.offset.reset", "latest");
P
plum-lihui 已提交
569 570
  //
  if (g_stConfInfo.useSnapshot) {
L
Liu Jicong 已提交
571
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
P
plum-lihui 已提交
572
  }
P
plum-lihui 已提交
573 574 575 576

  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);

  tmq_conf_destroy(conf);
L
Liu Jicong 已提交
577

P
plum-lihui 已提交
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
  return;
}

void build_topic_list(SThreadInfo* pInfo) {
  pInfo->topicList = tmq_list_new();
  // tmq_list_append(topic_list, "test_stb_topic_1");
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
  }
  return;
}

int32_t saveConsumeResult(SThreadInfo* pInfo) {
  char sqlStr[1024] = {0};
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
L
Liu Jicong 已提交
593
  sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
594 595
          g_stConfInfo.cdbName, atomic_fetch_add_64(&now, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
          pInfo->consumeRowCnt, pInfo->checkresult);
P
plum-lihui 已提交
596

P
plum-lihui 已提交
597
  char tmpString[128];
L
Liu Jicong 已提交
598
  taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
L
Liu Jicong 已提交
599

600
  TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
P
plum-lihui 已提交
601
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
602
    pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
603 604 605 606 607 608 609 610 611 612
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

void loop_consume(SThreadInfo* pInfo) {
L
Liu Jicong 已提交
613
  int32_t code;
P
plum-lihui 已提交
614

615 616
  int32_t once_flag = 0;

P
plum-lihui 已提交
617 618 619
  int64_t totalMsgs = 0;
  int64_t totalRows = 0;

P
plum-lihui 已提交
620
  char tmpString[128];
L
Liu Jicong 已提交
621 622
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
                  pInfo->consumerId);
P
plum-lihui 已提交
623

P
plum-lihui 已提交
624
  pInfo->ts = taosGetTimestampMs();
P
plum-lihui 已提交
625

P
plum-lihui 已提交
626
  if (pInfo->ifCheckData) {
P
plum-lihui 已提交
627
    char filename[256] = {0};
P
plum-lihui 已提交
628
    char tmpString[128];
P
plum-lihui 已提交
629 630 631
    // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
    // getCurrentTimeString(tmpString));
    sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
P
plum-lihui 已提交
632 633 634 635 636 637
    pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
    if (pInfo->pConsumeRowsFile == NULL) {
      taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
      return;
    }
  }
P
plum-lihui 已提交
638

639 640 641
  int64_t  lastTotalMsgs = 0;
  uint64_t lastPrintTime = taosGetTimestampMs();
  uint64_t startTs = taosGetTimestampMs();
P
plum-lihui 已提交
642

P
plum-lihui 已提交
643
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
P
plum-lihui 已提交
644
  while (running) {
P
plum-lihui 已提交
645
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
P
plum-lihui 已提交
646 647
    if (tmqMsg) {
      if (0 != g_stConfInfo.showMsgFlag) {
P
plum-lihui 已提交
648
        totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
P
plum-lihui 已提交
649 650 651 652 653
      }

      taos_free_result(tmqMsg);

      totalMsgs++;
654 655 656 657 658 659 660 661 662 663

      int64_t currentPrintTime = taosGetTimestampMs();
      if (currentPrintTime - lastPrintTime > 10 * 1000) {
        taosFprintfFile(
            g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
            pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / (currentPrintTime - lastPrintTime));
        lastPrintTime = currentPrintTime;
        lastTotalMsgs = totalMsgs;
      }

P
plum-lihui 已提交
664
      if (0 == once_flag) {
665
        once_flag = 1;
P
plum-lihui 已提交
666 667
        notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
      }
668

P
plum-lihui 已提交
669
      if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
L
Liu Jicong 已提交
670
        char tmpString[128];
P
plum-lihui 已提交
671
        taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
672 673
        break;
      }
L
Liu Jicong 已提交
674
    } else {
P
plum-lihui 已提交
675 676
      char tmpString[128];
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
677 678 679
      break;
    }
  }
680

P
plum-lihui 已提交
681 682 683 684
  if (0 == running) {
    taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
  }

P
plum-lihui 已提交
685 686 687 688 689 690 691 692 693 694
  pInfo->consumeMsgCnt = totalMsgs;
  pInfo->consumeRowCnt = totalRows;

  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
                  pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
}

void* consumeThreadFunc(void* param) {
  SThreadInfo* pInfo = (SThreadInfo*)param;

695 696 697
  pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pInfo->taos == NULL) {
    taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
698
    ASSERT(0);
699
    return NULL;
700 701
  }

P
plum-lihui 已提交
702 703 704
  build_consumer(pInfo);
  build_topic_list(pInfo);
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
705
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
D
dapan1121 已提交
706
    assert(0);
P
plum-lihui 已提交
707 708 709
    return NULL;
  }

L
Liu Jicong 已提交
710 711
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
  if (err != 0) {
P
plum-lihui 已提交
712
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
713
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
714 715
    assert(0);
    return NULL;
P
plum-lihui 已提交
716
  }
L
Liu Jicong 已提交
717

P
plum-lihui 已提交
718 719 720 721 722
  tmq_list_destroy(pInfo->topicList);
  pInfo->topicList = NULL;

  loop_consume(pInfo);

P
plum-lihui 已提交
723
  if (pInfo->ifManualCommit) {
L
Liu Jicong 已提交
724 725 726
    pPrint("tmq_commit() manual commit when consume end.\n");
    /*tmq_commit(pInfo->tmq, NULL, 0);*/
    tmq_commit_sync(pInfo->tmq, NULL);
L
Liu Jicong 已提交
727
    taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
L
Liu Jicong 已提交
728
    pPrint("tmq_commit() manual commit over.\n");
P
plum-lihui 已提交
729
  }
L
Liu Jicong 已提交
730

P
plum-lihui 已提交
731
  err = tmq_unsubscribe(pInfo->tmq);
L
Liu Jicong 已提交
732
  if (err != 0) {
P
plum-lihui 已提交
733
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
734
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
735
  }
L
Liu Jicong 已提交
736

P
plum-lihui 已提交
737
  err = tmq_consumer_close(pInfo->tmq);
L
Liu Jicong 已提交
738
  if (err != 0) {
P
plum-lihui 已提交
739
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
740
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
741 742 743 744 745 746
  }
  pInfo->tmq = NULL;

  // save consume result into consumeresult table
  saveConsumeResult(pInfo);

P
plum-lihui 已提交
747 748 749 750 751 752
  // save rows from per vgroup
  taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
  for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
    taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
  }

753 754 755
  taos_close(pInfo->taos);
  pInfo->taos = NULL;

P
plum-lihui 已提交
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
  return NULL;
}

void parseConsumeInfo() {
  char*      token;
  const char delim[2] = ",";
  const char ch = ':';

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
      g_stConfInfo.stThreads[i].numOfTopic++;

      token = strtok(NULL, delim);
    }

    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      {
        char* pstr = token;
        ltrim(pstr);
        char* ret = strchr(pstr, ch);
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
        strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
        g_stConfInfo.stThreads[i].numOfKey++;
      }

      token = strtok(NULL, delim);
    }
  }
}

int32_t getConsumeInfo() {
  char sqlStr[1024] = {0};

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
804
    pError("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
    taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  TAOS_ROW    row = NULL;
  int         num_fields = taos_num_fields(pRes);
  TAOS_FIELD* fields = taos_fetch_fields(pRes);

  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
  // ifcheckdata int

  int32_t numOfThread = 0;
  while ((row = taos_fetch_row(pRes))) {
    int32_t* lengths = taos_fetch_lengths(pRes);

    // set default value
L
Liu Jicong 已提交
823 824 825
    // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
P
plum-lihui 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841

    for (int i = 0; i < num_fields; ++i) {
      if (row[i] == NULL || 0 == i) {
        continue;
      }

      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
P
plum-lihui 已提交
842 843
      } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
P
plum-lihui 已提交
844 845 846 847 848 849 850 851 852 853 854 855 856 857
      }
    }
    numOfThread++;
  }
  g_stConfInfo.numOfThread = numOfThread;

  taos_free_result(pRes);

  parseConsumeInfo();

  return 0;
}

int main(int32_t argc, char* argv[]) {
858 859
  now = taosGetTimestampMs();

P
plum-lihui 已提交
860 861 862 863
  parseArgument(argc, argv);
  getConsumeInfo();
  saveConfigToLogFile();

P
plum-lihui 已提交
864 865
  tmqSetSignalHandle();

P
plum-lihui 已提交
866 867 868 869 870 871 872 873 874 875 876
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);

  // pthread_create one thread to consume
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
                     (void*)(&(g_stConfInfo.stThreads[i])));
  }

P
plum-lihui 已提交
877 878
  int64_t start = taosGetTimestampUs();

P
plum-lihui 已提交
879 880
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
881
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
P
plum-lihui 已提交
882 883
  }

P
plum-lihui 已提交
884 885 886 887 888 889 890 891 892
  int64_t end = taosGetTimestampUs();

  int64_t totalMsgs = 0;
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
  }

  int64_t t = end - start;
  if (0 == t) t = 1;
893

P
plum-lihui 已提交
894 895
  double tInMs = (double)t / 1000000.0;
  taosFprintfFile(g_fp,
896 897
                  "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
                  tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
P
plum-lihui 已提交
898 899 900 901 902 903

  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
  taosCloseFile(&g_fp);

  return 0;
}