diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 9d84f2c32faeef632f771c503350449f4eaf9ffe..4a798c2dfa3c8b240284586b8ea6ac2169ca03fb 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -375,6 +375,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit typedef enum ECollectColType { COLLECT_COL_TYPE_COL = 1, COLLECT_COL_TYPE_TAG, COLLECT_COL_TYPE_ALL } ECollectColType; int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* pTableAlias, ECollectColType type, SNodeList** pCols); +int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols); typedef bool (*FFuncClassifier)(int32_t funcId); int32_t nodesCollectFuncs(SSelectStmt* pSelect, ESqlClause clause, FFuncClassifier classifier, SNodeList** pFuncs); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9872f26b036712758f3358bf45f249a603a3190b..1318b6e4c18f89c7e840715473ce47da7b30ab11 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -678,7 +678,8 @@ typedef struct SJoinOperatorInfo { SSDataBlock *pRight; int32_t rightPos; SColumnInfo rightCol; - SNode *pOnCondition; + SNode *pOnConditions; + SNode *pOtherConditions; } SJoinOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 33954d551702034328a450bb7e1f8f658c91ecbb..71f04815dc2a12196896bd74b0e2e3c57241f135 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -53,13 +53,20 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - SNode* pOnCondition = pJoinNode->pOnConditions; - if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { - SOperatorNode* pNode = (SOperatorNode*)pOnCondition; + SNode* pMergeCondition = pJoinNode->pMergeCondition; + if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { + SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); - } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { - extractTimeCondition(pInfo, (SLogicConditionNode*)pOnCondition); + } else { + ASSERT(false); + } + + //TODO: merge these two conditions + ASSERT(pJoinNode->pOnConditions); + pInfo->pOnConditions = nodesCloneNode(pJoinNode->pOnConditions); + if (pJoinNode->node.pConditions != NULL) { + pInfo->pOtherConditions = pJoinNode->node.pConditions; } pOperator->fpSet = @@ -88,6 +95,8 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + nodesDestroyNode(pJoinOperator->pOnConditions); + nodesDestroyNode(pJoinOperator->pOtherConditions); } static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { @@ -192,7 +201,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (numOfNewRows == 0) { break; } - doFilter(pJoinInfo->pOnCondition, pRes); + doFilter(pJoinInfo->pOnConditions, pRes); + doFilter(pJoinInfo->pOtherConditions, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 68654b21a684dcb7cfe3a32ac26db59729e0e203..99216f59017b226c76328429514164dbb8453bfb 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -359,6 +359,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(joinType); + CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); COPY_SCALAR_FIELD(isSingleTableJoin); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index df5a401eb14f2d42351d1ad7e71ef97d43cbc5b2..f1d19af4acb233fa262ca5225836842187c3e7e1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1253,6 +1253,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanOnConditions = "OnConditions"; +static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; @@ -1261,6 +1262,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinLogicPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinLogicPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); } @@ -1616,6 +1620,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { } static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; @@ -1626,6 +1631,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pOnConditions); } @@ -1647,6 +1655,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkJoinPhysiPlanMergeCondition, &pNode->pMergeCondition); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 3747dde9ed48d248e153affd8487333a8432afbf..b12e3b14c70d0ba471baf2172c16c6c7b5a617bc 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -470,6 +470,9 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { + res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); + } if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkPhysiPlan(pJoin->pOnConditions, order, walker, pContext); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 1972010e2591ba29d15f3cf6360b0669afa0ffe7..10081f07e3b3f3315e7ff86190c488b4caa748e5 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -717,6 +717,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: { SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode; destroyLogicNode((SLogicNode*)pLogicNode); + nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pOnConditions); break; } @@ -827,6 +828,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyList(pPhyNode->pTargets); break; @@ -1492,6 +1494,38 @@ int32_t nodesCollectColumns(SSelectStmt* pSelect, ESqlClause clause, const char* return TSDB_CODE_SUCCESS; } +int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, ECollectColType type, SNodeList** pCols) { + if (NULL == pCols) { + return TSDB_CODE_FAILED; + } + SCollectColumnsCxt cxt = { + .errCode = TSDB_CODE_SUCCESS, + .pTableAlias = pTableAlias, + .collectType = type, + .pCols = (NULL == *pCols ? nodesMakeList() : *pCols), + .pColHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK)}; + if (NULL == cxt.pCols || NULL == cxt.pColHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + *pCols = NULL; + + nodesWalkExpr(node, collectColumns, &cxt); + + taosHashCleanup(cxt.pColHash); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pCols); + return cxt.errCode; + } + if (LIST_LENGTH(cxt.pCols) > 0) { + *pCols = cxt.pCols; + } else { + nodesDestroyList(cxt.pCols); + } + + return TSDB_CODE_SUCCESS; + +} + typedef struct SCollectFuncsCxt { int32_t errCode; FFuncClassifier classifier; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 87a5096d267859837a632ff3a4451a51e0c87435..90e35fec2001e0812245981a653fc9fd2e2cf897 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -540,11 +540,15 @@ static bool pushDownCondOptContainPriKeyEqualCond(SJoinLogicNode* pJoin, SNode* } return hasPrimaryKeyEqualCond; } else { - return pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); + bool isPriKeyEqualCond = pushDownCondOptIsPriKeyEqualCond(pJoin, pCond); + if (isPriKeyEqualCond) { + pJoin->pMergeCondition = nodesCloneNode(pCond); + } + return isPriKeyEqualCond; } } -static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { +static int32_t pushDownCondOptExtractJoinMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pOnConditions) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); } @@ -560,7 +564,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p } if (NULL == pJoin->node.pConditions) { - return pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + return pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); } SNode* pOnCond = NULL; @@ -582,7 +586,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p if (TSDB_CODE_SUCCESS == code) { OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); pCxt->optimized = true; - code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); + code = pushDownCondOptExtractJoinMergeCond(pCxt, pJoin); } else { nodesDestroyNode(pOnCond); nodesDestroyNode(pLeftChildCond); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index aac9c25f779e864a86c300be10ce5756f99ab676..8a0139dfb522ffd680829c579396359ae159d1e1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -609,10 +609,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren int32_t code = TSDB_CODE_SUCCESS; pJoin->joinType = pJoinLogicNode->joinType; - if (NULL != pJoinLogicNode->pOnConditions) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pOnConditions, - &pJoin->pOnConditions); - } + setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, + &pJoin->pMergeCondition); if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); @@ -620,6 +618,21 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (TSDB_CODE_SUCCESS == code) { code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } + + SNodeList* condCols = nodesMakeList(); + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + code = nodesCollectColumnsFromNode(pJoinLogicNode->pOnConditions, NULL, COLLECT_COL_TYPE_ALL, &condCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, condCols, pJoin->node.pOutputDataBlockDesc); + nodesDestroyList(condCols); + } + + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { + code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions, + &pJoin->pOnConditions); + } + if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); }