From cdcb1a368d249a72739d67b2025921a7f9633990 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 26 Jul 2022 16:05:42 +0800 Subject: [PATCH] feat: add input ts order for join operator --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/joinoperator.c | 75 +++++++++++++++---------- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4a57819eba..f1a1011ff0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -804,6 +804,7 @@ typedef struct STagFilterOperatorInfo { typedef struct SJoinOperatorInfo { SSDataBlock *pRes; int32_t joinType; + int32_t inputTsOrder; SSDataBlock *pLeft; int32_t leftPos; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index f26b2f4f0a..11e0059017 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -77,6 +77,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->pCondAfterMerge = NULL; } + pInfo->inputTsOrder = TSDB_ORDER_ASC; + if (pJoinNode->inputTsOrder == ORDER_ASC) { + pInfo->inputTsOrder = TSDB_ORDER_ASC; + } else if (pJoinNode->inputTsOrder == ORDER_DESC) { + pInfo->inputTsOrder = TSDB_ORDER_DESC; + } + //TODO: remove this when JoinNode inputTsOrder is ready + pInfo->inputTsOrder = TSDB_ORDER_ASC; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -107,11 +116,42 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } +static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); + + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; + + int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; + int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + int32_t rowIndex = -1; + + SColumnInfoData* pSrc = NULL; + if (pJoinInfo->pLeft->info.blockId == blockId) { + pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); + rowIndex = pJoinInfo->leftPos; + } else { + pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); + rowIndex = pJoinInfo->rightPos; + } + if (colDataIsNull_s(pSrc, rowIndex)) { + colDataAppendNULL(pDst, currRow); + } else { + char* p = colDataGetData(pSrc, rowIndex); + colDataAppend(pDst, currRow, p, false); + } + } + +} static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - int32_t nrows = 0; + int32_t nrows = pRes->info.rows; + + bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; while (1) { // todo extract method @@ -146,43 +186,20 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) // only the timestamp match support for ordinary table ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); - - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; - - int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; - int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; - int32_t rowIndex = -1; - - SColumnInfoData* pSrc = NULL; - if (pJoinInfo->pLeft->info.blockId == blockId) { - pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); - rowIndex = pJoinInfo->leftPos; - } else { - pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); - rowIndex = pJoinInfo->rightPos; - } - - if (colDataIsNull_s(pSrc, rowIndex)) { - colDataAppendNULL(pDst, nrows); - } else { - char* p = colDataGetData(pSrc, rowIndex); - colDataAppend(pDst, nrows, p, false); - } - } - + doJoinOneRow(pOperator, pRes, nrows); pJoinInfo->leftPos += 1; pJoinInfo->rightPos += 1; nrows += 1; - } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) { + } else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal || + !asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) { pJoinInfo->leftPos += 1; if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { continue; } - } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) { + } else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal || + !asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) { pJoinInfo->rightPos += 1; if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { continue; -- GitLab