tmqSim.c 27.2 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"
P
plum-lihui 已提交
27 28
#include "taosdef.h"
#include "types.h"
P
plum-lihui 已提交
29 30 31 32 33 34 35 36

#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b))

#define MAX_SQL_STR_LEN         (1024 * 1024)
#define MAX_ROW_STR_LEN         (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
P
plum-lihui 已提交
37
#define MAX_VGROUP_CNT          (32)
P
plum-lihui 已提交
38

39 40 41 42 43 44
typedef enum {
    NOTIFY_CMD_START_CONSUM,
	NOTIFY_CMD_START_COMMIT,
	NOTIFY_CMD_ID_BUTT
}NOTIFY_CMD_ID;

P
plum-lihui 已提交
45 46 47 48
typedef struct {
  TdThread thread;
  int32_t  consumerId;

L
Liu Jicong 已提交
49 50 51 52
  int32_t ifManualCommit;
  // int32_t  autoCommitIntervalMs;  // 1000 ms
  // char     autoCommit[8];         // true, false
  // char     autoOffsetRest[16];    // none, earliest, latest
P
plum-lihui 已提交
53

P
plum-lihui 已提交
54
  TdFilePtr pConsumeRowsFile;
P
plum-lihui 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
  int32_t ifCheckData;
  int64_t expectMsgCnt;

  int64_t consumeMsgCnt;
  int64_t consumeRowCnt;
  int32_t checkresult;

  char topicString[1024];
  char keyString[1024];

  int32_t numOfTopic;
  char    topics[32][64];

  int32_t numOfKey;
  char    key[32][64];
  char    value[32][64];

  tmq_t*      tmq;
  tmq_list_t* topicList;
L
Liu Jicong 已提交
74

P
plum-lihui 已提交
75 76 77
  int32_t numOfVgroups;
  int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2];  // [i][0]: vgroup id, [i][1]: rows of consume
  int64_t ts;
P
plum-lihui 已提交
78

79 80
  TAOS* taos;

P
plum-lihui 已提交
81 82 83 84 85 86 87
} SThreadInfo;

typedef struct {
  // input from argvs
  char        cdbName[32];
  char        dbName[32];
  int32_t     showMsgFlag;
L
Liu Jicong 已提交
88
  int32_t     showRowFlag;
P
plum-lihui 已提交
89
  int32_t     saveRowFlag;
P
plum-lihui 已提交
90 91 92 93 94 95 96
  int32_t     consumeDelay;  // unit s
  int32_t     numOfThread;
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo;

static SConfInfo g_stConfInfo;
TdFilePtr        g_fp = NULL;
P
plum-lihui 已提交
97
static int       running = 1;
P
plum-lihui 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
114 115
  printf("%s%s\n", indent, "-s");
  printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
116 117 118 119 120
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
  exit(EXIT_SUCCESS);
}

P
plum-lihui 已提交
121
char* getCurrentTimeString(char* timeString) {
L
Liu Jicong 已提交
122
  time_t    tTime = taosGetTimestampSec();
P
plum-lihui 已提交
123
  struct tm tm = *taosLocalTime(&tTime, NULL);
L
Liu Jicong 已提交
124 125
  sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
          tm.tm_min, tm.tm_sec);
P
plum-lihui 已提交
126 127 128 129

  return timeString;
}

P
plum-lihui 已提交
130
void initLogFile() {
P
plum-lihui 已提交
131
  char filename[256];
L
Liu Jicong 已提交
132
  char tmpString[128];
P
plum-lihui 已提交
133

L
Liu Jicong 已提交
134
  sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
wafwerar's avatar
wafwerar 已提交
135 136 137 138 139 140
#ifdef WINDOWS
  for (int i = 2; i < sizeof(filename); i++) {
    if (filename[i] == ':') filename[i] = '-';
    if (filename[i] == '\0') break;
  }
#endif
141
  TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
P
plum-lihui 已提交
142
  if (NULL == pFile) {
143
    fprintf(stderr, "Failed to open %s for save result\n", filename);
P
plum-lihui 已提交
144 145 146 147 148 149 150 151 152 153 154 155
    exit(-1);
  }
  g_fp = pFile;
}

