tmqSim.c 15.7 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
L
Liu Jicong 已提交
17
#include <dirent.h>
P
plum-lihui 已提交
18
#include <stdio.h>
L
Liu Jicong 已提交
19
#include <stdlib.h>
P
plum-lihui 已提交
20 21 22
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
L
Liu Jicong 已提交
23
#include <time.h>
P
plum-lihui 已提交
24 25 26 27 28 29
#include <unistd.h>

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"

L
Liu Jicong 已提交
30 31
#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
P
plum-lihui 已提交
32 33
#define min(a, b) (((a) < (b)) ? (a) : (b))

L
Liu Jicong 已提交
34 35
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
P
plum-lihui 已提交
36

P
plum-lihui 已提交
37
typedef struct {
L
Liu Jicong 已提交
38 39
  int32_t  expectMsgCnt;
  int32_t  consumeMsgCnt;
P
plum-lihui 已提交
40 41 42
  TdThread thread;
} SThreadInfo;

P
plum-lihui 已提交
43
typedef struct {
L
Liu Jicong 已提交
44 45 46 47
  // input from argvs
  char    dbName[32];
  char    topicString[256];
  char    keyString[1024];
L
Liu Jicong 已提交
48 49
  char    topicString1[256];
  char    keyString1[1024];
L
Liu Jicong 已提交
50 51 52
  int32_t showMsgFlag;
  int32_t consumeDelay;  // unit s
  int32_t consumeMsgCnt;
L
Liu Jicong 已提交
53
  int32_t checkMode;
L
Liu Jicong 已提交
54 55 56 57 58 59 60 61

  // save result after parse agrvs
  int32_t numOfTopic;
  char    topics[32][64];

  int32_t numOfKey;
  char    key[32][64];
  char    value[32][64];
L
Liu Jicong 已提交
62 63 64 65 66 67 68

  int32_t numOfTopic1;
  char    topics1[32][64];

  int32_t numOfKey1;
  char    key1[32][64];
  char    value1[32][64];
P
plum-lihui 已提交
69 70 71 72
} SConfInfo;

static SConfInfo g_stConfInfo;

L
Liu Jicong 已提交
73 74
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
P
plum-lihui 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-t");
  printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default ");
  printf("%s%s\n", indent, "-k");
  printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default ");
P
plum-lihui 已提交
88 89 90 91
  printf("%s%s\n", indent, "-t1");
  printf("%s%s%s\n", indent, indent, "The topic1 string for cosumer, no default ");
  printf("%s%s\n", indent, "-k1");
  printf("%s%s%s\n", indent, indent, "The key1-value1 string for cosumer, no default ");
P
plum-lihui 已提交
92 93
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
P
plum-lihui 已提交
94 95 96 97
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
  printf("%s%s\n", indent, "-m");
  printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt);
P
plum-lihui 已提交
98 99
  printf("%s%s\n", indent, "-j");
  printf("%s%s%s%d\n", indent, indent, "check mode, default is s", g_stConfInfo.checkMode);
P
plum-lihui 已提交
100 101 102
  exit(EXIT_SUCCESS);
}

L
Liu Jicong 已提交
103
void parseArgument(int32_t argc, char* argv[]) {
P
plum-lihui 已提交
104
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
L
Liu Jicong 已提交
105 106
  g_stConfInfo.showMsgFlag = 0;
  g_stConfInfo.consumeDelay = 8000;
P
plum-lihui 已提交
107
  g_stConfInfo.consumeMsgCnt = 0;
L
Liu Jicong 已提交
108

P
plum-lihui 已提交
109 110 111 112 113 114 115 116 117 118 119 120
  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-t") == 0) {
      strcpy(g_stConfInfo.topicString, argv[++i]);
    } else if (strcmp(argv[i], "-k") == 0) {
      strcpy(g_stConfInfo.keyString, argv[++i]);
P
plum-lihui 已提交
121 122 123 124
    } else if (strcmp(argv[i], "-t1") == 0) {
      strcpy(g_stConfInfo.topicString1, argv[++i]);
    } else if (strcmp(argv[i], "-k1") == 0) {
      strcpy(g_stConfInfo.keyString1, argv[++i]);
P
plum-lihui 已提交
125 126
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
P
plum-lihui 已提交
127 128 129 130
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0) {
      g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
P
plum-lihui 已提交
131 132
    } else if (strcmp(argv[i], "-j") == 0) {
      g_stConfInfo.checkMode = atol(argv[++i]);
P
plum-lihui 已提交
133 134
    } else {
      printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
L
Liu Jicong 已提交
135
      exit(-1);
P
plum-lihui 已提交
136 137 138
    }
  }

