tmqSim.c 29.7 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>

#include "taos.h"
P
plum-lihui 已提交
25
#include "taosdef.h"
P
plum-lihui 已提交
26 27
#include "taoserror.h"
#include "tlog.h"
P
plum-lihui 已提交
28
#include "types.h"
P
plum-lihui 已提交
29 30 31 32 33 34 35 36

#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))

#define MAX_SQL_STR_LEN         (1024 * 1024)
#define MAX_ROW_STR_LEN         (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
P
plum-lihui 已提交
37
#define MAX_VGROUP_CNT          (32)
P
plum-lihui 已提交
38

39 40 41 42 43
typedef enum {
  NOTIFY_CMD_START_CONSUM,
  NOTIFY_CMD_START_COMMIT,
  NOTIFY_CMD_ID_BUTT,
} NOTIFY_CMD_ID;
44

P
plum-lihui 已提交
45 46 47 48
typedef struct {
  TdThread thread;
  int32_t  consumerId;

L
Liu Jicong 已提交
49 50 51 52
  int32_t ifManualCommit;
  // int32_t  autoCommitIntervalMs;  // 1000 ms
  // char     autoCommit[8];         // true, false
  // char     autoOffsetRest[16];    // none, earliest, latest
P
plum-lihui 已提交
53

P
plum-lihui 已提交
54
  TdFilePtr pConsumeRowsFile;
P
plum-lihui 已提交
55 56
  int32_t   ifCheckData;
  int64_t   expectMsgCnt;
P
plum-lihui 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

  int64_t consumeMsgCnt;
  int64_t consumeRowCnt;
  int32_t checkresult;

  char topicString[1024];
  char keyString[1024];

  int32_t numOfTopic;
  char    topics[32][64];

  int32_t numOfKey;
  char    key[32][64];
  char    value[32][64];

  tmq_t*      tmq;
  tmq_list_t* topicList;
L
Liu Jicong 已提交
74

P
plum-lihui 已提交
75 76 77
  int32_t numOfVgroups;
  int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2];  // [i][0]: vgroup id, [i][1]: rows of consume
  int64_t ts;
P
plum-lihui 已提交
78

79 80
  TAOS* taos;

P
plum-lihui 已提交
81 82 83 84 85 86 87
} SThreadInfo;

typedef struct {
  // input from argvs
  char        cdbName[32];
  char        dbName[32];
  int32_t     showMsgFlag;
L
Liu Jicong 已提交
88
  int32_t     showRowFlag;
P
plum-lihui 已提交
89
  int32_t     saveRowFlag;
P
plum-lihui 已提交
90 91
  int32_t     consumeDelay;  // unit s
  int32_t     numOfThread;
P
plum-lihui 已提交
92
  int32_t     useSnapshot;
P
plum-lihui 已提交
93 94 95 96 97
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;

static SConfInfo g_stConfInfo;
TdFilePtr        g_fp = NULL;
P
plum-lihui 已提交
98
static int       running = 1;
P
plum-lihui 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114

// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
115 116
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
117 118 119 120 121
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
  exit(EXIT_SUCCESS);
}

P
plum-lihui 已提交
122
char* getCurrentTimeString(char* timeString) {
L
Liu Jicong 已提交
123
  time_t    tTime = taosGetTimestampSec();
P
plum-lihui 已提交
124
  struct tm tm = *taosLocalTime(&tTime, NULL);
L
Liu Jicong 已提交
125 126
  sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
          tm.tm_min, tm.tm_sec);
P
plum-lihui 已提交
127 128 129 130

  return timeString;
}

131
static void tmqStop(int signum, void* info, void* ctx) {
P
plum-lihui 已提交
132 133
  running = 0;
  char tmpString[128];
134
  taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
P
plum-lihui 已提交
135 136
}

137
static void tmqSetSignalHandle() { taosSetSignal(SIGINT, tmqStop); }
P
plum-lihui 已提交
138

P
plum-lihui 已提交
139
void initLogFile() {
P
plum-lihui 已提交
140
  char filename[256];
L
Liu Jicong 已提交
141
  char tmpString[128];
P
plum-lihui 已提交
142

P
plum-lihui 已提交
143 144 145
  pid_t process_id = getpid();

  sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
wafwerar's avatar
wafwerar 已提交
146 147 148 149 150 151
#ifdef WINDOWS
  for (int i = 2; i < sizeof(filename); i++) {
    if (filename[i] == ':') filename[i] = '-';
    if (filename[i] == '\0') break;
  }
#endif
152
  TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
P
plum-lihui 已提交
153
  if (NULL == pFile) {
154
    fprintf(stderr, "Failed to open %s for save result\n", filename);
P
plum-lihui 已提交
155 156 157 158 159 160 161 162 163 164 165 166
    exit(-1);
  }
  g_fp = pFile;
}

