tmqSim.c 15.7 KB
Newer Older
P
plum-lihui 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <assert.h>
P
plum-lihui 已提交
17
#include <dirent.h>
P
plum-lihui 已提交
18
#include <stdio.h>
L
Liu Jicong 已提交
19
#include <stdlib.h>
P
plum-lihui 已提交
20 21 22
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
L
Liu Jicong 已提交
23
#include <time.h>
P
plum-lihui 已提交
24
#include <unistd.h>
P
plum-lihui 已提交
25 26 27 28 29

#include "taos.h"
#include "taoserror.h"
#include "tlog.h"

L
Liu Jicong 已提交
30 31
#define GREEN     "\033[1;32m"
#define NC        "\033[0m"
P
plum-lihui 已提交
32 33
#define min(a, b) (((a) < (b)) ? (a) : (b))

P
plum-lihui 已提交
34 35 36
#define MAX_SQL_STR_LEN            (1024 * 1024)
#define MAX_ROW_STR_LEN            (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT    (16)
P
plum-lihui 已提交
37

P
plum-lihui 已提交
38
typedef struct {
P
plum-lihui 已提交
39 40
  TdThread    thread;
  int32_t     consumerId;
P
plum-lihui 已提交
41

P
plum-lihui 已提交
42 43 44 45 46 47
  int32_t     ifCheckData;
  int64_t     expectMsgCnt; 
  
  int64_t     consumeMsgCnt;
  int64_t     consumeRowCnt;
  int32_t     checkresult;
P
plum-lihui 已提交
48

P
plum-lihui 已提交
49 50
  char        topicString[1024];
  char        keyString[1024];
P
plum-lihui 已提交
51

P
plum-lihui 已提交
52 53
  int32_t     numOfTopic;
  char        topics[32][64];
P
plum-lihui 已提交
54

P
plum-lihui 已提交
55 56 57
  int32_t     numOfKey;
  char        key[32][64];
  char        value[32][64];  
P
plum-lihui 已提交
58 59 60

  tmq_t*      tmq;
  tmq_list_t* topicList;
P
plum-lihui 已提交
61
  
P
plum-lihui 已提交
62 63
} SThreadInfo;

P
plum-lihui 已提交
64
typedef struct {
L
Liu Jicong 已提交
65
  // input from argvs
P
plum-lihui 已提交
66 67 68 69 70 71 72
  char           cdbName[32];
  char           dbName[32];
  int32_t        showMsgFlag;
  int32_t        showRowFlag;
  int32_t        consumeDelay;  // unit s
  int32_t        numOfThread;  
  SThreadInfo    stThreads[MAX_CONSUMER_THREAD_CNT];
P
plum-lihui 已提交
73 74 75
} SConfInfo;

static SConfInfo g_stConfInfo;
P
plum-lihui 已提交
76
TdFilePtr g_fp = NULL;
P
plum-lihui 已提交
77

L
Liu Jicong 已提交
78 79
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
P
plum-lihui 已提交
80 81 82 83 84 85 86 87 88 89 90

static void printHelp() {
  char indent[10] = "        ";
  printf("Used to test the tmq feature with sim cases\n");

  printf("%s%s\n", indent, "-c");
  printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
  printf("%s%s\n", indent, "-d");
  printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
  printf("%s%s\n", indent, "-g");
  printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
P
plum-lihui 已提交
91 92
  printf("%s%s\n", indent, "-r");
  printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
P
plum-lihui 已提交
93 94
  printf("%s%s\n", indent, "-y");
  printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
P
plum-lihui 已提交
95 96 97
  exit(EXIT_SUCCESS);
}

P
plum-lihui 已提交
98

P
plum-lihui 已提交
99 100
void initLogFile() {
  // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
P
plum-lihui 已提交
101 102
  char file[256];
  sprintf(file, "%s/../log/tmqlog.txt", configDir);
P
plum-lihui 已提交
103
  TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
P
plum-lihui 已提交
104 105
  if (NULL == pFile) {
    fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
P
plum-lihui 已提交
106 107
    exit -1;
  };
P
plum-lihui 已提交
108
  g_fp = pFile;
P
plum-lihui 已提交
109
}
P
plum-lihui 已提交
110

P
plum-lihui 已提交
111 112