P
plum-lihui 已提交
139 140 141 142
  if (0 == g_stConfInfo.consumeMsgCnt) {
    g_stConfInfo.consumeMsgCnt = 0x7fffffff;
  }

P
plum-lihui 已提交
143
#if 0
P
plum-lihui 已提交
144 145 146 147 148
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
  pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC);
  pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC);  
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
L
Liu Jicong 已提交
149
#endif
P
plum-lihui 已提交
150 151
}

L
Liu Jicong 已提交
152 153 154
void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
P
plum-lihui 已提交
155 156 157 158 159
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

L
Liu Jicong 已提交
160 161 162 163 164 165 166 167 168 169 170 171
void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
P
plum-lihui 已提交
172 173 174
}

void parseInputString() {
L
Liu Jicong 已提交
175 176
  // printf("topicString: %s\n", g_stConfInfo.topicString);
  // printf("keyString: %s\n\n", g_stConfInfo.keyString);
P
plum-lihui 已提交
177

L
Liu Jicong 已提交
178
  char*      token;
P
plum-lihui 已提交
179 180 181 182
  const char delim[2] = ",";
  const char ch = ':';

  token = strtok(g_stConfInfo.topicString, delim);
L
Liu Jicong 已提交
183 184 185
  while (token != NULL) {
    // printf("%s\n", token );
    strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token);
P
plum-lihui 已提交
186
    ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
L
Liu Jicong 已提交
187 188
    // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
    g_stConfInfo.numOfTopic++;
L
Liu Jicong 已提交
189

P
plum-lihui 已提交
190
    token = strtok(NULL, delim);
L
Liu Jicong 已提交
191
  }
P
plum-lihui 已提交
192 193

  token = strtok(g_stConfInfo.topicString1, delim);
L
Liu Jicong 已提交
194 195 196
  while (token != NULL) {
    // printf("%s\n", token );
    strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token);
P
plum-lihui 已提交
197
    ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]);
L
Liu Jicong 已提交
198 199 200
    // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
    g_stConfInfo.numOfTopic1++;

P
plum-lihui 已提交
201 202 203 204
    token = strtok(NULL, delim);
  }

  token = strtok(g_stConfInfo.keyString, delim);
L
Liu Jicong 已提交
205 206 207 208 209 210 211 212 213 214 215
  while (token != NULL) {
    // printf("%s\n", token );
    {
      char* pstr = token;
      ltrim(pstr);
      char* ret = strchr(pstr, ch);
      memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr);
      strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1);
      // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
      // g_stConfInfo.value[g_stConfInfo.numOfKey]);
      g_stConfInfo.numOfKey++;
P
plum-lihui 已提交
216
    }
L
Liu Jicong 已提交
217

P
plum-lihui 已提交
218 219
    token = strtok(NULL, delim);
  }
L
Liu Jicong 已提交
220

P
plum-lihui 已提交
221
  token = strtok(g_stConfInfo.keyString1, delim);
L
Liu Jicong 已提交
222 223 224 225 226 227 228 229 230 231 232
  while (token != NULL) {
    // printf("%s\n", token );
    {
      char* pstr = token;
      ltrim(pstr);
      char* ret = strchr(pstr, ch);
      memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr);
      strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1);
      // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
      // g_stConfInfo.value[g_stConfInfo.numOfKey]);
      g_stConfInfo.numOfKey1++;
