提交 568725a2 编写于 作者: S shenglian zhou

fix: using right rows to build hash table when the tag equal cond is not null...

fix: using right rows to build hash table when the tag equal cond is not null and same ts row row number is greater than 16
上级 a51de5cc
......@@ -34,7 +34,9 @@ typedef struct SJoinRowCtx {
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
SSHashObj* buildTableTSRange;
SArray* rightRowLocations;
} SJoinRowCtx;
typedef struct SJoinOperatorInfo {
......@@ -306,12 +308,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions;
pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn));
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
if (pInfo->pTagEqualConditions != NULL) {
pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn));
pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn));
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
......@@ -345,15 +348,15 @@ static void freeGroupKeyData(void* param) {
void destroyMergeJoinOperator(void* param) {
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
if (pJoinOperator->pTagEqualConditions != NULL) {
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->rightTagCols);
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->rightTagCols);
taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->leftTagCols);
taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf);
taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData);
taosArrayDestroy(pJoinOperator->leftTagCols);
}
nodesDestroyNode(pJoinOperator->pCondAfterMerge);
pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes);
......@@ -496,7 +499,7 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right
static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
const SArray* leftRowLocations, int32_t leftRowIdx,
int32_t rightRowIdx, SSHashObj* rightTableHash, bool* pReachThreshold) {
int32_t rightRowIdx, SSHashObj* rightTableHash, SArray* rightRowLocations, bool* pReachThreshold) {
*pReachThreshold = false;
uint32_t limitRowNum = pOperator->resultInfo.threshold;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
......@@ -506,13 +509,18 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos);
int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys);
SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen);
if (!ppRightRows) {
continue;
SArray* pRightRows = NULL;
if (rightTableHash != NULL) {
fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos);
int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys);
SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen);
if (!ppRightRows) {
continue;
}
pRightRows = *ppRightRows;
} else {
pRightRows = rightRowLocations;
}
SArray* pRightRows = *ppRightRows;
size_t rightRowsSize = taosArrayGetSize(pRightRows);
for (j = rightRowIdx; j < rightRowsSize; ++j) {
if (*nRows >= limitRowNum) {
......@@ -551,7 +559,7 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
}
static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
SArray* rightCreatedBlocks, SSHashObj* rightTableHash) {
SArray* rightCreatedBlocks, SSHashObj* rightTableHash, SArray* rightRowLocations) {
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
blockDataDestroy(pBlock);
......@@ -561,7 +569,12 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
blockDataDestroy(pBlock);
}
mergeJoinDestoryBuildTable(rightTableHash);
if (rightRowLocations != NULL) {
taosArrayDestroy(rightRowLocations);
}
if (rightTableHash != NULL) {
mergeJoinDestoryBuildTable(rightTableHash);
}
taosArrayDestroy(leftCreatedBlocks);
taosArrayDestroy(leftRowLocations);
......@@ -571,6 +584,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
pJoinInfo->rowCtx.buildTableTSRange = NULL;
pJoinInfo->rowCtx.rightRowLocations = NULL;
}
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
......@@ -578,6 +592,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
int32_t code = TSDB_CODE_SUCCESS;
SJoinOperatorInfo* pJoinInfo = pOperator->info;
SArray* leftRowLocations = NULL;
SArray* rightRowLocations = NULL;
SArray* leftCreatedBlocks = NULL;
SArray* rightCreatedBlocks = NULL;
int32_t leftRowIdx = 0;
......@@ -588,6 +603,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
rightTableHash = pJoinInfo->rowCtx.buildTableTSRange;
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
rightRowIdx = pJoinInfo->rowCtx.rightRowIdx;
......@@ -602,8 +618,11 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash);
taosArrayDestroy(rightRowLocations);
if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash);
taosArrayDestroy(rightRowLocations);
rightRowLocations = NULL;
}
}
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
......@@ -617,12 +636,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
if (code == TSDB_CODE_SUCCESS) {
mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
rightRowIdx, rightTableHash, &reachThreshold);
rightRowIdx, rightTableHash, rightRowLocations, &reachThreshold);
}
if (!reachThreshold) {
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
rightTableHash);
rightTableHash, rightRowLocations);
} else {
pJoinInfo->rowCtx.rowRemains = true;
......@@ -631,6 +650,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
pJoinInfo->rowCtx.buildTableTSRange = rightTableHash;
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
}
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册