void saveConfigToLogFile() {
  taosFprintfFile(g_fp, "###################################################################\n");
  taosFprintfFile(g_fp, "# configDir:           %s\n", configDir);
  taosFprintfFile(g_fp, "# dbName:              %s\n", g_stConfInfo.dbName);
  taosFprintfFile(g_fp, "# cdbName:             %s\n", g_stConfInfo.cdbName);
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
156
  taosFprintfFile(g_fp, "# saveRowFlag:         %d\n", g_stConfInfo.saveRowFlag);
P
plum-lihui 已提交
157 158 159 160 161
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
  taosFprintfFile(g_fp, "# numOfThread:         %d\n", g_stConfInfo.numOfThread);

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
L
Liu Jicong 已提交
162 163 164
    // taosFprintfFile(g_fp, "  auto commit:              %s\n", g_stConfInfo.stThreads[i].autoCommit);
    // taosFprintfFile(g_fp, "  auto commit interval ms:  %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
    // taosFprintfFile(g_fp, "  auto offset rest:         %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
P
plum-lihui 已提交
165 166 167 168 169 170 171 172 173 174
    taosFprintfFile(g_fp, "  Topics: ");
    for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
      taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
    }
    taosFprintfFile(g_fp, "\n");
    taosFprintfFile(g_fp, "  Key: ");
    for (int k = 0; k < g_stConfInfo.stThreads[i].numOfKey; k++) {
      taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
    }
    taosFprintfFile(g_fp, "\n");
P
plum-lihui 已提交
175
    taosFprintfFile(g_fp, "  expect rows: %d\n", g_stConfInfo.stThreads[i].expectMsgCnt);
P
plum-lihui 已提交
176 177
  }

P
plum-lihui 已提交
178 179
  char tmpString[128];
  taosFprintfFile(g_fp, "# Test time:                %s\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
180 181 182 183 184 185 186
  taosFprintfFile(g_fp, "###################################################################\n");
}

void parseArgument(int32_t argc, char* argv[]) {
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
  g_stConfInfo.showMsgFlag = 0;
  g_stConfInfo.showRowFlag = 0;
P
plum-lihui 已提交
187
  g_stConfInfo.saveRowFlag = 0;
P
plum-lihui 已提交
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
  g_stConfInfo.consumeDelay = 5;

  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.cdbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.showRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
204 205
    } else if (strcmp(argv[i], "-s") == 0) {
      g_stConfInfo.saveRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
206 207 208
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
    } else {
P
plum-lihui 已提交
209
      pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
P
plum-lihui 已提交
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
      exit(-1);
    }
  }

  initLogFile();

  taosFprintfFile(g_fp, "====parseArgument() success\n");

#if 1
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
P
plum-lihui 已提交
225
  pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
P
plum-lihui 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
#endif
}

void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
}

P
plum-lihui 已提交
251 252 253 254 255
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
  int32_t i;
  for (i = 0; i < pInfo->numOfVgroups; i++) {
    if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
      pInfo->rowsOfPerVgroups[i][1] += rows;
L
Liu Jicong 已提交
256 257
      return;
    }
P
plum-lihui 已提交
258 259 260 261 262
  }

  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
  pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
  pInfo->numOfVgroups++;
L
Liu Jicong 已提交
263

P
plum-lihui 已提交
264 265
  taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
  if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
L
Liu Jicong 已提交
266 267
    taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
                    pInfo->numOfVgroups, vgroupId);
P
plum-lihui 已提交
268 269 270 271 272 273
    taosCloseFile(&g_fp);
    exit(-1);
  }
}

int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
P
plum-lihui 已提交
274 275 276 277 278 279 280
  char sqlStr[1100] = {0};

  if (strlen(buf) > 1024) {
    taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
    taosCloseFile(&g_fp);
    exit(-1);
  }
P
plum-lihui 已提交
281 282 283 284

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

L
Liu Jicong 已提交
285 286
  sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
          pInfo->ts++, buf);
P
plum-lihui 已提交
287 288 289 290 291 292 293 294 295 296 297 298 299 300
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

P
plum-lihui 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
  //if (shell.args.is_raw_time) {
  //  sprintf(buf, "%" PRId64, val);
  //  return buf;
  //}

  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /*
    comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
  */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

  struct tm *ptm = taosLocalTime(&tt, NULL);
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}

