diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5a2aaed74e5e101d97b82388f199607fa646967e..6a4ddacf60be025a205c1fb1229d5f3f5859cca3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1226,6 +1226,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { SColumnInfoData colInfo = {0}; SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); colInfo.info = p->info; + colInfo.hasNull = true; taosArrayPush(pBlock->pDataBlock, &colInfo); } diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index fd3581e2bfa86d9761eae6448ad249f178c96f15..e731c55a7df6b5a21fb383f041516abadf9d7cdd 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -151,6 +151,13 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle); */ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle); +/** + * get proper sort buffer pages according to the row size + * @param rowSize + * @return + */ +int32_t getProperSortPageSize(size_t rowSize); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 1008a5263cac638f35e5f8f80f368f16d30e1164..98a1aacbb651d95c5e3d808576b8581e0631b50f 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -420,24 +420,29 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, goto _error; } - pInfo->binfo.pRes = pResBlock; initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = pSortInfo; + pInfo->binfo.pRes = pResBlock; + pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; - pInfo->pInputBlock = pInputBlock; - pOperator->name = "MultiwaySortMerge"; + pInfo->pInputBlock = pInputBlock; + pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - - pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; - pInfo->sortBufSize = pInfo->bufPageSize * 16; - pInfo->hasGroupId = false; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pInfo->hasGroupId = false; pInfo->prefetchedTuple = NULL; pOperator->pTaskInfo = pTaskInfo; + + pInfo->bufPageSize = getProperSortPageSize(rowSize); + + uint32_t numOfSources = taosArrayGetSize(pSortInfo); + numOfSources = MAX(2, numOfSources); + + pInfo->sortBufSize = numOfSources * pInfo->bufPageSize; + pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1473bd81bb23d1384fe60356573d6a69cb4e9f70..daaba74d7cb3f32b473f81e23904cc13b0fa20c4 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -532,6 +532,19 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return 0; } +int32_t getProperSortPageSize(size_t rowSize) { + uint32_t defaultPageSize = 4096; + + uint32_t pgSize = 0; + if (rowSize * 4 > defaultPageSize) { + pgSize = rowSize * 4; + } else { + pgSize = defaultPageSize; + } + + return pgSize; +} + static int32_t createInitialSources(SSortHandle* pHandle) { size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; @@ -557,14 +570,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (!hasGroupId) { // calculate the buffer pages according to the total available buffers. - int32_t rowSize = blockDataGetRowSize(pBlock); - if (rowSize * 4 > 4096) { - pHandle->pageSize = rowSize * 4; - } else { - pHandle->pageSize = 4096; - } - - // todo!! + pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock)); + + // todo, number of pages are set according to the total available sort buffer pHandle->numOfPages = 1024; sortBufSize = pHandle->numOfPages * pHandle->pageSize; @@ -577,7 +585,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (pHandle->beforeFp != NULL) { pHandle->beforeFp(pBlock, pHandle->param); } - // todo relocate the columns + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); if (code != 0) { return code;