void saveConfigToLogFile() {
P
plum-lihui 已提交
113 114 115
  time_t    tTime = taosGetTimestampSec();
  struct tm tm = *taosLocalTime(&tTime, NULL);

P
plum-lihui 已提交
116 117 118 119 120 121 122
  taosFprintfFile(g_fp, "###################################################################\n");
  taosFprintfFile(g_fp, "# configDir:           %s\n",  configDir);
  taosFprintfFile(g_fp, "# dbName:              %s\n",  g_stConfInfo.dbName);
  taosFprintfFile(g_fp, "# cdbName:             %s\n",  g_stConfInfo.cdbName);
  taosFprintfFile(g_fp, "# showMsgFlag:         %d\n",  g_stConfInfo.showMsgFlag);
  taosFprintfFile(g_fp, "# showRowFlag:         %d\n",  g_stConfInfo.showRowFlag);
  taosFprintfFile(g_fp, "# consumeDelay:        %d\n",  g_stConfInfo.consumeDelay);
P
plum-lihui 已提交
123
  taosFprintfFile(g_fp, "# numOfThread:         %d\n",  g_stConfInfo.numOfThread);
P
plum-lihui 已提交
124 125 126 127 128 129

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {	  
    taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
	taosFprintfFile(g_fp, "  Topics: ");
    for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
    taosFprintfFile(g_fp, "%s, ",  g_stConfInfo.stThreads[i].topics[i]);
P
plum-lihui 已提交
130
    }
P
plum-lihui 已提交
131 132 133 134
    taosFprintfFile(g_fp, "\n");  
    taosFprintfFile(g_fp, "  Key: ");
    for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
      taosFprintfFile(g_fp, "%s:%s, ",  g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
P
plum-lihui 已提交
135
    }
P
plum-lihui 已提交
136
    taosFprintfFile(g_fp, "\n");
P
plum-lihui 已提交
137
  }
P
plum-lihui 已提交
138 139 140 141
  
  taosFprintfFile(g_fp, "# Test time:                %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
                                                                           tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
  taosFprintfFile(g_fp, "###################################################################\n");
P
plum-lihui 已提交
142 143
}

L
Liu Jicong 已提交
144
void parseArgument(int32_t argc, char* argv[]) {
P
plum-lihui 已提交
145
  memset(&g_stConfInfo, 0, sizeof(SConfInfo));
L
Liu Jicong 已提交
146
  g_stConfInfo.showMsgFlag = 0;
P
plum-lihui 已提交
147
  g_stConfInfo.showRowFlag = 0;
P
plum-lihui 已提交
148
  g_stConfInfo.consumeDelay = 5;
L
Liu Jicong 已提交
149

P
plum-lihui 已提交
150 151 152 153 154 155
  for (int32_t i = 1; i < argc; i++) {
    if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
      printHelp();
      exit(0);
    } else if (strcmp(argv[i], "-d") == 0) {
      strcpy(g_stConfInfo.dbName, argv[++i]);
P
plum-lihui 已提交
156 157
    } else if (strcmp(argv[i], "-w") == 0) {
      strcpy(g_stConfInfo.cdbName, argv[++i]);
P
plum-lihui 已提交
158 159 160 161
    } else if (strcmp(argv[i], "-c") == 0) {
      strcpy(configDir, argv[++i]);
    } else if (strcmp(argv[i], "-g") == 0) {
      g_stConfInfo.showMsgFlag = atol(argv[++i]);
P
plum-lihui 已提交
162 163
    } else if (strcmp(argv[i], "-r") == 0) {
      g_stConfInfo.showRowFlag = atol(argv[++i]);
P
plum-lihui 已提交
164 165
    } else if (strcmp(argv[i], "-y") == 0) {
      g_stConfInfo.consumeDelay = atol(argv[++i]);
P
plum-lihui 已提交
166 167
    } else {
      printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
L
Liu Jicong 已提交
168
      exit(-1);
P
plum-lihui 已提交
169 170 171
    }
  }

P
plum-lihui 已提交
172 173 174 175
  initLogFile();
  
  taosFprintfFile(g_fp, "====parseArgument() success\n");

P
plum-lihui 已提交
176
#if 1
P
plum-lihui 已提交
177 178
  pPrint("%s configDir:%s %s", GREEN, configDir, NC);
  pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
P
plum-lihui 已提交
179
  pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
P
plum-lihui 已提交
180
  pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
P
plum-lihui 已提交
181
  pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
P
plum-lihui 已提交
182
  pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
L
Liu Jicong 已提交
183
#endif
P
plum-lihui 已提交
184 185
}

L
Liu Jicong 已提交
186 187 188
void splitStr(char** arr, char* str, const char* del) {
  char* s = strtok(str, del);
  while (s != NULL) {
P
plum-lihui 已提交
189 190 191 192 193
    *arr++ = s;
    s = strtok(NULL, del);
  }
}

