From a380e0cd7cc40ca3cd2ca060e1492e6f62bb5877 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 4 Jul 2022 19:49:16 +0800 Subject: [PATCH] feat: merge condition, on condition and other conditions --- include/libs/nodes/querynodes.h | 1 + source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/joinoperator.c | 22 ++++++++++---- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 11 +++++++ source/libs/nodes/src/nodesTraverseFuncs.c | 3 ++ source/libs/nodes/src/nodesUtilFuncs.c | 34 ++++++++++++++++++++++ source/libs/planner/src/planOptimizer.c | 12 +++++--- source/libs/planner/src/planPhysiCreater.c | 21 ++++++++++--- 9 files changed, 93 insertions(+), 15 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 9d84f2c32f..4a798c2dfa 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -375,6 +375,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType; int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, SNodeList** pCols); +int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols); typedef bool (*FFuncClassifier)(int32_t funcId); int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, FFuncClassifier classifier, SNodeList** pFuncs); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9872f26b03..1318b6e4c1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -678,7 +678,8 @@ typedef struct SJoinOperatorInfo { SSDataBlock *pRight; int32_t rightPos; SColumnInfo rightCol; - SNode *pOnCondition; + SNode *pOnConditions; + SNode *pOtherConditions; } SJoinOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 33954d5517..71f04815dc 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -53,13 +53,20 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - SNode* pOnCondition = pJoinNode->pOnConditions; - if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { - SOperatorNode* pNode = (SOperatorNode*)pOnCondition; + SNode* pMergeCondition = pJoinNode->pMergeCondition; + if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { + SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); - } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { - extractTimeCondition(pInfo, (SLogicConditionNode*)pOnCondition); + } else { + ASSERT(false); + } + + //TODO: merge these two conditions + ASSERT(pJoinNode->pOnConditions); + pInfo->pOnConditions = nodesCloneNode(pJoinNode->pOnConditions); + if (pJoinNode->node.pConditions != NULL) { + pInfo->pOtherConditions = pJoinNode->node.pConditions; } pOperator->fpSet = @@ -88,6 +95,8 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + nodesDestroyNode(pJoinOperator->pOnConditions); + nodesDestroyNode(pJoinOperator->pOtherConditions); } static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { @@ -192,7 +201,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (numOfNewRows == 0) { break; } - doFilter(pJoinInfo->pOnCondition, pRes); + doFilter(pJoinInfo->pOnConditions, pRes); + doFilter(pJoinInfo->pOtherConditions, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 68654b21a6..99216f5901 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -359,6 +359,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(joinType); + CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); COPY_SCALAR_FIELD(isSingleTableJoin); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index df5a401eb1..f1d19af4ac 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1253,6 +1253,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; +static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1261,6 +1262,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } @@ -1616,6 +1620,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { } static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; @@ -1626,6 +1631,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions); } @@ -1647,6 +1655,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanMergeCondition, &pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 3747dde9ed..b12e3b14c7 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -470,6 +470,9 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); + } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkPhysiPlan(pJoin->pOnConditions, order, walker, pContext); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 1972010e25..10081f07e3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -717,6 +717,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: { SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); break; } @@ -827,6 +828,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); break; @@ -1492,6 +1494,38 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* return TSDB_CODE_SUCCESS; } +int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols) { + if (NULL == pCols) { + return TSDB_CODE_FAILED; + } + SCollectColumnsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .pTableAlias = pTableAlias, + .collectType = type, + .pCols = (NULL == *pCols ? nodesMakeList() : *pCols), + .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; + if (NULL == cxt.pCols || NULL == cxt.pColHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pCols = NULL; + + nodesWalkExpr(node, collectColumns, &cxt); + + taosHashCleanup(cxt.pColHash); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pCols); + return cxt.errCode; + } + if (LIST_LENGTH(cxt.pCols) > 0) { + *pCols = cxt.pCols; + } else { + nodesDestroyList(cxt.pCols); + } + + return TSDB_CODE_SUCCESS; + +} + typedef struct SCollectFuncsCxt { int32_t errCode; FFuncClassifier classifier; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 87a5096d26..90e35fec20 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -540,11 +540,15 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* } return hasPrimaryKeyEqualCond; } else { - return pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); + bool isPriKeyEqualCond = pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); + if (isPriKeyEqualCond) { + pJoin->pMergeCondition = nodesCloneNode(pCond); + } + return isPriKeyEqualCond; } } -static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { +static int32_t pushDownCondOptExtractJoinMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pOnConditions) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); } @@ -560,7 +564,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p } if (NULL == pJoin->node.pConditions) { - return pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + return pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); } SNode* pOnCond = NULL; @@ -582,7 +586,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; - code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + code = pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); } else { nodesDestroyNode(pOnCond); nodesDestroyNode(pLeftChildCond); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index aac9c25f77..8a0139dfb5 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -609,10 +609,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren int32_t code = TSDB_CODE_SUCCESS; pJoin->joinType = pJoinLogicNode->joinType; - if (NULL != pJoinLogicNode->pOnConditions) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions, - &pJoin->pOnConditions); - } + setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, + &pJoin->pMergeCondition); if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); @@ -620,6 +618,21 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } + + SNodeList* condCols = nodesMakeList(); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + code = nodesCollectColumnsFromNode(pJoinLogicNode->pOnConditions, NULL, COLLECT_COL_TYPE_ALL, &condCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, condCols, pJoin->node.pOutputDataBlockDesc); + nodesDestroyList(condCols); + } + + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions, + &pJoin->pOnConditions); + } + if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } -- GitLab