From 6fedb60ec276d143d4417aeac262a7fe5e8bd8dc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Nov 2021 18:53:22 +0800 Subject: [PATCH] [td-10564] Add test case and fix bug in generated log query plan. --- include/common/common.h | 9 +++ include/common/taosmsg.h | 11 +--- source/libs/function/src/tfunction.c | 6 +- source/libs/parser/src/astValidate.c | 28 ++++----- source/libs/parser/test/plannerTest.cpp | 6 +- source/libs/planner/src/planner.c | 79 +++++++++++++++++++++---- 6 files changed, 96 insertions(+), 43 deletions(-) diff --git a/include/common/common.h b/include/common/common.h index be1a85e0e8..0913c12597 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -107,6 +107,15 @@ typedef struct SExprInfo { struct tExprNode *pExpr; } SExprInfo; +typedef struct SStateWindow { + SColumn col; +} SStateWindow; + +typedef struct SSessionWindow { + int64_t gap; // gap between two session window(in microseconds) + SColumn col; +} SSessionWindow; + #define QUERY_ASC_FORWARD_STEP 1 #define QUERY_DESC_FORWARD_STEP -1 diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 2e0f59df04..4718d0e4b3 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -491,15 +491,6 @@ typedef struct SInterval { int64_t offset; } SInterval; -typedef struct SSessionWindow { - int64_t gap; // gap between two session window(in microseconds) - int32_t primaryColId; // primary timestamp column -} SSessionWindow; - -typedef struct SStateWindow { - int32_t columnId; -} SStateWindow; - typedef struct { SMsgHead head; char version[TSDB_VERSION_LEN]; @@ -524,7 +515,7 @@ typedef struct { int16_t orderColId; int16_t numOfCols; // the number of columns will be load from vnode SInterval interval; - SSessionWindow sw; // session window +// SSessionWindow sw; // session window uint16_t tagCondLen; // tag length in current query uint16_t colCondLen; // column length in current query int16_t numOfGroupCols; // num of group by columns diff --git a/source/libs/function/src/tfunction.c b/source/libs/function/src/tfunction.c index 2e4a4a058e..d3fc19a47f 100644 --- a/source/libs/function/src/tfunction.c +++ b/source/libs/function/src/tfunction.c @@ -48,10 +48,10 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct bool qIsAggregateFunction(const char* functionName) { assert(functionName != NULL); - bool scalefunc = false; - qIsBuiltinFunction(functionName, strlen(functionName), &scalefunc); + bool scalarfunc = false; + qIsBuiltinFunction(functionName, strlen(functionName), &scalarfunc); - return !scalefunc; + return !scalarfunc; } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 0aa7282e5c..46821a6e2b 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -763,6 +763,7 @@ int32_t validateSessionNode(SQueryStmtInfo *pQueryInfo, SSessionWindowVal* pSess const char* msg2 = "only one type time window allowed"; const char* msg3 = "invalid column name"; const char* msg4 = "invalid time window"; + const char* msg5 = "only the primary time stamp column can be used in session window"; // no session window if (!TPARSER_HAS_TOKEN(pSession->gap)) { @@ -795,18 +796,22 @@ int32_t validateSessionNode(SQueryStmtInfo *pQueryInfo, SSessionWindowVal* pSess } if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_ID) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + return buildInvalidOperationMsg(pMsgBuf, msg5); } - pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_ID; + STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex); + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + + SSchema* pSchema = getOneColumnSchema(pTableMeta, index.columnIndex); + pQueryInfo->sessionWindow.col = createColumn(pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema); return TSDB_CODE_SUCCESS; } // parse the window_state int32_t validateStateWindowNode(SQueryStmtInfo *pQueryInfo, SWindowStateVal* pWindowState, SMsgBuf* pMsgBuf) { const char* msg1 = "invalid column name"; - const char* msg2 = "invalid column type"; - const char* msg3 = "not support state_window with group by "; + const char* msg2 = "invalid column type to create state window"; + const char* msg3 = "not support state_window with group by"; const char* msg4 = "function not support for super table query"; const char* msg5 = "not support state_window on tag column"; @@ -836,22 +841,15 @@ int32_t validateStateWindowNode(SQueryStmtInfo *pQueryInfo, SWindowStateVal* pWi return buildInvalidOperationMsg(pMsgBuf, msg5); } - if (pGroupExpr->columnInfo == NULL) { - pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex)); - } - SSchema* pSchema = getOneColumnSchema(pTableMeta, index.columnIndex); if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || IS_FLOAT_TYPE(pSchema->type)) { return buildInvalidOperationMsg(pMsgBuf, msg2); } - columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL); - SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId }; - - //TODO use group by routine? state window query not support stable query. - taosArrayPush(pGroupExpr->columnInfo, &colIndex); + pQueryInfo->stateWindow.col = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema); pQueryInfo->info.stateWindow = true; + columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, index.type); return TSDB_CODE_SUCCESS; } @@ -3047,10 +3045,10 @@ int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SM if (pLeft->type == SQL_NODE_SQLFUNCTION && pRight->type == SQL_NODE_SQLFUNCTION) { char token[FUNCTIONS_NAME_MAX_LENGTH] = {0}; - tstrncpy(token, pLeft->Expr.operand.z, pLeft->Expr.operand.n); + strncpy(token, pLeft->Expr.operand.z, pLeft->Expr.operand.n); bool agg1 = qIsAggregateFunction(token); - tstrncpy(token, pRight->Expr.operand.z, pRight->Expr.operand.n); + strncpy(token, pRight->Expr.operand.z, pRight->Expr.operand.n); bool agg2 = qIsAggregateFunction(token); if (agg1 != agg2) { diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index 34d3639cc5..ded7d73e85 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -167,7 +167,7 @@ TEST(testCase, planner_test) { TEST(testCase, displayPlan) { // generateLogicplan("select count(*) from `t.1abc`"); // generateLogicplan("select count(*)+ 22 from `t.1abc`"); -// generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h)"); +// generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30"); // generateLogicplan("select count(*) from `t.1abc` group by a"); // generateLogicplan("select count(A+B) from `t.1abc` group by a"); // generateLogicplan("select count(length(a)+b) from `t.1abc` group by a"); @@ -175,7 +175,9 @@ TEST(testCase, displayPlan) { // generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`"); // generateLogicplan("select count(*), min(a) + 99 from `t.1abc`"); // generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`"); - generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc`"); +// generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20"); +// generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)"); + generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)"); // order by + group by column + limit offset + fill diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index ffbae310f7..e70b4a1280 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -95,8 +95,7 @@ int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQuery //====================================================================================================================== static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, - SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo, - void* pExtInfo) { + SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo, void* pExtInfo) { SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); pNode->info.type = type; @@ -134,6 +133,20 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla break; } + case QNODE_STATEWINDOW: { + SColumn* psw = calloc(1, sizeof(SColumn)); + pNode->pExtInfo = psw; + memcpy(psw, pExtInfo, sizeof(SColumn)); + break; + } + + case QNODE_SESSIONWINDOW: { + SSessionWindow *pSessionWindow = calloc(1, sizeof(SSessionWindow)); + pNode->pExtInfo = pSessionWindow; + memcpy(pSessionWindow, pExtInfo, sizeof(struct SSessionWindow)); + break; + } + case QNODE_GROUPBY: { SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo; @@ -262,9 +275,9 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer if (pQueryInfo->interval.interval > 0) { pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->interval); } else if (pQueryInfo->sessionWindow.gap > 0) { - pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL); - } else if (pQueryInfo->stateWindow.columnId > 0) { - pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, NULL, 0, info, NULL); + pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->sessionWindow); + } else if (pQueryInfo->stateWindow.col.info.colId > 0) { + pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->stateWindow); } else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) { pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, info, &pQueryInfo->groupbyExpr); } else { @@ -343,8 +356,8 @@ static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo) { for (int32_t j = 0; j < taosArrayGetSize(p); ++j) { SExprInfo* pExpr = taosArrayGetP(p, j); - bool canPushDown = true; if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE && qIsAggregateFunction(pExpr->pExpr->_function.functionName)) { + bool canPushDown = true; for (int32_t k = 0; k < taosArrayGetSize(pNext); ++k) { SExprInfo* pNextLevelExpr = taosArrayGetP(pNext, k); if (pExpr->base.pColumns->info.colId == pNextLevelExpr->base.resSchema.colId) { @@ -353,14 +366,14 @@ static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo) { break; } } - } - if (canPushDown) { - taosArrayInsert(pNext, j, &pExpr); - taosArrayRemove(p, j); + if (canPushDown) { + taosArrayInsert(pNext, j, &pExpr); + taosArrayRemove(p, j); + + // todo add the project function in level of "i" - // add the project function in level of "i" - + } } } } @@ -390,7 +403,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { uint64_t uid = pTableMetaInfo->pTableMeta->uid; SArray* exprList = taosArrayInit(4, POINTER_BYTES); - if (copyExprInfoList(exprList, pQueryInfo->exprList, uid, true) != 0) { + if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // dropAllExprInfo(exprList); exit(-1); @@ -558,6 +571,46 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, break; } + case QNODE_STATEWINDOW: { + for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { + SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i); + SSqlExpr* pExpr = &pExprInfo->base; + len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId); + if (i < pQueryNode->numOfExpr - 1) { + len1 = sprintf(buf + len,", "); + len += len1; + } + } + + len1 = sprintf(buf + len,") "); + len += len1; + + SColumn* pCol = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "col:%s #%d\n", pCol->name, pCol->info.colId); + len += len1; + break; + } + + case QNODE_SESSIONWINDOW: { + for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { + SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i); + SSqlExpr* pExpr = &pExprInfo->base; + len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId); + if (i < pQueryNode->numOfExpr - 1) { + len1 = sprintf(buf + len,", "); + len += len1; + } + } + + len1 = sprintf(buf + len,") "); + len += len1; + + struct SSessionWindow* ps = pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "col:[%s #%d], gap:%"PRId64" (ms) \n", ps->col.name, ps->col.info.colId, ps->gap); + len += len1; + break; + } + case QNODE_GROUPBY: { // todo hide the invisible column for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) { SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i); -- GitLab