diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index ff7ed8e4e71518e8003bce713b4d81244d55f4d2..e7d678db26df965acdc0f8dbf4bdcf214c5c9130 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -26,6 +26,7 @@ #include "tutil.h" #include "tnote.h" +extern void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); @@ -553,13 +554,22 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { } if (pSql->pStream) { - tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); /* * NOTE: * transfer the sql function for super table query before get meter/metric meta, * since in callback functions, only tscProcessSql(pStream->pSql) is executed! */ SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + if (0 == pMeterMetaInfo->pMetricMeta->numOfVnodes || 0 == pMeterMetaInfo->pMetricMeta->numOfMeters) { + tscTrace("%p stream:%p meta is updated, but no table, clear meter meta ans set next launch new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); + tscClearMeterMetaInfo(pMeterMetaInfo, false); + tscSetNextLaunchTimer(pSql->pStream, pSql); + return; + } + + tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscIncStreamExecutionCount(pSql->pStream); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index eaeb4e1976f6d848acd88f3d1fab9f2ea96c3bd2..be698e374eddf3a4fd6a3336fecf30f6e572ac90 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4657,6 +4657,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* if (pMeterMetaInfo->pMeterMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfMeters == 0) { tscTrace("%p no table in metricmeta, no output result", pSql); pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + pSql->res.qhandle = 0x1; // to pass the qhandle check; } // keep original limitation value in globalLimit diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 59e2f4f85c4c8cffff1dcc85ff2469d9d523eb20..a9b7512a376c7a1b7a68371424488be09db616f0 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -28,7 +28,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); -static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); +void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql); static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer); static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t launchDelay) { @@ -97,6 +97,13 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { return; } + if (0 == pMeterMetaInfo->pMetricMeta->numOfVnodes || 0 == pMeterMetaInfo->pMetricMeta->numOfMeters) { + tscTrace("%p no table in metricmeta, no launch query", pSql); + tscClearMeterMetaInfo(pMeterMetaInfo, false); + tscSetNextLaunchTimer(pStream, pSql); + return; + } + tscTrace("%p stream:%p start stream query on:%s", pSql, pStream, pMeterMetaInfo->name); tscProcessSql(pStream->pSql); @@ -323,7 +330,7 @@ static int64_t getLaunchTimeDelay(const SSqlStream* pStream) { } -static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { +void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { int64_t timer = 0; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);