diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 86dcd1eceb9e26dbf0da4ed02481287e851ceb79..b8bd6c09bc455b2c5ea535c7eb3902f3e6df984c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -94,6 +94,9 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con while (newSize < pAttr->length + dataLen) { newSize = newSize * 1.5; + if (newSize > UINT32_MAX) { + return TSDB_CODE_OUT_OF_MEMORY; + } } char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 88ed9eccb323561718a3a7cc1bc8dfd737bef53c..7a2277ed9ce81d43d9c6f51393c4b84770e0b9e5 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -24,6 +24,17 @@ #include "tmsg.h" #include "ttypes.h" +typedef struct SJoinRowCtx { + bool rowRemains; + int64_t ts; + SArray* leftRowLocations; + SArray* rightRowLocations; + SArray* leftCreatedBlocks; + SArray* rightCreatedBlocks; + int32_t leftRowIdx; + int32_t rightRowIdx; +} SJoinRowCtx; + typedef struct SJoinOperatorInfo { SSDataBlock* pRes; int32_t joinType; @@ -37,6 +48,8 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode* pCondAfterMerge; + + SJoinRowCtx rowCtx; } SJoinOperatorInfo; static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); @@ -287,49 +300,107 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, int32_t* nRows) { + int32_t code = TSDB_CODE_SUCCESS; SJoinOperatorInfo* pJoinInfo = pOperator->info; - SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); - SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + SArray* leftRowLocations = NULL; + SArray* leftCreatedBlocks = NULL; + SArray* rightRowLocations = NULL; + SArray* rightCreatedBlocks = NULL; + int32_t leftRowIdx = 0; + int32_t rightRowIdx = 0; + int32_t i, j; + + if (pJoinInfo->rowCtx.rowRemains) { + leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; + leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; + rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; + rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; + leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; + rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; + } else { + leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); - SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - int32_t code = TSDB_CODE_SUCCESS; - mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, - pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); - mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, - pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, + pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, + pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + } + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); size_t rightNumJoin = taosArrayGetSize(rightRowLocations); - code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin); + uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx; + uint32_t limitRowNum = maxRowNum; + if (maxRowNum > pOperator->resultInfo.threshold) { + limitRowNum = pOperator->resultInfo.threshold; + if (!pJoinInfo->rowCtx.rowRemains) { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.ts = timestamp; + pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; + pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; + pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; + pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; + } + } + + code = blockDataEnsureCapacity(pRes, limitRowNum); if (code != TSDB_CODE_SUCCESS) { qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), leftNumJoin, rightNumJoin); } + + if (code == TSDB_CODE_SUCCESS) { - for (int32_t i = 0; i < leftNumJoin; ++i) { - for (int32_t j = 0; j < rightNumJoin; ++j) { + bool done = false; + for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { + for (j = rightRowIdx; j < rightNumJoin; ++j) { + if (*nRows >= limitRowNum) { + done = true; + break; + } + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, rightRow->pos); ++*nRows; } + if (done) { + break; + } } - } - for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); - blockDataDestroy(pBlock); + if (maxRowNum > pOperator->resultInfo.threshold) { + pJoinInfo->rowCtx.leftRowIdx = i; + pJoinInfo->rowCtx.rightRowIdx = j; + } } - taosArrayDestroy(rightCreatedBlocks); - taosArrayDestroy(rightRowLocations); - for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); - blockDataDestroy(pBlock); + + if (maxRowNum <= pOperator->resultInfo.threshold) { + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + taosArrayDestroy(rightRowLocations); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + + if (pJoinInfo->rowCtx.rowRemains) { + pJoinInfo->rowCtx.rowRemains = false; + pJoinInfo->rowCtx.leftRowLocations = NULL; + pJoinInfo->rowCtx.rightRowLocations = NULL; + pJoinInfo->rowCtx.leftCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightCreatedBlocks = NULL; + } } - taosArrayDestroy(leftCreatedBlocks); - taosArrayDestroy(leftRowLocations); return TSDB_CODE_SUCCESS; } @@ -379,9 +450,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) while (1) { int64_t leftTs = 0; int64_t rightTs = 0; - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); - if (!hasNextTs) { - break; + if (pJoinInfo->rowCtx.rowRemains) { + leftTs = pJoinInfo->rowCtx.ts; + rightTs = pJoinInfo->rowCtx.ts; + } else { + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + if (!hasNextTs) { + break; + } } if (leftTs == rightTs) { @@ -389,12 +465,12 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { pJoinInfo->leftPos += 1; - if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { continue; } } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { pJoinInfo->rightPos += 1; - if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { continue; } } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 16751423b173b82601f4f490cead5eb62b94623a..af3cd6206b47e917bafab1ae9fcd94c65f8d2cd0 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -179,6 +179,7 @@ ,,y,script,./test.sh -f tsim/query/sys_tbname.sim ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim +,,y,script,./test.sh -f tsim/query/join.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/tsim/query/join.sim b/tests/script/tsim/query/join.sim new file mode 100644 index 0000000000000000000000000000000000000000..adb0338ef704ebf257a176f5c9772c5aecac08c4 --- /dev/null +++ b/tests/script/tsim/query/join.sim @@ -0,0 +1,72 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +$dbPrefix = db +$tbPrefix1 = tba +$tbPrefix2 = tbb +$mtPrefix = stb +$tbNum = 10000 +$rowNum = 2 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt1 = $mtPrefix . $i +$i = 1 +$mt2 = $mtPrefix . $i + +sql drop database $db -x step1 +step1: +sql create database $db +sql use $db +sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500)) +sql create table $mt2 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500)) + +print ====== start create child tables and insert data +$i = 0 +while $i < $tbNum + $tb = $tbPrefix1 . $i + sql create table $tb using $mt1 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa') + + $x = 0 + while $x < $rowNum + $cc = $x * 60000 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +print =============== step2 +$i = 0 +while $i < $tbNum + $tb = $tbPrefix2 . $i + sql create table $tb using $mt2 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa') + + $x = 0 + while $x < $rowNum + $cc = $x * 60000 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +sql select * from tba0 t1, tbb0 t2 where t1.ts=t2.ts; +if $rows != 2 then + return -1 +endi +sql select * from stb0 t1, stb1 t2 where t1.ts=t2.ts and t1.tag2=t2.tag2; +if $rows != 200000000 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT