diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c2cdfc056af402858ca63e60cf42b9b3c05a3afa..f3b90a8fd2c2185dc4f332831c6bb87126ec9260 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -36,7 +36,7 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows; } else { - return pColumnInfoData->info.bytes * numOfRows + BitmapLen(numOfRows); + return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) + BitmapLen(numOfRows); } } diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 51440a7f59567daaf3d76929628946a1d4c0544a..cff568aebc0c26fb2e9c22c7a027fa9edb4dd81a 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -36,12 +36,13 @@ typedef struct SMultiMergeSource { typedef struct SSortSource { SMultiMergeSource src; - union { - struct { - SArray* pageIdList; - int32_t pageIndex; - }; + struct { + SArray* pageIdList; + int32_t pageIndex; + }; + struct { void* param; + bool onlyRef; }; } SSortSource; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 916b6df96971c88166bffe74f134deb66905d377..7128e0a2c9d5225ba7ef1c4f15d60851aac44522 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4532,6 +4532,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); ps->param = param; + ps->onlyRef = true; tsortAddSource(pInfo->pSortHandle, ps); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 14e316345576b665c015288797d886d30cd44136..0e7644151a572c6f8f32193023ec6cd97a5d9ad0 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -176,10 +176,10 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[0]; + ps->onlyRef = true; tsortAddSource(pInfo->pSortHandle, ps); int32_t code = tsortOpen(pInfo->pSortHandle); - taosMemoryFreeClear(ps); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); @@ -377,10 +377,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; ps->param = param; + ps->onlyRef = false; tsortAddSource(pInfo->pCurrSortHandle, ps); int32_t code = tsortOpen(pInfo->pCurrSortHandle); - taosMemoryFreeClear(ps); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); @@ -471,6 +471,9 @@ void destroyGroupSortOperatorInfo(void* param) { taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->matchInfo.pList); + tsortDestroySortHandle(pInfo->pCurrSortHandle); + pInfo->pCurrSortHandle = NULL; + taosMemoryFreeClear(param); } @@ -563,6 +566,7 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[i]; + ps->onlyRef = true; tsortAddSource(pInfo->pSortHandle, ps); } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 9c10b51b1f2968500071cb0d14400ed631b7d887..1c31b550c64223b2afaf10499ff5512685b2ca68 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -110,6 +110,22 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } +void tsortClearOrderdSource(SArray *pOrderedSource) { + for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { + SSortSource** pSource = taosArrayGet(pOrderedSource, i); + if (NULL == *pSource) { + continue; + } + + if ((*pSource)->param && !(*pSource)->onlyRef) { + taosMemoryFree((*pSource)->param); + } + taosMemoryFreeClear(*pSource); + } + + taosArrayClear(pOrderedSource); +} + void tsortDestroySortHandle(SSortHandle* pSortHandle) { if (pSortHandle == NULL) { return; @@ -123,10 +139,8 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { destroyDiskbasedBuf(pSortHandle->pBuf); taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); - for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++) { - SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i); - taosMemoryFreeClear(*pSource); - } + + tsortClearOrderdSource(pSortHandle->pOrderedSource); taosArrayDestroy(pSortHandle->pOrderedSource); taosMemoryFreeClear(pSortHandle); } @@ -561,7 +575,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } } - taosArrayClear(pHandle->pOrderedSource); + tsortClearOrderdSource(pHandle->pOrderedSource); taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayDestroy(pResList); @@ -598,8 +612,11 @@ static int32_t createInitialSources(SSortHandle* pHandle) { size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; if (pHandle->type == SORT_SINGLESOURCE_SORT) { - SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); - taosArrayClear(pHandle->pOrderedSource); + SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + SSortSource* source = *pSource; + *pSource = NULL; + + tsortClearOrderdSource(pHandle->pOrderedSource); while (1) { SSDataBlock* pBlock = pHandle->fetchfp(source->param); @@ -623,6 +640,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) { int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); if (code != 0) { + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); + } + taosMemoryFree(source); return code; } @@ -632,6 +653,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) { int64_t p = taosGetTimestampUs(); code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); if (code != 0) { + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); + } + taosMemoryFree(source); return code; } @@ -642,6 +667,11 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } } + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); + } + taosMemoryFree(source); + if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { size_t size = blockDataGetSize(pHandle->pDataBlock); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index c81888eb95c0a76fd8a6c36172ba54348bd6566c..79ea10552ccaf1fb44fa9847695e6424539c37ed 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -107,7 +107,7 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } -static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2; } +static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } /** * +--------------------------+-------------------+--------------+