void saveConfigToLogFile() {
  taosFprintfFile(g_fp, "###################################################################\n");
  taosFprintfFile(g_fp, "# configDir:           %s\n", configDir);
  taosFprintfFile(g_fp, "# dbName:              %s\n", g_stConfInfo.dbName);
  taosFprintfFile(g_fp, "# cdbName:             %s\n", g_stConfInfo.cdbName);
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
167
  taosFprintfFile(g_fp, "# saveRowFlag:         %d\n", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
168 169 170 171 172
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
  taosFprintfFile(g_fp, "# numOfThread:         %d\n", g_stConfInfo.numOfThread);

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
L
Liu Jicong 已提交
173 174 175
    // taosFprintfFile(g_fp, "  auto commit:              %s\n", g_stConfInfo.stThreads[i].autoCommit);
    // taosFprintfFile(g_fp, "  auto commit interval ms:  %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
    // taosFprintfFile(g_fp, "  auto offset rest:         %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
P
plum-lihui 已提交
176 177 178 179 180 181 182 183 184 185
    taosFprintfFile(g_fp, "  Topics: ");
    for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
      taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
    }
    taosFprintfFile(g_fp, "\n");
    taosFprintfFile(g_fp, "  Key: ");
    for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
      taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
    }
    taosFprintfFile(g_fp, "\n");
P
plum-lihui 已提交
186
    taosFprintfFile(g_fp, "  expect rows: %d\n", g_stConfInfo.stThreads[i].expectMsgCnt);
P
plum-lihui 已提交
187 188
  }

P
plum-lihui 已提交
189 190
  char tmpString[128];
  taosFprintfFile(g_fp, "# Test time:                %s\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
191 192 193 194 195 196 197
  taosFprintfFile(g_fp, "###################################################################\n");
}

void parseArgument(int32_t argc, char* argv[]) {
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
  g_stConfInfo.showMsgFlag = 0;
  g_stConfInfo.showRowFlag = 0;
P
plum-lihui 已提交
198
  g_stConfInfo.saveRowFlag = 0;
P
plum-lihui 已提交
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
  g_stConfInfo.consumeDelay = 5;

  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.cdbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.showRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
215 216
    } else if (strcmp(argv[i], "-s") == 0) {
      g_stConfInfo.saveRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
217 218
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
P
plum-lihui 已提交
219 220
    } else if (strcmp(argv[i], "-e") == 0) {
      g_stConfInfo.useSnapshot = atol(argv[++i]);
P
plum-lihui 已提交
221
    } else {
P
plum-lihui 已提交
222
      pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
P
plum-lihui 已提交
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
      exit(-1);
    }
  }

  initLogFile();

  taosFprintfFile(g_fp, "====parseArgument() success\n");

#if 1
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
P
plum-lihui 已提交
238
  pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
P
plum-lihui 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
#endif
}

void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
}

P
plum-lihui 已提交
264 265 266 267 268
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
  int32_t i;
  for (i = 0; i < pInfo->numOfVgroups; i++) {
    if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
      pInfo->rowsOfPerVgroups[i][1] += rows;
L
Liu Jicong 已提交
269 270
      return;
    }
P
plum-lihui 已提交
271 272 273 274 275
  }

  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
  pInfo->numOfVgroups++;
L
Liu Jicong 已提交
276

P
plum-lihui 已提交
277 278
  taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
  if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
L
Liu Jicong 已提交
279 280
    taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
                    pInfo->numOfVgroups, vgroupId);
P
plum-lihui 已提交
281 282 283 284 285 286
    taosCloseFile(&g_fp);
    exit(-1);
  }
}

int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
P
plum-lihui 已提交
287 288 289 290 291 292 293
  char sqlStr[1100] = {0};

  if (strlen(buf) > 1024) {
    taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
    taosCloseFile(&g_fp);
    exit(-1);
  }
P
plum-lihui 已提交
294 295 296 297

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

L
Liu Jicong 已提交
298 299
  sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
          pInfo->ts++, buf);
P
plum-lihui 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

