提交 968f21d4 编写于 作者: S Shuduo Sang

cherry pick f5e4cdd1

上级 da6ee8ee
......@@ -114,7 +114,7 @@ typedef enum TALBE_EXISTS_EN {
TBL_EXISTS_BUTT
} TALBE_EXISTS_EN;
enum MODE {
enum enumSYNC_MODE {
SYNC_MODE,
ASYNC_MODE,
MODE_BUT
......@@ -127,6 +127,12 @@ enum enum_TAOS_INTERFACE {
INTERFACE_BUT
};
typedef enum enumQUERY_CLASS {
SPECIFIED_CLASS,
STABLE_CLASS,
CLASS_BUT
} QUERY_CLASS;
typedef enum enum_PROGRESSIVE_OR_INTERLACE {
PROGRESSIVE_INSERT_MODE,
INTERLACE_INSERT_MODE,
......@@ -6587,18 +6593,21 @@ static void specified_sub_callback(
}
static TAOS_SUB* subscribeImpl(
QUERY_CLASS class,
threadInfo *pThreadInfo,
char *sql, char* topic, bool restart)
char *sql, char* topic, bool restart, uint64_t interval)
{
TAOS_SUB* tsub = NULL;
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
if ((SPECIFIED_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, specified_sub_callback, (void*)pThreadInfo,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
} else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
} else if ((STABLE_CLASS == class)
&& (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) {
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
......@@ -6608,7 +6617,7 @@ static TAOS_SUB* subscribeImpl(
tsub = taos_subscribe(
pThreadInfo->taos,
restart,
topic, sql, NULL, NULL, 0);
topic, sql, NULL, NULL, interval);
}
if (tsub == NULL) {
......@@ -6674,8 +6683,10 @@ static void *superSubscribe(void *sarg) {
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[i] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart);
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
return NULL;
......@@ -6717,8 +6728,10 @@ static void *superSubscribe(void *sarg) {
g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[i]= 0;
tsub[i] = subscribeImpl(
STABLE_CLASS,
pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart
g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval
);
if (NULL == tsub[i]) {
taos_close(pThreadInfo->taos);
......@@ -6774,10 +6787,12 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID);
}
tsub = subscribeImpl(pThreadInfo,
tsub = subscribeImpl(
SPECIFIED_CLASS, pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart);
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == tsub) {
taos_close(pThreadInfo->taos);
return NULL;
......@@ -6807,10 +6822,12 @@ static void *specifiedSubscribe(void *sarg) {
taos_unsubscribe(tsub,
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
tsub = subscribeImpl(
SPECIFIED_CLASS,
pThreadInfo,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
topic,
g_queryInfo.specifiedQueryInfo.subscribeRestart
g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval
);
if (NULL == tsub) {
taos_close(pThreadInfo->taos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册