提交 2df67d91 编写于 作者: H Haojun Liao

fix(query): extract correct join condition from physical plan.

上级 18fcb334
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
static void extractTimeCondition(SJoinOperatorInfo *Info, SLogicConditionNode* pLogicConditionNode);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo,
int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition,
...@@ -38,22 +39,22 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t ...@@ -38,22 +39,22 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
pInfo->pRes = pResBlock; pInfo->pRes = pResBlock;
pOperator->name = "MergeJoinOperator"; pOperator->name = "MergeJoinOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) {
SOperatorNode* pNode = (SOperatorNode*)pOnCondition; SOperatorNode* pNode = (SOperatorNode*)pOnCondition;
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
} else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) {
ASSERT(0); extractTimeCondition(pInfo, (SLogicConditionNode*) pOnCondition);
} }
pOperator->fpSet = pOperator->fpSet =
...@@ -182,3 +183,17 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { ...@@ -182,3 +183,17 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
return (pRes->info.rows > 0) ? pRes : NULL; return (pRes->info.rows > 0) ? pRes : NULL;
} }
static void extractTimeCondition(SJoinOperatorInfo *pInfo, SLogicConditionNode* pLogicConditionNode) {
int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList);
for(int32_t i = 0; i < len; ++i) {
SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i);
if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
SOperatorNode* pn1 = (SOperatorNode*)pNode;
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pn1->pLeft);
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pn1->pRight);
break;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册