P
plum-lihui 已提交
233
    }
L
Liu Jicong 已提交
234

P
plum-lihui 已提交
235 236 237 238
    token = strtok(NULL, delim);
  }
}

L
Liu Jicong 已提交
239
static int running = 1;
L
Liu Jicong 已提交
240
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
P
plum-lihui 已提交
241

L
Liu Jicong 已提交
242 243 244 245 246 247 248 249 250 251 252
int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
P
plum-lihui 已提交
253 254 255
}

tmq_t* build_consumer() {
L
Liu Jicong 已提交
256
#if 0
P
plum-lihui 已提交
257
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
258

P
plum-lihui 已提交
259 260 261 262 263 264 265
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
L
Liu Jicong 已提交
266 267
    taos_free_result(pRes);
    exit(-1);
P
plum-lihui 已提交
268 269
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
270
#endif
P
plum-lihui 已提交
271 272

  tmq_conf_t* conf = tmq_conf_new();
L
Liu Jicong 已提交
273
  // tmq_conf_set(conf, "group.id", "tg2");
P
plum-lihui 已提交
274 275 276
  for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) {
    tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]);
  }
L
Liu Jicong 已提交
277 278 279 280 281 282
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
  tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
  tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
  assert(tmq);
  tmq_conf_destroy(conf);
P
plum-lihui 已提交
283 284 285 286 287
  return tmq;
}

tmq_list_t* build_topic_list() {
  tmq_list_t* topic_list = tmq_list_new();
L
Liu Jicong 已提交
288
  // tmq_list_append(topic_list, "test_stb_topic_1");
P
plum-lihui 已提交
289 290 291 292 293 294
  for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) {
    tmq_list_append(topic_list, g_stConfInfo.topics[i]);
  }
  return topic_list;
}

P
plum-lihui 已提交
295
tmq_t* build_consumer_x() {
L
Liu Jicong 已提交
296
#if 0
P
plum-lihui 已提交
297
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
298

P
plum-lihui 已提交
299 300 301 302 303 304 305
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
L
Liu Jicong 已提交
306 307
    taos_free_result(pRes);
    exit(-1);
P
plum-lihui 已提交
308 309
  }
  taos_free_result(pRes);
L
Liu Jicong 已提交
310
#endif
P
plum-lihui 已提交
311 312

  tmq_conf_t* conf = tmq_conf_new();
L
Liu Jicong 已提交
313
  // tmq_conf_set(conf, "group.id", "tg2");
P
plum-lihui 已提交
314 315 316
  for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) {
    tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]);
  }
L
Liu Jicong 已提交
317 318 319 320 321 322
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
  tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
  tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0);
  assert(tmq);
  tmq_conf_destroy(conf);
P
plum-lihui 已提交
323 324 325 326 327
  return tmq;
}

tmq_list_t* build_topic_list_x() {
  tmq_list_t* topic_list = tmq_list_new();
L
Liu Jicong 已提交
328
  // tmq_list_append(topic_list, "test_stb_topic_1");
P
plum-lihui 已提交
329 330 331 332 333 334
  for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) {
    tmq_list_append(topic_list, g_stConfInfo.topics1[i]);
  }
  return topic_list;
}

P
plum-lihui 已提交
335
void loop_consume(tmq_t* tmq) {
P
plum-lihui 已提交
336 337 338
  tmq_resp_err_t err;

  int32_t totalMsgs = 0;
P
plum-lihui 已提交
339
  int32_t totalRows = 0;
P
plum-lihui 已提交
340 341
  int32_t skipLogNum = 0;
  while (running) {
L
Liu Jicong 已提交
342
    TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000);
P
plum-lihui 已提交
343
    if (tmqMsg) {
L
Liu Jicong 已提交
344
      totalMsgs++;
P
plum-lihui 已提交
345

L
Liu Jicong 已提交
346
#if 0
P
plum-lihui 已提交
347 348 349 350
	  TAOS_ROW row;
	  while (NULL != (row = tmq_get_row(tmqMsg))) {
        totalRows++;
	  }
L
Liu Jicong 已提交
351 352
#endif

L
Liu Jicong 已提交
353
      /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
L
Liu Jicong 已提交
354
      if (0 != g_stConfInfo.showMsgFlag) {
L
Liu Jicong 已提交
355
        /*msg_process(tmqMsg);*/
L
Liu Jicong 已提交
356
      }
P
plum-lihui 已提交
357
      tmq_message_destroy(tmqMsg);
P
plum-lihui 已提交
358 359 360 361 362 363 364 365
    } else {
      break;
    }
  }

  err = tmq_consumer_close(tmq);
  if (err) {
    printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
366
    exit(-1);
P
plum-lihui 已提交
367
  }
