From c6555c3b3778808e910a2044714913cd62895d61 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 16 May 2023 16:25:55 +0800 Subject: [PATCH] fix: join operator finished --- source/libs/executor/src/joinoperator.c | 365 +++++++++++++++++++----- source/libs/planner/src/planOptimizer.c | 8 + 2 files changed, 305 insertions(+), 68 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 754b5f4737..a5591b22b3 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -30,11 +30,11 @@ typedef struct SJoinRowCtx { bool rowRemains; int64_t ts; SArray* leftRowLocations; - SArray* rightRowLocations; SArray* leftCreatedBlocks; SArray* rightCreatedBlocks; int32_t leftRowIdx; int32_t rightRowIdx; + SSHashObj* buildTableTSRange; } SJoinRowCtx; typedef struct SJoinOperatorInfo { @@ -50,6 +50,17 @@ typedef struct SJoinOperatorInfo { int32_t rightPos; SColumnInfo rightCol; SNode* pCondAfterMerge; + SNode* pTagEqualConditions; + + SArray* leftTagCols; + SArray* leftTagKeys; + char* leftTagKeyBuf; + int32_t leftTagKeyLen; + + SArray* rightTagCols; + SArray* rightTagKeys; + char* rightTagKeyBuf; + int32_t rightTagKeyLen; SJoinRowCtx rowCtx; } SJoinOperatorInfo; @@ -92,6 +103,147 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown setJoinColumnInfo(&pInfo->rightCol, rightTsCol); } +static void extractTagEqualColsFromOper(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstreams, SOperatorNode* pOperNode, + SColumn* pLeft, SColumn* pRight) { + SColumnNode* pLeftNode = (SColumnNode*)pOperNode->pLeft; + SColumnNode* pRightNode = (SColumnNode*)pOperNode->pRight; + if (pLeftNode->dataBlockId == pRightNode->dataBlockId || pLeftNode->dataBlockId == pDownstreams[0]->resultDataBlockId) { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + } else { + *pLeft = extractColumnFromColumnNode((SColumnNode*)pOperNode->pRight); + *pRight = extractColumnFromColumnNode((SColumnNode*)pOperNode->pLeft); + } +} + +static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownStream, SNode* pTagEqualNode, + SArray* leftTagEqCols, SArray* rightTagEqCols) { + SColumn left = {0}; + SColumn right = {0}; + if (nodeType(pTagEqualNode) == QUERY_NODE_LOGIC_CONDITION && ((SLogicConditionNode*)pTagEqualNode)->condType == LOGIC_COND_TYPE_AND) { + SNode* pNode = NULL; + FOREACH(pNode, ((SLogicConditionNode*)pTagEqualNode)->pParameterList) { + SOperatorNode* pOperNode = (SOperatorNode*)pNode; + extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } + return; + } + + if (nodeType(pTagEqualNode) == QUERY_NODE_OPERATOR) { + SOperatorNode* pOperNode = (SOperatorNode*)pTagEqualNode; + extractTagEqualColsFromOper(pInfo, pDownStream, pOperNode, &left, &right); + taosArrayPush(leftTagEqCols, &left); + taosArrayPush(rightTagEqCols, &right); + } +} + +static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { + *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if ((*pGroupColVals) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); + (*keyLen) += pCol->bytes; // actual data + null_flag + + SGroupKeys key = {0}; + key.bytes = pCol->bytes; + key.type = pCol->type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pCol->bytes); + if (key.pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush((*pGroupColVals), &key); + } + + int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + (*keyLen) += nullFlagSize; + + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); + if ((*keyBuf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDataBlock* pBlock, int32_t rowIndex) { + SColumnDataAgg* pColAgg = NULL; + + size_t numOfGroupCols = taosArrayGetSize(pCols); + + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + + // valid range check. todo: return error code. + if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { + continue; + } + + if (pBlock->pBlockAgg != NULL) { + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + pkey->isNull = true; + } else { + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, rowIndex); + if (pkey->type == TSDB_DATA_TYPE_JSON) { + if (tTagIsJson(val)) { + terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; + return; + } + int32_t dataLen = getJsonValueLen(val); + memcpy(pkey->pData, val, dataLen); + } else if (IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, varDataTLen(val)); + ASSERT(varDataTLen(val) <= pkey->bytes); + } else { + memcpy(pkey->pData, val, pkey->bytes); + } + } + } +} + +static int32_t combineGroupKeysIntoBuf(void* pKey, const SArray* pGroupKeys) { + size_t numOfGroupCols = taosArrayGetSize(pGroupKeys); + + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); + if (pkey->isNull) { + isNull[i] = 1; + continue; + } + + isNull[i] = 0; + if (pkey->type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(pkey->pData); + memcpy(pStart, (pkey->pData), dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pkey->type)) { + varDataCopy(pStart, pkey->pData); + pStart += varDataTLen(pkey->pData); + ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); + } else { + memcpy(pStart, pkey->pData, pkey->bytes); + pStart += pkey->bytes; + } + } + + return (int32_t)(pStart - (char*)pKey); +} + SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); @@ -153,6 +305,13 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->inputOrder = TSDB_ORDER_DESC; } + pInfo->pTagEqualConditions = pJoinNode->pTagEqualCondtions; + pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); + pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); + extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); + initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { @@ -179,8 +338,22 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } +static void freeGroupKeyData(void* param) { + SGroupKeys* pKey = (SGroupKeys*)param; + taosMemoryFree(pKey->pData); +} + void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; + + taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->rightTagCols); + + taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); + taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); + taosArrayDestroy(pJoinOperator->leftTagCols); + nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); @@ -300,22 +473,121 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } +static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) { + int32_t buildTableCap = MIN(taosArrayGetSize(rightRowLocations), 4096); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); + for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { + SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); + fillGroupKeyValsFromTagCols(pInfo->rightTagCols, pInfo->rightTagKeys, rightRow->pDataBlock, rightRow->pos); + int32_t keyLen = combineGroupKeysIntoBuf(pInfo->rightTagKeyBuf, pInfo->rightTagKeys); + SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen); + if (!ppRows) { + SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); + taosArrayPush(rows, rightRow); + tSimpleHashPut(buildTable, pInfo->rightTagKeyBuf, keyLen, &rows, POINTER_BYTES); + } else { + taosArrayPush(*ppRows, rightRow); + } + } + *ppHashObj = buildTable; + return TSDB_CODE_SUCCESS; +} + +static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, + const SArray* leftRowLocations, int32_t leftRowIdx, + int32_t rightRowIdx, SSHashObj* rightTableHash, bool* pReachThreshold) { + *pReachThreshold = false; + uint32_t limitRowNum = pOperator->resultInfo.threshold; + SJoinOperatorInfo* pJoinInfo = pOperator->info; + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + + int32_t i,j; + + for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); + int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); + SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); + if (!ppRightRows) { + continue; + } + SArray* pRightRows = *ppRightRows; + size_t rightRowsSize = taosArrayGetSize(pRightRows); + for (j = rightRowIdx; j < rightRowsSize; ++j) { + if (*nRows >= limitRowNum) { + *pReachThreshold = true; + break; + } + + SRowLocation* rightRow = taosArrayGet(pRightRows, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + if (*pReachThreshold) { + break; + } + } + + if (*pReachThreshold) { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.leftRowIdx = i; + pJoinInfo->rowCtx.rightRowIdx = j; + } + return TSDB_CODE_SUCCESS; +} + +static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { + void* p = NULL; + int32_t iter = 0; + + while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + + tSimpleHashCleanup(pBuildTable); +} + +static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, + SArray* rightCreatedBlocks, SSHashObj* rightTableHash) { + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + mergeJoinDestoryBuildTable(rightTableHash); + + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + + pJoinInfo->rowCtx.rowRemains = false; + pJoinInfo->rowCtx.leftRowLocations = NULL; + pJoinInfo->rowCtx.leftCreatedBlocks = NULL; + pJoinInfo->rowCtx.rightCreatedBlocks = NULL; + pJoinInfo->rowCtx.buildTableTSRange = NULL; +} + static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, int32_t* nRows) { int32_t code = TSDB_CODE_SUCCESS; SJoinOperatorInfo* pJoinInfo = pOperator->info; SArray* leftRowLocations = NULL; SArray* leftCreatedBlocks = NULL; - SArray* rightRowLocations = NULL; SArray* rightCreatedBlocks = NULL; int32_t leftRowIdx = 0; int32_t rightRowIdx = 0; - int32_t i, j; - + SSHashObj* rightTableHash = NULL; + if (pJoinInfo->rowCtx.rowRemains) { leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; - rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; + rightTableHash = pJoinInfo->rowCtx.buildTableTSRange; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; rightRowIdx = pJoinInfo->rowCtx.rightRowIdx; @@ -323,85 +595,42 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); - rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); + taosArrayDestroy(rightRowLocations); } size_t leftNumJoin = taosArrayGetSize(leftRowLocations); - size_t rightNumJoin = taosArrayGetSize(rightRowLocations); - uint32_t maxRowNum = *nRows + (leftNumJoin - leftRowIdx - 1) * rightNumJoin + rightNumJoin - rightRowIdx; - uint32_t limitRowNum = maxRowNum; - if (maxRowNum > pOperator->resultInfo.threshold) { - limitRowNum = pOperator->resultInfo.threshold; - if (!pJoinInfo->rowCtx.rowRemains) { - pJoinInfo->rowCtx.rowRemains = true; - pJoinInfo->rowCtx.ts = timestamp; - pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; - pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; - pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; - pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - } - } - - code = blockDataEnsureCapacity(pRes, limitRowNum); + code = blockDataEnsureCapacity(pRes, pOperator->resultInfo.threshold); if (code != TSDB_CODE_SUCCESS) { - qError("%s can not ensure block capacity for join. left: %zu, right: %zu", GET_TASKID(pOperator->pTaskInfo), - leftNumJoin, rightNumJoin); + qError("%s can not ensure block capacity for join. left: %zu", GET_TASKID(pOperator->pTaskInfo), + leftNumJoin); } - - if (code == TSDB_CODE_SUCCESS) { - bool done = false; - for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { - for (j = rightRowIdx; j < rightNumJoin; ++j) { - if (*nRows >= limitRowNum) { - done = true; - break; - } + bool reachThreshold = false; - SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); - SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); - mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, - rightRow->pos); - ++*nRows; - } - if (done) { - break; - } - } - - if (maxRowNum > pOperator->resultInfo.threshold) { - pJoinInfo->rowCtx.leftRowIdx = i; - pJoinInfo->rowCtx.rightRowIdx = j; - } + if (code == TSDB_CODE_SUCCESS) { + mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, + rightRowIdx, rightTableHash, &reachThreshold); } - if (maxRowNum <= pOperator->resultInfo.threshold) { - for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(rightCreatedBlocks); - taosArrayDestroy(rightRowLocations); - for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { - SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); - blockDataDestroy(pBlock); - } - taosArrayDestroy(leftCreatedBlocks); - taosArrayDestroy(leftRowLocations); + if (!reachThreshold) { + mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, + rightTableHash); - if (pJoinInfo->rowCtx.rowRemains) { - pJoinInfo->rowCtx.rowRemains = false; - pJoinInfo->rowCtx.leftRowLocations = NULL; - pJoinInfo->rowCtx.rightRowLocations = NULL; - pJoinInfo->rowCtx.leftCreatedBlocks = NULL; - pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - } + } else { + pJoinInfo->rowCtx.rowRemains = true; + pJoinInfo->rowCtx.ts = timestamp; + pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; + pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; + pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; + pJoinInfo->rowCtx.buildTableTSRange = rightTableHash; } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f5ef02e207..8c111133fc 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -759,6 +759,14 @@ static bool pushDownCondOptIsTagEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { if (OP_TYPE_EQUAL != pOper->opType) { return false; } + if (QUERY_NODE_COLUMN != nodeType(pOper->pLeft) || QUERY_NODE_COLUMN != nodeType(pOper->pRight)) { + return false; + } + SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); + SColumnNode* pRight = (SColumnNode*)(pOper->pRight); + if (pLeft->node.resType.type != pRight->node.resType.type) { + return false; + } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; if (pushDownCondOptIsTag(pOper->pLeft, pLeftCols)) { -- GitLab