tmqSim.c 13.3 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
L
Liu Jicong 已提交
17
#include <dirent.h>
P
plum-lihui 已提交
18
#include <stdio.h>
L
Liu Jicong 已提交
19
#include <stdlib.h>
P
plum-lihui 已提交
20 21 22
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
L
Liu Jicong 已提交
23
#include <time.h>
P
plum-lihui 已提交
24 25 26 27 28 29
#include <unistd.h>

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"

L
Liu Jicong 已提交
30 31
#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
P
plum-lihui 已提交
32 33
#define min(a, b) (((a) < (b)) ? (a) : (b))

L
Liu Jicong 已提交
34 35 36
#define MAX_SQL_STR_LEN         (1024 * 1024)
#define MAX_ROW_STR_LEN         (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16)
P
plum-lihui 已提交
37

P
plum-lihui 已提交
38
typedef struct {
L
Liu Jicong 已提交
39 40
  TdThread thread;
  int32_t  consumerId;
P
plum-lihui 已提交
41

L
Liu Jicong 已提交
42 43
  int32_t ifCheckData;
  int64_t expectMsgCnt;
P
plum-lihui 已提交
44

L
Liu Jicong 已提交
45 46
  int64_t consumeMsgCnt;
  int32_t checkresult;
P
plum-lihui 已提交
47

L
Liu Jicong 已提交
48 49
  char topicString[1024];
  char keyString[1024];
P
plum-lihui 已提交
50

L
Liu Jicong 已提交
51 52 53 54 55 56
  int32_t numOfTopic;
  char    topics[32][64];

  int32_t numOfKey;
  char    key[32][64];
  char    value[32][64];
P
plum-lihui 已提交
57 58 59

  tmq_t*      tmq;
  tmq_list_t* topicList;
L
Liu Jicong 已提交
60

P
plum-lihui 已提交
61 62
} SThreadInfo;

P
plum-lihui 已提交
63
typedef struct {
L
Liu Jicong 已提交
64
  // input from argvs
L
Liu Jicong 已提交
65 66 67 68 69
  char        dbName[32];
  int32_t     showMsgFlag;
  int32_t     consumeDelay;  // unit s
  int32_t     numOfThread;
  SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
P
plum-lihui 已提交
70 71 72
} SConfInfo;

static SConfInfo g_stConfInfo;
L
Liu Jicong 已提交
73
TdFilePtr        g_fp = NULL;
P
plum-lihui 已提交
74

L
Liu Jicong 已提交
75 76
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
P
plum-lihui 已提交
77 78 79 80 81 82 83 84 85 86 87

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
P
plum-lihui 已提交
88 89
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
P
plum-lihui 已提交
90 91 92
  exit(EXIT_SUCCESS);
}

P
plum-lihui 已提交
93 94 95 96 97
void initLogFile() {
  // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
  TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
  if (NULL == pFile) {
    fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
L
Liu Jicong 已提交
98
    exit - 1;
P
plum-lihui 已提交
99 100 101 102 103 104 105
  };
  g_fp = pFile;

  time_t    tTime = taosGetTimestampSec();
  struct tm tm = *taosLocalTime(&tTime, NULL);

  taosFprintfFile(pFile, "###################################################################\n");
L
Liu Jicong 已提交
106 107 108 109
  taosFprintfFile(pFile, "# configDir:           %s\n", configDir);
  taosFprintfFile(pFile, "# dbName:              %s\n", g_stConfInfo.dbName);
  taosFprintfFile(pFile, "# showMsgFlag:         %d\n", g_stConfInfo.showMsgFlag);
  taosFprintfFile(pFile, "# consumeDelay:        %d\n", g_stConfInfo.consumeDelay);
P
plum-lihui 已提交
110

L
Liu Jicong 已提交
111
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
P
plum-lihui 已提交
112
    taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
L
Liu Jicong 已提交
113 114 115
    taosFprintfFile(pFile, "  Topics: ");
    for (int i = 0; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
      taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
P
plum-lihui 已提交
116
    }
L
Liu Jicong 已提交
117
    taosFprintfFile(pFile, "\n");
P
plum-lihui 已提交
118
    taosFprintfFile(pFile, "  Key: ");
L
Liu Jicong 已提交
119 120
    for (int i = 0; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
      taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
P
plum-lihui 已提交
121 122 123
    }
    taosFprintfFile(pFile, "\n");
  }
