diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7b3c590f07469009511987abf8f5075973657961..7f7bd4a3efa0abb3a20a905ca9efac890a506091 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -146,6 +146,39 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* } } +typedef struct SRowLocation { + SSDataBlock* pDataBlock; + int32_t pos; +} SRowLocation; + +static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, + SArray* pPosArray) { + int32_t numRows = pBlock->info.rows; + ASSERT(startPos < numRows); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t i = startPos; + char* pVal = colDataGetData(pCol, i); + for (i = startPos + 1; i < numRows; ++i) { + char* pNextVal = colDataGetData(pCol, i); + if (*(int64_t*)pVal != *(int64_t*)pNextVal) { + break; + } + } + int32_t endPos = i; + + SSDataBlock* block = pBlock; + if (endPos - startPos > 1) { + block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); + } + SRowLocation location = {0}; + for (int32_t j = startPos; j < endPos; ++j) { + location.pDataBlock = block; + location.pos = j; + taosArrayPush(pPosArray, &location); + } + return 0; +} static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SJoinOperatorInfo* pJoinInfo = pOperator->info;