From 3d7650d12c454944c4030cad24a3b8bd10e739f5 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 7 May 2021 18:18:40 +0800 Subject: [PATCH] [TD-3902]: taosdemo subscribe. (#6028) Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 139 +++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 64 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 20d7f67138..847a8045af 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -1630,64 +1630,68 @@ static void printfQueryMeta() { printf("database name: \033[33m%s\033[0m\n", g_queryInfo.dbName); printf("\n"); - printf("specified table query info: \n"); - printf("query interval: \033[33m%"PRIu64" ms\033[0m\n", - g_queryInfo.specifiedQueryInfo.queryInterval); - printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times); - printf("concurrent: \033[33m%"PRIu64"\033[0m\n", - g_queryInfo.specifiedQueryInfo.concurrent); - printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", + + if ((SUBSCRIBE_TEST == g_args.test_mode) || (QUERY_TEST == g_args.test_mode)) { + printf("specified table query info: \n"); + printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount); - printf("specified tbl query times:\n"); - printf(" \033[33m%"PRIu64"\033[0m\n", + if (g_queryInfo.specifiedQueryInfo.sqlCount > 0) { + printf("specified tbl query times:\n"); + printf(" \033[33m%"PRIu64"\033[0m\n", g_queryInfo.specifiedQueryInfo.queryTimes); - - if (SUBSCRIBE_TEST == g_args.test_mode) { - printf("mod: \033[33m%d\033[0m\n", - g_queryInfo.specifiedQueryInfo.mode); - printf("interval: \033[33m%"PRIu64"\033[0m\n", + printf("query interval: \033[33m%"PRIu64" ms\033[0m\n", + g_queryInfo.specifiedQueryInfo.queryInterval); + printf("top query times:\033[33m%"PRIu64"\033[0m\n", g_args.query_times); + printf("concurrent: \033[33m%"PRIu64"\033[0m\n", + g_queryInfo.specifiedQueryInfo.concurrent); + printf("mod: \033[33m%s\033[0m\n", + (g_queryInfo.specifiedQueryInfo.mode)?"async":"sync"); + printf("interval: \033[33m%"PRIu64"\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval); - printf("restart: \033[33m%d\033[0m\n", + printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart); - printf("keepProgress: \033[33m%d\033[0m\n", + printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - } - for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n", - i, g_queryInfo.specifiedQueryInfo.sql[i]); - } - printf("\n"); - printf("super table query info:\n"); - printf("query interval: \033[33m%"PRIu64"\033[0m\n", - g_queryInfo.superQueryInfo.queryInterval); - printf("threadCnt: \033[33m%d\033[0m\n", - g_queryInfo.superQueryInfo.threadCnt); - printf("childTblCount: \033[33m%"PRIu64"\033[0m\n", - g_queryInfo.superQueryInfo.childTblCount); - printf("stable name: \033[33m%s\033[0m\n", - g_queryInfo.superQueryInfo.sTblName); - printf("stb query times:\033[33m%"PRIu64"\033[0m\n", - g_queryInfo.superQueryInfo.queryTimes); - - if (SUBSCRIBE_TEST == g_args.test_mode) { - printf("mod: \033[33m%d\033[0m\n", - g_queryInfo.superQueryInfo.mode); - printf("interval: \033[33m%"PRIu64"\033[0m\n", + for (uint64_t i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + printf(" sql[%"PRIu64"]: \033[33m%s\033[0m\n", + i, g_queryInfo.specifiedQueryInfo.sql[i]); + } + printf("\n"); + } + + printf("super table query info:\n"); + printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", + g_queryInfo.superQueryInfo.sqlCount); + + if (g_queryInfo.superQueryInfo.sqlCount > 0) { + printf("query interval: \033[33m%"PRIu64"\033[0m\n", + g_queryInfo.superQueryInfo.queryInterval); + printf("threadCnt: \033[33m%d\033[0m\n", + g_queryInfo.superQueryInfo.threadCnt); + printf("childTblCount: \033[33m%"PRIu64"\033[0m\n", + g_queryInfo.superQueryInfo.childTblCount); + printf("stable name: \033[33m%s\033[0m\n", + g_queryInfo.superQueryInfo.sTblName); + printf("stb query times:\033[33m%"PRIu64"\033[0m\n", + g_queryInfo.superQueryInfo.queryTimes); + + printf("mod: \033[33m%s\033[0m\n", + (g_queryInfo.superQueryInfo.mode)?"async":"sync"); + printf("interval: \033[33m%"PRIu64"\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); - printf("restart: \033[33m%d\033[0m\n", + printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); - printf("keepProgress: \033[33m%d\033[0m\n", + printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); - } - printf("sqlCount: \033[33m%"PRIu64"\033[0m\n", - g_queryInfo.superQueryInfo.sqlCount); - for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - printf(" sql[%d]: \033[33m%s\033[0m\n", - i, g_queryInfo.superQueryInfo.sql[i]); + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + printf(" sql[%d]: \033[33m%s\033[0m\n", + i, g_queryInfo.superQueryInfo.sql[i]); + } + printf("\n"); + } } - printf("\n"); SHOW_PARSE_RESULT_END(); } @@ -2847,7 +2851,7 @@ static void* createTable(void *sarg) } static int startMultiThreadCreateChildTable( - char* cols, int threads, int64_t startFrom, int64_t ntables, + char* cols, int threads, uint64_t startFrom, uint64_t ntables, char* db_name, SSuperTable* superTblInfo) { pthread_t *pids = malloc(threads * sizeof(pthread_t)); @@ -2862,13 +2866,13 @@ static int startMultiThreadCreateChildTable( threads = 1; } - int64_t a = ntables / threads; + uint64_t a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - int64_t b = 0; + uint64_t b = 0; b = ntables % threads; for (int64_t i = 0; i < threads; i++) { @@ -4212,7 +4216,7 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } } - // sub_table_query + // super_table_query cJSON *superQuery = cJSON_GetObjectItem(root, "super_table_query"); if (!superQuery) { g_queryInfo.superQueryInfo.threadCnt = 1; @@ -5679,13 +5683,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, taos_close(taos); - int a = ntables / threads; + uint64_t a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - int b = 0; + uint64_t b = 0; if (threads != 0) { b = ntables % threads; } @@ -6380,7 +6384,7 @@ static int queryTestProcess() { b = ntables % threads; } - int startFrom = 0; + uint64_t startFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; @@ -6436,13 +6440,14 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c } getResult(res, (char*)param); - taos_free_result(res); + // tao_unscribe() will free result. } -static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) { +static TAOS_SUB* subscribeImpl( + TAOS *taos, char *sql, char* topic, char* resultFileName) { TAOS_SUB* tsub = NULL; - if (g_queryInfo.specifiedQueryInfo.mode) { + if (ASYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) { tsub = taos_subscribe(taos, g_queryInfo.specifiedQueryInfo.subscribeRestart, topic, sql, subscribe_callback, (void*)resultFileName, @@ -6466,6 +6471,9 @@ static void *superSubscribe(void *sarg) { char subSqlstr[1024]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; + if (g_queryInfo.superQueryInfo.sqlCount == 0) + return NULL; + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -6524,7 +6532,7 @@ static void *superSubscribe(void *sarg) { TAOS_RES* res = NULL; while(1) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { - if (1 == g_queryInfo.superQueryInfo.mode) { + if (ASYNC_QUERY_MODE == g_queryInfo.superQueryInfo.mode) { continue; } @@ -6554,6 +6562,9 @@ static void *specifiedSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; + if (g_queryInfo.specifiedQueryInfo.sqlCount == 0) + return NULL; + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, @@ -6591,7 +6602,7 @@ static void *specifiedSubscribe(void *sarg) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { sprintf(topic, "taosdemo-subscribe-%d", i); char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[i][0] != 0) { + if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); } @@ -6610,7 +6621,7 @@ static void *specifiedSubscribe(void *sarg) { TAOS_RES* res = NULL; while(1) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - if (SYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) { + if (ASYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) { continue; } @@ -6710,21 +6721,21 @@ static int subscribeTestProcess() { exit(-1); } - int ntables = g_queryInfo.superQueryInfo.childTblCount; + uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount; int threads = g_queryInfo.superQueryInfo.threadCnt; - int a = ntables / threads; + uint64_t a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - int b = 0; + uint64_t b = 0; if (threads != 0) { b = ntables % threads; } - int startFrom = 0; + uint64_t startFrom = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; -- GitLab