L
Liu Jicong 已提交
194 195 196 197 198 199 200 201 202 203 204 205
void ltrim(char* str) {
  if (str == NULL || *str == '\0') {
    return;
  }
  int   len = 0;
  char* p = str;
  while (*p != '\0' && isspace(*p)) {
    ++p;
    ++len;
  }
  memmove(str, p, strlen(str) - len + 1);
  // return str;
P
plum-lihui 已提交
206 207
}

P
plum-lihui 已提交
208 209
static int running = 1;
static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
P
plum-lihui 已提交
210
  char buf[1024];
P
plum-lihui 已提交
211
  int32_t totalRows = 0;
P
plum-lihui 已提交
212

P
plum-lihui 已提交
213 214 215 216 217
  //printf("topic: %s\n", tmq_get_topic_name(msg));
  //printf("vg:%d\n", tmq_get_vgroup_id(msg));
  taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n",  msgIndex, threadLable);
  taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n",  tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
  
P
plum-lihui 已提交
218 219 220
  while (1) {
    TAOS_ROW row = taos_fetch_row(msg);
    if (row == NULL) break;
P
plum-lihui 已提交
221 222 223 224 225 226 227
	if (0 != g_stConfInfo.showRowFlag) {
      TAOS_FIELD* fields      = taos_fetch_fields(msg);
      int32_t     numOfFields = taos_field_count(msg);
      taos_print_row(buf, row, fields, numOfFields);
  	  taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
	}
	totalRows++;
P
plum-lihui 已提交
228
  }
P
plum-lihui 已提交
229 230

  return totalRows;
P
plum-lihui 已提交
231 232
}

L
Liu Jicong 已提交
233 234 235 236 237 238 239 240 241 242 243
int queryDB(TAOS* taos, char* command) {
  TAOS_RES* pRes = taos_query(taos, command);
  int       code = taos_errno(pRes);
  // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
  if (code != 0) {
    pError("failed to reason:%s, sql: %s", tstrerror(code), command);
    taos_free_result(pRes);
    return -1;
  }
  taos_free_result(pRes);
  return 0;
P
plum-lihui 已提交
244 245
}

P
plum-lihui 已提交
246 247 248
static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
  printf("tmq_commit_cb_print() commit %d\n", resp);
}
L
Liu Jicong 已提交
249

P
plum-lihui 已提交
250 251
void build_consumer(SThreadInfo *pInfo) {
  tmq_conf_t* conf = tmq_conf_new();
P
plum-lihui 已提交
252

P
plum-lihui 已提交
253 254 255 256
  //tmq_conf_set(conf, "td.connect.ip", "localhost");
  //tmq_conf_set(conf, "td.connect.port", "6030");
  tmq_conf_set(conf, "td.connect.user", "root");
  tmq_conf_set(conf, "td.connect.pass", "taosdata");
P
plum-lihui 已提交
257

P
plum-lihui 已提交
258 259 260 261 262
  tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);

  tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);

  // tmq_conf_set(conf, "group.id", "cgrp1");
P
plum-lihui 已提交
263 264
  for (int32_t i = 0; i < pInfo->numOfKey; i++) {
    tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
P
plum-lihui 已提交
265
  }
P
plum-lihui 已提交
266 267 268 269 270 271 272 273 274 275 276 277

  //tmq_conf_set(conf, "client.id", "c-001");

  //tmq_conf_set(conf, "enable.auto.commit", "true");
  //tmq_conf_set(conf, "enable.auto.commit", "false");

  //tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
  
  //tmq_conf_set(conf, "auto.offset.reset", "none");
  //tmq_conf_set(conf, "auto.offset.reset", "earliest");
  //tmq_conf_set(conf, "auto.offset.reset", "latest");
  
L
Liu Jicong 已提交
278
  pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
P
plum-lihui 已提交
279
  return;
P
plum-lihui 已提交
280 281
}

P
plum-lihui 已提交
282
void build_topic_list(SThreadInfo *pInfo) {
P
plum-lihui 已提交
283
  pInfo->topicList = tmq_list_new();
L
Liu Jicong 已提交
284
  // tmq_list_append(topic_list, "test_stb_topic_1");
P
plum-lihui 已提交
285 286
  for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
    tmq_list_append(pInfo->topicList, pInfo->topics[i]);
P
plum-lihui 已提交
287
  }
P
plum-lihui 已提交
288
  return;
P
plum-lihui 已提交
289 290
}

P
plum-lihui 已提交
291
int32_t saveConsumeResult(SThreadInfo *pInfo) {
P
plum-lihui 已提交
292
  char sqlStr[1024] = {0};
P
plum-lihui 已提交
293
  
P
plum-lihui 已提交
294 295
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);
P
plum-lihui 已提交
296
  