P
plum-lihui 已提交
314 315 316 317 318
static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
  // if (shell.args.is_raw_time) {
  //   sprintf(buf, "%" PRId64, val);
  //   return buf;
  // }
P
plum-lihui 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355

  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /*
    comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
  */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

P
plum-lihui 已提交
356
  struct tm* ptm = taosLocalTime(&tt, NULL);
P
plum-lihui 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}

P
plum-lihui 已提交
370 371
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
                                 int32_t precision) {
P
plum-lihui 已提交
372 373 374 375 376 377 378 379 380
  if (val == NULL) {
    taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
    return;
  }

  int  n;
  char buf[TSDB_MAX_BYTES_PER_ROW];
  switch (field->type) {
    case TSDB_DATA_TYPE_BOOL:
P
plum-lihui 已提交
381
      taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
P
plum-lihui 已提交
382 383
      break;
    case TSDB_DATA_TYPE_TINYINT:
P
plum-lihui 已提交
384
      taosFprintfFile(pFile, "%d", *((int8_t*)val));
P
plum-lihui 已提交
385 386
      break;
    case TSDB_DATA_TYPE_UTINYINT:
P
plum-lihui 已提交
387
      taosFprintfFile(pFile, "%u", *((uint8_t*)val));
P
plum-lihui 已提交
388 389
      break;
    case TSDB_DATA_TYPE_SMALLINT:
P
plum-lihui 已提交
390
      taosFprintfFile(pFile, "%d", *((int16_t*)val));
P
plum-lihui 已提交
391 392
      break;
    case TSDB_DATA_TYPE_USMALLINT:
P
plum-lihui 已提交
393
      taosFprintfFile(pFile, "%u", *((uint16_t*)val));
P
plum-lihui 已提交
394 395
      break;
    case TSDB_DATA_TYPE_INT:
P
plum-lihui 已提交
396
      taosFprintfFile(pFile, "%d", *((int32_t*)val));
P
plum-lihui 已提交
397 398
      break;
    case TSDB_DATA_TYPE_UINT:
P
plum-lihui 已提交
399
      taosFprintfFile(pFile, "%u", *((uint32_t*)val));
P
plum-lihui 已提交
400 401
      break;
    case TSDB_DATA_TYPE_BIGINT:
P
plum-lihui 已提交
402
      taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
P
plum-lihui 已提交
403 404
      break;
    case TSDB_DATA_TYPE_UBIGINT:
P
plum-lihui 已提交
405
      taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
P
plum-lihui 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
      break;
    case TSDB_DATA_TYPE_FLOAT:
      taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
      if (n > TMAX(25, length)) {
        taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
      } else {
        taosFprintfFile(pFile, "%s", buf);
      }
      break;
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
    case TSDB_DATA_TYPE_JSON:
      memcpy(buf, val, length);
      buf[length] = 0;
      taosFprintfFile(pFile, "\'%s\'", buf);
      break;
    case TSDB_DATA_TYPE_TIMESTAMP:
P
plum-lihui 已提交
426
      shellFormatTimestamp(buf, *(int64_t*)val, precision);
P
plum-lihui 已提交
427 428 429 430 431 432 433
      taosFprintfFile(pFile, "'%s'", buf);
      break;
    default:
      break;
  }
}

P
plum-lihui 已提交
434 435
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
                               int32_t precision) {
P
plum-lihui 已提交
436 437
  for (int32_t i = 0; i < num_fields; i++) {
    if (i > 0) {
438
      taosFprintfFile(pFile, ",");
P
plum-lihui 已提交
439
    }
P
plum-lihui 已提交
440
    shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
P
plum-lihui 已提交
441 442 443 444
  }
  taosFprintfFile(pFile, "\n");
}

P
plum-lihui 已提交
445
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
P
plum-lihui 已提交
446 447
  char    buf[1024];
  int32_t totalRows = 0;
L
Liu Jicong 已提交
448

P
plum-lihui 已提交
449
  // printf("topic: %s\n", tmq_get_topic_name(msg));
P
plum-lihui 已提交
450 451
  int32_t     vgroupId = tmq_get_vgroup_id(msg);
  const char* dbName = tmq_get_db_name(msg);
L
Liu Jicong 已提交
452

P
plum-lihui 已提交
453
  taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
P
plum-lihui 已提交
454 455
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
                  tmq_get_topic_name(msg), vgroupId);
P
plum-lihui 已提交
456 457 458

  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
