diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 0831555d2270a11dabcc7b189e8168b4c170db06..717b0575866f3a46be0092897288456850fdaff2 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -114,12 +114,18 @@ typedef enum TALBE_EXISTS_EN { TBL_EXISTS_BUTT } TALBE_EXISTS_EN; -enum MODE { +enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT }; +typedef enum enumQUERY_CLASS { + SPECIFIED_CLASS, + STABLE_CLASS, + CLASS_BUT +} QUERY_CLASS; + typedef enum enum_INSERT_MODE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, @@ -6557,18 +6563,21 @@ static void specified_sub_callback( } static TAOS_SUB* subscribeImpl( + QUERY_CLASS class, threadInfo *pThreadInfo, - char *sql, char* topic, bool restart) + char *sql, char* topic, bool restart, uint64_t interval) { TAOS_SUB* tsub = NULL; - if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { + if ((SPECIFIED_CLASS == class) + && (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) { tsub = taos_subscribe( pThreadInfo->taos, restart, topic, sql, specified_sub_callback, (void*)pThreadInfo, g_queryInfo.specifiedQueryInfo.subscribeInterval); - } else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + } else if ((STABLE_CLASS == class) + && (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) { tsub = taos_subscribe( pThreadInfo->taos, restart, @@ -6578,7 +6587,7 @@ static TAOS_SUB* subscribeImpl( tsub = taos_subscribe( pThreadInfo->taos, restart, - topic, sql, NULL, NULL, 0); + topic, sql, NULL, NULL, interval); } if (tsub == NULL) { @@ -6644,8 +6653,10 @@ static void *superSubscribe(void *sarg) { debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", __func__, __LINE__, pThreadInfo->threadID, subSqlstr); tsub[i] = subscribeImpl( + STABLE_CLASS, pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart); + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; @@ -6687,8 +6698,10 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.subscribeKeepProgress); consumed[i]= 0; tsub[i] = subscribeImpl( + STABLE_CLASS, pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval ); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); @@ -6744,10 +6757,12 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); } - tsub = subscribeImpl(pThreadInfo, + tsub = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart); + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == tsub) { taos_close(pThreadInfo->taos); return NULL; @@ -6777,10 +6792,12 @@ static void *specifiedSubscribe(void *sarg) { taos_unsubscribe(tsub, g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); tsub = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval ); if (NULL == tsub) { taos_close(pThreadInfo->taos);