diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index a5591b22b30e8755b303277105a72d2a02a672e4..54ddab9e1e44e99051207c0da860fdba81cdee6a 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -34,7 +34,9 @@ typedef struct SJoinRowCtx { SArray* rightCreatedBlocks; int32_t leftRowIdx; int32_t rightRowIdx; + SSHashObj* buildTableTSRange; + SArray* rightRowLocations; } SJoinRowCtx; typedef struct SJoinOperatorInfo { @@ -306,12 +308,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; - pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); - pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); - extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); - initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); - initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); - + if (pInfo->pTagEqualConditions != NULL) { + pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); + extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); + initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { @@ -345,15 +348,15 @@ static void freeGroupKeyData(void* param) { void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + if (pJoinOperator->pTagEqualConditions != NULL) { + taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->rightTagCols); - taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); - taosArrayDestroy(pJoinOperator->rightTagCols); - - taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); - taosArrayDestroy(pJoinOperator->leftTagCols); - + taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->leftTagCols); + } nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); @@ -496,7 +499,7 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, const SArray* leftRowLocations, int32_t leftRowIdx, - int32_t rightRowIdx, SSHashObj* rightTableHash, bool* pReachThreshold) { + int32_t rightRowIdx, SSHashObj* rightTableHash, SArray* rightRowLocations, bool* pReachThreshold) { *pReachThreshold = false; uint32_t limitRowNum = pOperator->resultInfo.threshold; SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -506,13 +509,18 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); - int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); - SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); - if (!ppRightRows) { - continue; + SArray* pRightRows = NULL; + if (rightTableHash != NULL) { + fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); + int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); + SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); + if (!ppRightRows) { + continue; + } + pRightRows = *ppRightRows; + } else { + pRightRows = rightRowLocations; } - SArray* pRightRows = *ppRightRows; size_t rightRowsSize = taosArrayGetSize(pRightRows); for (j = rightRowIdx; j < rightRowsSize; ++j) { if (*nRows >= limitRowNum) { @@ -551,7 +559,7 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { } static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, - SArray* rightCreatedBlocks, SSHashObj* rightTableHash) { + SArray* rightCreatedBlocks, SSHashObj* rightTableHash, SArray* rightRowLocations) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); blockDataDestroy(pBlock); @@ -561,7 +569,12 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); blockDataDestroy(pBlock); } - mergeJoinDestoryBuildTable(rightTableHash); + if (rightRowLocations != NULL) { + taosArrayDestroy(rightRowLocations); + } + if (rightTableHash != NULL) { + mergeJoinDestoryBuildTable(rightTableHash); + } taosArrayDestroy(leftCreatedBlocks); taosArrayDestroy(leftRowLocations); @@ -571,6 +584,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef pJoinInfo->rowCtx.leftCreatedBlocks = NULL; pJoinInfo->rowCtx.rightCreatedBlocks = NULL; pJoinInfo->rowCtx.buildTableTSRange = NULL; + pJoinInfo->rowCtx.rightRowLocations = NULL; } static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, @@ -578,6 +592,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t int32_t code = TSDB_CODE_SUCCESS; SJoinOperatorInfo* pJoinInfo = pOperator->info; SArray* leftRowLocations = NULL; + SArray* rightRowLocations = NULL; SArray* leftCreatedBlocks = NULL; SArray* rightCreatedBlocks = NULL; int32_t leftRowIdx = 0; @@ -588,6 +603,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; rightTableHash = pJoinInfo->rowCtx.buildTableTSRange; + rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; @@ -602,8 +618,11 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); - mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); - taosArrayDestroy(rightRowLocations); + if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { + mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); + taosArrayDestroy(rightRowLocations); + rightRowLocations = NULL; + } } size_t leftNumJoin = taosArrayGetSize(leftRowLocations); @@ -617,12 +636,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t if (code == TSDB_CODE_SUCCESS) { mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, rightTableHash, &reachThreshold); + rightRowIdx, rightTableHash, rightRowLocations, &reachThreshold); } if (!reachThreshold) { mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - rightTableHash); + rightTableHash, rightRowLocations); } else { pJoinInfo->rowCtx.rowRemains = true; @@ -631,6 +650,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; pJoinInfo->rowCtx.buildTableTSRange = rightTableHash; + pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; } return TSDB_CODE_SUCCESS; }