P
plum-lihui 已提交
459

L
Liu Jicong 已提交
460
    if (row == NULL) break;
P
plum-lihui 已提交
461

P
plum-lihui 已提交
462
    TAOS_FIELD* fields = taos_fetch_fields(msg);
P
plum-lihui 已提交
463
    int32_t     numOfFields = taos_field_count(msg);
P
plum-lihui 已提交
464 465 466
    int32_t*    length = taos_fetch_lengths(msg);
    int32_t     precision = taos_result_precision(msg);
    const char* tbName = tmq_get_table_name(msg);
L
Liu Jicong 已提交
467

468
#if 0
P
plum-lihui 已提交
469 470 471 472 473 474
	// get schema
	//============================== stub =================================================//
	for (int32_t i = 0; i < numOfFields; i++) {
	  taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
	}
	//============================== stub =================================================//
475
#endif
P
plum-lihui 已提交
476

477
    dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
P
plum-lihui 已提交
478

P
plum-lihui 已提交
479
    taos_print_row(buf, row, fields, numOfFields);
L
Liu Jicong 已提交
480 481

    if (0 != g_stConfInfo.showRowFlag) {
L
Liu Jicong 已提交
482
      taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
P
plum-lihui 已提交
483 484 485
      // if (0 != g_stConfInfo.saveRowFlag) {
      //   saveConsumeContentToTbl(pInfo, buf);
      // }
P
plum-lihui 已提交
486
    }
L
Liu Jicong 已提交
487

P
plum-lihui 已提交
488 489 490
    totalRows++;
  }

P
plum-lihui 已提交
491
  addRowsToVgroupId(pInfo, vgroupId, totalRows);
L
Liu Jicong 已提交
492

P
plum-lihui 已提交
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
  return totalRows;
}

int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
}

P
plum-lihui 已提交
508
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
509 510 511 512 513 514 515

int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
  char sqlStr[1024] = {0};

  int64_t now = taosGetTimestampMs();

  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
P
plum-lihui 已提交
516 517
  sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId,
          pInfo->consumerId);
518 519 520 521 522 523 524 525 526

  taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);

  taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);

  return 0;
}

static int32_t g_once_commit_flag = 0;
P
plum-lihui 已提交
527
static void    tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
528
     pError("tmq_commit_cb_print() commit %d\n", code);
529

530 531 532
     if (0 == g_once_commit_flag) {
       g_once_commit_flag = 1;
       notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
533
  }
534 535 536

     char tmpString[128];
     taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
537 538 539 540 541 542 543 544 545 546
}

void build_consumer(SThreadInfo* pInfo) {
  tmq_conf_t* conf = tmq_conf_new();

  // tmq_conf_set(conf, "td.connect.ip", "localhost");
  // tmq_conf_set(conf, "td.connect.port", "6030");
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");

L
Liu Jicong 已提交
547
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
P
plum-lihui 已提交
548

549
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
P
plum-lihui 已提交
550 551 552 553 554 555

  // tmq_conf_set(conf, "group.id", "cgrp1");
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
  }

556 557
  tmq_conf_set(conf, "msg.with.table.name", "true");

P
plum-lihui 已提交
558 559 560 561 562 563 564 565 566 567
  // tmq_conf_set(conf, "client.id", "c-001");

  // tmq_conf_set(conf, "enable.auto.commit", "true");
  // tmq_conf_set(conf, "enable.auto.commit", "false");

  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");

  // tmq_conf_set(conf, "auto.offset.reset", "none");
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
  // tmq_conf_set(conf, "auto.offset.reset", "latest");
P
plum-lihui 已提交
568 569
  //
  if (g_stConfInfo.useSnapshot) {
L
Liu Jicong 已提交
570
    tmq_conf_set(conf, "experimental.snapshot.enable", "true");
P
plum-lihui 已提交
571
  }
P
plum-lihui 已提交
572 573 574 575

  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);

  tmq_conf_destroy(conf);
L
Liu Jicong 已提交
576

P
plum-lihui 已提交
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
  return;
}

void build_topic_list(SThreadInfo* pInfo) {
  pInfo->topicList = tmq_list_new();
  // tmq_list_append(topic_list, "test_stb_topic_1");
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
  }
  return;
}