L
Liu Jicong 已提交
124

P
plum-lihui 已提交
125
  taosFprintfFile(pFile, "# Test time:                %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
L
Liu Jicong 已提交
126
                  tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
P
plum-lihui 已提交
127 128 129
  taosFprintfFile(pFile, "###################################################################\n");
}

L
Liu Jicong 已提交
130
void parseArgument(int32_t argc, char* argv[]) {
P
plum-lihui 已提交
131
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
L
Liu Jicong 已提交
132
  g_stConfInfo.showMsgFlag = 0;
P
plum-lihui 已提交
133
  g_stConfInfo.consumeDelay = 5;
L
Liu Jicong 已提交
134

P
plum-lihui 已提交
135 136 137 138 139 140 141 142 143 144
  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
P
plum-lihui 已提交
145 146
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
P
plum-lihui 已提交
147 148
    } else {
      printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
L
Liu Jicong 已提交
149
      exit(-1);
P
plum-lihui 已提交
150 151 152
    }
  }

P
plum-lihui 已提交
153
#if 1
P
plum-lihui 已提交
154 155
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
P
plum-lihui 已提交
156
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
P
plum-lihui 已提交
157
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
L
Liu Jicong 已提交
158
#endif
P
plum-lihui 已提交
159 160
}

L
Liu Jicong 已提交
161 162 163
void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
P
plum-lihui 已提交
164 165 166 167 168
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

L
Liu Jicong 已提交
169 170 171 172 173 174 175 176 177 178 179 180
void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
P
plum-lihui 已提交
181 182
}

L
Liu Jicong 已提交
183
static int  running = 1;
P
plum-lihui 已提交
184 185 186
static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) {
  char buf[1024];

L
Liu Jicong 已提交
187 188 189 190 191
  // printf("topic: %s\n", tmq_get_topic_name(msg));
  // printf("vg:%d\n", tmq_get_vgroup_id(msg));
  taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable);
  taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));

P
plum-lihui 已提交
192 193 194 195 196
  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;
    TAOS_FIELD* fields = taos_fetch_fields(msg);
    int32_t     numOfFields = taos_field_count(msg);
L
Liu Jicong 已提交
197 198 199
    // taos_print_row(buf, row, fields, numOfFields);
    // printf("%s\n", buf);
    // taosFprintfFile(g_fp, "%s\n",  buf);
P
plum-lihui 已提交
200 201 202
  }
}

L
Liu Jicong 已提交
203 204 205 206 207 208 209 210 211 212 213
int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
P
plum-lihui 已提交
214 215
}

L
Liu Jicong 已提交
216
void build_consumer(SThreadInfo* pInfo) {
P
plum-lihui 已提交
217
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
218

P
plum-lihui 已提交
219 220 221 222 223 224 225
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);

  sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in use db, reason:%s\n", taos_errstr(pRes));
L
Liu Jicong 已提交
226 227
    taos_free_result(pRes);
    exit(-1);
P
plum-lihui 已提交
228 229 230 231
  }
  taos_free_result(pRes);

  tmq_conf_t* conf = tmq_conf_new();
L
Liu Jicong 已提交
232
  // tmq_conf_set(conf, "group.id", "tg2");
P
plum-lihui 已提交
233 234
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
P
plum-lihui 已提交
235
  }
L
Liu Jicong 已提交
236
  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
P
plum-lihui 已提交
237
  return;
P
plum-lihui 已提交
238 239
}

L
Liu Jicong 已提交
240
void build_topic_list(SThreadInfo* pInfo) {
P
plum-lihui 已提交
241
  pInfo->topicList = tmq_list_new();
L
Liu Jicong 已提交
242
  // tmq_list_append(topic_list, "test_stb_topic_1");
P
plum-lihui 已提交
243 244
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
P
plum-lihui 已提交
245
  }
