From 3469117732e7bc8781b45a13968ed418545ee0c2 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Wed, 27 Jul 2022 15:12:23 +0800 Subject: [PATCH] fix: add input ts order to physical plan join node --- source/libs/executor/src/joinoperator.c | 10 +++------- source/libs/nodes/src/nodesCodeFuncs.c | 8 +++++++- source/libs/planner/src/planPhysiCreater.c | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 895899d068..7b3c590f07 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -83,8 +83,6 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } else if (pJoinNode->inputTsOrder == ORDER_DESC) { pInfo->inputTsOrder = TSDB_ORDER_DESC; } - //TODO: remove this when JoinNode inputTsOrder is ready - pInfo->inputTsOrder = TSDB_ORDER_ASC; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); @@ -201,7 +199,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) if (!hasNextTs) { break; } - + if (leftTs == rightTs) { mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); @@ -209,15 +207,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) pJoinInfo->rightPos += 1; nrows += 1; - } else if (asc && leftTs < rightTs || - !asc && leftTs > rightTs) { + } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { pJoinInfo->leftPos += 1; if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { continue; } - } else if (asc && leftTs > rightTs || - !asc && leftTs < rightTs) { + } else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) { pJoinInfo->rightPos += 1; if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { continue; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 186a51f000..c499d6e7cc 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1712,6 +1712,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { } static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; @@ -1723,6 +1724,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); } @@ -1742,7 +1746,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); - ; + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder, code); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 587e566939..1c57859f1a 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -632,6 +632,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren int32_t code = TSDB_CODE_SUCCESS; pJoin->joinType = pJoinLogicNode->joinType; + pJoin->inputTsOrder = pJoinLogicNode->inputTsOrder; setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, &pJoin->pMergeCondition); if (TSDB_CODE_SUCCESS == code) { -- GitLab