int32_t saveConsumeResult(SThreadInfo* pInfo) {
  char sqlStr[1024] = {0};

592 593
  int64_t now = taosGetTimestampMs();

P
plum-lihui 已提交
594
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
L
Liu Jicong 已提交
595 596
  sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
          g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
P
plum-lihui 已提交
597

P
plum-lihui 已提交
598
  char tmpString[128];
L
Liu Jicong 已提交
599
  taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
L
Liu Jicong 已提交
600

601
  TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
P
plum-lihui 已提交
602
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
603
    pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
604 605 606 607 608 609 610 611 612 613
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

void loop_consume(SThreadInfo* pInfo) {
L
Liu Jicong 已提交
614
  int32_t code;
P
plum-lihui 已提交
615

616 617
  int32_t once_flag = 0;

P
plum-lihui 已提交
618 619 620
  int64_t totalMsgs = 0;
  int64_t totalRows = 0;

P
plum-lihui 已提交
621
  char tmpString[128];
L
Liu Jicong 已提交
622 623
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
                  pInfo->consumerId);
P
plum-lihui 已提交
624

P
plum-lihui 已提交
625
  pInfo->ts = taosGetTimestampMs();
P
plum-lihui 已提交
626

P
plum-lihui 已提交
627
  if (pInfo->ifCheckData) {
P
plum-lihui 已提交
628
    char filename[256] = {0};
P
plum-lihui 已提交
629
    char tmpString[128];
P
plum-lihui 已提交
630 631 632
    // sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
    // getCurrentTimeString(tmpString));
    sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
P
plum-lihui 已提交
633 634 635 636 637 638
    pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
    if (pInfo->pConsumeRowsFile == NULL) {
      taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
      return;
    }
  }
P
plum-lihui 已提交
639

P
plum-lihui 已提交
640 641 642
  int64_t    lastTotalMsgs = 0;
  uint64_t   lastPrintTime = taosGetTimestampMs();
  uint64_t   startTs = taosGetTimestampMs();
P
plum-lihui 已提交
643

P
plum-lihui 已提交
644
  int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
P
plum-lihui 已提交
645
  while (running) {
P
plum-lihui 已提交
646
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
P
plum-lihui 已提交
647 648
    if (tmqMsg) {
      if (0 != g_stConfInfo.showMsgFlag) {
P
plum-lihui 已提交
649
        totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
P
plum-lihui 已提交
650 651 652 653 654
      }

      taos_free_result(tmqMsg);

      totalMsgs++;
P
plum-lihui 已提交
655 656 657 658 659 660 661 662 663 664
	  
	  int64_t currentPrintTime = taosGetTimestampMs();
	  if (currentPrintTime - lastPrintTime > 10 * 1000) {
		  taosFprintfFile(g_fp,	
		  	              "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n", 
		  	              pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0/(currentPrintTime - lastPrintTime));
		  lastPrintTime = currentPrintTime;
		  lastTotalMsgs = totalMsgs;
	  }
	  
P
plum-lihui 已提交
665
      if (0 == once_flag) {
666
        once_flag = 1;
P
plum-lihui 已提交
667 668
        notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
      }
669

P
plum-lihui 已提交
670
      if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
L
Liu Jicong 已提交
671
        char tmpString[128];
P
plum-lihui 已提交
672
        taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
673 674
        break;
      }
L
Liu Jicong 已提交
675
    } else {
P
plum-lihui 已提交
676 677
      char tmpString[128];
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
678 679 680
      break;
    }
  }
P
plum-lihui 已提交
681
  
P
plum-lihui 已提交
682 683 684 685
  if (0 == running) {
    taosFprintfFile(g_fp, "receive stop signal and not continue consume\n");
  }

P
plum-lihui 已提交
686 687 688 689 690 691 692 693 694 695
  pInfo->consumeMsgCnt = totalMsgs;
  pInfo->consumeRowCnt = totalRows;

  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
                  pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
}

void* consumeThreadFunc(void* param) {
  SThreadInfo* pInfo = (SThreadInfo*)param;

696 697 698
  pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pInfo->taos == NULL) {
    taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
699
    return NULL;
700 701
  }

P
plum-lihui 已提交
702 703 704
  build_consumer(pInfo);
  build_topic_list(pInfo);
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
705
    taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
D
dapan1121 已提交
706
    assert(0);
P
plum-lihui 已提交
707 708 709
    return NULL;
  }

L
Liu Jicong 已提交
710 711
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
  if (err != 0) {
P
plum-lihui 已提交
712
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
713
    taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
714 715
    assert(0);
    return NULL;
P
plum-lihui 已提交
716
  }
