diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 5a156704c28e82b1d7461e7200b3b011af318cff..01e03a983d6e13f07af04ec857479b00036d5b1d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -355,8 +355,11 @@ typedef struct SQueryPlan { int32_t numOfSubplans; SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. SExplainInfo explainInfo; + SNodeList* pPlaceholderValues; } SQueryPlan; +void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext); + #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 6d805a322671c9afafd98d8e211c0b010492d86d..d2f73e40715911ff456f39dfad3aa6953ed7b45e 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -82,6 +82,7 @@ typedef struct SValueNode { bool isDuration; bool translate; bool genByCalc; + int16_t placeholderNo; union { bool b; int64_t i; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 0b164bf43f57109fa91adb4547baaed91956452b..f343295c563669756da0ae5e809b1c0832820f23 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "plannodes.h" +#include "taos.h" typedef struct SPlanContext { uint64_t queryId; @@ -32,6 +33,7 @@ typedef struct SPlanContext { bool showRewrite; int8_t triggerType; int64_t watermark; + bool isStmtQuery; } SPlanContext; // Create the physical plan for the query, according to the AST. @@ -43,6 +45,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo // @pSource one execution location of this group of datasource subplans int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); +typedef TAOS_MULTI_BIND TAOS_BIND_v2; // todo remove +int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_BIND_v2* pParams); + // Convert to subplan to string for the scheduler to send to the executor int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen); int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan); diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 1830d15f691f3be545a8fe5163c2ee84abfd9f6c..e74ecfd0f4563f904ca73cba7b2681b41c45e548 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -14,6 +14,7 @@ */ #include "querynodes.h" +#include "plannodes.h" typedef enum ETraversalOrder { TRAVERSAL_PREORDER = 1, @@ -21,9 +22,14 @@ typedef enum ETraversalOrder { TRAVERSAL_POSTORDER, } ETraversalOrder; -static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext); +typedef EDealRes (*FNodeDispatcher)(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext); -static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { +static EDealRes walkExpr(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext); +static EDealRes walkExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext); +static EDealRes walkPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext); +static EDealRes walkPhysiPlans(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext); + +static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext, FNodeDispatcher dispatcher) { if (NULL == pNode) { return DEAL_RES_CONTINUE; } @@ -37,6 +43,18 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker } } + res = dispatcher(pNode, order, walker, pContext); + + if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) { + res = walker(pNode, pContext); + } + + return res; +} + +static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { + EDealRes res = DEAL_RES_CONTINUE; + switch (nodeType(pNode)) { case QUERY_NODE_COLUMN: case QUERY_NODE_VALUE: @@ -45,98 +63,98 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker break; case QUERY_NODE_OPERATOR: { SOperatorNode* pOpNode = (SOperatorNode*)pNode; - res = walkNode(pOpNode->pLeft, order, walker, pContext); + res = walkExpr(pOpNode->pLeft, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pOpNode->pRight, order, walker, pContext); + res = walkExpr(pOpNode->pRight, order, walker, pContext); } break; } case QUERY_NODE_LOGIC_CONDITION: - res = walkList(((SLogicConditionNode*)pNode)->pParameterList, order, walker, pContext); + res = walkExprs(((SLogicConditionNode*)pNode)->pParameterList, order, walker, pContext); break; case QUERY_NODE_FUNCTION: - res = walkList(((SFunctionNode*)pNode)->pParameterList, order, walker, pContext); + res = walkExprs(((SFunctionNode*)pNode)->pParameterList, order, walker, pContext); break; case QUERY_NODE_REAL_TABLE: case QUERY_NODE_TEMP_TABLE: break; // todo case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; - res = walkNode(pJoinTableNode->pLeft, order, walker, pContext); + res = walkExpr(pJoinTableNode->pLeft, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pJoinTableNode->pRight, order, walker, pContext); + res = walkExpr(pJoinTableNode->pRight, order, walker, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pJoinTableNode->pOnCond, order, walker, pContext); + res = walkExpr(pJoinTableNode->pOnCond, order, walker, pContext); } break; } case QUERY_NODE_GROUPING_SET: - res = walkList(((SGroupingSetNode*)pNode)->pParameterList, order, walker, pContext); + res = walkExprs(((SGroupingSetNode*)pNode)->pParameterList, order, walker, pContext); break; case QUERY_NODE_ORDER_BY_EXPR: - res = walkNode(((SOrderByExprNode*)pNode)->pExpr, order, walker, pContext); + res = walkExpr(((SOrderByExprNode*)pNode)->pExpr, order, walker, pContext); break; case QUERY_NODE_STATE_WINDOW: { SStateWindowNode* pState = (SStateWindowNode*)pNode; - res = walkNode(pState->pExpr, order, walker, pContext); + res = walkExpr(pState->pExpr, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pState->pCol, order, walker, pContext); + res = walkExpr(pState->pCol, order, walker, pContext); } break; } case QUERY_NODE_SESSION_WINDOW: { SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; - res = walkNode((SNode*)pSession->pCol, order, walker, pContext); + res = walkExpr((SNode*)pSession->pCol, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode((SNode*)pSession->pGap, order, walker, pContext); + res = walkExpr((SNode*)pSession->pGap, order, walker, pContext); } break; } case QUERY_NODE_INTERVAL_WINDOW: { SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; - res = walkNode(pInterval->pInterval, order, walker, pContext); + res = walkExpr(pInterval->pInterval, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pInterval->pOffset, order, walker, pContext); + res = walkExpr(pInterval->pOffset, order, walker, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pInterval->pSliding, order, walker, pContext); + res = walkExpr(pInterval->pSliding, order, walker, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pInterval->pFill, order, walker, pContext); + res = walkExpr(pInterval->pFill, order, walker, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = walkNode(pInterval->pCol, order, walker, pContext); + res = walkExpr(pInterval->pCol, order, walker, pContext); } break; } case QUERY_NODE_NODE_LIST: - res = walkList(((SNodeListNode*)pNode)->pNodeList, order, walker, pContext); + res = walkExprs(((SNodeListNode*)pNode)->pNodeList, order, walker, pContext); break; case QUERY_NODE_FILL: - res = walkNode(((SFillNode*)pNode)->pValues, order, walker, pContext); + res = walkExpr(((SFillNode*)pNode)->pValues, order, walker, pContext); break; case QUERY_NODE_RAW_EXPR: - res = walkNode(((SRawExprNode*)pNode)->pNode, order, walker, pContext); + res = walkExpr(((SRawExprNode*)pNode)->pNode, order, walker, pContext); break; case QUERY_NODE_TARGET: - res = walkNode(((STargetNode*)pNode)->pExpr, order, walker, pContext); + res = walkExpr(((STargetNode*)pNode)->pExpr, order, walker, pContext); break; default: break; } - if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) { - res = walker(pNode, pContext); - } - return res; } -static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) { +static EDealRes walkExpr(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { + return walkNode(pNode, order, walker, pContext, dispatchExpr); +} + +static EDealRes walkExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) { SNode* node; FOREACH(node, pNodeList) { - EDealRes res = walkNode(node, order, walker, pContext); + EDealRes res = walkExpr(node, order, walker, pContext); if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { return res; } @@ -145,24 +163,24 @@ static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalke } void nodesWalkExpr(SNodeptr pNode, FNodeWalker walker, void* pContext) { - (void)walkNode(pNode, TRAVERSAL_PREORDER, walker, pContext); + (void)walkExpr(pNode, TRAVERSAL_PREORDER, walker, pContext); } void nodesWalkExprs(SNodeList* pNodeList, FNodeWalker walker, void* pContext) { - (void)walkList(pNodeList, TRAVERSAL_PREORDER, walker, pContext); + (void)walkExprs(pNodeList, TRAVERSAL_PREORDER, walker, pContext); } void nodesWalkExprPostOrder(SNodeptr pNode, FNodeWalker walker, void* pContext) { - (void)walkNode(pNode, TRAVERSAL_POSTORDER, walker, pContext); + (void)walkExpr(pNode, TRAVERSAL_POSTORDER, walker, pContext); } void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext) { - (void)walkList(pList, TRAVERSAL_POSTORDER, walker, pContext); + (void)walkExprs(pList, TRAVERSAL_POSTORDER, walker, pContext); } -static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext); +static EDealRes rewriteExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext); -static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { +static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { if (NULL == pRawNode || NULL == *pRawNode) { return DEAL_RES_CONTINUE; } @@ -185,82 +203,82 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit break; case QUERY_NODE_OPERATOR: { SOperatorNode* pOpNode = (SOperatorNode*)pNode; - res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext); + res = rewriteExpr(&(pOpNode->pLeft), order, rewriter, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext); + res = rewriteExpr(&(pOpNode->pRight), order, rewriter, pContext); } break; } case QUERY_NODE_LOGIC_CONDITION: - res = rewriteList(((SLogicConditionNode*)pNode)->pParameterList, order, rewriter, pContext); + res = rewriteExprs(((SLogicConditionNode*)pNode)->pParameterList, order, rewriter, pContext); break; case QUERY_NODE_FUNCTION: - res = rewriteList(((SFunctionNode*)pNode)->pParameterList, order, rewriter, pContext); + res = rewriteExprs(((SFunctionNode*)pNode)->pParameterList, order, rewriter, pContext); break; case QUERY_NODE_REAL_TABLE: case QUERY_NODE_TEMP_TABLE: break; // todo case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; - res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext); + res = rewriteExpr(&(pJoinTableNode->pLeft), order, rewriter, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext); + res = rewriteExpr(&(pJoinTableNode->pRight), order, rewriter, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext); + res = rewriteExpr(&(pJoinTableNode->pOnCond), order, rewriter, pContext); } break; } case QUERY_NODE_GROUPING_SET: - res = rewriteList(((SGroupingSetNode*)pNode)->pParameterList, order, rewriter, pContext); + res = rewriteExprs(((SGroupingSetNode*)pNode)->pParameterList, order, rewriter, pContext); break; case QUERY_NODE_ORDER_BY_EXPR: - res = rewriteNode(&(((SOrderByExprNode*)pNode)->pExpr), order, rewriter, pContext); + res = rewriteExpr(&(((SOrderByExprNode*)pNode)->pExpr), order, rewriter, pContext); break; case QUERY_NODE_STATE_WINDOW: { SStateWindowNode* pState = (SStateWindowNode*)pNode; - res = rewriteNode(&pState->pExpr, order, rewriter, pContext); + res = rewriteExpr(&pState->pExpr, order, rewriter, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&pState->pCol, order, rewriter, pContext); + res = rewriteExpr(&pState->pCol, order, rewriter, pContext); } break; } case QUERY_NODE_SESSION_WINDOW: { SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; - res = rewriteNode((SNode**)&pSession->pCol, order, rewriter, pContext); + res = rewriteExpr((SNode**)&pSession->pCol, order, rewriter, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode((SNode**)&pSession->pGap, order, rewriter, pContext); + res = rewriteExpr((SNode**)&pSession->pGap, order, rewriter, pContext); } break; } case QUERY_NODE_INTERVAL_WINDOW: { SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; - res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext); + res = rewriteExpr(&(pInterval->pInterval), order, rewriter, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext); + res = rewriteExpr(&(pInterval->pOffset), order, rewriter, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext); + res = rewriteExpr(&(pInterval->pSliding), order, rewriter, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext); + res = rewriteExpr(&(pInterval->pFill), order, rewriter, pContext); } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { - res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext); + res = rewriteExpr(&(pInterval->pCol), order, rewriter, pContext); } break; } case QUERY_NODE_NODE_LIST: - res = rewriteList(((SNodeListNode*)pNode)->pNodeList, order, rewriter, pContext); + res = rewriteExprs(((SNodeListNode*)pNode)->pNodeList, order, rewriter, pContext); break; case QUERY_NODE_FILL: - res = rewriteNode(&(((SFillNode*)pNode)->pValues), order, rewriter, pContext); + res = rewriteExpr(&(((SFillNode*)pNode)->pValues), order, rewriter, pContext); break; case QUERY_NODE_RAW_EXPR: - res = rewriteNode(&(((SRawExprNode*)pNode)->pNode), order, rewriter, pContext); + res = rewriteExpr(&(((SRawExprNode*)pNode)->pNode), order, rewriter, pContext); break; case QUERY_NODE_TARGET: - res = rewriteNode(&(((STargetNode*)pNode)->pExpr), order, rewriter, pContext); + res = rewriteExpr(&(((STargetNode*)pNode)->pExpr), order, rewriter, pContext); break; default: break; @@ -273,10 +291,10 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit return res; } -static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { +static EDealRes rewriteExprs(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { SNode** pNode; FOREACH_FOR_REWRITE(pNode, pNodeList) { - EDealRes res = rewriteNode(pNode, order, rewriter, pContext); + EDealRes res = rewriteExpr(pNode, order, rewriter, pContext); if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { return res; } @@ -285,19 +303,19 @@ static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRe } void nodesRewriteExpr(SNode** pNode, FNodeRewriter rewriter, void* pContext) { - (void)rewriteNode(pNode, TRAVERSAL_PREORDER, rewriter, pContext); + (void)rewriteExpr(pNode, TRAVERSAL_PREORDER, rewriter, pContext); } void nodesRewriteExprs(SNodeList* pList, FNodeRewriter rewriter, void* pContext) { - (void)rewriteList(pList, TRAVERSAL_PREORDER, rewriter, pContext); + (void)rewriteExprs(pList, TRAVERSAL_PREORDER, rewriter, pContext); } void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext) { - (void)rewriteNode(pNode, TRAVERSAL_POSTORDER, rewriter, pContext); + (void)rewriteExpr(pNode, TRAVERSAL_POSTORDER, rewriter, pContext); } void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext) { - (void)rewriteList(pList, TRAVERSAL_POSTORDER, rewriter, pContext); + (void)rewriteExprs(pList, TRAVERSAL_POSTORDER, rewriter, pContext); } void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext) { @@ -357,3 +375,206 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit return; } + +static EDealRes walkPhysiNode(SPhysiNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { + EDealRes res = walkPhysiPlan((SNode*)pNode->pOutputDataBlockDesc, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pNode->pConditions, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pNode->pChildren, order, walker, pContext); + } + return res; +} + +static EDealRes walkScanPhysi(SScanPhysiNode* pScan, ETraversalOrder order, FNodeWalker walker, void* pContext) { + EDealRes res = walkPhysiNode((SPhysiNode*)pScan, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pScan->pScanCols, order, walker, pContext); + } + return res; +} + +static EDealRes walkTableScanPhysi(STableScanPhysiNode* pScan, ETraversalOrder order, FNodeWalker walker, void* pContext) { + EDealRes res = walkScanPhysi((SScanPhysiNode*)pScan, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pScan->pDynamicScanFuncs, order, walker, pContext); + } + return res; +} + +static EDealRes walkWindowPhysi(SWinodwPhysiNode* pWindow, ETraversalOrder order, FNodeWalker walker, void* pContext) { + EDealRes res = walkPhysiNode((SPhysiNode*)pWindow, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pWindow->pExprs, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pWindow->pFuncs, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pWindow->pTspk, order, walker, pContext); + } + return res; +} + +static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { + EDealRes res = DEAL_RES_CONTINUE; + + switch (nodeType(pNode)) { + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + res = walkTableScanPhysi((STableScanPhysiNode*)pNode, order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: + res = walkTableScanPhysi((STableScanPhysiNode*)pNode, order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: + res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: { + SProjectPhysiNode* pProject = (SProjectPhysiNode*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pProject->pProjections, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_JOIN: { + SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pJoin->pOnConditions, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pJoin->pTargets, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_AGG: { + SAggPhysiNode* pAgg = (SAggPhysiNode*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pAgg->pExprs, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pAgg->pGroupKeys, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pAgg->pAggFuncs, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pExchange->pSrcEndPoints, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SORT: { + SSortPhysiNode* pSort = (SSortPhysiNode*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pSort->pExprs, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pSort->pSortKeys, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pSort->pTargets, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: { + SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)pNode; + res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan((SNode*)pInterval->pFill, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: + res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: { + SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode; + res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pState->pStateKey, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { + SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pPart->pExprs, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pPart->pPartitionKeys, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pPart->pTargets, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + res = walkPhysiPlan((SNode*)(((SDataSinkNode*)pNode)->pInputDataBlockDesc), order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_PLAN_INSERT: + res = walkPhysiPlan((SNode*)(((SDataSinkNode*)pNode)->pInputDataBlockDesc), order, walker, pContext); + break; + case QUERY_NODE_PHYSICAL_SUBPLAN: { + SSubplan* pSubplan = (SSubplan*)pNode; + res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlans(pSubplan->pChildren, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan((SNode*)pSubplan->pNode, order, walker, pContext); + } + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan((SNode*)pSubplan->pDataSink, order, walker, pContext); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN: { + SQueryPlan* pPlan = (SQueryPlan*)pNode; + if (NULL != pPlan->pSubplans) { + // only need to walk the top-level subplans, because they will recurse to all the subplans below + walkPhysiPlan(nodesListGetNode(pPlan->pSubplans, 0), order, walker, pContext); + } + break; + } + default: + res = dispatchExpr(pNode, order, walker, pContext); + break; + } + + return res; +} + +static EDealRes walkPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) { + return walkNode(pNode, order, walker, pContext, dispatchPhysiPlan); +} + +static EDealRes walkPhysiPlans(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) { + SNode* node; + FOREACH(node, pNodeList) { + EDealRes res = walkPhysiPlan(node, order, walker, pContext); + if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { + return res; + } + } + return DEAL_RES_CONTINUE; +} + +void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext) { + (void)walkPhysiPlan(pNode, TRAVERSAL_PREORDER, walker, pContext); +} diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 2c4fba50592342d2f24131d589ec82b82df0b75c..85f9b3399c0ff51d564264d92473bc3bcafdd4d3 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -32,6 +32,7 @@ typedef struct SAstCreateContext { bool notSupport; bool valid; SNode* pRootNode; + int16_t placeholderNo; } SAstCreateContext; typedef enum EDatabaseOptionType { @@ -86,7 +87,7 @@ SNode* createColumnNode(SAstCreateContext* pCxt, SToken* pTableAlias, SToken* pC SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral); SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt); -SNode* createPlaceholderValueNode(SAstCreateContext* pCxt); +SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias); SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2); SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index a995ec9035c3714bf8c0ed544d1a16df9d9f9e23..33b525cf3af08e7950c6d9427a625a068aa10a2d 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -465,8 +465,8 @@ literal(A) ::= NK_STRING(B). literal(A) ::= NK_BOOL(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_BOOL, &B)); } literal(A) ::= TIMESTAMP(B) NK_STRING(C). { A = createRawExprNodeExt(pCxt, &B, &C, createValueNode(pCxt, TSDB_DATA_TYPE_TIMESTAMP, &C)); } literal(A) ::= duration_literal(B). { A = B; } -literal(A) ::= NULL(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_NULL, NULL)); } -literal(A) ::= NK_QUESTION(B). { A = createRawExprNode(pCxt, &B, createPlaceholderValueNode(pCxt)); } +literal(A) ::= NULL(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B)); } +literal(A) ::= NK_QUESTION(B). { A = createRawExprNode(pCxt, &B, createPlaceholderValueNode(pCxt, &B)); } duration_literal(A) ::= NK_VARIABLE(B). { A = createRawExprNode(pCxt, &B, createDurationValueNode(pCxt, &B)); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 790aa92a10a578f22fcf6fd8e8720c941705c1f6..0bc28beef9b510a3e02de53dd1ffd8986b197f02 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -44,6 +44,7 @@ void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) { pCxt->notSupport = false; pCxt->valid = true; pCxt->pRootNode = NULL; + pCxt->placeholderNo = 1; } static void trimEscape(SToken* pName) { @@ -258,14 +259,12 @@ SNode* createColumnNode(SAstCreateContext* pCxt, SToken* pTableAlias, SToken* pC SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* pLiteral) { SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); CHECK_OUT_OF_MEM(val); - if (NULL != pLiteral) { - val->literal = strndup(pLiteral->z, pLiteral->n); - if (TK_NK_ID != pLiteral->type && TK_TIMEZONE != pLiteral->type && - (IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) { - trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n); - } - CHECK_OUT_OF_MEM(val->literal); + val->literal = strndup(pLiteral->z, pLiteral->n); + if (TK_NK_ID != pLiteral->type && TK_TIMEZONE != pLiteral->type && + (IS_VAR_DATA_TYPE(dataType) || TSDB_DATA_TYPE_TIMESTAMP == dataType)) { + trimString(pLiteral->z, pLiteral->n, val->literal, pLiteral->n); } + CHECK_OUT_OF_MEM(val->literal); val->node.resType.type = dataType; val->node.resType.bytes = IS_VAR_DATA_TYPE(dataType) ? strlen(val->literal) : tDataTypes[dataType].bytes; if (TSDB_DATA_TYPE_TIMESTAMP == dataType) { @@ -306,10 +305,12 @@ SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt) { return (SNode*)val; } -SNode* createPlaceholderValueNode(SAstCreateContext* pCxt) { +SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLiteral) { SValueNode* val = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); CHECK_OUT_OF_MEM(val); - // todo + val->literal = strndup(pLiteral->z, pLiteral->n); + CHECK_OUT_OF_MEM(val->literal); + val->placeholderNo = pCxt->placeholderNo++; return (SNode*)val; } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 76af6b7ac76c8674305d71f31b2a3f82ecfd9513..2eeff04ea2ff18ece6b34bc1609eb8a78d9ceb72 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -50,7 +50,6 @@ int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) { Parse(pParser, 0, t0, &cxt); goto abort_parse; } - case TK_NK_QUESTION: case TK_NK_ILLEGAL: { snprintf(cxt.pQueryCxt->pMsg, cxt.pQueryCxt->msgLen, "unrecognized token: \"%s\"", t0.z); cxt.valid = false; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 45e8cd3acc79ce7b5eaa38118d82ad0a1e440da1..7e894eba17afa80310b62bee13c3653026b8d7f4 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -403,6 +403,9 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) { static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) { uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : pVal->node.resType.precision); pVal->node.resType.precision = precision; + if (pVal->placeholderNo > 0) { + return DEAL_RES_CONTINUE; + } if (pVal->isDuration) { if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) != TSDB_CODE_SUCCESS) { diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 6969a5a0f939f5e15f887cbcd1cbdc4eaca96f3e..224d64701736fac4cf0e91eeb1baf3da46854cf0 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -134,7 +134,6 @@ typedef union { #define ParseCTX_STORE #define YYNSTATE 568 #define YYNRULE 433 -#define YYNRULE_WITH_ACTION 433 #define YYNTOKEN 220 #define YY_MAX_SHIFT 567 #define YY_MIN_SHIFTREDUCE 843 @@ -588,28 +587,7 @@ static const YYCODETYPE yy_lookahead[] = { /* 1840 */ 334, 334, 334, 334, 334, 334, 334, 334, 334, 334, /* 1850 */ 334, 334, 334, 334, 334, 286, 287, 288, 289, 290, /* 1860 */ 291, 334, 293, 334, 334, 334, 334, 334, 334, 334, - /* 1870 */ 334, 334, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1880 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1890 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1900 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1910 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1920 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1930 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1940 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1950 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1960 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1970 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1980 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 1990 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2000 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2010 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2020 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2030 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2040 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2050 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2060 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2070 */ 220, 220, 220, 220, 220, 220, 220, 220, 220, 220, - /* 2080 */ 220, 220, 220, + /* 1870 */ 334, 334, }; #define YY_SHIFT_COUNT (567) #define YY_SHIFT_MIN (0) @@ -2050,18 +2028,15 @@ static YYACTIONTYPE yy_find_shift_action( do{ i = yy_shift_ofst[stateno]; assert( i>=0 ); - assert( i<=YY_ACTTAB_COUNT ); - assert( i+YYNTOKEN<=(int)YY_NLOOKAHEAD ); + /* assert( i+YYNTOKEN<=(int)YY_NLOOKAHEAD ); */ assert( iLookAhead!=YYNOCODE ); assert( iLookAhead < YYNTOKEN ); i += iLookAhead; - assert( i<(int)YY_NLOOKAHEAD ); - if( yy_lookahead[i]!=iLookAhead ){ + if( i>=YY_NLOOKAHEAD || yy_lookahead[i]!=iLookAhead ){ #ifdef YYFALLBACK YYCODETYPE iFallback; /* Fallback token */ - assert( iLookAhead %s\n", @@ -2076,8 +2051,16 @@ static YYACTIONTYPE yy_find_shift_action( #ifdef YYWILDCARD { int j = i - iLookAhead + YYWILDCARD; - assert( j<(int)(sizeof(yy_lookahead)/sizeof(yy_lookahead[0])) ); - if( yy_lookahead[j]==YYWILDCARD && iLookAhead>0 ){ + if( +#if YY_SHIFT_MIN+YYWILDCARD<0 + j>=0 && +#endif +#if YY_SHIFT_MAX+YYWILDCARD>=YY_ACTTAB_COUNT + j0 + ){ #ifndef NDEBUG if( yyTraceFILE ){ fprintf(yyTraceFILE, "%sWILDCARD %s => %s\n", @@ -2091,7 +2074,6 @@ static YYACTIONTYPE yy_find_shift_action( #endif /* YYWILDCARD */ return yy_default[stateno]; }else{ - assert( i>=0 && iyytos; #ifndef NDEBUG if( yyTraceFILE && yyruleno<(int)(sizeof(yyRuleName)/sizeof(yyRuleName[0])) ){ - yysize = yyRuleInfoNRhs[yyruleno]; + yysize = yyRuleInfo[yyruleno].nrhs; if( yysize ){ - fprintf(yyTraceFILE, "%sReduce %d [%s]%s, pop back to state %d.\n", + fprintf(yyTraceFILE, "%sReduce %d [%s], go to state %d.\n", yyTracePrompt, - yyruleno, yyRuleName[yyruleno], - yyrulenoyytos - yypParser->yystack)>yypParser->yyhwm ){ yypParser->yyhwm++; @@ -3988,11 +3533,11 @@ static YYACTIONTYPE yy_reduce( yymsp[0].minor.yy456 = yylhsminor.yy456; break; case 257: /* literal ::= NULL */ -{ yylhsminor.yy456 = createRawExprNode(pCxt, &yymsp[0].minor.yy0, createValueNode(pCxt, TSDB_DATA_TYPE_NULL, NULL)); } +{ yylhsminor.yy456 = createRawExprNode(pCxt, &yymsp[0].minor.yy0, createValueNode(pCxt, TSDB_DATA_TYPE_NULL, &yymsp[0].minor.yy0)); } yymsp[0].minor.yy456 = yylhsminor.yy456; break; case 258: /* literal ::= NK_QUESTION */ -{ yylhsminor.yy456 = createRawExprNode(pCxt, &yymsp[0].minor.yy0, createPlaceholderValueNode(pCxt)); } +{ yylhsminor.yy456 = createRawExprNode(pCxt, &yymsp[0].minor.yy0, createPlaceholderValueNode(pCxt, &yymsp[0].minor.yy0)); } yymsp[0].minor.yy456 = yylhsminor.yy456; break; case 259: /* duration_literal ::= NK_VARIABLE */ @@ -4436,9 +3981,9 @@ static YYACTIONTYPE yy_reduce( break; /********** End reduce actions ************************************************/ }; - assert( yyruleno. */ -extern bool g_isDump; \ No newline at end of file +#ifndef PARSER_TEST_UTIL_H +#define PARSER_TEST_UTIL_H + +extern bool g_isDump; + +#endif // PARSER_TEST_UTIL_H diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index bcea94278ea7c686e44045ea9470152e5052556c..004f0b18fd2e1796ab932d1c062645772f5c664e 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -17,6 +17,31 @@ #include "planInt.h" +typedef struct SCollectPlaceholderValuesCxt { + int32_t errCode; + SNodeList* pValues; +} SCollectPlaceholderValuesCxt; + +static EDealRes collectPlaceholderValuesImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_VALUE == nodeType(pNode) && ((SValueNode*)pNode)->placeholderNo > 0) { + SCollectPlaceholderValuesCxt* pCxt = pContext; + pCxt->errCode = nodesListMakeAppend(&pCxt->pValues, pNode); + return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR; + } + return DEAL_RES_CONTINUE; +} + +static int32_t collectPlaceholderValues(SPlanContext* pCxt, SQueryPlan* pPlan) { + SCollectPlaceholderValuesCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pValues = NULL }; + nodesWalkPhysiPlan((SNode*)pPlan, collectPlaceholderValuesImpl, &cxt); + if (TSDB_CODE_SUCCESS == cxt.errCode) { + pPlan->pPlaceholderValues = cxt.pValues; + } else { + nodesDestroyList(cxt.pValues); + } + return cxt.errCode; +} + int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) { SLogicNode* pLogicNode = NULL; SLogicSubplan* pLogicSubplan = NULL; @@ -35,6 +60,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo if (TSDB_CODE_SUCCESS == code) { code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList); } + if (TSDB_CODE_SUCCESS == code && pCxt->isStmtQuery) { + code = collectPlaceholderValues(pCxt, *pPlan); + } nodesDestroyNode(pLogicNode); nodesDestroyNode(pLogicSubplan); @@ -73,6 +101,82 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstream return setSubplanExecutionNode(subplan->pNode, groupId, pSource); } +static int32_t setValueByBindParam(SValueNode* pVal, TAOS_BIND_v2* pParam) { + if (1 == *(pParam->is_null)) { + pVal->node.resType.type = TSDB_DATA_TYPE_NULL; + pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; + return TSDB_CODE_SUCCESS; + } + pVal->node.resType.type = pParam->buffer_type; + pVal->node.resType.bytes = *(pParam->length); + switch (pParam->buffer_type) { + case TSDB_DATA_TYPE_BOOL: + pVal->datum.b = *((bool*)pParam->buffer); + break; + case TSDB_DATA_TYPE_TINYINT: + pVal->datum.i = *((int8_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_SMALLINT: + pVal->datum.i = *((int16_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_INT: + pVal->datum.i = *((int32_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_BIGINT: + pVal->datum.i = *((int64_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_FLOAT: + pVal->datum.d = *((float*)pParam->buffer); + break; + case TSDB_DATA_TYPE_DOUBLE: + pVal->datum.d = *((double*)pParam->buffer); + break; + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: + pVal->datum.p = taosMemoryCalloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1); + if (NULL == pVal->datum.p) { + return TSDB_CODE_OUT_OF_MEMORY; + } + varDataSetLen(pVal->datum.p, pVal->node.resType.bytes); + strncpy(varDataVal(pVal->datum.p), (const char*)pParam->buffer, pVal->node.resType.bytes); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + pVal->datum.i = *((int64_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_UTINYINT: + pVal->datum.u = *((uint8_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_USMALLINT: + pVal->datum.u = *((uint16_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_UINT: + pVal->datum.u = *((uint32_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_UBIGINT: + pVal->datum.u = *((uint64_t*)pParam->buffer); + break; + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + case TSDB_DATA_TYPE_MEDIUMBLOB: + // todo + default: + break; + } + pVal->translate = true; + return TSDB_CODE_SUCCESS; +} + +int32_t qStmtBindParam(SQueryPlan* pPlan, TAOS_BIND_v2* pParams) { + int32_t index = 0; + SNode* pNode = NULL; + FOREACH(pNode, pPlan->pPlaceholderValues) { + setValueByBindParam((SValueNode*)pNode, pParams + index); + } + return TSDB_CODE_SUCCESS; +} + int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) { SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink; diff --git a/source/libs/planner/test/planStmtTest.cpp b/source/libs/planner/test/planStmtTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ca206c784356eccfec485d428c5eb104fb46b7bf --- /dev/null +++ b/source/libs/planner/test/planStmtTest.cpp @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "planTestUtil.h" +#include "planner.h" + +using namespace std; + +class PlanStmtTest : public PlannerTestBase { +public: + void prepare(const string& sql) { + run(sql); + // todo calloc pBindParams_ + } + + void bindParam(int32_t val) { + TAOS_BIND_v2* pBind = pBindParams_ + paramNo_++; + pBind->buffer_type = TSDB_DATA_TYPE_INT; + pBind->num = 1; + pBind->buffer_length = sizeof(int32_t); + pBind->buffer = taosMemoryCalloc(1, pBind->buffer_length); + pBind->length = (int32_t*)taosMemoryCalloc(1, sizeof(int32_t)); + pBind->is_null = (char*)taosMemoryCalloc(1, sizeof(char)); + *((int32_t*)pBind->buffer) = val; + *(pBind->length) = sizeof(int32_t); + *(pBind->is_null) = 0; + } + + void exec() { + // todo + } + +private: + TAOS_BIND_v2* pBindParams_; + int32_t paramNo_; +}; + +TEST_F(PlanStmtTest, stmt) { + useDb("root", "test"); + + run("SELECT * FROM t1 where c1 = ?"); +} diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e4c1d4100082468c69645ebb25a9d73da718b490 --- /dev/null +++ b/source/libs/planner/test/planTestUtil.cpp @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "planTestUtil.h" + +#include + +#include "cmdnodes.h" +#include "parser.h" +#include "planInt.h" + +using namespace std; +using namespace testing; + +#define DO_WITH_THROW(func, ...) \ + do { \ + int32_t code__ = func(__VA_ARGS__); \ + if (TSDB_CODE_SUCCESS != code__) { \ + throw runtime_error("sql:[" + stmtEnv_.sql_ + "] " #func " code:" + to_string(code__) + ", strerror:" + string(tstrerror(code__)) + ", msg:" + string(stmtEnv_.msgBuf_.data())); \ + } \ + } while(0); + +class PlannerTestBaseImpl { +public: + void useDb(const string& acctId, const string& db) { + caseEnv_.acctId_ = acctId; + caseEnv_.db_ = db; + } + + void run(const string& sql) { + reset(); + try { + SQuery* pQuery = nullptr; + doParseSql(sql, &pQuery); + + SPlanContext cxt = {0}; + setPlanContext(pQuery, &cxt); + + SLogicNode* pLogicNode = nullptr; + doCreateLogicPlan(&cxt, &pLogicNode); + + doOptimizeLogicPlan(&cxt, pLogicNode); + + SLogicSubplan* pLogicSubplan = nullptr; + doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); + + SQueryLogicPlan* pLogicPlan = nullptr; + doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); + + SQueryPlan* pPlan = nullptr; + doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL); + } catch (...) { + dump(); + throw; + } + } + +private: + struct caseEnv { + string acctId_; + string db_; + }; + + struct stmtEnv { + string sql_; + array msgBuf_; + }; + + struct stmtRes { + string ast_; + string rawLogicPlan_; + string optimizedLogicPlan_; + string splitLogicPlan_; + string scaledLogicPlan_; + string physiPlan_; + }; + + void reset() { + stmtEnv_.sql_.clear(); + stmtEnv_.msgBuf_.fill(0); + + res_.ast_.clear(); + res_.rawLogicPlan_.clear(); + res_.optimizedLogicPlan_.clear(); + res_.splitLogicPlan_.clear(); + res_.scaledLogicPlan_.clear(); + res_.physiPlan_.clear(); + } + + void dump() { + cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl; + cout << "syntax tree : " << endl; + cout << res_.ast_ << endl; + cout << "raw logic plan : " << endl; + cout << res_.rawLogicPlan_ << endl; + cout << "optimized logic plan : " << endl; + cout << res_.optimizedLogicPlan_ << endl; + cout << "split logic plan : " << endl; + cout << res_.splitLogicPlan_ << endl; + cout << "scaled logic plan : " << endl; + cout << res_.scaledLogicPlan_ << endl; + cout << "physical plan : " << endl; + cout << res_.physiPlan_ << endl; + } + + void doParseSql(const string& sql, SQuery** pQuery) { + stmtEnv_.sql_ = sql; + transform(stmtEnv_.sql_.begin(), stmtEnv_.sql_.end(), stmtEnv_.sql_.begin(), ::tolower); + + SParseContext cxt = {0}; + cxt.acctId = atoi(caseEnv_.acctId_.c_str()); + cxt.db = caseEnv_.db_.c_str(); + cxt.pSql = stmtEnv_.sql_.c_str(); + cxt.sqlLen = stmtEnv_.sql_.length(); + cxt.pMsg = stmtEnv_.msgBuf_.data(); + cxt.msgLen = stmtEnv_.msgBuf_.max_size(); + + DO_WITH_THROW(qParseQuerySql, &cxt, pQuery); + res_.ast_ = toString((*pQuery)->pRoot); + } + + void doCreateLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) { + DO_WITH_THROW(createLogicPlan, pCxt, pLogicNode); + res_.rawLogicPlan_ = toString((SNode*)(*pLogicNode)); + } + + void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { + DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicNode); + res_.optimizedLogicPlan_ = toString((SNode*)pLogicNode); + } + + void doSplitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) { + DO_WITH_THROW(splitLogicPlan, pCxt, pLogicNode, pLogicSubplan); + res_.splitLogicPlan_ = toString((SNode*)(*pLogicSubplan)); + } + + void doScaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) { + DO_WITH_THROW(scaleOutLogicPlan, pCxt, pLogicSubplan, pLogicPlan); + res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan)); + } + + void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) { + DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList); + res_.physiPlan_ = toString((SNode*)(*pPlan)); + } + + void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) { + if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) { + pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery; + pCxt->topicQuery = true; + } else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) { + SMCreateSmaReq req = {0}; + tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); + nodesStringToNode(req.ast, &pCxt->pAstRoot); + pCxt->streamQuery = true; + } else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) { + SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pQuery->pRoot; + pCxt->pAstRoot = pStmt->pQuery; + pCxt->streamQuery = true; + pCxt->triggerType = pStmt->pOptions->triggerType; + pCxt->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); + } else { + pCxt->pAstRoot = pQuery->pRoot; + } + } + + string toString(const SNode* pRoot) { + char* pStr = NULL; + int32_t len = 0; + DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len) + string str(pStr); + taosMemoryFreeClear(pStr); + return str; + } + + caseEnv caseEnv_; + stmtEnv stmtEnv_; + stmtRes res_; +}; + +PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) { +} + +PlannerTestBase::~PlannerTestBase() { +} + +void PlannerTestBase::useDb(const std::string& acctId, const std::string& db) { + impl_->useDb(acctId, db); +} + +void PlannerTestBase::run(const std::string& sql) { + return impl_->run(sql); +} diff --git a/source/libs/planner/test/planTestUtil.h b/source/libs/planner/test/planTestUtil.h new file mode 100644 index 0000000000000000000000000000000000000000..71039082f9b148936c7b44058ef8b78c93563f55 --- /dev/null +++ b/source/libs/planner/test/planTestUtil.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef PLAN_TEST_UTIL_H +#define PLAN_TEST_UTIL_H + +#include + +class PlannerTestBaseImpl; + +class PlannerTestBase : public testing::Test { +public: + PlannerTestBase(); + virtual ~PlannerTestBase(); + + void useDb(const std::string& acctId, const std::string& db); + void run(const std::string& sql); + +private: + std::unique_ptr impl_; +}; + +#endif // PLAN_TEST_UTIL_H diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 291b4c9cc70880d1328afbe2fe2197344dd2459c..7ab61a8daaab0b6e66dfd453e3afc780a26a571a 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -90,17 +90,16 @@ protected: return false; } - SQueryPlan* pPlan = nullptr; - code = createPhysiPlan(&cxt, pLogicPlan, &pPlan, NULL); + code = createPhysiPlan(&cxt, pLogicPlan, &plan_, NULL); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] createPhysiPlan code:" << code << ", strerror:" << tstrerror(code) << endl; return false; } cout << "unformatted physical plan : " << endl; - cout << toString((const SNode*)pPlan, false) << endl; + cout << toString((const SNode*)plan_, false) << endl; SNode* pNode; - FOREACH(pNode, pPlan->pSubplans) { + FOREACH(pNode, plan_->pSubplans) { SNode* pSubplan; FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { cout << "unformatted physical subplan : " << endl; @@ -160,6 +159,7 @@ private: string sqlBuf_; SParseContext cxt_; SQuery* query_; + SQueryPlan* plan_; }; TEST_F(PlannerTest, selectBasic) {