L
Liu Jicong 已提交
368

P
plum-lihui 已提交
369
  printf("{consume success: %d, %d}", totalMsgs, totalRows);
P
plum-lihui 已提交
370 371
}

P
plum-lihui 已提交
372
int32_t parallel_consume(tmq_t* tmq, int threadLable) {
P
plum-lihui 已提交
373 374 375 376 377 378
  tmq_resp_err_t err;

  int32_t totalMsgs = 0;
  int32_t totalRows = 0;
  int32_t skipLogNum = 0;
  while (running) {
L
Liu Jicong 已提交
379
    TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
P
plum-lihui 已提交
380
    if (tmqMsg) {
L
Liu Jicong 已提交
381
      totalMsgs++;
P
plum-lihui 已提交
382

L
Liu Jicong 已提交
383
      // printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs);
P
plum-lihui 已提交
384

L
Liu Jicong 已提交
385
#if 0
P
plum-lihui 已提交
386 387 388 389
	  TAOS_ROW row;
	  while (NULL != (row = tmq_get_row(tmqMsg))) {
        totalRows++;
	  }
L
Liu Jicong 已提交
390 391 392 393 394 395
#endif

      /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/
      if (0 != g_stConfInfo.showMsgFlag) {
        /*msg_process(tmqMsg);*/
      }
P
plum-lihui 已提交
396 397
      tmq_message_destroy(tmqMsg);

L
Liu Jicong 已提交
398
      if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
P
plum-lihui 已提交
399
        break;
L
Liu Jicong 已提交
400
      }
P
plum-lihui 已提交
401 402 403 404 405 406 407 408
    } else {
      break;
    }
  }

  err = tmq_consumer_close(tmq);
  if (err) {
    printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
409
    exit(-1);
P
plum-lihui 已提交
410
  }
L
Liu Jicong 已提交
411

L
Liu Jicong 已提交
412
  // printf("%d", totalMsgs); // output to sim for check result
P
plum-lihui 已提交
413 414 415
  return totalMsgs;
}

L
Liu Jicong 已提交
416
void* threadFunc(void* param) {
P
plum-lihui 已提交
417 418
  int32_t totalMsgs = 0;

L
Liu Jicong 已提交
419
  SThreadInfo* pInfo = (SThreadInfo*)param;
P
plum-lihui 已提交
420

L
Liu Jicong 已提交
421
  tmq_t*      tmq = build_consumer_x();
P
plum-lihui 已提交
422
  tmq_list_t* topic_list = build_topic_list_x();
L
Liu Jicong 已提交
423
  if ((NULL == tmq) || (NULL == topic_list)) {
P
plum-lihui 已提交
424 425
    return NULL;
  }
L
Liu Jicong 已提交
426

P
plum-lihui 已提交
427 428 429 430 431 432
  tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
  if (err) {
    printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
    exit(-1);
  }

L
Liu Jicong 已提交
433 434 435 436
  // if (0 == g_stConfInfo.consumeMsgCnt) {
  //   loop_consume(tmq);
  // } else {
  pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
P
plum-lihui 已提交
437 438 439 440 441
  //}

  err = tmq_unsubscribe(tmq);
  if (err) {
    printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
442
    pInfo->consumeMsgCnt = -1;
P
plum-lihui 已提交
443 444 445 446
    return NULL;
  }

  return NULL;
P
plum-lihui 已提交
447 448
}