static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) {
  if (val == NULL) {
    taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
    return;
  }

  int  n;
  char buf[TSDB_MAX_BYTES_PER_ROW];
  switch (field->type) {
    case TSDB_DATA_TYPE_BOOL:
      taosFprintfFile(pFile, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
      break;
    case TSDB_DATA_TYPE_TINYINT:
      taosFprintfFile(pFile, "%d", *((int8_t *)val));
      break;
    case TSDB_DATA_TYPE_UTINYINT:
      taosFprintfFile(pFile, "%u", *((uint8_t *)val));
      break;
    case TSDB_DATA_TYPE_SMALLINT:
      taosFprintfFile(pFile, "%d", *((int16_t *)val));
      break;
    case TSDB_DATA_TYPE_USMALLINT:
      taosFprintfFile(pFile, "%u", *((uint16_t *)val));
      break;
    case TSDB_DATA_TYPE_INT:
      taosFprintfFile(pFile, "%d", *((int32_t *)val));
      break;
    case TSDB_DATA_TYPE_UINT:
      taosFprintfFile(pFile, "%u", *((uint32_t *)val));
      break;
    case TSDB_DATA_TYPE_BIGINT:
      taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val));
      break;
    case TSDB_DATA_TYPE_UBIGINT:
      taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
      break;
    case TSDB_DATA_TYPE_FLOAT:
      taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
      break;
    case TSDB_DATA_TYPE_DOUBLE:
      n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
      if (n > TMAX(25, length)) {
        taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
      } else {
        taosFprintfFile(pFile, "%s", buf);
      }
      break;
    case TSDB_DATA_TYPE_BINARY:
    case TSDB_DATA_TYPE_NCHAR:
    case TSDB_DATA_TYPE_JSON:
      memcpy(buf, val, length);
      buf[length] = 0;
      taosFprintfFile(pFile, "\'%s\'", buf);
      break;
    case TSDB_DATA_TYPE_TIMESTAMP:
      shellFormatTimestamp(buf, *(int64_t *)val, precision);
      taosFprintfFile(pFile, "'%s'", buf);
      break;
    default:
      break;
  }
}

static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, int32_t precision) {
  for (int32_t i = 0; i < num_fields; i++) {
    if (i > 0) {
      taosFprintfFile(pFile, "\n");
    }
    shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
  }
  taosFprintfFile(pFile, "\n");
}

P
plum-lihui 已提交
430
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
P
plum-lihui 已提交
431 432
  char    buf[1024];
  int32_t totalRows = 0;
L
Liu Jicong 已提交
433

P
plum-lihui 已提交
434
  // printf("topic: %s\n", tmq_get_topic_name(msg));
P
plum-lihui 已提交
435
  int32_t vgroupId = tmq_get_vgroup_id(msg);
P
plum-lihui 已提交
436
  const char*   dbName = tmq_get_db_name(msg);
L
Liu Jicong 已提交
437

P
plum-lihui 已提交
438 439
  taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
  taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", tmq_get_topic_name(msg), vgroupId);
P
plum-lihui 已提交
440 441 442

  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
P
plum-lihui 已提交
443

L
Liu Jicong 已提交
444
    if (row == NULL) break;
P
plum-lihui 已提交
445

P
plum-lihui 已提交
446
    TAOS_FIELD* fields      = taos_fetch_fields(msg);
P
plum-lihui 已提交
447
    int32_t     numOfFields = taos_field_count(msg);
P
plum-lihui 已提交
448 449 450
	int32_t*    length      = taos_fetch_lengths(msg);
	int32_t     precision   = taos_result_precision(msg);
    const char* tbName      = tmq_get_table_name(msg);
L
Liu Jicong 已提交
451

P
plum-lihui 已提交
452
	dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
P
plum-lihui 已提交
453
    taos_print_row(buf, row, fields, numOfFields);
L
Liu Jicong 已提交
454 455

    if (0 != g_stConfInfo.showRowFlag) {
L
Liu Jicong 已提交
456
      taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
P
plum-lihui 已提交
457 458 459
      //if (0 != g_stConfInfo.saveRowFlag) {
      //  saveConsumeContentToTbl(pInfo, buf);
      //}
P
plum-lihui 已提交
460
    }
L
Liu Jicong 已提交
461

P
plum-lihui 已提交
462 463 464
    totalRows++;
  }

P
plum-lihui 已提交
465
  addRowsToVgroupId(pInfo, vgroupId, totalRows);
L
Liu Jicong 已提交
466

P
plum-lihui 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
  return totalRows;
}

int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
}

482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {  
}

int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
  char sqlStr[1024] = {0};

  int64_t now = taosGetTimestampMs();

  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
  sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)", 
                   g_stConfInfo.cdbName, 
                   now,
                   cmdId,
                   pInfo->consumerId);

  taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);

  taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);

  return 0;
}

static int32_t g_once_commit_flag = 0;
L
Liu Jicong 已提交
505
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
506 507 508 509 510 511 512
  pError("tmq_commit_cb_print() commit %d\n", code);  

  if (0 == g_once_commit_flag) {
    g_once_commit_flag = 1;
	notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
  }
  taosFprintfFile(g_fp, "tmq_commit_cb_print() be called\n");
P
plum-lihui 已提交
513 514 515 516 517 518 519 520 521 522
}

