From f15517b1d7f4decd06be411466346c685da39258 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 8 Aug 2022 13:28:49 +0800 Subject: [PATCH] fix: add checks for stream query --- source/libs/parser/src/parTranslater.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index be16289595..c12c4aa9c6 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4729,15 +4729,8 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt return TSDB_CODE_SUCCESS; } - if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); - } - - SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; - - if (QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) || - TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || - !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) { + if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || + QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } @@ -4770,12 +4763,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) { return code; } +static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || + !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + } + return TSDB_CODE_SUCCESS; +} + static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) { pCxt->createStream = true; int32_t code = addWstartTsToCreateStreamQuery(pStmt); if (TSDB_CODE_SUCCESS == code) { code = translateQuery(pCxt, pStmt); } + if (TSDB_CODE_SUCCESS == code) { + code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt); + } if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB); code = nodesNodeToString(pStmt, false, &pReq->ast, NULL); -- GitLab