diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6cb7d8852310082e0f1431c265c01752f5d527b7..a4b39645a213238c8291f21838e507cdd2d31025 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -180,6 +180,8 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); +int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, + uint32_t numOfRows, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b0085605b77e02a01f6d50cc4d79917c2ef9b66c..03edb1be73604d045f2356c625aa7443548d4503 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -271,7 +271,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren if (numOfRows > 1) { int32_t* pOffset = pColumnInfoData->varmeta.offset; - memset(&pOffset[currentRow + 1], &pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1)); + memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1)); pColumnInfoData->reassigned = true; } diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 136444ed394b294d83469fe489dd8994ebc91e42..a4a180d542b95c8580c10c9cfb08ab96aa9c6607 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -21,6 +21,14 @@ extern "C" { #define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760 +#pragma pack(push, 1) +typedef struct SBufRowInfo { + void* next; + uint16_t pageId; + int32_t offset; +} SBufRowInfo; +#pragma pack(pop) + typedef struct SHJoinCtx { bool rowRemains; SBufRowInfo* pBuildRow; @@ -42,7 +50,6 @@ typedef struct SHJoinColInfo { int32_t bytes; char* data; char* bitMap; - char* dataInBuf; } SHJoinColInfo; typedef struct SBufPageInfo { @@ -51,13 +58,6 @@ typedef struct SBufPageInfo { char* data; } SBufPageInfo; -#pragma pack(push, 1) -typedef struct SBufRowInfo { - void* next; - uint16_t pageId; - int32_t offset; -} SBufRowInfo; -#pragma pack(pop) typedef struct SGroupData { SBufRowInfo* rows; @@ -96,9 +96,6 @@ typedef struct SHJoinOperatorInfo { SHJoinCtx ctx; } SHJoinOperatorInfo; -static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator); -static void destroyHashJoinOperator(void* param); - #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 1d2685b8c6f98aa8309c2e9900c4d378ff227e48..c18edd870f58342bc5ad3233de4dca7b07e623b2 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -126,6 +126,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); @@ -163,4 +165,4 @@ int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SA } #endif -#endif // TDENGINE_OPERATOR_H \ No newline at end of file +#endif // TDENGINE_OPERATOR_H diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 4e1b2d9e5e4fe8024b7fe3f149123dbb87881844..fca4a86f8d931d11da1dee9dcfb93cf58705ce00 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -27,7 +27,7 @@ #include "ttypes.h" #include "hashjoin.h" -int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { +static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { pTable->keyNum = LIST_LENGTH(pList); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo)); @@ -40,9 +40,9 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { SNode* pNode = NULL; FOREACH(pNode, pList) { SColumnNode* pColNode = (SColumnNode*)pNode; - pTable->keyCols[i]->srcSlot = pColNode->slotId; - pTable->keyCols[i]->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); - pTable->keyCols[i]->bytes = pColNode->node.resType.bytes; + pTable->keyCols[i].srcSlot = pColNode->slotId; + pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + pTable->keyCols[i].bytes = pColNode->node.resType.bytes; bufSize += pColNode->node.resType.bytes; ++i; } @@ -57,7 +57,7 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { return TSDB_CODE_SUCCESS; } -void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { +static void getHJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { *colNum = 0; SNode* pNode = NULL; @@ -70,7 +70,7 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { } } -bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { +static bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { for (int32_t i = 0; i < keyNum; ++i) { if (pKeys[i].srcSlot == slotId) { *pKeyIdx = i; @@ -81,8 +81,8 @@ bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32 return false; } -int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { - getJoinValColNum(pList, pTable->blkId, &pTable->valNum); +static int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { + getHJoinValColNum(pList, pTable->blkId, &pTable->valNum); if (pTable->valNum == 0) { return TSDB_CODE_SUCCESS; } @@ -133,7 +133,7 @@ int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { } -int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { +static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { SNodeList* pKeyList = NULL; SHJoinTableInfo* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; @@ -144,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo pKeyList = pJoinNode->pOnRight; } - int32_t code = initJoinKeyColsInfo(pTable, pKeyList); + int32_t code = initHJoinKeyColsInfo(pTable, pKeyList); if (code) { return code; } - int32_t code = initJoinValColsInfo(pTable, pJoinNode->pTargets); + code = initJoinValColsInfo(pTable, pJoinNode->pTargets); if (code) { return code; } @@ -158,7 +158,7 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo return TSDB_CODE_SUCCESS; } -void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { +static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { int32_t buildIdx = 0; int32_t probeIdx = 1; @@ -190,7 +190,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ pInfo->pProbe = &pInfo->tbs[probeIdx]; } -void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { +static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { pInfo->pResColNum = pJoinNode->pTargets->length; pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); if (NULL == pInfo->pResColMap) { @@ -213,7 +213,7 @@ void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode } -FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { +static FORCE_INLINE int32_t addPageToHJoinBuf(SArray* pRowBufs) { SBufPageInfo page; page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; page.offset = 0; @@ -226,126 +226,70 @@ FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { return TSDB_CODE_SUCCESS; } -int32_t initJoinBufPages(SHJoinOperatorInfo* pInfo) { +static int32_t initHJoinBufPages(SHJoinOperatorInfo* pInfo) { pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); if (NULL == pInfo->pRowBufs) { return TSDB_CODE_OUT_OF_MEMORY; } - return addPageToJoinBuf(pInfo->pRowBufs); + return addPageToHJoinBuf(pInfo->pRowBufs); } +static void freeHJoinTableInfo(SHJoinTableInfo* pTable) { + taosMemoryFreeClear(pTable->keyCols); + taosMemoryFreeClear(pTable->keyBuf); + taosMemoryFreeClear(pTable->valCols); + taosArrayDestroy(pTable->valVarCols); +} -SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, - SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { - SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - - int32_t code = TSDB_CODE_SUCCESS; - if (pOperator == NULL || pInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - int32_t numOfCols = 0; - pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - initResultSizeInfo(&pOperator->resultInfo, 4096); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - - setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - - initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); - initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - - setJoinBuildAndProbeTable(pInfo, pJoinNode); - code = buildJoinResColMap(pInfo, pJoinNode); - if (code) { - goto _error; - } - - code = initJoinBufPages(pInfo); - if (code) { - goto _error; - } +static void freeHJoinBufPage(void* param) { + SBufPageInfo* pInfo = (SBufPageInfo*)param; + taosMemoryFree(pInfo->data); +} - size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024; - pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); - if (pInfo->pKeyHash == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; +static void destroyHJoinKeyHash(SSHashObj** ppHash) { + if (NULL == ppHash || NULL == (*ppHash)) { + return; } - if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) { - pInfo->pCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (pInfo->pCond == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(*ppHash, pIte, &iter)) != NULL) { + SGroupData* pGroup = pIte; + SBufRowInfo* pRow = pGroup->rows; + SBufRowInfo* pNext = NULL; + while (pRow) { + pNext = pRow->next; + taosMemoryFree(pRow); + pRow = pNext; } - - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCond); - pLogicCond->pParameterList = nodesMakeList(); - if (pLogicCond->pParameterList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFilterConditions)); - nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); - pLogicCond->condType = LOGIC_COND_TYPE_AND; - } else if (pJoinNode->pFilterConditions != NULL) { - pInfo->pCond = nodesCloneNode(pJoinNode->pFilterConditions); - } else if (pJoinNode->node.pConditions != NULL) { - pInfo->pCond = nodesCloneNode(pJoinNode->node.pConditions); - } else { - pInfo->pCond = NULL; - } - - code = filterInitFromNode(pInfo->pCond, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroHashJoinOperator, optrDefaultBufFn, NULL); - - return pOperator; - -_error: - if (pInfo != NULL) { - destroyHashJoinOperator(pInfo); - } - - taosMemoryFree(pOperator); - pTaskInfo->code = code; - return NULL; + tSimpleHashCleanup(*ppHash); + *ppHash = NULL; } -void destroHashJoinOperator(void* param) { +static void destroyHashJoinOperator(void* param) { SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; - if (pJoinOperator->pColEqualOnConditions != NULL) { - mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); - taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->rightEqOnCondCols); - taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->leftEqOnCondCols); - } - nodesDestroyNode(pJoinOperator->pCondAfterMerge); - - taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); - taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); + destroyHJoinKeyHash(&pJoinOperator->pKeyHash); + freeHJoinTableInfo(&pJoinOperator->tbs[0]); + freeHJoinTableInfo(&pJoinOperator->tbs[1]); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); + taosMemoryFreeClear(pJoinOperator->pResColMap); + taosArrayDestroyEx(pJoinOperator->pRowBufs, freeHJoinBufPage); + nodesDestroyNode(pJoinOperator->pCond); + taosMemoryFreeClear(param); } -FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { +static FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); return pPage->data + pRow->offset; } -FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { +static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { SHJoinTableInfo* pBuild = pJoin->pBuild; SHJoinTableInfo* pProbe = pJoin->pProbe; int32_t buildIdx = 0, buildValIdx = 0; @@ -356,11 +300,17 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r for (int32_t r = 0; r < rowNum; ++r) { char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); char* pValData = pData + pBuild->valBitMapSize; + char* pKeyData = pProbe->keyData; + buildIdx = buildValIdx = probeIdx = 0; for (int32_t i = 0; i < pJoin->pResColNum; ++i) { if (pJoin->pResColMap[i]) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); if (pBuild->valCols[buildIdx].keyCol) { - + code = colDataSetVal(pDst, pRes->info.rows + r, pKeyData, false); + if (code) { + return code; + } + pKeyData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pKeyData) : pBuild->valCols[buildIdx].bytes; } else { if (colDataIsNull_f(pData, buildValIdx)) { code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true); @@ -377,8 +327,8 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r buildValIdx++; } buildIdx++; - } else { - SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot); + } else if (0 == i) { + SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); @@ -395,20 +345,20 @@ FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t r } -FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { +static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinCtx* pCtx = &pJoin->ctx; SBufRowInfo* pStart = pCtx->pBuildRow; int32_t rowNum = 0; - int32_t resNum = pRes.info.rows; + int32_t resNum = pRes->info.rows; - while (pCtx->pBuildRow && resNum < pRes.info.capacity) { + while (pCtx->pBuildRow && resNum < pRes->info.capacity) { rowNum++; resNum++; pCtx->pBuildRow = pCtx->pBuildRow->next; } - int32_t code = copyJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); + int32_t code = copyHJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); if (code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, code); @@ -418,7 +368,42 @@ FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBl pCtx->rowRemains = pCtx->pBuildRow ? true : false; } -void doHashJoinImpl(struct SOperatorInfo* pOperator) { + +static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { + char *pData = NULL; + size_t bufLen = 0; + + if (1 == pTable->keyNum) { + if (pTable->keyCols[0].vardata) { + pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; + bufLen = varDataTLen(pData); + } else { + pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx; + bufLen = pTable->keyCols[0].bytes; + } + pTable->keyData = pData; + } else { + for (int32_t i = 0; i < pTable->keyNum; ++i) { + if (pTable->keyCols[i].vardata) { + pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; + memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } else { + pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; + memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); + bufLen += pTable->keyCols[i].bytes; + } + } + pTable->keyData = pTable->keyBuf; + } + + if (pBufLen) { + *pBufLen = bufLen; + } +} + + +static void doHashJoinImpl(struct SOperatorInfo* pOperator) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinTableInfo* pProbe = pJoin->pProbe; SHJoinCtx* pCtx = &pJoin->ctx; @@ -426,7 +411,7 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) { size_t bufLen = 0; if (pJoin->ctx.pBuildRow) { - appendJoinResToBlock(pOperator, pRes); + appendHJoinResToBlock(pOperator, pRes); return; } @@ -435,15 +420,15 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) { SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); if (pGroup) { pCtx->pBuildRow = pGroup->rows; - appendJoinResToBlock(pOperator, pRes); - if (pRes->info.rows >= pRes.info.capacity) { + appendHJoinResToBlock(pOperator, pRes); + if (pRes->info.rows >= pRes->info.capacity) { break; } } } } -int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { +static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { for (int32_t i = 0; i < pTable->keyNum; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -463,7 +448,7 @@ int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { return TSDB_CODE_SUCCESS; } -int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { +static int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { if (!pTable->valColExist) { return TSDB_CODE_SUCCESS; } @@ -480,7 +465,7 @@ int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes); return TSDB_CODE_INVALID_PARA; } - if (!pTable->valCols[i].vardata)) { + if (!pTable->valCols[i].vardata) { pTable->valCols[i].bitMap = pCol->nullbitmap; } pTable->valCols[i].data = pCol->pData; @@ -493,40 +478,8 @@ int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { } -FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { - char *pData = NULL; - size_t bufLen = 0; - - if (1 == pTable->keyNum) { - if (pTable->keyCols[0].vardata) { - pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; - bufLen = varDataTLen(pData); - } else { - pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx; - bufLen = pTable->keyCols[0].bytes; - } - pTable->keyData = pData; - } else { - for (int32_t i = 0; i < pTable->keyNum; ++i) { - if (pTable->keyCols[i].vardata) { - pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; - memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); - bufLen += varDataTLen(pData); - } else { - pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; - memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); - bufLen += pTable->keyCols[i].bytes; - } - } - pTable->keyData = pTable->keyBuf; - } - - if (pBufLen) { - *pBufLen = bufLen; - } -} -FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { +static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { if (!pTable->valColExist) { return; } @@ -559,7 +512,7 @@ FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) } -FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { +static FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { if (0 == bufSize) { pRow->pageId = -1; return TSDB_CODE_SUCCESS; @@ -580,14 +533,14 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** return TSDB_CODE_SUCCESS; } - int32_t code = addPageToJoinBuf(pPages); + int32_t code = addPageToHJoinBuf(pPages); if (code) { return code; } } while (true); } -FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { +static FORCE_INLINE int32_t getHJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { if (NULL == pTable->valVarCols) { return pTable->valBufSize; } @@ -605,7 +558,7 @@ FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) } -int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { +static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { SGroupData group = {0}; SBufRowInfo* pRow = NULL; @@ -616,13 +569,13 @@ int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTa } pRow = group.rows; } else { - pRow = taosMemoryMalloc(sizeof(SBufRowInfo)) + pRow = taosMemoryMalloc(sizeof(SBufRowInfo)); if (NULL == pRow) { return TSDB_CODE_OUT_OF_MEMORY; } } - int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); + int32_t code = getHJoinValBufSize(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); if (code) { return code; } @@ -640,7 +593,7 @@ int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTa return TSDB_CODE_SUCCESS; } -int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { +static int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { SHJoinTableInfo* pBuild = pJoin->pBuild; int32_t code = setValColsData(pBlock, pBuild); if (code) { @@ -658,7 +611,7 @@ int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyL return TSDB_CODE_SUCCESS; } -int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { +static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { SHJoinTableInfo* pBuild = pJoin->pBuild; int32_t code = setKeyColsData(pBlock, pBuild); if (code) { @@ -677,8 +630,7 @@ int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { return code; } -int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { - SHJoinOperatorInfo* pJoin = pOperator->info; +static int32_t buildHJoinKeyHash(SHJoinOperatorInfo* pJoin) { SSDataBlock* pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -697,7 +649,7 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { +static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinTableInfo* pProbe = pJoin->pProbe; int32_t code = setKeyColsData(pBlock, pProbe); @@ -714,9 +666,19 @@ void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { pJoin->ctx.pProbeData = pBlock; doHashJoinImpl(pOperator); + + return TSDB_CODE_SUCCESS; } -SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { +static void setHJoinDone(struct SOperatorInfo* pOperator) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + pOperator->status = OP_EXEC_DONE; + + SHJoinOperatorInfo* pInfo = pOperator->info; + destroyHJoinKeyHash(&pInfo->pKeyHash); +} + +static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { SHJoinOperatorInfo* pJoin = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; @@ -728,15 +690,14 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { } if (NULL == pJoin->pKeyHash) { - code = buildJoinKeyHash(pJoin); + code = buildHJoinKeyHash(pJoin); if (code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - pOperator->status = OP_EXEC_DONE; + setHJoinDone(pOperator); return NULL; } } @@ -755,12 +716,15 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { while (true) { SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); if (NULL == pBlock) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - pOperator->status = OP_EXEC_DONE; + setHJoinDone(pOperator); break; } - launchBlockHashJoin(pOperator, pBlock); + code = launchBlockHashJoin(pOperator, pBlock); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } if (pRes->info.rows < pOperator->resultInfo.threshold) { continue; @@ -776,3 +740,88 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { return (pRes->info.rows > 0) ? pRes : NULL; } + +SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { + SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + int32_t code = TSDB_CODE_SUCCESS; + if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + int32_t numOfCols = 0; + pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + + setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + + initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); + initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); + + setHJoinBuildAndProbeTable(pInfo, pJoinNode); + code = buildHJoinResColMap(pInfo, pJoinNode); + if (code) { + goto _error; + } + + code = initHJoinBufPages(pInfo); + if (code) { + goto _error; + } + + size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024; + pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (pInfo->pKeyHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) { + pInfo->pCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (pInfo->pCond == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCond); + pLogicCond->pParameterList = nodesMakeList(); + if (pLogicCond->pParameterList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pFilterConditions)); + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); + pLogicCond->condType = LOGIC_COND_TYPE_AND; + } else if (pJoinNode->pFilterConditions != NULL) { + pInfo->pCond = nodesCloneNode(pJoinNode->pFilterConditions); + } else if (pJoinNode->node.pConditions != NULL) { + pInfo->pCond = nodesCloneNode(pJoinNode->node.pConditions); + } else { + pInfo->pCond = NULL; + } + + code = filterInitFromNode(pInfo->pCond, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL); + + return pOperator; + +_error: + if (pInfo != NULL) { + destroyHashJoinOperator(pInfo); + } + + taosMemoryFree(pOperator); + pTaskInfo->code = code; + return NULL; +} + +