From a44725f178d04a847ca5bd4e4fb8f083664b1695 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 20 Apr 2022 14:14:26 +0800 Subject: [PATCH] [test: refactor tmq test script] --- tests/script/jenkins/basic.txt | 2 +- tests/script/tsim/tmq/consume.sh | 103 ++++++ tests/test/c/tmqSim.c | 566 +++++++++++++------------------ 3 files changed, 345 insertions(+), 326 deletions(-) create mode 100755 tests/script/tsim/tmq/consume.sh diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 8dc7fb920e..64ea437648 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,7 +55,7 @@ # ---- tmq ./test.sh -f tsim/tmq/basic.sim -./test.sh -f tsim/tmq/basic1.sim +#./test.sh -f tsim/tmq/basic1.sim #./test.sh -f tsim/tmq/oneTopic.sim #./test.sh -f tsim/tmq/multiTopic.sim diff --git a/tests/script/tsim/tmq/consume.sh b/tests/script/tsim/tmq/consume.sh new file mode 100755 index 0000000000..ac500e6704 --- /dev/null +++ b/tests/script/tsim/tmq/consume.sh @@ -0,0 +1,103 @@ +#!/bin/bash + +################################################## +# +# Do tmq test +# +################################################## + +set +e + +# set default value for parameters +EXEC_OPTON=start +DB_NAME=db +POLL_DELAY=5 +VALGRIND=0 +SIGNAL=SIGINT + +while getopts "d:s:v:y:x:" arg +do + case $arg in + d) + DB_NAME=$OPTARG + ;; + s) + EXEC_OPTON=$OPTARG + ;; + v) + VALGRIND=1 + ;; + y) + POLL_DELAY=$OPTARG + ;; + x) + SIGNAL=$OPTARG + ;; + ?) + echo "unkown argument" + ;; + esac +done + +SCRIPT_DIR=`pwd` + +IN_TDINTERNAL="community" +if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then + cd ../../.. +else + cd ../../ +fi + +TOP_DIR=`pwd` + +if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then + BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2,3` +else + BIN_DIR=`find . -name "tmq_sim"|grep bin|head -n1|cut -d '/' -f 2` +fi + +declare -x BUILD_DIR=$TOP_DIR/$BIN_DIR + +declare -x SIM_DIR=$TOP_DIR/sim + +PROGRAM=$BUILD_DIR/build/bin/tmq_sim + +PRG_DIR=$SIM_DIR/tsim +CFG_DIR=$PRG_DIR/cfg +LOG_DIR=$PRG_DIR/log + +echo "------------------------------------------------------------------------" +echo "BUILD_DIR: $BUILD_DIR" +echo "SIM_DIR : $SIM_DIR" +echo "CFG_DIR : $CFG_DIR" + + +echo "PROGRAM: $PROGRAM +echo "CFG_DIR: $CFG_DIR +echo "POLL_DELAY: $POLL_DELAY +echo "DB_NAME: $DB_NAME + +echo "------------------------------------------------------------------------" +if [ "$EXEC_OPTON" = "start" ]; then + if [ $VALGRIND -eq 1 ]; then + echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 & + nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 & + else + echo "nohup $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &" + nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME > /dev/null 2>&1 & + fi +else + PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'` + while [ -n "$PID" ] + do + if [ "$SIGNAL" = "SIGKILL" ]; then + echo try to kill by signal SIGKILL + kill -9 $PID + else + echo try to kill by signal SIGINT + kill -SIGINT $PID + fi + sleep 1 + PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'` + done +fi diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index eaca8f151e..5546e514cc 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -31,44 +31,46 @@ #define NC "\033[0m" #define min(a, b) (((a) < (b)) ? (a) : (b)) -#define MAX_SQL_STR_LEN (1024 * 1024) -#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_CONSUMER_THREAD_CNT (16) typedef struct { - int32_t expectMsgCnt; - int32_t consumeMsgCnt; - TdThread thread; + TdThread thread; + int32_t consumerId; + + int32_t ifCheckData; + int64_t expectMsgCnt; + + int64_t consumeMsgCnt; + int32_t checkresult; + + char topicString[1024]; + char keyString[1024]; + + int32_t numOfTopic; + char topics[32][64]; + + int32_t numOfKey; + char key[32][64]; + char value[32][64]; + + tmq_t* tmq; + tmq_list_t* topicList; + } SThreadInfo; typedef struct { // input from argvs - char dbName[32]; - char topicString[256]; - char keyString[1024]; - char topicString1[256]; - char keyString1[1024]; - int32_t showMsgFlag; - int32_t consumeDelay; // unit s - int32_t consumeMsgCnt; - int32_t checkMode; - - // save result after parse agrvs - int32_t numOfTopic; - char topics[32][64]; - - int32_t numOfKey; - char key[32][64]; - char value[32][64]; - - int32_t numOfTopic1; - char topics1[32][64]; - - int32_t numOfKey1; - char key1[32][64]; - char value1[32][64]; + char dbName[32]; + int32_t showMsgFlag; + int32_t consumeDelay; // unit s + int32_t numOfThread; + SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; } SConfInfo; static SConfInfo g_stConfInfo; +TdFilePtr g_fp = NULL; // char* g_pRowValue = NULL; // TdFilePtr g_fp = NULL; @@ -81,30 +83,54 @@ static void printHelp() { printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); printf("%s%s\n", indent, "-d"); printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); - printf("%s%s\n", indent, "-t"); - printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default "); - printf("%s%s\n", indent, "-k"); - printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default "); - printf("%s%s\n", indent, "-t1"); - printf("%s%s%s\n", indent, indent, "The topic1 string for cosumer, no default "); - printf("%s%s\n", indent, "-k1"); - printf("%s%s%s\n", indent, indent, "The key1-value1 string for cosumer, no default "); printf("%s%s\n", indent, "-g"); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s\n", indent, "-y"); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); - printf("%s%s\n", indent, "-m"); - printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt); - printf("%s%s\n", indent, "-j"); - printf("%s%s%s%d\n", indent, indent, "check mode, default is s", g_stConfInfo.checkMode); exit(EXIT_SUCCESS); } +void initLogFile() { + // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); + TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); + if (NULL == pFile) { + fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); + exit -1; + }; + g_fp = pFile; + + time_t tTime = taosGetTimestampSec(); + struct tm tm = *taosLocalTime(&tTime, NULL); + + taosFprintfFile(pFile, "###################################################################\n"); + taosFprintfFile(pFile, "# configDir: %s\n", configDir); + taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); + taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); + taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); + taosFprintfFile(pFile, " Topics: "); + for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { + taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]); + } + taosFprintfFile(pFile, "\n"); + taosFprintfFile(pFile, " Key: "); + for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) { + taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); + } + taosFprintfFile(pFile, "\n"); + } + + taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + taosFprintfFile(pFile, "###################################################################\n"); +} + void parseArgument(int32_t argc, char* argv[]) { memset(&g_stConfInfo, 0, sizeof(SConfInfo)); g_stConfInfo.showMsgFlag = 0; - g_stConfInfo.consumeDelay = 8000; - g_stConfInfo.consumeMsgCnt = 0; + g_stConfInfo.consumeDelay = 5; for (int32_t i = 1; i < argc; i++) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { @@ -114,37 +140,20 @@ void parseArgument(int32_t argc, char* argv[]) { strcpy(g_stConfInfo.dbName, argv[++i]); } else if (strcmp(argv[i], "-c") == 0) { strcpy(configDir, argv[++i]); - } else if (strcmp(argv[i], "-t") == 0) { - strcpy(g_stConfInfo.topicString, argv[++i]); - } else if (strcmp(argv[i], "-k") == 0) { - strcpy(g_stConfInfo.keyString, argv[++i]); - } else if (strcmp(argv[i], "-t1") == 0) { - strcpy(g_stConfInfo.topicString1, argv[++i]); - } else if (strcmp(argv[i], "-k1") == 0) { - strcpy(g_stConfInfo.keyString1, argv[++i]); } else if (strcmp(argv[i], "-g") == 0) { g_stConfInfo.showMsgFlag = atol(argv[++i]); } else if (strcmp(argv[i], "-y") == 0) { g_stConfInfo.consumeDelay = atol(argv[++i]); - } else if (strcmp(argv[i], "-m") == 0) { - g_stConfInfo.consumeMsgCnt = atol(argv[++i]); - } else if (strcmp(argv[i], "-j") == 0) { - g_stConfInfo.checkMode = atol(argv[++i]); } else { printf("%s unknow para: %s %s", GREEN, argv[++i], NC); exit(-1); } } - if (0 == g_stConfInfo.consumeMsgCnt) { - g_stConfInfo.consumeMsgCnt = 0x7fffffff; - } - -#if 0 +#if 1 pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); - pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); - pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC); + pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); #endif } @@ -171,74 +180,26 @@ void ltrim(char* str) { // return str; } -void parseInputString() { - // printf("topicString: %s\n", g_stConfInfo.topicString); - // printf("keyString: %s\n\n", g_stConfInfo.keyString); - - char* token; - const char delim[2] = ","; - const char ch = ':'; - - token = strtok(g_stConfInfo.topicString, delim); - while (token != NULL) { - // printf("%s\n", token ); - strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); - ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - g_stConfInfo.numOfTopic++; - - token = strtok(NULL, delim); - } - - token = strtok(g_stConfInfo.topicString1, delim); - while (token != NULL) { - // printf("%s\n", token ); - strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token); - ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]); - // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - g_stConfInfo.numOfTopic1++; - - token = strtok(NULL, delim); - } - - token = strtok(g_stConfInfo.keyString, delim); - while (token != NULL) { - // printf("%s\n", token ); - { - char* pstr = token; - ltrim(pstr); - char* ret = strchr(pstr, ch); - memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr); - strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1); - // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], - // g_stConfInfo.value[g_stConfInfo.numOfKey]); - g_stConfInfo.numOfKey++; - } - - token = strtok(NULL, delim); - } - - token = strtok(g_stConfInfo.keyString1, delim); - while (token != NULL) { - // printf("%s\n", token ); - { - char* pstr = token; - ltrim(pstr); - char* ret = strchr(pstr, ch); - memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret - pstr); - strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret + 1); - // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], - // g_stConfInfo.value[g_stConfInfo.numOfKey]); - g_stConfInfo.numOfKey1++; - } - - token = strtok(NULL, delim); +static int running = 1; +static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) { + char buf[1024]; + + //printf("topic: %s\n", tmq_get_topic_name(msg)); + //printf("vg:%d\n", tmq_get_vgroup_id(msg)); + taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable); + taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); + + while (1) { + TAOS_ROW row = taos_fetch_row(msg); + if (row == NULL) break; + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + //taos_print_row(buf, row, fields, numOfFields); + //printf("%s\n", buf); + //taosFprintfFile(g_fp, "%s\n", buf); } } -static int running = 1; -/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ - int queryDB(TAOS* taos, char* command) { TAOS_RES* pRes = taos_query(taos, command); int code = taos_errno(pRes); @@ -252,8 +213,7 @@ int queryDB(TAOS* taos, char* command) { return 0; } -tmq_t* build_consumer() { -#if 0 +void build_consumer(SThreadInfo *pInfo) { char sqlStr[1024] = {0}; TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -267,273 +227,229 @@ tmq_t* build_consumer() { exit(-1); } taos_free_result(pRes); -#endif tmq_conf_t* conf = tmq_conf_new(); // tmq_conf_set(conf, "group.id", "tg2"); - for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { - tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); + for (int32_t i = 0; i < pInfo->numOfKey; i++) { + tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); } - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); - assert(tmq); - tmq_conf_destroy(conf); - return tmq; + pInfo->tmq = tmq_consumer_new(pConn, conf, NULL, 0); + return; } -tmq_list_t* build_topic_list() { - tmq_list_t* topic_list = tmq_list_new(); +void build_topic_list(SThreadInfo *pInfo) { + pInfo->topicList = tmq_list_new(); // tmq_list_append(topic_list, "test_stb_topic_1"); - for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) { - tmq_list_append(topic_list, g_stConfInfo.topics[i]); + for (int32_t i = 0; i < pInfo->numOfTopic; i++) { + tmq_list_append(pInfo->topicList, pInfo->topics[i]); } - return topic_list; + return; } -tmq_t* build_consumer_x() { -#if 0 +int32_t saveConsumeResult(SThreadInfo *pInfo) { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); - - sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + + // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int + sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", + g_stConfInfo.dbName, + pInfo->consumerId, + pInfo->consumeMsgCnt, + pInfo->checkresult); + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); + printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); exit(-1); } + taos_free_result(pRes); -#endif - - tmq_conf_t* conf = tmq_conf_new(); - // tmq_conf_set(conf, "group.id", "tg2"); - for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) { - tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]); - } - tmq_conf_set(conf, "td.connect.user", "root"); - tmq_conf_set(conf, "td.connect.pass", "taosdata"); - tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - tmq_t* tmq = tmq_consumer_new1(conf, NULL, 0); - assert(tmq); - tmq_conf_destroy(conf); - return tmq; -} -tmq_list_t* build_topic_list_x() { - tmq_list_t* topic_list = tmq_list_new(); - // tmq_list_append(topic_list, "test_stb_topic_1"); - for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) { - tmq_list_append(topic_list, g_stConfInfo.topics1[i]); - } - return topic_list; + return 0; } -void loop_consume(tmq_t* tmq) { +void loop_consume(SThreadInfo *pInfo) { tmq_resp_err_t err; + + int64_t totalMsgs = 0; + //int64_t totalRows = 0; - int32_t totalMsgs = 0; - int32_t totalRows = 0; - int32_t skipLogNum = 0; while (running) { - TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000); - if (tmqMsg) { - totalMsgs++; - -#if 0 - TAOS_ROW row; - while (NULL != (row = tmq_get_row(tmqMsg))) { - totalRows++; - } -#endif - - /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/ + TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); + if (tmqMsg) { if (0 != g_stConfInfo.showMsgFlag) { - /*msg_process(tmqMsg);*/ + msg_process(tmqMsg, totalMsgs, 0); } + tmq_message_destroy(tmqMsg); - } else { - break; - } - } - - err = tmq_consumer_close(tmq); - if (err) { - printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - - printf("{consume success: %d, %d}", totalMsgs, totalRows); -} - -int32_t parallel_consume(tmq_t* tmq, int threadLable) { - tmq_resp_err_t err; - - int32_t totalMsgs = 0; - int32_t totalRows = 0; - int32_t skipLogNum = 0; - while (running) { - TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000); - if (tmqMsg) { + totalMsgs++; - - // printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); - -#if 0 - TAOS_ROW row; - while (NULL != (row = tmq_get_row(tmqMsg))) { - totalRows++; - } -#endif - - /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/ - if (0 != g_stConfInfo.showMsgFlag) { - /*msg_process(tmqMsg);*/ - } - tmq_message_destroy(tmqMsg); - - if (totalMsgs >= g_stConfInfo.consumeMsgCnt) { + + if (totalMsgs >= pInfo->expectMsgCnt) { break; } } else { break; } } - - err = tmq_consumer_close(tmq); + + err = tmq_consumer_close(pInfo->tmq); if (err) { printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - // printf("%d", totalMsgs); // output to sim for check result - return totalMsgs; + pInfo->consumeMsgCnt = totalMsgs; + } -void* threadFunc(void* param) { +void *consumeThreadFunc(void *param) { int32_t totalMsgs = 0; - SThreadInfo* pInfo = (SThreadInfo*)param; + SThreadInfo *pInfo = (SThreadInfo *)param; - tmq_t* tmq = build_consumer_x(); - tmq_list_t* topic_list = build_topic_list_x(); - if ((NULL == tmq) || (NULL == topic_list)) { + build_consumer(pInfo); + build_topic_list(pInfo); + if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){ return NULL; } - - tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); + + tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); if (err) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } + + loop_consume(pInfo); - // if (0 == g_stConfInfo.consumeMsgCnt) { - // loop_consume(tmq); - // } else { - pInfo->consumeMsgCnt = parallel_consume(tmq, 1); - //} - - err = tmq_unsubscribe(tmq); + err = tmq_unsubscribe(pInfo->tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - pInfo->consumeMsgCnt = -1; + pInfo->consumeMsgCnt = -1; return NULL; - } + } + + // save consume result into consumeresult table + saveConsumeResult(pInfo); return NULL; } -int main(int32_t argc, char* argv[]) { - parseArgument(argc, argv); - parseInputString(); +void parseConsumeInfo() { + char* token; + const char delim[2] = ","; + const char ch = ':'; - int32_t numOfThreads = 1; - TdThreadAttr thattr; - taosThreadAttrInit(&thattr); - taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); - SThreadInfo* pInfo = (SThreadInfo*)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); - - if (g_stConfInfo.numOfTopic1) { - // pthread_create one thread to consume - for (int32_t i = 0; i < numOfThreads; ++i) { - pInfo[i].expectMsgCnt = 0; - pInfo[i].consumeMsgCnt = 0; - taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void*)(pInfo + i)); + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + token = strtok(g_stConfInfo.stThreads[i].topicString, delim); + while (token != NULL) { + // printf("%s\n", token ); + strcpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token); + ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); + // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + g_stConfInfo.stThreads[i].numOfTopic++; + + token = strtok(NULL, delim); + } + + token = strtok(g_stConfInfo.stThreads[i].keyString, delim); + while (token != NULL) { + // printf("%s\n", token ); + { + char* pstr = token; + ltrim(pstr); + char* ret = strchr(pstr, ch); + memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr); + strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1); + // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], + // g_stConfInfo.value[g_stConfInfo.numOfKey]); + g_stConfInfo.stThreads[i].numOfKey++; + } + + token = strtok(NULL, delim); } } +} - int32_t totalMsgs = 0; - tmq_t* tmq = build_consumer(); - tmq_list_t* topic_list = build_topic_list(); - if ((NULL == tmq) || (NULL == topic_list)) { - return -1; - } - - tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); - if (err) { - printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); +int32_t getConsumeInfo() { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); exit(-1); + } + + TAOS_ROW row = NULL; + int num_fields = taos_num_fields(pRes); + TAOS_FIELD* fields = taos_fetch_fields(pRes); + + // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int + + int32_t numOfThread = 0; + while ((row = taos_fetch_row(pRes))) { + int32_t* lengths = taos_fetch_lengths(pRes); + + for (int i = 0; i < num_fields; ++i) { + if (row[i] == NULL || 0 == i) { + continue; + } + + if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { + g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]); + } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { + memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); + } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { + memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); + } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { + g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]); + } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { + g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]); + } + } + numOfThread ++; } + g_stConfInfo.numOfThread = numOfThread; - if (0 == g_stConfInfo.numOfTopic1) { - loop_consume(tmq); - } else { - totalMsgs = parallel_consume(tmq, 0); - } + taos_free_result(pRes); - err = tmq_unsubscribe(tmq); - if (err) { - printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } + parseConsumeInfo(); - if (g_stConfInfo.numOfTopic1) { - for (int32_t i = 0; i < numOfThreads; i++) { - taosThreadJoin(pInfo[i].thread, NULL); - } + return 0; +} - // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); - if (0 == g_stConfInfo.checkMode) { - if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { - printf("success"); - } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (1 == g_stConfInfo.checkMode) { - if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) { - printf("success"); - } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (2 == g_stConfInfo.checkMode) { - if ((totalMsgs + pInfo->consumeMsgCnt) == 3 * g_stConfInfo.consumeMsgCnt) { - printf("success"); - } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (3 == g_stConfInfo.checkMode) { - if ((totalMsgs == 2 * g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt)) { - printf("success"); - } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else if (4 == g_stConfInfo.checkMode) { - if (((totalMsgs == 0) && (pInfo->consumeMsgCnt == 3 * g_stConfInfo.consumeMsgCnt)) || - ((pInfo->consumeMsgCnt == 0) && (totalMsgs == 3 * g_stConfInfo.consumeMsgCnt)) || - ((pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt) && (totalMsgs == 2 * g_stConfInfo.consumeMsgCnt)) || - ((pInfo->consumeMsgCnt == 2 * g_stConfInfo.consumeMsgCnt) && (totalMsgs == g_stConfInfo.consumeMsgCnt))) { - printf("success"); - } else { - printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } - } else { - printf("fail, check mode unknow. consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); - } + +int main(int32_t argc, char* argv[]) { + parseArgument(argc, argv); + getConsumeInfo(); + initLogFile(); + + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { + taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i]))); + } + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); } + //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + + taosFprintfFile(g_fp, "\n"); + taosCloseFile(&g_fp); + return 0; } -- GitLab