diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b598fffbc6f00cbafe5a1529c094fb108d30ab81..e753c08f9de9fbbd0ef5993c957e17b6b4e95987 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6118,17 +6118,50 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) { return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow); } +static bool hasJsonTypeProjection(SSelectStmt* pSelect) { + SNode* pProj = NULL; + FOREACH(pProj, pSelect->pProjectionList) { + if (TSDB_DATA_TYPE_JSON == ((SExprNode*)pProj)->resType.type) { + return true; + } + } + return false; +} + +static EDealRes hasColumnOrPseudoColumn(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + +static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) { + bool hasColumn = false; + nodesWalkExprPostOrder(pNode, hasColumnOrPseudoColumn, &hasColumn); + return hasColumn; +} + static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || - crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) { + crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must be of VARCHAR type"); } + if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasColumnOrPseudoColumn(pSelect->pSubtable)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "SUBTABLE expression must not has column when no partition by clause"); + } + if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "The trigger mode of non window query can only be AT_ONCE"); diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index b7ca944ebb22770b848951f833b00b0e37618244..6a08193a39a1e1da7f321ae111445e5d714ac1ac 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) { run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC); + run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME", + TSDB_CODE_PAR_INVALID_STREAM_QUERY); + run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) " + "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY); } /*