P
plum-lihui 已提交
297
  // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
P
plum-lihui 已提交
298 299 300 301 302 303 304
  sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", 
                               g_stConfInfo.cdbName, 
                               pInfo->consumerId, 
                               pInfo->consumeMsgCnt, 
                               pInfo->consumeRowCnt, 
                               pInfo->checkresult);
  
P
plum-lihui 已提交
305 306
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
P
plum-lihui 已提交
307
    printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
L
Liu Jicong 已提交
308 309
    taos_free_result(pRes);
    exit(-1);
P
plum-lihui 已提交
310
  }
P
plum-lihui 已提交
311
  
P
plum-lihui 已提交
312 313
  taos_free_result(pRes);

P
plum-lihui 已提交
314
  return 0;
P
plum-lihui 已提交
315 316
}

P
plum-lihui 已提交
317
void loop_consume(SThreadInfo *pInfo) {
P
plum-lihui 已提交
318
  tmq_resp_err_t err;
P
plum-lihui 已提交
319
  
P
plum-lihui 已提交
320
  int64_t totalMsgs = 0;
P
plum-lihui 已提交
321
  int64_t totalRows = 0;
P
plum-lihui 已提交
322 323

  while (running) {
P
plum-lihui 已提交
324
    TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
P
plum-lihui 已提交
325
    if (tmqMsg) {     
L
Liu Jicong 已提交
326
      if (0 != g_stConfInfo.showMsgFlag) {
P
plum-lihui 已提交
327
        totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
L
Liu Jicong 已提交
328
      }
L
Liu Jicong 已提交
329 330 331

      taos_free_result(tmqMsg);

L
Liu Jicong 已提交
332
      totalMsgs++;
P
plum-lihui 已提交
333
      
P
plum-lihui 已提交
334
      if (totalMsgs >= pInfo->expectMsgCnt) {
P
plum-lihui 已提交
335
        break;
L
Liu Jicong 已提交
336
      }
P
plum-lihui 已提交
337 338 339 340
    } else {
      break;
    }
  }
P
plum-lihui 已提交
341
  
P
plum-lihui 已提交
342
  err = tmq_consumer_close(pInfo->tmq);
P
plum-lihui 已提交
343 344
  if (err) {
    printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
L
Liu Jicong 已提交
345
    exit(-1);
P
plum-lihui 已提交
346
  }
L
Liu Jicong 已提交
347

P
plum-lihui 已提交
348
  pInfo->consumeMsgCnt = totalMsgs;
P
plum-lihui 已提交
349 350 351 352
  pInfo->consumeRowCnt = totalRows;

  taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %"PRId64", consumeRowCnt: %"PRId64"\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
  
P
plum-lihui 已提交
353 354
}

P
plum-lihui 已提交
355
void *consumeThreadFunc(void *param) {
P
plum-lihui 已提交
356 357
  int32_t totalMsgs = 0;

P
plum-lihui 已提交
358
  SThreadInfo *pInfo = (SThreadInfo *)param;
P
plum-lihui 已提交
359

P
plum-lihui 已提交
360 361
  build_consumer(pInfo);
  build_topic_list(pInfo);
P
plum-lihui 已提交
362
  if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
P
plum-lihui 已提交
363 364
    return NULL;
  }
P
plum-lihui 已提交
365
  
P
plum-lihui 已提交
366
  tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
P
plum-lihui 已提交
367 368 369 370
  if (err) {
    printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
    exit(-1);
  }
P
plum-lihui 已提交
371
  
P
plum-lihui 已提交
372
  loop_consume(pInfo);
P
plum-lihui 已提交
373

P
plum-lihui 已提交
374 375
  tmq_commit(pInfo->tmq, NULL, 0);

P
plum-lihui 已提交
376
  err = tmq_unsubscribe(pInfo->tmq);
P
plum-lihui 已提交
377 378
  if (err) {
    printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
P
plum-lihui 已提交
379
	pInfo->consumeMsgCnt = -1;
P
plum-lihui 已提交
380
    return NULL;
P
plum-lihui 已提交
381 382
  }  
  
P
plum-lihui 已提交
383 384
  // save consume result into consumeresult table
  saveConsumeResult(pInfo);
P
plum-lihui 已提交
385 386

  return NULL;
P
plum-lihui 已提交
387 388
}

