From 78b8473e03670ec434fe7fe30fbeaf7730beace2 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 19 May 2021 20:12:19 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 4238 taosdemo async subscribe (#6162) * [TD-4238]: taosdemo async subscribe. * [TD-4238]: taosdemo async subscribe subsribe sql command do not use aggregation functions. * [TD-4238]: taosdemo async subscribe. interval. * [TD-4238]: taosdemo async subscribe. fix super table sub result file. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 61 +++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 0831555d22..f8d3f83d47 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -114,12 +114,18 @@ typedef enum TALBE_EXISTS_EN { TBL_EXISTS_BUTT } TALBE_EXISTS_EN; -enum MODE { +enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT }; +typedef enum enumQUERY_CLASS { + SPECIFIED_CLASS, + STABLE_CLASS, + CLASS_BUT +} QUERY_CLASS; + typedef enum enum_INSERT_MODE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, @@ -6557,18 +6563,21 @@ static void specified_sub_callback( } static TAOS_SUB* subscribeImpl( + QUERY_CLASS class, threadInfo *pThreadInfo, - char *sql, char* topic, bool restart) + char *sql, char* topic, bool restart, uint64_t interval) { TAOS_SUB* tsub = NULL; - if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { + if ((SPECIFIED_CLASS == class) + && (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) { tsub = taos_subscribe( pThreadInfo->taos, restart, topic, sql, specified_sub_callback, (void*)pThreadInfo, g_queryInfo.specifiedQueryInfo.subscribeInterval); - } else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + } else if ((STABLE_CLASS == class) + && (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) { tsub = taos_subscribe( pThreadInfo->taos, restart, @@ -6578,7 +6587,7 @@ static TAOS_SUB* subscribeImpl( tsub = taos_subscribe( pThreadInfo->taos, restart, - topic, sql, NULL, NULL, 0); + topic, sql, NULL, NULL, interval); } if (tsub == NULL) { @@ -6629,9 +6638,14 @@ static void *superSubscribe(void *sarg) { char topic[32] = {0}; for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { + verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", + __func__, __LINE__, + pThreadInfo->threadID, + pThreadInfo->start_table_from, + pThreadInfo->end_table_to, i); sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", i, pThreadInfo->querySeq); - memset(subSqlstr,0,sizeof(subSqlstr)); + memset(subSqlstr, 0, sizeof(subSqlstr)); replaceChildTblName( g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], subSqlstr, i); @@ -6644,8 +6658,10 @@ static void *superSubscribe(void *sarg) { debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", __func__, __LINE__, pThreadInfo->threadID, subSqlstr); tsub[i] = subscribeImpl( + STABLE_CLASS, pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart); + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; @@ -6687,8 +6703,10 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.subscribeKeepProgress); consumed[i]= 0; tsub[i] = subscribeImpl( + STABLE_CLASS, pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval ); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); @@ -6744,10 +6762,12 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); } - tsub = subscribeImpl(pThreadInfo, + tsub = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart); + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == tsub) { taos_close(pThreadInfo->taos); return NULL; @@ -6777,11 +6797,12 @@ static void *specifiedSubscribe(void *sarg) { taos_unsubscribe(tsub, g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); tsub = subscribeImpl( - pThreadInfo, - g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], - topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart - ); + SPECIFIED_CLASS, + pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == tsub) { taos_close(pThreadInfo->taos); return NULL; @@ -6833,7 +6854,7 @@ static int subscribeTestProcess() { //==== create threads for query for specified table if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { - printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount); } else { @@ -6870,10 +6891,10 @@ static int subscribeTestProcess() { } //==== create threads for super table query - if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { - printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + if (g_queryInfo.superQueryInfo.sqlCount <= 0) { + printf("%s() LN%d, super table query sqlCount %"PRIu64".\n", __func__, __LINE__, - g_queryInfo.specifiedQueryInfo.sqlCount); + g_queryInfo.superQueryInfo.sqlCount); } else { if ((g_queryInfo.superQueryInfo.sqlCount > 0) && (g_queryInfo.superQueryInfo.threadCnt > 0)) { @@ -6906,8 +6927,8 @@ static int subscribeTestProcess() { b = ntables % threads; } - uint64_t startFrom = 0; for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + uint64_t startFrom = 0; for (int j = 0; j < threads; j++) { uint64_t seq = i * threads + j; threadInfo *t_info = infosOfStable + seq; -- GitLab