P
plum-lihui 已提交
246
  return;
P
plum-lihui 已提交
247 248
}

L
Liu Jicong 已提交
249
int32_t saveConsumeResult(SThreadInfo* pInfo) {
P
plum-lihui 已提交
250
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
251

P
plum-lihui 已提交
252 253
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);
L
Liu Jicong 已提交
254

P
plum-lihui 已提交
255
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
L
Liu Jicong 已提交
256 257 258
  sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", g_stConfInfo.dbName,
          pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->checkresult);

P
plum-lihui 已提交
259 260
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
261
    printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
L
Liu Jicong 已提交
262 263
    taos_free_result(pRes);
    exit(-1);
P
plum-lihui 已提交
264
  }
L
Liu Jicong 已提交
265

P
plum-lihui 已提交
266 267
  taos_free_result(pRes);

P
plum-lihui 已提交
268
  return 0;
P
plum-lihui 已提交
269 270
}

L
Liu Jicong 已提交
271
void loop_consume(SThreadInfo* pInfo) {
P
plum-lihui 已提交
272
  tmq_resp_err_t err;
L
Liu Jicong 已提交
273

P
plum-lihui 已提交
274
  int64_t totalMsgs = 0;
L
Liu Jicong 已提交
275
  // int64_t totalRows = 0;
P
plum-lihui 已提交
276 277

  while (running) {
P
plum-lihui 已提交
278
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
L
Liu Jicong 已提交
279
    if (tmqMsg) {
L
Liu Jicong 已提交
280
      if (0 != g_stConfInfo.showMsgFlag) {
P
plum-lihui 已提交
281
        msg_process(tmqMsg, totalMsgs, 0);
L
Liu Jicong 已提交
282
      }
L
Liu Jicong 已提交
283 284 285

      taos_free_result(tmqMsg);

L
Liu Jicong 已提交
286
      totalMsgs++;
L
Liu Jicong 已提交
287

P
plum-lihui 已提交
288
      if (totalMsgs >= pInfo->expectMsgCnt) {
P
plum-lihui 已提交
289
        break;
L
Liu Jicong 已提交
290
      }
P
plum-lihui 已提交
291 292 293 294
    } else {
      break;
    }
  }
L
Liu Jicong 已提交
295

P
plum-lihui 已提交
296
  err = tmq_consumer_close(pInfo->tmq);
P
plum-lihui 已提交
297 298
  if (err) {
    printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
299
    exit(-1);
P
plum-lihui 已提交
300
  }
L
Liu Jicong 已提交
301

P
plum-lihui 已提交
302
  pInfo->consumeMsgCnt = totalMsgs;
P
plum-lihui 已提交
303 304
}

L
Liu Jicong 已提交
305
void* consumeThreadFunc(void* param) {
P
plum-lihui 已提交
306 307
  int32_t totalMsgs = 0;

L
Liu Jicong 已提交
308
  SThreadInfo* pInfo = (SThreadInfo*)param;
P
plum-lihui 已提交
309

P
plum-lihui 已提交
310 311
  build_consumer(pInfo);
  build_topic_list(pInfo);
L
Liu Jicong 已提交
312
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
P
plum-lihui 已提交
313 314
    return NULL;
  }
L
Liu Jicong 已提交
315

P
plum-lihui 已提交
316
  tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
P
plum-lihui 已提交
317 318 319 320
  if (err) {
    printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
    exit(-1);
  }
L
Liu Jicong 已提交
321

P
plum-lihui 已提交
322
  loop_consume(pInfo);
P
plum-lihui 已提交
323

P
plum-lihui 已提交
324
  err = tmq_unsubscribe(pInfo->tmq);
P
plum-lihui 已提交
325 326
  if (err) {
    printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
327
    pInfo->consumeMsgCnt = -1;
P
plum-lihui 已提交
328
    return NULL;
L
Liu Jicong 已提交
329 330
  }

P
plum-lihui 已提交
331 332
  // save consume result into consumeresult table
  saveConsumeResult(pInfo);