L
Liu Jicong 已提交
717

P
plum-lihui 已提交
718 719 720 721 722
  tmq_list_destroy(pInfo->topicList);
  pInfo->topicList = NULL;

  loop_consume(pInfo);

P
plum-lihui 已提交
723
  if (pInfo->ifManualCommit) {
L
Liu Jicong 已提交
724 725 726
    pPrint("tmq_commit() manual commit when consume end.\n");
    /*tmq_commit(pInfo->tmq, NULL, 0);*/
    tmq_commit_sync(pInfo->tmq, NULL);
L
Liu Jicong 已提交
727
    taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
L
Liu Jicong 已提交
728
    pPrint("tmq_commit() manual commit over.\n");
P
plum-lihui 已提交
729
  }
L
Liu Jicong 已提交
730

P
plum-lihui 已提交
731
  err = tmq_unsubscribe(pInfo->tmq);
L
Liu Jicong 已提交
732
  if (err != 0) {
P
plum-lihui 已提交
733
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
734
    taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
735
  }
L
Liu Jicong 已提交
736

P
plum-lihui 已提交
737
  err = tmq_consumer_close(pInfo->tmq);
L
Liu Jicong 已提交
738
  if (err != 0) {
P
plum-lihui 已提交
739
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
740
    taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
741 742 743 744 745 746
  }
  pInfo->tmq = NULL;

  // save consume result into consumeresult table
  saveConsumeResult(pInfo);

P
plum-lihui 已提交
747 748 749 750 751 752
  // save rows from per vgroup
  taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
  for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
    taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
  }

753 754 755
  taos_close(pInfo->taos);
  pInfo->taos = NULL;

P
plum-lihui 已提交
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803
  return NULL;
}

void parseConsumeInfo() {
  char*      token;
  const char delim[2] = ",";
  const char ch = ':';

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
      g_stConfInfo.stThreads[i].numOfTopic++;

      token = strtok(NULL, delim);
    }

    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      {
        char* pstr = token;
        ltrim(pstr);
        char* ret = strchr(pstr, ch);
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
        strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
        g_stConfInfo.stThreads[i].numOfKey++;
      }

      token = strtok(NULL, delim);
    }
  }
}

int32_t getConsumeInfo() {
  char sqlStr[1024] = {0};

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
804
    pError("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
    taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  TAOS_ROW    row = NULL;
  int         num_fields = taos_num_fields(pRes);
  TAOS_FIELD* fields = taos_fetch_fields(pRes);

  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
  // ifcheckdata int

  int32_t numOfThread = 0;
  while ((row = taos_fetch_row(pRes))) {
    int32_t* lengths = taos_fetch_lengths(pRes);

    // set default value
L
Liu Jicong 已提交
823 824 825
    // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
P
plum-lihui 已提交
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841

    for (int i = 0; i < num_fields; ++i) {
      if (row[i] == NULL || 0 == i) {
        continue;
      }

      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
P
plum-lihui 已提交
842 843
      } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
P
plum-lihui 已提交
844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
      }
    }
    numOfThread++;
  }
  g_stConfInfo.numOfThread = numOfThread;

  taos_free_result(pRes);

  parseConsumeInfo();

  return 0;
}

int main(int32_t argc, char* argv[]) {
  parseArgument(argc, argv);
  getConsumeInfo();
  saveConfigToLogFile();

P
plum-lihui 已提交
862 863
  tmqSetSignalHandle();

P
plum-lihui 已提交
864 865 866 867 868 869 870 871 872 873 874
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);

  // pthread_create one thread to consume
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
                     (void*)(&(g_stConfInfo.stThreads[i])));
  }

P
plum-lihui 已提交
875 876
  int64_t start = taosGetTimestampUs();

P
plum-lihui 已提交
877 878
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
879
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
P
plum-lihui 已提交
880 881
  }

P
plum-lihui 已提交
882 883 884 885 886 887 888 889 890
  int64_t end = taosGetTimestampUs();

  int64_t totalMsgs = 0;
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
  }

  int64_t t = end - start;
  if (0 == t) t = 1;
P
plum-lihui 已提交
891
  
P
plum-lihui 已提交
892 893
  double tInMs = (double)t / 1000000.0;
  taosFprintfFile(g_fp,
P
plum-lihui 已提交
894 895
				"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
				tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
P
plum-lihui 已提交
896 897 898 899 900 901

  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
  taosCloseFile(&g_fp);

  return 0;
}