From ee16b46112e6a83eb1434b9ab90cc104c8e89bcb Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 18 May 2023 10:46:19 +0800 Subject: [PATCH] fix: remove group keys from join operator --- source/libs/executor/src/joinoperator.c | 94 +++++-------------------- 1 file changed, 18 insertions(+), 76 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 07b8a4db11..68f6951c26 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -55,12 +55,10 @@ typedef struct SJoinOperatorInfo { SNode* pTagEqualConditions; SArray* leftTagCols; - SArray* leftTagKeys; char* leftTagKeyBuf; int32_t leftTagKeyLen; SArray* rightTagCols; - SArray* rightTagKeys; char* rightTagKeyBuf; int32_t rightTagKeyLen; @@ -141,27 +139,11 @@ static void extractTagEqualCondCols(SJoinOperatorInfo* pInfo, SOperatorInfo** pD } } -static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { - *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if ((*pGroupColVals) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - +static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = (SColumn*)taosArrayGet(pGroupColList, i); (*keyLen) += pCol->bytes; // actual data + null_flag - - SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); - if (key.pData == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush((*pGroupColVals), &key); } int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; @@ -175,10 +157,11 @@ static int32_t initTagColsGroupkeys(SArray** pGroupColVals, int32_t* keyLen, cha return TSDB_CODE_SUCCESS; } -static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDataBlock* pBlock, int32_t rowIndex) { +static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t rowIndex, void* pKey) { SColumnDataAgg* pColAgg = NULL; - size_t numOfGroupCols = taosArrayGetSize(pCols); + char* isNull = (char*)pKey; + char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; for (int32_t i = 0; i < numOfGroupCols; ++i) { SColumn* pCol = (SColumn*) taosArrayGet(pCols, i); @@ -193,56 +176,24 @@ static void fillGroupKeyValsFromTagCols(SArray* pCols, SArray* pGroupKeys, SSDat pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? } - SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { - pkey->isNull = true; + isNull[i] = 1; } else { - pkey->isNull = false; + isNull[i] = 0; char* val = colDataGetData(pColInfoData, rowIndex); - if (pkey->type == TSDB_DATA_TYPE_JSON) { - if (tTagIsJson(val)) { - terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; - return; - } + if (pCol->type == TSDB_DATA_TYPE_JSON) { int32_t dataLen = getJsonValueLen(val); - memcpy(pkey->pData, val, dataLen); - } else if (IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, varDataTLen(val)); - ASSERT(varDataTLen(val) <= pkey->bytes); + memcpy(pStart, val, dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + varDataCopy(pStart, val); + pStart += varDataTLen(val); } else { - memcpy(pkey->pData, val, pkey->bytes); + memcpy(pStart, val, pCol->bytes); + pStart += pCol->bytes; } } } -} - -static int32_t combineGroupKeysIntoBuf(void* pKey, const SArray* pGroupKeys) { - size_t numOfGroupCols = taosArrayGetSize(pGroupKeys); - - char* isNull = (char*)pKey; - char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols; - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SGroupKeys* pkey = taosArrayGet(pGroupKeys, i); - if (pkey->isNull) { - isNull[i] = 1; - continue; - } - - isNull[i] = 0; - if (pkey->type == TSDB_DATA_TYPE_JSON) { - int32_t dataLen = getJsonValueLen(pkey->pData); - memcpy(pStart, (pkey->pData), dataLen); - pStart += dataLen; - } else if (IS_VAR_DATA_TYPE(pkey->type)) { - varDataCopy(pStart, pkey->pData); - pStart += varDataTLen(pkey->pData); - ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); - } else { - memcpy(pStart, pkey->pData, pkey->bytes); - pStart += pkey->bytes; - } - } - return (int32_t)(pStart - (char*)pKey); } @@ -312,8 +263,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->leftTagCols = taosArrayInit(4, sizeof(SColumn)); pInfo->rightTagCols = taosArrayInit(4, sizeof(SColumn)); extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); - initTagColsGroupkeys(&pInfo->leftTagKeys, &pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); - initTagColsGroupkeys(&pInfo->rightTagKeys, &pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); + initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -341,20 +292,13 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } -static void freeGroupKeyData(void* param) { - SGroupKeys* pKey = (SGroupKeys*)param; - taosMemoryFree(pKey->pData); -} - void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; if (pJoinOperator->pTagEqualConditions != NULL) { taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->rightTagKeys, freeGroupKeyData); taosArrayDestroy(pJoinOperator->rightTagCols); taosMemoryFreeClear(pJoinOperator->leftTagKeyBuf); - taosArrayDestroyEx(pJoinOperator->leftTagKeys, freeGroupKeyData); taosArrayDestroy(pJoinOperator->leftTagCols); } nodesDestroyNode(pJoinOperator->pCondAfterMerge); @@ -482,8 +426,7 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); - fillGroupKeyValsFromTagCols(pInfo->rightTagCols, pInfo->rightTagKeys, rightRow->pDataBlock, rightRow->pos); - int32_t keyLen = combineGroupKeysIntoBuf(pInfo->rightTagKeyBuf, pInfo->rightTagKeys); + int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); SArray** ppRows = tSimpleHashGet(buildTable, pInfo->rightTagKeyBuf, keyLen); if (!ppRows) { SArray* rows = taosArrayInit(4, sizeof(SRowLocation)); @@ -511,8 +454,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SArray* pRightRows = NULL; if (rightTableHash != NULL) { - fillGroupKeyValsFromTagCols(pJoinInfo->leftTagCols, pJoinInfo->leftTagKeys, leftRow->pDataBlock, leftRow->pos); - int32_t keyLen = combineGroupKeysIntoBuf(pJoinInfo->leftTagKeyBuf, pJoinInfo->leftTagKeys); + int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); if (!ppRightRows) { continue; -- GitLab