未验证 提交 036673cb 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #15836 from taosdata/feature/3.0_wxy

fix: add checks for stream query
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#include "systable.h" #include "systable.h"
#include "taos.h" #include "taos.h"
#include "tdef.h" #include "tdef.h"
#include "types.h"
#include "tgrant.h" #include "tgrant.h"
#include "types.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
...@@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = { ...@@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "wal_seg_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
}; };
static const SSysDbTableSchema userFuncSchema[] = { static const SSysDbTableSchema userFuncSchema[] = {
...@@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = { ...@@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, // {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, // {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)}, {TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
{TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, {TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
......
...@@ -2515,7 +2515,7 @@ static bool isPartitionByTbname(SNodeList* pPartitionByList) { ...@@ -2515,7 +2515,7 @@ static bool isPartitionByTbname(SNodeList* pPartitionByList) {
return false; return false;
} }
SNode* pPartKey = nodesListGetNode(pPartitionByList, 0); SNode* pPartKey = nodesListGetNode(pPartitionByList, 0);
return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType; return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType;
} }
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
...@@ -2566,7 +2566,6 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -2566,7 +2566,6 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) { if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pSelect->isTimeLineResult = true;
pCxt->currClause = SQL_CLAUSE_WINDOW; pCxt->currClause = SQL_CLAUSE_WINDOW;
int32_t code = translateExpr(pCxt, &pSelect->pWindow); int32_t code = translateExpr(pCxt, &pSelect->pWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2637,7 +2636,6 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec ...@@ -2637,7 +2636,6 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec
if (NULL == pSelect->pPartitionByList) { if (NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pSelect->isTimeLineResult = false;
pCxt->currClause = SQL_CLAUSE_PARTITION_BY; pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
return translateExprList(pCxt, pSelect->pPartitionByList); return translateExprList(pCxt, pSelect->pPartitionByList);
} }
...@@ -4708,6 +4706,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm ...@@ -4708,6 +4706,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq); return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq);
} }
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList);
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL != pStmt->pOptions->pWatermark && if (NULL != pStmt->pOptions->pWatermark &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) { (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
...@@ -4723,14 +4727,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt ...@@ -4723,14 +4727,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) ||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) {
if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
return TSDB_CODE_SUCCESS;
}
} }
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return TSDB_CODE_SUCCESS;
} }
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
...@@ -4759,12 +4761,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) { ...@@ -4759,12 +4761,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
return code; return code;
} }
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) { static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true; pCxt->createStream = true;
int32_t code = addWstartTsToCreateStreamQuery(pStmt); int32_t code = addWstartTsToCreateStreamQuery(pStmt);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt); code = translateQuery(pCxt, pStmt);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB); getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL); code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
......
...@@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode); return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_PARTITION: case QUERY_NODE_LOGIC_PLAN_PARTITION:
return stbSplIsMultiTbScanChild(streamQuery, pNode); return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册