L
Liu Jicong 已提交
449
int main(int32_t argc, char* argv[]) {
P
plum-lihui 已提交
450 451
  parseArgument(argc, argv);
  parseInputString();
L
Liu Jicong 已提交
452

L
Liu Jicong 已提交
453
  int32_t      numOfThreads = 1;
P
plum-lihui 已提交
454 455 456
  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
L
Liu Jicong 已提交
457
  SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo));
P
plum-lihui 已提交
458 459 460 461 462 463

  if (g_stConfInfo.numOfTopic1) {
    // pthread_create one thread to consume
    for (int32_t i = 0; i < numOfThreads; ++i) {
      pInfo[i].expectMsgCnt = 0;
      pInfo[i].consumeMsgCnt = 0;
L
Liu Jicong 已提交
464
      taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i));
P
plum-lihui 已提交
465 466 467
    }
  }

L
Liu Jicong 已提交
468
  int32_t     totalMsgs = 0;
L
Liu Jicong 已提交
469
  tmq_t*      tmq = build_consumer();
P
plum-lihui 已提交
470
  tmq_list_t* topic_list = build_topic_list();
L
Liu Jicong 已提交
471
  if ((NULL == tmq) || (NULL == topic_list)) {
P
plum-lihui 已提交
472 473
    return -1;
  }
L
Liu Jicong 已提交
474

P
plum-lihui 已提交
475 476 477 478 479
  tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
  if (err) {
    printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
    exit(-1);
  }
P
plum-lihui 已提交
480

P
plum-lihui 已提交
481
  if (0 == g_stConfInfo.numOfTopic1) {
P
plum-lihui 已提交
482 483
    loop_consume(tmq);
  } else {
P
plum-lihui 已提交
484
    totalMsgs = parallel_consume(tmq, 0);
P
plum-lihui 已提交
485
  }
P
plum-lihui 已提交
486 487 488 489 490 491

  err = tmq_unsubscribe(tmq);
  if (err) {
    printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
    exit(-1);
  }
P
plum-lihui 已提交
492

L
Liu Jicong 已提交
493
  if (g_stConfInfo.numOfTopic1) {
P
plum-lihui 已提交
494 495 496 497
    for (int32_t i = 0; i < numOfThreads; i++) {
      taosThreadJoin(pInfo[i].thread, NULL);
    }

L
Liu Jicong 已提交
498 499 500 501
    // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
    if (0 == g_stConfInfo.checkMode) {
      if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) {
        printf("success");
P
plum-lihui 已提交
502
      } else {
L
Liu Jicong 已提交
503 504 505
        printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
      }
    } else if (1 == g_stConfInfo.checkMode) {
P
plum-lihui 已提交
506
      if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) {
L
Liu Jicong 已提交
507
        printf("success");
P
plum-lihui 已提交
508
      } else {
L
Liu Jicong 已提交
509 510 511 512 513
        printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
      }
    } else if (2 == g_stConfInfo.checkMode) {
      if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) {
        printf("success");
P
plum-lihui 已提交
514
      } else {
L
Liu Jicong 已提交
515 516 517
        printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
      }
    } else if (3 == g_stConfInfo.checkMode) {
P
plum-lihui 已提交
518
      if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) {
L
Liu Jicong 已提交
519
        printf("success");
P
plum-lihui 已提交
520
      } else {
L
Liu Jicong 已提交
521 522 523 524 525 526 527 528
        printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
      }
    } else if (4 == g_stConfInfo.checkMode) {
      if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) ||
          ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) ||
          ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) ||
          ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) {
        printf("success");
P
plum-lihui 已提交
529
      } else {
L
Liu Jicong 已提交
530 531 532 533 534
        printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
      }
    } else {
      printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt);
    }
P
plum-lihui 已提交
535 536
  }

P
plum-lihui 已提交
537 538 539
  return 0;
}