提交 629ab2b8 编写于 作者: S shenglian zhou

fix: extract tag equal condition

上级 51d70ca2
...@@ -112,6 +112,7 @@ typedef struct SJoinLogicNode { ...@@ -112,6 +112,7 @@ typedef struct SJoinLogicNode {
SNode* pOnConditions; SNode* pOnConditions;
bool isSingleTableJoin; bool isSingleTableJoin;
EOrder inputTsOrder; EOrder inputTsOrder;
SNode* pTagEqualConditions;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
...@@ -405,6 +406,7 @@ typedef struct SSortMergeJoinPhysiNode { ...@@ -405,6 +406,7 @@ typedef struct SSortMergeJoinPhysiNode {
SNode* pOnConditions; SNode* pOnConditions;
SNodeList* pTargets; SNodeList* pTargets;
EOrder inputTsOrder; EOrder inputTsOrder;
SNode* pTagEqualCondtions;
} SSortMergeJoinPhysiNode; } SSortMergeJoinPhysiNode;
typedef struct SAggPhysiNode { typedef struct SAggPhysiNode {
......
...@@ -401,6 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { ...@@ -401,6 +401,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_SCALAR_FIELD(joinType); COPY_SCALAR_FIELD(joinType);
CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pMergeCondition);
CLONE_NODE_FIELD(pOnConditions); CLONE_NODE_FIELD(pOnConditions);
CLONE_NODE_FIELD(pTagEqualConditions);
COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1416,6 +1416,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) { ...@@ -1416,6 +1416,7 @@ static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
static const char* jkJoinLogicPlanJoinType = "JoinType"; static const char* jkJoinLogicPlanJoinType = "JoinType";
static const char* jkJoinLogicPlanOnConditions = "OnConditions"; static const char* jkJoinLogicPlanOnConditions = "OnConditions";
static const char* jkJoinLogicPlanMergeCondition = "MergeConditions"; static const char* jkJoinLogicPlanMergeCondition = "MergeConditions";
static const char* jkJoinLogicPlanTagEqualConditions = "TagEqualConditions";
static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj; const SJoinLogicNode* pNode = (const SJoinLogicNode*)pObj;
...@@ -1430,7 +1431,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1430,7 +1431,9 @@ static int32_t logicJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions); code = tjsonAddObject(pJson, jkJoinLogicPlanOnConditions, nodeToJson, pNode->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinLogicPlanTagEqualConditions, nodeToJson, pNode->pTagEqualConditions);
}
return code; return code;
} }
...@@ -1447,7 +1450,9 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) { ...@@ -1447,7 +1450,9 @@ static int32_t jsonToLogicJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions); code = jsonToNodeObject(pJson, jkJoinLogicPlanOnConditions, &pNode->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinLogicPlanTagEqualConditions, &pNode->pTagEqualConditions);
}
return code; return code;
} }
...@@ -1878,6 +1883,7 @@ static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; ...@@ -1878,6 +1883,7 @@ static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition";
static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
static const char* jkJoinPhysiPlanTargets = "Targets"; static const char* jkJoinPhysiPlanTargets = "Targets";
static const char* jkJoinPhysiPlanTagEqualConditions = "TagEqualConditions";
static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
...@@ -1898,7 +1904,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1898,7 +1904,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets); code = nodeListToJson(pJson, jkJoinPhysiPlanTargets, pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanTagEqualConditions, nodeToJson, pNode->pTagEqualCondtions);
}
return code; return code;
} }
...@@ -1921,7 +1929,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { ...@@ -1921,7 +1929,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets); code = jsonToNodeList(pJson, jkJoinPhysiPlanTargets, &pNode->pTargets);
} }
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanTagEqualConditions, &pNode->pTagEqualCondtions);
}
return code; return code;
} }
......
...@@ -2317,7 +2317,8 @@ enum { ...@@ -2317,7 +2317,8 @@ enum {
PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION, PHY_SORT_MERGE_JOIN_CODE_MERGE_CONDITION,
PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS, PHY_SORT_MERGE_JOIN_CODE_ON_CONDITIONS,
PHY_SORT_MERGE_JOIN_CODE_TARGETS, PHY_SORT_MERGE_JOIN_CODE_TARGETS,
PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER,
PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS
}; };
static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
...@@ -2339,7 +2340,9 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { ...@@ -2339,7 +2340,9 @@ static int32_t physiJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder); code = tlvEncodeEnum(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER, pNode->inputTsOrder);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS, nodeToMsg, pNode->pTagEqualCondtions);
}
return code; return code;
} }
...@@ -2368,6 +2371,9 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) { ...@@ -2368,6 +2371,9 @@ static int32_t msgToPhysiJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER: case PHY_SORT_MERGE_JOIN_CODE_INPUT_TS_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder)); code = tlvDecodeEnum(pTlv, &pNode->inputTsOrder, sizeof(pNode->inputTsOrder));
break; break;
case PHY_SORT_MERGE_JOIN_CODE_TAG_EQUAL_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pTagEqualCondtions);
break;
default: default:
break; break;
} }
......
...@@ -1072,6 +1072,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1072,6 +1072,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyNode(pLogicNode->pMergeCondition); nodesDestroyNode(pLogicNode->pMergeCondition);
nodesDestroyNode(pLogicNode->pOnConditions); nodesDestroyNode(pLogicNode->pOnConditions);
nodesDestroyNode(pLogicNode->pTagEqualConditions);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_AGG: { case QUERY_NODE_LOGIC_PLAN_AGG: {
...@@ -1204,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -1204,6 +1205,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pMergeCondition);
nodesDestroyNode(pPhyNode->pOnConditions); nodesDestroyNode(pPhyNode->pOnConditions);
nodesDestroyList(pPhyNode->pTargets); nodesDestroyList(pPhyNode->pTargets);
nodesDestroyNode(pPhyNode->pTagEqualCondtions);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
......
...@@ -740,6 +740,75 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin ...@@ -740,6 +740,75 @@ static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoin
return code; return code;
} }
static bool pushDownCondOptIsTag(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pNode;
if (COLUMN_TYPE_TAG != pCol->colType) {
return false;
}
return pushDownCondOptBelongThisTable(pNode, pTableCols);
}
static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false;
}
SOperatorNode* pOper = (SOperatorNode*)pCond;
if (OP_TYPE_EQUAL != pOper->opType) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) {
return pushDownCondOptIsTag(pOper->pRight, pRightCols);
} else if (pushDownCondOptIsTag(pOper->pLeft, pRightCols)) {
return pushDownCondOptIsTag(pOper->pRight, pLeftCols);
}
return false;
}
static int32_t pushDownCondOptJoinExtractTagEqualLogicCond(SJoinLogicNode* pJoin) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions);
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pTagEqualConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (pushDownCondOptIsTagEqualCond(pJoin, pCond)) {
code = nodesListMakeAppend(&pTagEqualConds, nodesCloneNode(pCond));
}
}
SNode* pTempTagEqCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempTagEqCond, &pTagEqualConds);
}
if (TSDB_CODE_SUCCESS == code) {
pJoin->pTagEqualConditions = pTempTagEqCond;
return TSDB_CODE_SUCCESS;
} else {
nodesDestroyList(pTagEqualConds);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static int32_t pushDownCondOptJoinExtractTagEqualCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pOnConditions))->condType) {
return pushDownCondOptJoinExtractTagEqualLogicCond(pJoin);
}
if (pushDownCondOptIsTagEqualCond(pJoin, pJoin->pOnConditions)) {
pJoin->pTagEqualConditions = nodesCloneNode(pJoin->pOnConditions);
}
return TSDB_CODE_SUCCESS;
}
static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -774,6 +843,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p ...@@ -774,6 +843,10 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin); code = pushDownCondOptJoinExtractMergeCond(pCxt, pJoin);
} }
if (TSDB_CODE_SUCCESS == code) {
code = pushDownCondOptJoinExtractTagEqualCond(pCxt, pJoin);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true; pCxt->optimized = true;
......
...@@ -705,6 +705,9 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren ...@@ -705,6 +705,9 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions); pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pTagEqualConditions) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqualConditions, &pJoin->pTagEqualCondtions);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册