void build_consumer(SThreadInfo* pInfo) {
  tmq_conf_t* conf = tmq_conf_new();

  // tmq_conf_set(conf, "td.connect.ip", "localhost");
  // tmq_conf_set(conf, "td.connect.port", "6030");
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");

L
Liu Jicong 已提交
523
  // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
P
plum-lihui 已提交
524

525
  tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
P
plum-lihui 已提交
526 527 528 529 530 531

  // tmq_conf_set(conf, "group.id", "cgrp1");
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
  }

532 533
  tmq_conf_set(conf, "msg.with.table.name", "true");

P
plum-lihui 已提交
534 535 536 537 538 539 540 541 542 543 544 545 546 547
  // tmq_conf_set(conf, "client.id", "c-001");

  // tmq_conf_set(conf, "enable.auto.commit", "true");
  // tmq_conf_set(conf, "enable.auto.commit", "false");

  // tmq_conf_set(conf, "auto.commit.interval.ms", "1000");

  // tmq_conf_set(conf, "auto.offset.reset", "none");
  // tmq_conf_set(conf, "auto.offset.reset", "earliest");
  // tmq_conf_set(conf, "auto.offset.reset", "latest");

  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);

  tmq_conf_destroy(conf);
L
Liu Jicong 已提交
548

P
plum-lihui 已提交
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
  return;
}

void build_topic_list(SThreadInfo* pInfo) {
  pInfo->topicList = tmq_list_new();
  // tmq_list_append(topic_list, "test_stb_topic_1");
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
  }
  return;
}

int32_t saveConsumeResult(SThreadInfo* pInfo) {
  char sqlStr[1024] = {0};

564 565
  int64_t now = taosGetTimestampMs();

P
plum-lihui 已提交
566
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
L
Liu Jicong 已提交
567 568
  sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
          g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
P
plum-lihui 已提交
569

P
plum-lihui 已提交
570
  char tmpString[128];
L
Liu Jicong 已提交
571
  taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
L
Liu Jicong 已提交
572

573
  TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
P
plum-lihui 已提交
574
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
575
    pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
576 577 578 579 580 581 582 583 584 585
    taos_free_result(pRes);
    exit(-1);
  }

  taos_free_result(pRes);

  return 0;
}

void loop_consume(SThreadInfo* pInfo) {
L
Liu Jicong 已提交
586
  int32_t code;
P
plum-lihui 已提交
587

588 589
  int32_t once_flag = 0;

P
plum-lihui 已提交
590 591 592
  int64_t totalMsgs = 0;
  int64_t totalRows = 0;

P
plum-lihui 已提交
593
  char tmpString[128];
L
Liu Jicong 已提交
594 595
  taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
                  pInfo->consumerId);
P
plum-lihui 已提交
596

P
plum-lihui 已提交
597
  pInfo->ts = taosGetTimestampMs();
P
plum-lihui 已提交
598 599 600 601 602 603 604 605 606 607 608 609
  
  if (pInfo->ifCheckData) {
  	char filename[256] = {0};
    char tmpString[128];
	//sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString));
	sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
    pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
    if (pInfo->pConsumeRowsFile == NULL) {
      taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
      return;
    }
  }
P
plum-lihui 已提交
610

P
plum-lihui 已提交
611 612 613 614
  while (running) {
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
    if (tmqMsg) {
      if (0 != g_stConfInfo.showMsgFlag) {
P
plum-lihui 已提交
615
        totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
P
plum-lihui 已提交
616 617 618 619 620 621
      }

      taos_free_result(tmqMsg);

      totalMsgs++;

622 623 624 625 626
	  if (0 == once_flag) {
        once_flag = 1;
		notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
	  }

P
plum-lihui 已提交
627
      if (totalRows >= pInfo->expectMsgCnt) {
L
Liu Jicong 已提交
628
        char tmpString[128];
P
plum-lihui 已提交
629
        taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
630 631
        break;
      }
L
Liu Jicong 已提交
632
    } else {
P
plum-lihui 已提交
633 634
      char tmpString[128];
      taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
P
plum-lihui 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
      break;
    }
  }

  pInfo->consumeMsgCnt = totalMsgs;
  pInfo->consumeRowCnt = totalRows;

  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
                  pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
}

void* consumeThreadFunc(void* param) {
  int32_t totalMsgs = 0;

  SThreadInfo* pInfo = (SThreadInfo*)param;

651 652 653 654 655 656
  pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
  if (pInfo->taos == NULL) {
    taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
	exit(-1);
  }

P
plum-lihui 已提交
657 658 659
  build_consumer(pInfo);
  build_topic_list(pInfo);
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
D
dapan1121 已提交
660
    assert(0);
P
plum-lihui 已提交
661 662 663
    return NULL;
  }

