提交 968c538b 编写于 作者: D dapan1121

fix: add merge join operator result limitation

上级 16bc8cb5
......@@ -94,6 +94,9 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
while (newSize < pAttr->length + dataLen) {
newSize = newSize * 1.5;
if (newSize > UINT32_MAX) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
char* buf = taosMemoryRealloc(pColumnInfoData->pData, newSize);
......
......@@ -24,6 +24,17 @@
#include "tmsg.h"
#include "ttypes.h"
typedef struct SJoinRowCtx {
bool rowRemains;
int64_t ts;
SArray* leftRowLocations;
SArray* rightRowLocations;
SArray* leftCreatedBlocks;
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
} SJoinRowCtx;
typedef struct SJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
......@@ -37,6 +48,8 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
SJoinRowCtx rowCtx;
} SJoinOperatorInfo;
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
......@@ -287,49 +300,107 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
int32_t* nRows) {
int32_t code = TSDB_CODE_SUCCESS;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
SArray* leftRowLocations = NULL;
SArray* leftCreatedBlocks = NULL;
SArray* rightRowLocations = NULL;
SArray* rightCreatedBlocks = NULL;
int32_t leftRowIdx = 0;
int32_t rightRowIdx = 0;
int32_t i, j;
if (pJoinInfo->rowCtx.rowRemains) {
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
} else {
leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
int32_t code = TSDB_CODE_SUCCESS;
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
}
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
code = blockDataEnsureCapacity(pRes, *nRows + leftNumJoin * rightNumJoin);
uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx;
uint32_t limitRowNum = maxRowNum;
if (maxRowNum > pOperator->resultInfo.threshold) {
limitRowNum = pOperator->resultInfo.threshold;
if (!pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = true;
pJoinInfo->rowCtx.ts = timestamp;
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
}
}
code = blockDataEnsureCapacity(pRes, limitRowNum);
if (code != TSDB_CODE_SUCCESS) {
qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo),
leftNumJoin, rightNumJoin);
}
if (code == TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < leftNumJoin; ++i) {
for (int32_t j = 0; j < rightNumJoin; ++j) {
bool done = false;
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
for (j = rightRowIdx; j < rightNumJoin; ++j) {
if (*nRows >= limitRowNum) {
done = true;
break;
}
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
rightRow->pos);
++*nRows;
}
if (done) {
break;
}
}
}
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
if (maxRowNum > pOperator->resultInfo.threshold) {
pJoinInfo->rowCtx.leftRowIdx = i;
pJoinInfo->rowCtx.rightRowIdx = j;
}
}
taosArrayDestroy(rightCreatedBlocks);
taosArrayDestroy(rightRowLocations);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
if (maxRowNum <= pOperator->resultInfo.threshold) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(rightCreatedBlocks);
taosArrayDestroy(rightRowLocations);
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
if (pJoinInfo->rowCtx.rowRemains) {
pJoinInfo->rowCtx.rowRemains = false;
pJoinInfo->rowCtx.leftRowLocations = NULL;
pJoinInfo->rowCtx.rightRowLocations = NULL;
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
}
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
return TSDB_CODE_SUCCESS;
}
......@@ -379,9 +450,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
while (1) {
int64_t leftTs = 0;
int64_t rightTs = 0;
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) {
break;
if (pJoinInfo->rowCtx.rowRemains) {
leftTs = pJoinInfo->rowCtx.ts;
rightTs = pJoinInfo->rowCtx.ts;
} else {
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) {
break;
}
}
if (leftTs == rightTs) {
......@@ -389,12 +465,12 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
}
......
......@@ -179,6 +179,7 @@
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/forceFill.sim
,,y,script,./test.sh -f tsim/query/join.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = db
$tbPrefix1 = tba
$tbPrefix2 = tbb
$mtPrefix = stb
$tbNum = 10000
$rowNum = 2
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt1 = $mtPrefix . $i
$i = 1
$mt2 = $mtPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db
sql use $db
sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
sql create table $mt2 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
print ====== start create child tables and insert data
$i = 0
while $i < $tbNum
$tb = $tbPrefix1 . $i
sql create table $tb using $mt1 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
print =============== step2
$i = 0
while $i < $tbNum
$tb = $tbPrefix2 . $i
sql create table $tb using $mt2 tags( $i , 'aaaaaaaaaaaaaaaaaaaaaaaaaaa')
$x = 0
while $x < $rowNum
$cc = $x * 60000
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x )
$x = $x + 1
endw
$i = $i + 1
endw
sql select * from tba0 t1, tbb0 t2 where t1.ts=t2.ts;
if $rows != 2 then
return -1
endi
sql select * from stb0 t1, stb1 t2 where t1.ts=t2.ts and t1.tag2=t2.tag2;
if $rows != 200000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册