未验证 提交 6aa05c43 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20067 from taosdata/fix/TS-2687

fix: add merge join operator result limitation
...@@ -98,6 +98,9 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const ...@@ -98,6 +98,9 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
while (newSize < pAttr->length + dataLen) { while (newSize < pAttr->length + dataLen) {
newSize = newSize * 1.5; newSize = newSize * 1.5;
if (newSize > UINT32_MAX) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize); char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
......
...@@ -24,6 +24,17 @@ ...@@ -24,6 +24,17 @@
#include "tmsg.h" #include "tmsg.h"
#include "ttypes.h" #include "ttypes.h"
typedef struct SJoinRowCtx {
bool rowRemains;
int64_t ts;
SArray* leftRowLocations;
SArray* rightRowLocations;
SArray* leftCreatedBlocks;
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
} SJoinRowCtx;
typedef struct SJoinOperatorInfo { typedef struct SJoinOperatorInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t joinType; int32_t joinType;
...@@ -37,6 +48,8 @@ typedef struct SJoinOperatorInfo { ...@@ -37,6 +48,8 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos; int32_t rightPos;
SColumnInfo rightCol; SColumnInfo rightCol;
SNode* pCondAfterMerge; SNode* pCondAfterMerge;
SJoinRowCtx rowCtx;
} SJoinOperatorInfo; } SJoinOperatorInfo;
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
...@@ -287,37 +300,86 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator ...@@ -287,37 +300,86 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) { int32_t* nRows) {
int32_t code = TSDB_CODE_SUCCESS;
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); SArray* leftRowLocations = NULL;
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); SArray* leftCreatedBlocks = NULL;
SArray* rightRowLocations = NULL;
SArray* rightCreatedBlocks = NULL;
int32_t leftRowIdx = 0;
int32_t rightRowIdx = 0;
int32_t i, j;
if (pJoinInfo->rowCtx.rowRemains) {
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
} else {
leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
}
size_t leftNumJoin = taosArrayGetSize(leftRowLocations); size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations); size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin); uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
uint32_t limitRowNum = maxRowNum;
if (maxRowNum > pOperator->resultInfo.threshold) {
limitRowNum = pOperator->resultInfo.threshold;
if (!pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
}
}
code = blockDataEnsureCapacity(pRes, limitRowNum);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin, rightNumJoin); leftNumJoin, rightNumJoin);
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < leftNumJoin; ++i) { bool done = false;
for (int32_t j = 0; j < rightNumJoin; ++j) { for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
for (j = rightRowIdx; j < rightNumJoin; ++j) {
if (*nRows >= limitRowNum) {
done = true;
break;
}
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos); rightRow->pos);
++*nRows; ++*nRows;
} }
if (done) {
break;
}
}
if (maxRowNum > pOperator->resultInfo.threshold) {
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
} }
} }
if (maxRowNum <= pOperator->resultInfo.threshold) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
...@@ -330,6 +392,15 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t ...@@ -330,6 +392,15 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
} }
taosArrayDestroy(leftCreatedBlocks); taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations); taosArrayDestroy(leftRowLocations);
if (pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.rightRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -379,22 +450,27 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) ...@@ -379,22 +450,27 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
while (1) { while (1) {
int64_t leftTs = 0; int64_t leftTs = 0;
int64_t rightTs = 0; int64_t rightTs = 0;
if (pJoinInfo->rowCtx.rowRemains) {
leftTs = pJoinInfo->rowCtx.ts;
rightTs = pJoinInfo->rowCtx.ts;
} else {
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) { if (!hasNextTs) {
break; break;
} }
}
if (leftTs == rightTs) { if (leftTs == rightTs) {
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue; continue;
} }
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1; pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue; continue;
} }
} }
......
...@@ -179,6 +179,7 @@ ...@@ -179,6 +179,7 @@
,,y,script,./test.sh -f tsim/query/sys_tbname.sim ,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/forceFill.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim
,,n,script,./test.sh -f tsim/query/join.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = db
$tbPrefix1 = tba
$tbPrefix2 = tbb
$mtPrefix = stb
$tbNum = 10000
$rowNum = 2
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt1 = $mtPrefix . $i
$i = 1
$mt2 = $mtPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
sql create table $mt2 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
print ====== start create child tables and insert data
$i = 0
while $i < $tbNum
$tb = $tbPrefix1 . $i
sql create table $tb using $mt1 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
print =============== step2
$i = 0
while $i < $tbNum
$tb = $tbPrefix2 . $i
sql create table $tb using $mt2 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sql select * from tba0 t1, tbb0 t2 where t1.ts=t2.ts;
if $rows != 2 then
return -1
endi
sql select * from stb0 t1, stb1 t2 where t1.ts=t2.ts and t1.tag2=t2.tag2;
if $rows != 200000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册