P
plum-lihui 已提交
333 334

  return NULL;
P
plum-lihui 已提交
335 336
}

P
plum-lihui 已提交
337 338 339 340
void parseConsumeInfo() {
  char*      token;
  const char delim[2] = ",";
  const char ch = ':';
L
Liu Jicong 已提交
341

L
Liu Jicong 已提交
342
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
P
plum-lihui 已提交
343 344 345 346 347 348 349
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
      g_stConfInfo.stThreads[i].numOfTopic++;
L
Liu Jicong 已提交
350

P
plum-lihui 已提交
351 352
      token = strtok(NULL, delim);
    }
L
Liu Jicong 已提交
353

P
plum-lihui 已提交
354 355 356 357 358 359 360 361 362 363 364 365 366
    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      {
        char* pstr = token;
        ltrim(pstr);
        char* ret = strchr(pstr, ch);
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
        strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
        g_stConfInfo.stThreads[i].numOfKey++;
      }
L
Liu Jicong 已提交
367

P
plum-lihui 已提交
368
      token = strtok(NULL, delim);
P
plum-lihui 已提交
369 370
    }
  }
P
plum-lihui 已提交
371
}
P
plum-lihui 已提交
372

P
plum-lihui 已提交
373 374
int32_t getConsumeInfo() {
  char sqlStr[1024] = {0};
L
Liu Jicong 已提交
375

P
plum-lihui 已提交
376 377
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);
L
Liu Jicong 已提交
378

P
plum-lihui 已提交
379 380 381 382 383
  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName);
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
    taos_free_result(pRes);
P
plum-lihui 已提交
384
    exit(-1);
L
Liu Jicong 已提交
385 386 387 388 389 390 391 392 393
  }

  TAOS_ROW    row = NULL;
  int         num_fields = taos_num_fields(pRes);
  TAOS_FIELD* fields = taos_fetch_fields(pRes);

  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint,
  // ifcheckdata int

P
plum-lihui 已提交
394 395
  int32_t numOfThread = 0;
  while ((row = taos_fetch_row(pRes))) {
L
Liu Jicong 已提交
396 397 398
    int32_t* lengths = taos_fetch_lengths(pRes);

    for (int i = 0; i < num_fields; ++i) {
P
plum-lihui 已提交
399 400 401
      if (row[i] == NULL || 0 == i) {
        continue;
      }
L
Liu Jicong 已提交
402

P
plum-lihui 已提交
403
      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
L
Liu Jicong 已提交
404
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]);
P
plum-lihui 已提交
405 406 407 408 409
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
L
Liu Jicong 已提交
410
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
P
plum-lihui 已提交
411
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
L
Liu Jicong 已提交
412
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
P
plum-lihui 已提交
413 414
      }
    }
L
Liu Jicong 已提交
415
    numOfThread++;
P
plum-lihui 已提交
416
  }
P
plum-lihui 已提交
417
  g_stConfInfo.numOfThread = numOfThread;
P
plum-lihui 已提交
418

P
plum-lihui 已提交
419
  taos_free_result(pRes);
P
plum-lihui 已提交
420

P
plum-lihui 已提交
421
  parseConsumeInfo();
P
plum-lihui 已提交
422

P
plum-lihui 已提交
423 424
  return 0;
}
P
plum-lihui 已提交
425

P
plum-lihui 已提交
426 427 428 429 430 431 432 433 434 435 436
int main(int32_t argc, char* argv[]) {
  parseArgument(argc, argv);
  getConsumeInfo();
  initLogFile();

  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);

  // pthread_create one thread to consume
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
L
Liu Jicong 已提交
437 438
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc,
                     (void*)(&(g_stConfInfo.stThreads[i])));
P
plum-lihui 已提交
439 440 441 442
  }

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
P
plum-lihui 已提交
443 444
  }

L
Liu Jicong 已提交
445 446 447 448 449
  // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);

  taosFprintfFile(g_fp, "\n");
  taosCloseFile(&g_fp);

P
plum-lihui 已提交
450 451 452
  return 0;
}