L
Liu Jicong 已提交
664 665
  int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
  if (err != 0) {
P
plum-lihui 已提交
666
    pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
667 668
    exit(-1);
  }
L
Liu Jicong 已提交
669

P
plum-lihui 已提交
670 671 672 673 674
  tmq_list_destroy(pInfo->topicList);
  pInfo->topicList = NULL;

  loop_consume(pInfo);

P
plum-lihui 已提交
675
  if (pInfo->ifManualCommit) {
L
Liu Jicong 已提交
676 677 678
    pPrint("tmq_commit() manual commit when consume end.\n");
    /*tmq_commit(pInfo->tmq, NULL, 0);*/
    tmq_commit_sync(pInfo->tmq, NULL);
L
Liu Jicong 已提交
679
    taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n");
L
Liu Jicong 已提交
680
    pPrint("tmq_commit() manual commit over.\n");
P
plum-lihui 已提交
681
  }
L
Liu Jicong 已提交
682

P
plum-lihui 已提交
683
  err = tmq_unsubscribe(pInfo->tmq);
L
Liu Jicong 已提交
684
  if (err != 0) {
P
plum-lihui 已提交
685
    pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
686 687
    /*pInfo->consumeMsgCnt = -1;*/
    /*return NULL;*/
P
plum-lihui 已提交
688
  }
L
Liu Jicong 已提交
689

P
plum-lihui 已提交
690
  err = tmq_consumer_close(pInfo->tmq);
L
Liu Jicong 已提交
691
  if (err != 0) {
P
plum-lihui 已提交
692
    pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
693
    /*exit(-1);*/
P
plum-lihui 已提交
694 695 696 697 698 699
  }
  pInfo->tmq = NULL;

  // save consume result into consumeresult table
  saveConsumeResult(pInfo);

P
plum-lihui 已提交
700 701 702 703 704 705
  // save rows from per vgroup
  taosFprintfFile(g_fp, "======== consumerId: %d, consume rows from per vgroups ========\n", pInfo->consumerId);
  for (int32_t i = 0; i < pInfo->numOfVgroups; i++) {
    taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
  }

706 707 708
  taos_close(pInfo->taos);
  pInfo->taos = NULL;

P
plum-lihui 已提交
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
  return NULL;
}

void parseConsumeInfo() {
  char*      token;
  const char delim[2] = ",";
  const char ch = ':';

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
      g_stConfInfo.stThreads[i].numOfTopic++;

      token = strtok(NULL, delim);
    }

    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      {
        char* pstr = token;
        ltrim(pstr);
        char* ret = strchr(pstr, ch);
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
        strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
        g_stConfInfo.stThreads[i].numOfKey++;
      }

      token = strtok(NULL, delim);
    }
  }
}

int32_t getConsumeInfo() {
  char sqlStr[1024] = {0};

  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
757
    pError("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
    taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);
    taos_free_result(pRes);
    exit(-1);
  }

  TAOS_ROW    row = NULL;
  int         num_fields = taos_num_fields(pRes);
  TAOS_FIELD* fields = taos_fetch_fields(pRes);

  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
  // ifcheckdata int

  int32_t numOfThread = 0;
  while ((row = taos_fetch_row(pRes))) {
    int32_t* lengths = taos_fetch_lengths(pRes);

    // set default value
L
Liu Jicong 已提交
776 777 778
    // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
    // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
P
plum-lihui 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794

    for (int i = 0; i < num_fields; ++i) {
      if (row[i] == NULL || 0 == i) {
        continue;
      }

      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
P
plum-lihui 已提交
795 796
      } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
        g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
P
plum-lihui 已提交
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
      }
    }
    numOfThread++;
  }
  g_stConfInfo.numOfThread = numOfThread;

  taos_free_result(pRes);

  parseConsumeInfo();

  return 0;
}

int main(int32_t argc, char* argv[]) {
  parseArgument(argc, argv);
  getConsumeInfo();
  saveConfigToLogFile();

  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);

  // pthread_create one thread to consume
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
                     (void*)(&(g_stConfInfo.stThreads[i])));
  }

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
828
    taosThreadClear(&g_stConfInfo.stThreads[i].thread);
P
plum-lihui 已提交
829 830 831 832 833 834 835 836 837 838
  }

  // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);

  taosFprintfFile(g_fp, "==== close tmqlog ====\n");
  taosCloseFile(&g_fp);

  return 0;
}