diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7b3c590f07469009511987abf8f5075973657961..8902804fab478e906484be5d54d0cd636d18b814 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { } static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, - SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { + SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, + int32_t rightPos) { SJoinOperatorInfo* pJoinInfo = pOperator->info; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { @@ -129,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; - if (pJoinInfo->pLeft->info.blockId == blockId) { + if (pLeftBlock->info.blockId == blockId) { pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); rowIndex = leftPos; } else { @@ -144,7 +145,128 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* colDataAppend(pDst, currRow, p, false); } } +} +typedef struct SRowLocation { + SSDataBlock* pDataBlock; + int32_t pos; +} SRowLocation; + +// pBlock[tsSlotId][startPos, endPos) == timestamp, +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, + int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { + int32_t numRows = pBlock->info.rows; + ASSERT(startPos < numRows); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); + + int32_t i = startPos; + for (; i < numRows; ++i) { + char* pNextVal = colDataGetData(pCol, i); + if (timestamp != *(int64_t*)pNextVal) { + break; + } + } + int32_t endPos = i; + *pEndPos = endPos; + + if (endPos - startPos == 0) { + return 0; + } + + SSDataBlock* block = pBlock; + bool createdNewBlock = false; + if (endPos == numRows) { + block = blockDataExtractBlock(pBlock, startPos, endPos-startPos); + taosArrayPush(createdBlocks, &block); + createdNewBlock = true; + } + SRowLocation location = {0}; + for (int32_t j = startPos; j < endPos; ++j) { + location.pDataBlock = block; + location.pos = ( createdNewBlock ? j - startPos : j); + taosArrayPush(rowLocations, &location); + } + return 0; +} + +// whichChild == 0, left child of join; whichChild ==1, right child of join +static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId, + SSDataBlock* startDataBlock, int32_t startPos, + int64_t timestamp, SArray* rowLocations, + SArray* createdBlocks) { + ASSERT(whichChild == 0 || whichChild == 1); + + SJoinOperatorInfo* pJoinInfo = pOperator->info; + int32_t endPos = -1; + SSDataBlock* dataBlock = startDataBlock; + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); + while (endPos == dataBlock->info.rows) { + SOperatorInfo* ds = pOperator->pDownstream[whichChild]; + dataBlock = ds->fpSet.getNextFn(ds); + if (whichChild == 0) { + pJoinInfo->leftPos = 0; + pJoinInfo->pLeft = dataBlock; + } else if (whichChild == 1) { + pJoinInfo->rightPos = 0; + pJoinInfo->pRight = dataBlock; + } + + if (dataBlock == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + endPos = -1; + break; + } + + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks); + } + if (endPos != -1) { + if (whichChild == 0) { + pJoinInfo->leftPos = endPos; + } else if (whichChild == 1) { + pJoinInfo->rightPos = endPos; + } + } + return 0; +} + +static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, + int32_t* nRows) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, + pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, + pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + size_t rightNumJoin = taosArrayGetSize(rightRowLocations); + for (int32_t i = 0; i < leftNumJoin; ++i) { + for (int32_t j = 0; j < rightNumJoin; ++j) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + } + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + taosArrayDestroy(rightRowLocations); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + return TSDB_CODE_SUCCESS; } static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { @@ -195,18 +317,15 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) while (1) { int64_t leftTs = 0; int64_t rightTs = 0; - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); if (!hasNextTs) { break; } if (leftTs == rightTs) { - mergeJoinJoinLeftRight(pOperator, pRes, nrows, - pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); - pJoinInfo->leftPos += 1; - pJoinInfo->rightPos += 1; - - nrows += 1; + mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, + pJoinInfo->rightPos); + mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { pJoinInfo->leftPos += 1; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9a82b194a983b7d3ea188046924298cf1563738e..ed1580ed911e107dc4a8c8dcdb6179c8b1d466e5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2098,9 +2098,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); switch (pSliceInfo->fillType) { - case TSDB_FILL_NULL: + case TSDB_FILL_NULL: { colDataAppendNULL(pDst, rows); + pResBlock->info.rows += 1; break; + } case TSDB_FILL_SET_VALUE: { SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; @@ -2118,9 +2120,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); colDataAppend(pDst, rows, (char*)&v, false); } - } break; + pResBlock->info.rows += 1; + break; + } - case TSDB_FILL_LINEAR: + case TSDB_FILL_LINEAR: { #if 0 if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { @@ -2151,17 +2155,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } } #endif + // TODO: pResBlock->info.rows += 1; break; - + } case TSDB_FILL_PREV: { SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); colDataAppend(pDst, rows, pkey->pData, false); - } break; + pResBlock->info.rows += 1; + break; + } case TSDB_FILL_NEXT: { char* p = colDataGetData(pSrc, rowIndex); colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex)); - } break; + pResBlock->info.rows += 1; + break; + } case TSDB_FILL_NONE: default: @@ -2169,7 +2178,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } } - pResBlock->info.rows += 1; } static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -2221,6 +2229,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; + blockDataCleanup(pResBlock); + int32_t numOfRows = 0; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); diff --git a/tests/script/tsim/parser/join_multivnode.sim b/tests/script/tsim/parser/join_multivnode.sim index c33fa85fa255c732e7b358e2d9014d520a6beaac..f1204326d3c9de769b1fa68b4ce6c725478a18bf 100644 --- a/tests/script/tsim/parser/join_multivnode.sim +++ b/tests/script/tsim/parser/join_multivnode.sim @@ -98,6 +98,11 @@ while $i < $tbNum endw print ===============multivnode projection join.sim +sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts; +print ===> rows $row +if $row != 9000 then + print expect 9000, actual: $row +endi sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; print ===> rows $row if $row != 3000 then diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index 2348873a34283572116e6eb97760733d400c6914..9d30e1946a16c487f22e43fe03461d559dc7c945 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -377,11 +377,11 @@ class TDTestCase: tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") - tdSql.checkRows(self.rows) + tdSql.checkRows(self.rows + int(self.rows * 0.6 //3)+ int(self.rows * 0.8 // 4)) tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows + 3) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts") - tdSql.checkRows(self.rows * 3 + 6) + tdSql.checkRows(50) tdSql.query("select count(*) from db.ct1") tdSql.checkData(0, 0, self.rows) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index a136d0a1a2137ebfd9311157afc4b394f0f92829..00b0aed5ee6277c6dc5e79b3c0b112135f4c7ac6 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -43,9 +43,9 @@ class TDTestCase: tdLog.exit("compare error: %s != %s"%src, dst) else: break - + tdSql.execute('use db_taosx') - tdSql.query("select * from ct3") + tdSql.query("select * from ct3 order by c1 desc") tdSql.checkRows(2) tdSql.checkData(0, 1, 51) tdSql.checkData(0, 4, 940) @@ -58,17 +58,17 @@ class TDTestCase: tdSql.query("select * from ct2") tdSql.checkRows(0) - tdSql.query("select * from ct0") + tdSql.query("select * from ct0 order by c1") tdSql.checkRows(2) tdSql.checkData(0, 3, "a") tdSql.checkData(1, 4, None) - tdSql.query("select * from n1") + tdSql.query("select * from n1 order by cc3 desc") tdSql.checkRows(2) tdSql.checkData(0, 1, "eeee") tdSql.checkData(1, 2, 940) - tdSql.query("select * from jt") + tdSql.query("select * from jt order by i desc") tdSql.checkRows(2) tdSql.checkData(0, 1, 11) tdSql.checkData(0, 2, None)