diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 6062e33599bfe4917ae15e63b928b44cf35f6e5d..8b80b6fda0548de9649992da1fb0ede5cbc434bd 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -114,7 +114,7 @@ typedef enum TALBE_EXISTS_EN { TBL_EXISTS_BUTT } TALBE_EXISTS_EN; -enum MODE { +enum enumSYNC_MODE { SYNC_MODE, ASYNC_MODE, MODE_BUT @@ -127,6 +127,12 @@ enum enum_TAOS_INTERFACE { INTERFACE_BUT }; +typedef enum enumQUERY_CLASS { + SPECIFIED_CLASS, + STABLE_CLASS, + CLASS_BUT +} QUERY_CLASS; + typedef enum enum_PROGRESSIVE_OR_INTERLACE { PROGRESSIVE_INSERT_MODE, INTERLACE_INSERT_MODE, @@ -6587,18 +6593,21 @@ static void specified_sub_callback( } static TAOS_SUB* subscribeImpl( + QUERY_CLASS class, threadInfo *pThreadInfo, - char *sql, char* topic, bool restart) + char *sql, char* topic, bool restart, uint64_t interval) { TAOS_SUB* tsub = NULL; - if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { + if ((SPECIFIED_CLASS == class) + && (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) { tsub = taos_subscribe( pThreadInfo->taos, restart, topic, sql, specified_sub_callback, (void*)pThreadInfo, g_queryInfo.specifiedQueryInfo.subscribeInterval); - } else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + } else if ((STABLE_CLASS == class) + && (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) { tsub = taos_subscribe( pThreadInfo->taos, restart, @@ -6608,7 +6617,7 @@ static TAOS_SUB* subscribeImpl( tsub = taos_subscribe( pThreadInfo->taos, restart, - topic, sql, NULL, NULL, 0); + topic, sql, NULL, NULL, interval); } if (tsub == NULL) { @@ -6674,8 +6683,10 @@ static void *superSubscribe(void *sarg) { debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", __func__, __LINE__, pThreadInfo->threadID, subSqlstr); tsub[i] = subscribeImpl( + STABLE_CLASS, pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart); + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; @@ -6717,8 +6728,10 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.subscribeKeepProgress); consumed[i]= 0; tsub[i] = subscribeImpl( + STABLE_CLASS, pThreadInfo, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart + g_queryInfo.superQueryInfo.subscribeRestart, + g_queryInfo.superQueryInfo.subscribeInterval ); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); @@ -6774,10 +6787,12 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); } - tsub = subscribeImpl(pThreadInfo, + tsub = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart); + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval); if (NULL == tsub) { taos_close(pThreadInfo->taos); return NULL; @@ -6807,10 +6822,12 @@ static void *specifiedSubscribe(void *sarg) { taos_unsubscribe(tsub, g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); tsub = subscribeImpl( + SPECIFIED_CLASS, pThreadInfo, g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart + g_queryInfo.specifiedQueryInfo.subscribeRestart, + g_queryInfo.specifiedQueryInfo.subscribeInterval ); if (NULL == tsub) { taos_close(pThreadInfo->taos);