提交 93211d7e 编写于 作者: H Haojun Liao

[TD-2567]<fix>: fix bug in continuous query.

上级 ba96c081
...@@ -65,44 +65,51 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in ...@@ -65,44 +65,51 @@ static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, in
return retryDelta; return retryDelta;
} }
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
SSqlStream *pStream = (SSqlStream *)pMsg->ahandle; SSqlStream *pStream = (SSqlStream *)param;
SSqlObj * pSql = pStream->pSql; assert(pStream->pSql == tres && code == TSDB_CODE_SUCCESS);
pSql->fp = tscProcessStreamQueryCallback; SSqlObj* pSql = (SSqlObj*) tres;
pSql->fetchFp = tscProcessStreamQueryCallback; pSql->fp = doLaunchQuery;
pSql->param = pStream; pSql->fetchFp = doLaunchQuery;
pSql->res.completed = false; pSql->res.completed = false;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
pSql->res.code = code;
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0); code = tscGetSTableVgroupInfo(pSql, 0);
pSql->res.code = code;
} }
// failed to get meter/metric meta, retry in 10sec. // failed to get table Meta or vgroup list, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p,get metermeta failed, retry in %" PRId64 "ms", pStream->pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
} else {
tscTansformSQLFuncForSTableQuery(pQueryInfo); tscTansformSQLFuncForSTableQuery(pQueryInfo);
tscDebug("%p stream:%p start stream query on:%s", pSql, pStream, pTableMetaInfo->name); tscDebug("%p stream:%p, start stream query on:%s", pSql, pStream, pTableMetaInfo->name);
tscDoQuery(pStream->pSql);
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
tscDoQuery(pSql);
tscIncStreamExecutionCount(pStream); tscIncStreamExecutionCount(pStream);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
pSql->res.code = code;
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscDebug("%p stream:%p, get table Meta failed, retry in %" PRId64 "ms", pSql, pStream, retryDelayTime);
tscSetRetryTimer(pStream, pSql, retryDelayTime);
} }
} }
static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlStream *pStream = (SSqlStream *)pMsg->ahandle;
doLaunchQuery(pStream, pStream->pSql, 0);
}
static void tscProcessStreamTimer(void *handle, void *tmrId) { static void tscProcessStreamTimer(void *handle, void *tmrId) {
SSqlStream *pStream = (SSqlStream *)handle; SSqlStream *pStream = (SSqlStream *)handle;
if (pStream == NULL) return; if (pStream == NULL || pStream->pTimer != tmrId) {
if (pStream->pTimer != tmrId) return; return;
}
pStream->pTimer = NULL; pStream->pTimer = NULL;
pStream->numOfRes = 0; // reset the numOfRes. pStream->numOfRes = 0; // reset the numOfRes.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册