P
plum-lihui 已提交
389 390 391 392
void parseConsumeInfo() {
  char*      token;
  const char delim[2] = ",";
  const char ch = ':';
L
Liu Jicong 已提交
393

P
plum-lihui 已提交
394
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {  	
P
plum-lihui 已提交
395 396 397 398 399 400 401
    token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token);
      ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
      // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
      g_stConfInfo.stThreads[i].numOfTopic++;
P
plum-lihui 已提交
402
  	
P
plum-lihui 已提交
403 404
      token = strtok(NULL, delim);
    }
P
plum-lihui 已提交
405
    
P
plum-lihui 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418
    token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
    while (token != NULL) {
      // printf("%s\n", token );
      {
        char* pstr = token;
        ltrim(pstr);
        char* ret = strchr(pstr, ch);
        memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
        strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
        // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
        // g_stConfInfo.value[g_stConfInfo.numOfKey]);
        g_stConfInfo.stThreads[i].numOfKey++;
      }
P
plum-lihui 已提交
419
  	
P
plum-lihui 已提交
420
      token = strtok(NULL, delim);
P
plum-lihui 已提交
421 422
    }
  }
P
plum-lihui 已提交
423
}
P
plum-lihui 已提交
424

P
plum-lihui 已提交
425 426
int32_t getConsumeInfo() {
  char sqlStr[1024] = {0};
P
plum-lihui 已提交
427
  
P
plum-lihui 已提交
428 429
  TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
  assert(pConn != NULL);
P
plum-lihui 已提交
430 431
  
  sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
P
plum-lihui 已提交
432 433 434
  TAOS_RES* pRes = taos_query(pConn, sqlStr);
  if (taos_errno(pRes) != 0) {
    printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
P
plum-lihui 已提交
435 436
    taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
    taosCloseFile(&g_fp);  
P
plum-lihui 已提交
437
    taos_free_result(pRes);
P
plum-lihui 已提交
438
    exit(-1);
P
plum-lihui 已提交
439 440 441 442 443 444 445 446
  }  
  
  TAOS_ROW	   row        = NULL;
  int		   num_fields = taos_num_fields(pRes);
  TAOS_FIELD*  fields     = taos_fetch_fields(pRes);
  
  // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
  
P
plum-lihui 已提交
447 448
  int32_t numOfThread = 0;
  while ((row = taos_fetch_row(pRes))) {
P
plum-lihui 已提交
449 450 451
	int32_t*  lengths = taos_fetch_lengths(pRes);
	
    for (int i = 0; i < num_fields; ++i) {      
P
plum-lihui 已提交
452 453 454
      if (row[i] == NULL || 0 == i) {
        continue;
      }
P
plum-lihui 已提交
455
      
P
plum-lihui 已提交
456
      if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
P
plum-lihui 已提交
457
        g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
P
plum-lihui 已提交
458 459 460 461 462
      } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
      } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
        memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
      } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
P
plum-lihui 已提交
463
        g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
P
plum-lihui 已提交
464
      } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
P
plum-lihui 已提交
465
        g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
P
plum-lihui 已提交
466 467
      }
    }
P
plum-lihui 已提交
468
    numOfThread ++;
P
plum-lihui 已提交
469
  }
P
plum-lihui 已提交
470
  g_stConfInfo.numOfThread = numOfThread;
P
plum-lihui 已提交
471

P
plum-lihui 已提交
472
  taos_free_result(pRes);
P
plum-lihui 已提交
473

P
plum-lihui 已提交
474
  parseConsumeInfo();
P
plum-lihui 已提交
475

P
plum-lihui 已提交
476 477
  return 0;
}
P
plum-lihui 已提交
478

P
plum-lihui 已提交
479

P
plum-lihui 已提交
480 481 482
int main(int32_t argc, char* argv[]) {
  parseArgument(argc, argv);
  getConsumeInfo();
P
plum-lihui 已提交
483
  saveConfigToLogFile();
P
plum-lihui 已提交
484 485 486 487 488 489

  TdThreadAttr thattr;
  taosThreadAttrInit(&thattr);
  taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);

  // pthread_create one thread to consume
P
plum-lihui 已提交
490
  taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);	
P
plum-lihui 已提交
491
  for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
P
plum-lihui 已提交
492
    taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
P
plum-lihui 已提交
493 494 495 496
  }

  for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
    taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
P
plum-lihui 已提交
497 498
  }

P
plum-lihui 已提交
499 500
  //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);	
  
P
plum-lihui 已提交
501
  taosFprintfFile(g_fp, "==== close tmqlog ====\n");	
P
plum-lihui 已提交
502 503
  taosCloseFile(&g_fp);  
  
P
plum-lihui 已提交
504 505 506
  return 0;
}