diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 11e00590172794fefb58580d904a2820b04d08cc..7b3c590f07469009511987abf8f5075973657961 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -83,8 +83,6 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } else if (pJoinNode->inputTsOrder == ORDER_DESC) { pInfo->inputTsOrder = TSDB_ORDER_DESC; } - //TODO: remove this when JoinNode inputTsOrder is ready - pInfo->inputTsOrder = TSDB_ORDER_ASC; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); @@ -116,7 +114,9 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) { + +static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, + SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { SJoinOperatorInfo* pJoinInfo = pOperator->info; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { @@ -130,11 +130,11 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int SColumnInfoData* pSrc = NULL; if (pJoinInfo->pLeft->info.blockId == blockId) { - pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); - rowIndex = pJoinInfo->leftPos; + pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); + rowIndex = leftPos; } else { - pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); - rowIndex = pJoinInfo->rightPos; + pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId); + rowIndex = rightPos; } if (colDataIsNull_s(pSrc, rowIndex)) { @@ -146,60 +146,74 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int } } -static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + +static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - int32_t nrows = pRes->info.rows; + if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + SOperatorInfo* ds1 = pOperator->pDownstream[0]; + pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); - bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; - - while (1) { - // todo extract method - if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - SOperatorInfo* ds1 = pOperator->pDownstream[0]; - pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); - - pJoinInfo->leftPos = 0; - if (pJoinInfo->pLeft == NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - break; - } + pJoinInfo->leftPos = 0; + if (pJoinInfo->pLeft == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + return false; } + } - if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - SOperatorInfo* ds2 = pOperator->pDownstream[1]; - pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); + if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + SOperatorInfo* ds2 = pOperator->pDownstream[1]; + pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); - pJoinInfo->rightPos = 0; - if (pJoinInfo->pRight == NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - break; - } + pJoinInfo->rightPos = 0; + if (pJoinInfo->pRight == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + return false; } + } + // only the timestamp match support for ordinary table + SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); + char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); + *pLeftTs = *(int64_t*)pLeftVal; + + SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); + char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); + *pRightTs = *(int64_t*)pRightVal; + + ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + return true; +} + +static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; - SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); - char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); + int32_t nrows = pRes->info.rows; - SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); - char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); + bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; + + while (1) { + int64_t leftTs = 0; + int64_t rightTs = 0; + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + if (!hasNextTs) { + break; + } - // only the timestamp match support for ordinary table - ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - doJoinOneRow(pOperator, pRes, nrows); + if (leftTs == rightTs) { + mergeJoinJoinLeftRight(pOperator, pRes, nrows, + pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); pJoinInfo->leftPos += 1; pJoinInfo->rightPos += 1; nrows += 1; - } else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal || - !asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) { + } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { pJoinInfo->leftPos += 1; if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { continue; } - } else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal || - !asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) { + } else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) { pJoinInfo->rightPos += 1; if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { continue; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 186a51f0001911ef1b0144e404c6dbdc6b7af89c..c499d6e7cca7010328e38f61d2f5888602914d50 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1712,6 +1712,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) { } static const char* jkJoinPhysiPlanJoinType = "JoinType"; +static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition"; static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; @@ -1723,6 +1724,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition); } @@ -1742,7 +1746,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code); - ; + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder, code); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b6df2fe162e538cf4cbb253e35feee08de1b3796..5dc95a0f344e11df8fa955c72bb8728e7afee529 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -633,6 +633,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren int32_t code = TSDB_CODE_SUCCESS; pJoin->joinType = pJoinLogicNode->joinType; + pJoin->inputTsOrder = pJoinLogicNode->inputTsOrder; setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition, &pJoin->pMergeCondition); if (TSDB_CODE_SUCCESS == code) {