From baede29bd5a223b66c702ead803d680adc549782 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Mon, 28 Mar 2022 19:56:19 +0800 Subject: [PATCH] [add tmq sim] --- .../script/sh/massiveTable/compileVersion.sh | 2 + tests/test/c/CMakeLists.txt | 8 + tests/test/c/tmqSim.c | 301 ++++++++++++++++++ 3 files changed, 311 insertions(+) create mode 100644 tests/test/c/tmqSim.c diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh index dd6382992a..13b2baae32 100755 --- a/tests/script/sh/massiveTable/compileVersion.sh +++ b/tests/script/sh/massiveTable/compileVersion.sh @@ -75,10 +75,12 @@ rm -f /usr/bin/taos rm -f /usr/bin/taosd rm -f /usr/bin/create_table rm -f /usr/bin/tmq_demo +rm -f /usr/bin/tmq_sim ln -s $taos_dir/taos /usr/bin/taos ln -s $taosd_dir/taosd /usr/bin/taosd ln -s $exec_process_dir/create_table /usr/bin/create_table ln -s $exec_process_dir/tmq_demo /usr/bin/tmq_demo +ln -s $exec_process_dir/tmq_sim /usr/bin/tmq_sim diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index e186491bd4..804894a69d 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -1,5 +1,6 @@ add_executable(create_table create_table.c) add_executable(tmq_demo tmqDemo.c) +add_executable(tmq_sim tmqSim.c) target_link_libraries( create_table PUBLIC taos @@ -14,3 +15,10 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_sim + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c new file mode 100644 index 0000000000..57fc6eb6f9 --- /dev/null +++ b/tests/test/c/tmqSim.c @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// clang-format off + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "taos.h" +#include "taoserror.h" +#include "tlog.h" + +#define GREEN "\033[1;32m" +#define NC "\033[0m" +#define min(a, b) (((a) < (b)) ? (a) : (b)) + +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) + +typedef struct { + // input from argvs + char dbName[32]; + char topicString[256]; + char keyString[1024]; + int32_t showMsgFlag; + + // save result after parse agrvs + int32_t numOfTopic; + char topics[32][64]; + + int32_t numOfKey; + char key[32][64]; + char value[32][64]; +} SConfInfo; + +static SConfInfo g_stConfInfo; + +//char* g_pRowValue = NULL; +//TdFilePtr g_fp = NULL; + +static void printHelp() { + char indent[10] = " "; + printf("Used to test the tmq feature with sim cases\n"); + + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); + printf("%s%s\n", indent, "-d"); + printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default "); + printf("%s%s\n", indent, "-k"); + printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default "); + printf("%s%s\n", indent, "-g"); + printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); + exit(EXIT_SUCCESS); +} + +void parseArgument(int32_t argc, char *argv[]) { + + memset(&g_stConfInfo, 0, sizeof(SConfInfo)); + + for (int32_t i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-d") == 0) { + strcpy(g_stConfInfo.dbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + strcpy(g_stConfInfo.topicString, argv[++i]); + } else if (strcmp(argv[i], "-k") == 0) { + strcpy(g_stConfInfo.keyString, argv[++i]); + } else if (strcmp(argv[i], "-g") == 0) { + g_stConfInfo.showMsgFlag = atol(argv[++i]); + } else { + printf("%s unknow para: %s %s", GREEN, argv[++i], NC); + exit(-1); + } + } + +#if 1 + pPrint("%s configDir:%s %s", GREEN, configDir, NC); + pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); + pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); + pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC); + pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); +#endif +} + +void splitStr(char **arr, char *str, const char *del) { + char *s = strtok(str, del); + while(s != NULL) { + *arr++ = s; + s = strtok(NULL, del); + } +} + +void ltrim(char *str) +{ + if (str == NULL || *str == '\0') + { + return; + } + int len = 0; + char *p = str; + while (*p != '\0' && isspace(*p)) + { + ++p; ++len; + } + memmove(str, p, strlen(str) - len + 1); + //return str; +} + + +void parseInputString() { + //printf("topicString: %s\n", g_stConfInfo.topicString); + //printf("keyString: %s\n\n", g_stConfInfo.keyString); + + char *token; + const char delim[2] = ","; + const char ch = ':'; + + token = strtok(g_stConfInfo.topicString, delim); + while(token != NULL) { + //printf("%s\n", token ); + strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); + ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + //printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + g_stConfInfo.numOfTopic++; + + token = strtok(NULL, delim); + } + + printf("\n\n"); + + token = strtok(g_stConfInfo.keyString, delim); + while(token != NULL) { + //printf("%s\n", token ); + { + char* pstr = token; + ltrim(pstr); + char *ret = strchr(pstr, ch); + memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret-pstr); + strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret+1); + //printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]); + g_stConfInfo.numOfKey++; + } + + token = strtok(NULL, delim); + } +} + + +static int running = 1; +static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } + + +int queryDB(TAOS *taos, char *command) { + TAOS_RES *pRes = taos_query(taos, command); + int code = taos_errno(pRes); + //if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + if (code != 0) { + pError("failed to reason:%s, sql: %s", tstrerror(code), command); + taos_free_result(pRes); + return -1; + } + taos_free_result(pRes); + return 0 ; +} + +tmq_t* build_consumer() { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + //tmq_conf_set(conf, "group.id", "tg2"); + for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { + tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); + } + tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + //tmq_list_append(topic_list, "test_stb_topic_1"); + for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) { + tmq_list_append(topic_list, g_stConfInfo.topics[i]); + } + return topic_list; +} + +void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + static const int MIN_COMMIT_COUNT = 1000; + + int msg_count = 0; + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + return; + } + + while (running) { + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); + if (tmqmessage) { + msg_process(tmqmessage); + tmq_message_destroy(tmqmessage); + + if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); + } + } + + err = tmq_consumer_close(tmq); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void perf_loop(tmq_t* tmq, tmq_list_t* topics) { + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); + return; + } + + int32_t totalMsgs = 0; + int32_t skipLogNum = 0; + //int64_t startTime = taosGetTimestampUs(); + while (running) { + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); + if (tmqmessage) { + totalMsgs++; + skipLogNum += tmqGetSkipLogNum(tmqmessage); + if (0 != g_stConfInfo.showMsgFlag) { + msg_process(tmqmessage); + } + tmq_message_destroy(tmqmessage); + } else { + break; + } + } + //int64_t endTime = taosGetTimestampUs(); + //double consumeTime = (double)(endTime - startTime) / 1000000; + + + err = tmq_consumer_close(tmq); + if (err) { + printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + + printf("{consume success: %d}", totalMsgs); +} + +int main(int32_t argc, char *argv[]) { + parseArgument(argc, argv); + parseInputString(); + + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + if ((NULL == tmq) || (NULL == topic_list)){ + return -1; + } + + perf_loop(tmq, topic_list); + + return 0; +} + -- GitLab