From 39c06f13f658a8e0edc18765c8c5e893f7f57209 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 19 Feb 2022 15:20:20 +0800 Subject: [PATCH] [td-11818] refactor sort, and sort operator. --- include/common/tep.h | 5 +- include/util/tpagedbuf.h | 2 +- source/common/src/tep.c | 17 +- source/libs/executor/inc/executorimpl.h | 16 +- source/libs/executor/inc/tsort.h | 130 ++++ source/libs/executor/src/executorimpl.c | 437 ++++------- source/libs/executor/src/tsort.c | 694 ++++++++++++++++++ source/libs/executor/test/executorTests.cpp | 6 +- .../libs/executor/test/executorUtilTests.cpp | 269 +++++++ source/util/src/tpagedbuf.c | 4 +- 10 files changed, 1254 insertions(+), 326 deletions(-) create mode 100644 source/libs/executor/inc/tsort.h create mode 100644 source/libs/executor/src/tsort.c create mode 100644 source/libs/executor/test/executorUtilTests.cpp diff --git a/include/common/tep.h b/include/common/tep.h index 584b8a5a71..a4e28dbb7c 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -18,9 +18,6 @@ typedef struct SBlockOrderInfo { int32_t order; int32_t colIndex; SColumnInfoData *pColData; -// int32_t type; -// int32_t bytes; -// bool hasNull; } SBlockOrderInfo; int taosGetFqdnPortFromEp(const char *ep, SEp *pEp); @@ -93,6 +90,8 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock); double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); +SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); + size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index e989c31cd6..708c6cf741 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -157,7 +157,7 @@ void setBufPageDirty(SFilePage* pPageInfo, bool dirty); * Print the statistics when closing this buffer * @param pBuf */ -void printStatisBeforeClose(SDiskbasedBuf* pBuf); +void setPrintStatis(SDiskbasedBuf* pBuf); /** * return buf statistics. diff --git a/source/common/src/tep.c b/source/common/src/tep.c index 970b6d954f..b7f7043d26 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -411,7 +411,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 return pDst; } - /** * * +------------------+---------------+--------------------+ @@ -522,6 +521,22 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); } +SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) { + SSchema* pSchema = calloc(pBlock->info.numOfCols, sizeof(SSchema)); + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + pSchema[i].bytes = pColInfoData->info.bytes; + pSchema[i].type = pColInfoData->info.type; + pSchema[i].colId = pColInfoData->info.colId; + } + + if (numOfCols != NULL) { + *numOfCols = pBlock->info.numOfCols; + } + + return pSchema; +} + double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); double rowSize = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4265864f09..adfed41f99 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -15,6 +15,7 @@ #ifndef TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H +#include "tsort.h" #ifdef __cplusplus extern "C" { #endif @@ -557,10 +558,6 @@ typedef struct SSortMergeOperatorInfo { char** currentGroupColData; SArray* udfInfo; int32_t numOfSources; -// char** prevRow; -// int32_t resultRowFactor; -// bool multiGroupResults; -// bool hasGroupColData; } SSortMergeOperatorInfo; typedef struct SMsortComparParam { @@ -571,19 +568,16 @@ typedef struct SMsortComparParam { } SMsortComparParam; typedef struct SOrderOperatorInfo { - int32_t sourceId; uint32_t sortBufSize; // max buffer size for in-memory sort SSDataBlock *pDataBlock; bool hasVarCol; // has variable length column, such as binary/varchar/nchar - int32_t numOfCompleted; - SDiskbasedBuf *pSortInternalBuf; - SMultiwayMergeTreeInfo *pMergeTree; - SArray *pSources; // SArray + SArray *orderInfo; + bool nullFirst; + SSortHandle *pSortHandle; + int32_t bufPageSize; int32_t numOfRowsInRes; - SMsortComparParam cmpParam; - // TODO extact struct int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h new file mode 100644 index 0000000000..9114746ff9 --- /dev/null +++ b/source/libs/executor/inc/tsort.h @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TSORT_H +#define TDENGINE_TSORT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common.h" +#include "os.h" + +enum { + SORT_MULTIWAY_MERGE = 0x1, + SORT_SINGLESOURCE = 0x2, +}; + +typedef struct SMultiMergeSource { + int32_t type; + int32_t rowIndex; + SSDataBlock *pBlock; +} SMultiMergeSource; + +typedef struct SExternalMemSource { + SMultiMergeSource src; + SArray* pageIdList; + int32_t pageIndex; +} SExternalMemSource; + +typedef struct SOperatorSource { + SMultiMergeSource src; + void* param; +} SOperatorSource; + +typedef struct SSortHandle SSortHandle; +typedef struct STupleHandle STupleHandle; + +typedef SSDataBlock* (*_sort_fetch_block_fn_t)(void* param); +typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* param); + +/** + * + * @param type + * @return + */ +SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr); + +/** + * + * @param pSortHandle + */ +void destroySortHandle(SSortHandle* pSortHandle); + +/** + * + * @param pHandle + * @return + */ +int32_t sortOpen(SSortHandle* pHandle); + +/** + * + * @param pHandle + * @return + */ +int32_t sortClose(SSortHandle* pHandle); + +/** + * + * @return + */ +int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp); + +/** + * + * @param pHandle + * @param fp + * @return + */ +int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp); + +/** + * + * @param pHandle + * @param pSource + * @return success or failed + */ +int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource); + +/** + * + * @param pHandle + * @return + */ +STupleHandle* sortNextTuple(SSortHandle* pHandle); + +/** + * + * @param pHandle + * @param colIndex + * @return + */ +bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex); + +/** + * + * @param pHandle + * @param colIndex + * @return + */ +void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSORT_H diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4d8249d12f..730396c070 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include +#include #include "exception.h" #include "os.h" #include "parser.h" @@ -5598,7 +5599,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { return pOrderColumns; } -static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { +static void destroySortMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortMergeOperatorInfo *pInfo = (SSortMergeOperatorInfo*) param; destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->groupColumnList); @@ -5632,11 +5633,21 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortMergeOperatorInfo* pInfo = pOperator->info; + SMsortComparParam resultParam = {.orderInfo = pInfo->orderInfo}; + SArray* pSource = taosArrayInit(4, POINTER_BYTES); + for(int32_t i = 0; i < pInfo->numOfSources; ++i) { SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup); - // TODO set the order input sources. +// doAddNewOperatorSource(pSource, pBlock, pInfo->binfo.capacity); } +// sortComparInit(&resultParam, pSource, 0, pInfo->numOfSources - 1, pOperator); + +// int32_t code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn); +// if (code != TSDB_CODE_SUCCESS) { +// longjmp(pTaskInfo->env, code); +// } + return NULL; } @@ -5679,24 +5690,17 @@ SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->pTaskInfo = pTaskInfo; pOperator->exec = doSortedMerge; - pOperator->cleanupFn = destroyGlobalAggOperatorInfo; + pOperator->cleanupFn = destroySortMergeOperatorInfo; appendDownstream(pOperator, downstream); return pOperator; } -typedef struct SExternalMemSource { - SArray* pageIdList; - int32_t pageIndex; - int32_t rowIndex; - SSDataBlock *pBlock; -} SExternalMemSource; - int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pRightIdx = *(int32_t *)pRight; - SMsortComparParam *pParam = (SMsortComparParam *)param; + SMsortComparParam *pParam = (SMsortComparParam *)param; SArray *pInfo = pParam->orderInfo; @@ -5704,16 +5708,16 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { SExternalMemSource* pRightSource = pParam->pSources[pRightIdx]; // this input is exhausted, set the special value to denote this - if (pLeftSource->rowIndex == -1) { + if (pLeftSource->src.rowIndex == -1) { return 1; } - if (pRightSource->rowIndex == -1) { + if (pRightSource->src.rowIndex == -1) { return -1; } - SSDataBlock* pLeftBlock = pLeftSource->pBlock; - SSDataBlock* pRightBlock = pRightSource->pBlock; + SSDataBlock* pLeftBlock = pLeftSource->src.pBlock; + SSDataBlock* pRightBlock = pRightSource->src.pBlock; for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); @@ -5722,13 +5726,13 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { bool leftNull = false; if (pLeftColInfoData->hasNull) { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->rowIndex, pLeftBlock->pBlockAgg); + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); } SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); bool rightNull = false; if (pRightColInfoData->hasNull) { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->rowIndex, pRightBlock->pBlockAgg); + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); } if (leftNull && rightNull) { @@ -5743,8 +5747,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { return pParam->nullFirst? -1:1; } - void* left1 = colDataGet(pLeftColInfoData, pLeftSource->rowIndex); - void* right1 = colDataGet(pRightColInfoData, pRightSource->rowIndex); + void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex); + void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex); switch(pLeftColInfoData->info.type) { case TSDB_DATA_TYPE_INT: { @@ -5767,137 +5771,108 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { } } -static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SOrderOperatorInfo* pInfo) { - /* - * load a new SDataBlock into memory of a given intermediate data-set source, - * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree - */ - if (pSource->rowIndex >= pSource->pBlock->info.rows) { - pSource->rowIndex = 0; - pSource->pageIndex += 1; - - if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { - pInfo->numOfCompleted += 1; - pSource->rowIndex = -1; - pSource->pageIndex = -1; - pSource->pBlock = blockDataDestroy(pSource->pBlock); - } else { - SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); - - SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); - int32_t code = blockDataFromBuf(pSource->pBlock, pPage->data); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - releaseBufPage(pInfo->pSortInternalBuf, pPage); - } - } - - /* - * Adjust loser tree otherwise, according to new candidate data - * if the loser tree is rebuild completed, we do not need to adjust - */ - int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree); - -#ifdef _DEBUG_VIEW - printf("before adjust:\t"); - tMergeTreePrint(pTree); -#endif - - tMergeTreeAdjust(pTree, leafNodeIndex); - -#ifdef _DEBUG_VIEW - printf("\nafter adjust:\t"); - tMergeTreePrint(pTree); -#endif -} - -static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { +static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHandle) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); - bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); - + bool isNull = sortIsValueNull(pTupleHandle, i); if (isNull) { colDataAppend(pColInfo, pBlock->info.rows, NULL, true); } else { - char* pData = colDataGet(pSrcColInfo, *rowIndex); + char* pData = sortGetValue(pTupleHandle, i); colDataAppend(pColInfo, pBlock->info.rows, pData, false); } } pBlock->info.rows += 1; - *rowIndex += 1; } -static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, int32_t numOfCols) { - SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); - if (pSource == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } +static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) { + int32_t numOfCols = pDataBlock->info.numOfCols; - pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId); - pSource->pBlock = calloc(1, sizeof(SSDataBlock)); - pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - pSource->pBlock->info.numOfCols = numOfCols; + SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); + pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; - SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); colInfo.info = p->info; - taosArrayPush(pSource->pBlock->pDataBlock, &colInfo); + taosArrayPush(pBlock->pDataBlock, &colInfo); } - taosArrayPush(pAllSources, &pSource); - - pInfo->sourceId += 1; - - int32_t rowSize = blockDataGetSerialRowSize(pSource->pBlock); - int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize; - - return blockDataEnsureCapacity(pSource->pBlock, numOfRows); + return pBlock; } -void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) { - int32_t start = 0; - - while(start < pInfo->pDataBlock->info.rows) { - int32_t stop = 0; - blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf)); - SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1); - if (p == NULL) { - longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - int32_t pageId = -1; - SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); - if (pPage == NULL) { - assert(0); - longjmp(env, terrno); - } - - int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t); - assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); - - blockDataToBuf(pPage->data, p); - - setBufPageDirty(pPage, true); - releaseBufPage(pInfo->pSortInternalBuf, pPage); - - blockDataDestroy(p); - start = stop + 1; - } +//static int32_t doAddNewExternalMemSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, SSDataBlock* pBlock) { +// SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); +// if (pSource == NULL) { +// return TSDB_CODE_QRY_OUT_OF_MEMORY; +// } +// +// pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId); +// pSource->src.pBlock = pBlock; +// +// taosArrayPush(pAllSources, &pSource); +// +// pInfo->sourceId += 1; +// +// int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock); +// int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize; +// return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); +//} - int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; - blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); +//static int32_t doAddNewOperatorSource(SArray* pAllSources, SSDataBlock* pBlock, int32_t capacity) { +// SOperatorSource* pSource = calloc(1, sizeof(SOperatorSource)); +// if (pSource == NULL) { +// return TSDB_CODE_QRY_OUT_OF_MEMORY; +// } +// +// pSource->src.pBlock = pBlock; +// taosArrayPush(pAllSources, &pSource); +// +// return blockDataEnsureCapacity(pSource->src.pBlock, capacity); +//} - int32_t code = doAddNewSource(pInfo, pSources, numOfCols); - if (code != TSDB_CODE_SUCCESS) { - longjmp(env, code); - } -} +//void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) { +// int32_t start = 0; +// +// while(start < pInfo->pDataBlock->info.rows) { +// int32_t stop = 0; +// blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf)); +// SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1); +// if (p == NULL) { +// longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY); +// } +// +// int32_t pageId = -1; +// SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); +// if (pPage == NULL) { +// assert(0); +// longjmp(env, terrno); +// } +// +// int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t); +// assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); +// +// blockDataToBuf(pPage->data, p); +// +// setBufPageDirty(pPage, true); +// releaseBufPage(pInfo->pSortInternalBuf, pPage); +// +// blockDataDestroy(p); +// start = stop + 1; +// } +// +// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; +// blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); +// +// SSDataBlock* pBlock = createDataBlock(pInfo->pDataBlock); +// int32_t code = doAddNewExternalMemSource(pInfo, pSources, pBlock); +// if (code != TSDB_CODE_SUCCESS) { +// longjmp(env, code); +// } +//} static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SDiskbasedBuf* pBuf) { cmpParam->pSources = taosArrayGet(pSources, startIndex); @@ -5908,7 +5883,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int SPageInfo* pPgInfo = *(SPageInfo**) taosArrayGet(pSource->pageIdList, pSource->pageIndex); SFilePage* pPage = getBufPage(pBuf, getPageId(pPgInfo)); - int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data); + int32_t code = blockDataFromBuf(cmpParam->pSources[i]->src.pBlock, pPage->data); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5922,31 +5897,23 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int static int32_t sortComparClearup(SMsortComparParam* cmpParam) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { SExternalMemSource* pSource = cmpParam->pSources[i]; - blockDataDestroy(pSource->pBlock); + blockDataDestroy(pSource->src.pBlock); tfree(pSource); } cmpParam->numOfSources = 0; } -static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam, int32_t capacity) { +static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SOrderOperatorInfo* pInfo, int32_t capacity) { blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); while(1) { - if (cmpParam->numOfSources == pInfo->numOfCompleted) { - break; - } - - int32_t index = tMergeTreeGetChosenIndex(pInfo->pMergeTree); - - SExternalMemSource *pSource = (*cmpParam).pSources[index]; - appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex); - - int32_t code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } + STupleHandle* pTupleHandle = sortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + appendOneRowToDataBlock(pInfo->pDataBlock, pTupleHandle); if (pInfo->pDataBlock->info.rows >= capacity) { return pInfo->pDataBlock; } @@ -5955,106 +5922,10 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -static int32_t doInternalSort(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo) { - size_t numOfSources = taosArrayGetSize(pInfo->pSources); - - // Calculate the I/O counts to complete the data sort. - double sortCount = floorl(log2(numOfSources) / log2(getNumOfInMemBufPages(pInfo->pSortInternalBuf))); - - pInfo->totalElapsed = taosGetTimestampUs() - pInfo->startTs; - qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, - GET_TASKID(pTaskInfo), (int32_t) (sortCount + 1), getTotalBufSize(pInfo->pSortInternalBuf), pInfo->sortElapsed, - pInfo->totalElapsed); - - size_t pgSize = getBufPageSize(pInfo->pSortInternalBuf); - int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pInfo->pDataBlock))/ blockDataGetSerialRowSize(pInfo->pDataBlock); - - blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows); - - size_t numOfSorted = taosArrayGetSize(pInfo->pSources); - for(int32_t t = 0; t < sortCount; ++t) { - int64_t st = taosGetTimestampUs(); - - SArray* pResList = taosArrayInit(4, POINTER_BYTES); - SMsortComparParam resultParam = {.orderInfo = pInfo->cmpParam.orderInfo}; - - int32_t numOfInputSources = getNumOfInMemBufPages(pInfo->pSortInternalBuf); - int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources; - - // Only *numOfInputSources* can be loaded into buffer to perform the external sort. - for(int32_t i = 0; i < sortGroup; ++i) { - pInfo->sourceId += 1; - - int32_t end = (i + 1) * numOfInputSources - 1; - if (end > numOfSorted - 1) { - end = numOfSorted - 1; - } - - pInfo->cmpParam.numOfSources = end - i * numOfInputSources + 1; - - int32_t code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, i * numOfInputSources, end, pInfo->pSortInternalBuf); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } - - code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } - - while (1) { - SSDataBlock* pDataBlock = getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, numOfRows); - if (pDataBlock == NULL) { - break; - } - - int32_t pageId = -1; - SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); - if (pPage == NULL) { - assert(0); - longjmp(pTaskInfo->env, terrno); - } - - int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t); - assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); - - blockDataToBuf(pPage->data, pDataBlock); - - setBufPageDirty(pPage, true); - releaseBufPage(pInfo->pSortInternalBuf, pPage); - - blockDataClearup(pDataBlock, pInfo->hasVarCol); - } - - tMergeTreeDestroy(pInfo->pMergeTree); - pInfo->numOfCompleted = 0; - - code = doAddNewSource(pInfo, pResList, pInfo->pDataBlock->info.numOfCols); - if (code != 0) { - longjmp(pTaskInfo->env, code); - } - } - - sortComparClearup(&pInfo->cmpParam); - - taosArrayClear(pInfo->pSources); - taosArrayAddAll(pInfo->pSources, pResList); - taosArrayDestroy(pResList); - - pInfo->cmpParam = resultParam; - numOfSorted = taosArrayGetSize(pInfo->pSources); - - int64_t el = taosGetTimestampUs() - st; - pInfo->totalElapsed += el; - - SDiskbasedBufStatis statis = getDBufStatis(pInfo->pSortInternalBuf); - - qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", GET_TASKID(pTaskInfo), t + 1, el, statis.loadBytes/1024.0, - statis.flushBytes/1024.0); - } - - pInfo->cmpParam.numOfSources = taosArrayGetSize(pInfo->pSources); - return 0; +static SSDataBlock* loadNextDataBlock(void* param) { + bool newgroup = false; + SOperatorInfo* pOperator = (SOperatorInfo*) param; + return pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], &newgroup); } static SSDataBlock* doSort(void* param, bool* newgroup) { @@ -6068,74 +5939,30 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes); } int64_t st = taosGetTimestampUs(); - while(1) { - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - - // start to flush data into disk and try do multiway merge sort - if (pBlock == NULL) { - break; - } - - int32_t code = blockDataMerge(pInfo->pDataBlock, pBlock); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pOperator->pTaskInfo->env, code); - } - - size_t size = blockDataGetSize(pInfo->pDataBlock); - if (size > pInfo->sortBufSize) { - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst); - - int64_t el = taosGetTimestampUs() - p; - pInfo->sortElapsed += el; - - addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env); - } - } - - if (pInfo->pDataBlock->info.rows > 0) { - // Perform the in-memory sort and then flush data in the buffer into disk. - blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst); - - // All sorted data are resident in memory, external memory sort is not needed. - // Return to the upstream operator directly - if (isAllDataInMemBuf(pInfo->pSortInternalBuf)) { - pOperator->status = OP_EXEC_DONE; - return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock; - } + SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize, + numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); - addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env); - } - - doInternalSort(pTaskInfo, pInfo); - - int32_t code = blockDataEnsureCapacity(pInfo->pDataBlock, pInfo->numOfRowsInRes); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } + tfree(p); + setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + setComparFn(pInfo->pSortHandle, msortComparFn); - int32_t numOfSources = taosArrayGetSize(pInfo->pSources); - ASSERT(numOfSources <= getNumOfInMemBufPages(pInfo->pSortInternalBuf)); - code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, 0, numOfSources - 1, pInfo->pSortInternalBuf); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } + sortAddSource(pInfo->pSortHandle, pOperator); - code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn); + // TODO set error code; + int32_t code = sortOpen(pInfo->pSortHandle); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + longjmp(pTaskInfo->env, terrno); } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo, pInfo->numOfRowsInRes); } static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { @@ -6176,8 +6003,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI pInfo->numOfRowsInRes = 1024; pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); - pInfo->pSources = taosArrayInit(4, POINTER_BYTES); - pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal); + pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); @@ -6187,8 +6013,8 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI } } - int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/"); - if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) { +// int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/"); + if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) { tfree(pOperator); destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo)); tfree(pInfo); @@ -7236,10 +7062,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); - taosArrayDestroy(pInfo->cmpParam.orderInfo); - destroyResultBuf(pInfo->pSortInternalBuf); - - tMergeTreeDestroy(pInfo->pMergeTree); + taosArrayDestroy(pInfo->orderInfo); } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c new file mode 100644 index 0000000000..4911146b54 --- /dev/null +++ b/source/libs/executor/src/tsort.c @@ -0,0 +1,694 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "common.h" +#include "query.h" + +#include "tsort.h" +#include "tep.h" +#include "tcfg.h" +#include "tlosertree.h" +#include "tpagedbuf.h" +#include "tutil.h" + +typedef struct SMsortComparParam { + void **pSources; + int32_t numOfSources; + SArray *orderInfo; // SArray + bool nullFirst; +} SMsortComparParam; + +typedef struct STupleHandle { + SSDataBlock* pBlock; + int32_t rowIndex; +} STupleHandle; + +typedef struct SSortHandle { + int32_t type; + + int32_t pageSize; + int32_t numOfPages; + SDiskbasedBuf *pBuf; + + SArray *pOrderInfo; + bool nullFirst; + bool hasVarCol; + + SArray *pSources; + SArray *pOrderedSource; + + _sort_fetch_block_fn_t fetchfp; + _sort_merge_compar_fn_t comparFn; + + void *pParam; + SMultiwayMergeTreeInfo *pMergeTree; + int32_t numOfCols; + + int64_t startTs; + uint64_t sortElapsed; + uint64_t totalElapsed; + + int32_t sourceId; + SSDataBlock *pDataBlock; + SMsortComparParam cmpParam; + int32_t numOfCompletedSources; + bool opened; + const char *idStr; + + bool inMemSort; + bool needAdjust; + STupleHandle tupleHandle; +} SSortHandle; + +SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { + SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); + pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = numOfCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + + colInfo.info.type = pSchema[i].type; + colInfo.info.bytes = pSchema[i].bytes; + colInfo.info.colId = pSchema[i].colId; + taosArrayPush(pBlock->pDataBlock, &colInfo); + } + + return pBlock; +} + +/** + * + * @param type + * @return + */ +SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) { + SSortHandle* pSortHandle = calloc(1, sizeof(SSortHandle)); + + pSortHandle->type = type; + pSortHandle->pageSize = pageSize; + pSortHandle->numOfPages = numOfPages; + pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); + pSortHandle->pSources = taosArrayInit(4, POINTER_BYTES); + pSortHandle->pOrderInfo = pOrderInfo; + pSortHandle->nullFirst = nullFirst; + pSortHandle->cmpParam.orderInfo = pOrderInfo; + + pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols); + + if (idstr != NULL) { + pSortHandle->idStr = strdup(idstr); + } + + return pSortHandle; +} + +void destroySortHandle(SSortHandle* pSortHandle) { + sortClose(pSortHandle); + + taosArrayDestroy(pSortHandle->pSources); + if (pSortHandle->pMergeTree != NULL) { + tMergeTreeDestroy(pSortHandle->pMergeTree); + } + + tfree(pSortHandle->idStr); + tfree(pSortHandle); +} + +int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) { + if (pSortHandle->type == SORT_SINGLESOURCE) { + pSortHandle->pParam = pSource; + } else { + taosArrayPush(pSortHandle->pOrderedSource, &pSource); + } +} + +static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) { + int32_t numOfCols = pDataBlock->info.numOfCols; + + SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); + pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = numOfCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + colInfo.info = p->info; + taosArrayPush(pBlock->pDataBlock, &colInfo); + } + + return pBlock; +} + +static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { + SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); + if (pSource == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + pSource->pageIdList = getDataBufPagesIdList(pBuf, (*sourceId)); + pSource->src.pBlock = pBlock; + + taosArrayPush(pAllSources, &pSource); + + (*sourceId) += 1; + + int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock); + int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock))/rowSize; + + return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); +} + +static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { + int32_t start = 0; + + if (pHandle->pBuf == NULL) { + int32_t code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + while(start < pDataBlock->info.rows) { + int32_t stop = 0; + blockDataSplitRows(pDataBlock, pHandle->hasVarCol, start, &stop, pHandle->pageSize); + SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); + if (p == NULL) { + return terrno; + } + + int32_t pageId = -1; + SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId); + if (pPage == NULL) { + return terrno; + } + + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t); + assert(size <= getBufPageSize(pHandle->pBuf)); + + blockDataToBuf(pPage->data, p); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + + blockDataDestroy(p); + start = stop + 1; + } + + blockDataClearup(pDataBlock, pHandle->hasVarCol); + + SSDataBlock* pBlock = createDataBlock(pDataBlock); + int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } +} + +static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) { + cmpParam->pSources = taosArrayGet(pSources, startIndex); + cmpParam->numOfSources = (endIndex - startIndex + 1); + + int32_t code = 0; + + if (pHandle->type == SORT_SINGLESOURCE) { + for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { + SExternalMemSource* pSource = cmpParam->pSources[i]; + SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); + + SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo)); + code = blockDataFromBuf(pSource->src.pBlock, pPage->data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + releaseBufPage(pHandle->pBuf, pPage); + } + } else { + // multi-pass internal merge sort is required + if (pHandle->pBuf == NULL) { + code = createDiskbasedBuffer(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + + for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { + SOperatorSource* pSource = cmpParam->pSources[i]; + pSource->src.pBlock = pHandle->fetchfp(pSource->param); + } + } + + return code; +} + +static int32_t sortComparClearup(SMsortComparParam* cmpParam) { + for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { + SExternalMemSource* pSource = cmpParam->pSources[i]; + blockDataDestroy(pSource->src.pBlock); + tfree(pSource); + } + + cmpParam->numOfSources = 0; +} + +static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); + bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); + + if (isNull) { + colDataAppend(pColInfo, pBlock->info.rows, NULL, true); + } else { + char* pData = colDataGet(pSrcColInfo, *rowIndex); + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } + } + + pBlock->info.rows += 1; + *rowIndex += 1; +} + +static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) { + /* + * load a new SDataBlock into memory of a given intermediate data-set source, + * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree + */ + if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) { + pSource->src.rowIndex = 0; + pSource->pageIndex += 1; + + if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { + (*numOfCompleted) += 1; + pSource->src.rowIndex = -1; + pSource->pageIndex = -1; + pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); + } else { + if (pHandle->type == SORT_SINGLESOURCE) { + SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); + + SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo)); + int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage->data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + releaseBufPage(pHandle->pBuf, pPage); + } else { + pSource->src.pBlock = pHandle->fetchfp(((SOperatorSource*)pSource)->param); + if (pSource->src.pBlock == NULL) { + (*numOfCompleted) += 1; + pSource->src.rowIndex = -1; + } + } + } + } + + /* + * Adjust loser tree otherwise, according to new candidate data + * if the loser tree is rebuild completed, we do not need to adjust + */ + int32_t leafNodeIndex = tMergeTreeGetAdjustIndex(pTree); + +#ifdef _DEBUG_VIEW + printf("before adjust:\t"); + tMergeTreePrint(pTree); +#endif + + tMergeTreeAdjust(pTree, leafNodeIndex); + +#ifdef _DEBUG_VIEW + printf("\nafter adjust:\t"); + tMergeTreePrint(pTree); +#endif +} + +static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { + blockDataClearup(pHandle->pDataBlock, pHandle->hasVarCol); + + while(1) { + if (cmpParam->numOfSources == pHandle->numOfCompletedSources) { + break; + } + + int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); + + SExternalMemSource *pSource = (*cmpParam).pSources[index]; + appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex); + + int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + if (pHandle->pDataBlock->info.rows >= capacity) { + return pHandle->pDataBlock; + } + } + + return (pHandle->pDataBlock->info.rows > 0)? pHandle->pDataBlock:NULL; +} + +static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { + int32_t pLeftIdx = *(int32_t *)pLeft; + int32_t pRightIdx = *(int32_t *)pRight; + + SMsortComparParam *pParam = (SMsortComparParam *)param; + + SArray *pInfo = pParam->orderInfo; + + SExternalMemSource* pLeftSource = pParam->pSources[pLeftIdx]; + SExternalMemSource* pRightSource = pParam->pSources[pRightIdx]; + + // this input is exhausted, set the special value to denote this + if (pLeftSource->src.rowIndex == -1) { + return 1; + } + + if (pRightSource->src.rowIndex == -1) { + return -1; + } + + SSDataBlock* pLeftBlock = pLeftSource->src.pBlock; + SSDataBlock* pRightBlock = pRightSource->src.pBlock; + + for(int32_t i = 0; i < pInfo->size; ++i) { + SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); + + SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); + + bool leftNull = false; + if (pLeftColInfoData->hasNull) { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); + } + + SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); + bool rightNull = false; + if (pRightColInfoData->hasNull) { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); + } + + if (leftNull && rightNull) { + continue; // continue to next slot + } + + if (rightNull) { + return pParam->nullFirst? 1:-1; + } + + if (leftNull) { + return pParam->nullFirst? -1:1; + } + + void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex); + void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex); + + switch(pLeftColInfoData->info.type) { + case TSDB_DATA_TYPE_INT: { + int32_t leftv = *(int32_t*)left1; + int32_t rightv = *(int32_t*)right1; + + if (leftv == rightv) { + break; + } else { + if (pOrder->order == TSDB_ORDER_ASC) { + return leftv < rightv? -1 : 1; + } else { + return leftv < rightv? 1 : -1; + } + } + } + default: + assert(0); + } + } +} + +static int32_t doInternalMergeSort(SSortHandle* pHandle) { + size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); + + // Calculate the I/O counts to complete the data sort. + double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages)); + + pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs; + qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, + pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); + + size_t pgSize = pHandle->pageSize; + int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock); + + blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); + + size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); + for(int32_t t = 0; t < sortPass; ++t) { + int64_t st = taosGetTimestampUs(); + + SArray* pResList = taosArrayInit(4, POINTER_BYTES); + + int32_t numOfInputSources = pHandle->numOfPages; + int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources; + + // Only *numOfInputSources* can be loaded into buffer to perform the external sort. + for(int32_t i = 0; i < sortGroup; ++i) { + pHandle->sourceId += 1; + + int32_t end = (i + 1) * numOfInputSources - 1; + if (end > numOfSorted - 1) { + end = numOfSorted - 1; + } + + pHandle->cmpParam.numOfSources = end - i * numOfInputSources + 1; + + int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + while (1) { + SSDataBlock* pDataBlock = getSortedBlockData(pHandle, &pHandle->cmpParam, numOfRows); + if (pDataBlock == NULL) { + break; + } + + int32_t pageId = -1; + SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId); + if (pPage == NULL) { + return terrno; + } + + int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t); + assert(size <= getBufPageSize(pHandle->pBuf)); + + blockDataToBuf(pPage->data, pDataBlock); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + + blockDataClearup(pDataBlock, pHandle->hasVarCol); + } + + tMergeTreeDestroy(pHandle->pMergeTree); + pHandle->numOfCompletedSources = 0; + + SSDataBlock* pBlock = createDataBlock(pHandle->pDataBlock); + code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); + if (code != 0) { + return code; + } + } + + sortComparClearup(&pHandle->cmpParam); + + taosArrayClear(pHandle->pOrderedSource); + taosArrayAddAll(pHandle->pOrderedSource, pResList); + taosArrayDestroy(pResList); + + numOfSorted = taosArrayGetSize(pHandle->pOrderedSource); + + int64_t el = taosGetTimestampUs() - st; + pHandle->totalElapsed += el; + + SDiskbasedBufStatis statis = getDBufStatis(pHandle->pBuf); + qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el, statis.loadBytes/1024.0, + statis.flushBytes/1024.0); + + if (pHandle->type == SORT_MULTIWAY_MERGE) { + pHandle->type = SORT_SINGLESOURCE; + pHandle->comparFn = msortComparFn; + } + } + + pHandle->cmpParam.numOfSources = taosArrayGetSize(pHandle->pOrderedSource); + return 0; +} + +int32_t sortOpen(SSortHandle* pHandle) { + if (pHandle->opened) { + return 0; + } + + if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { + return -1; + } + + pHandle->opened = true; + + size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; + if (pHandle->type == SORT_SINGLESOURCE) { + if (pHandle->pParam == NULL) { + qError("%s sort source not set yet", pHandle->idStr); + return -1; + } + + while (1) { + SSDataBlock* pBlock = pHandle->fetchfp(pHandle->pParam); + if (pBlock == NULL) { + break; + } + + if (pHandle->pDataBlock == NULL) { + pHandle->pDataBlock = createDataBlock(pBlock); + } + + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); + if (code != 0) { + return code; + } + + size_t size = blockDataGetSize(pHandle->pDataBlock); + if (size > sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); + + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; + + doAddToBuf(pHandle->pDataBlock, pHandle); + } + } + + if (pHandle->pDataBlock->info.rows > 0) { + size_t size = blockDataGetSize(pHandle->pDataBlock); + + // Perform the in-memory sort and then flush data in the buffer into disk. + blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); + + // All sorted data can fit in memory, external memory sort is not needed. Return to directly + if (size <= sortBufSize) { + pHandle->cmpParam.numOfSources = 1; + pHandle->inMemSort = true; + + pHandle->tupleHandle.rowIndex = -1; + pHandle->tupleHandle.pBlock = pHandle->pDataBlock; + return 0; + } else { + doAddToBuf(pHandle->pDataBlock, pHandle); + } + } + } else { + // do nothing + } + + // do internal sort + int32_t code = doInternalMergeSort(pHandle); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); + ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); + code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); + if (code != TSDB_CODE_SUCCESS) { + return code; + } +} + +int32_t sortClose(SSortHandle* pHandle) { + // do nothing +} + +int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) { + pHandle->fetchfp = fp; +} + +int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) { + pHandle->comparFn = fp; +} + +STupleHandle* sortNextTuple(SSortHandle* pHandle) { + if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { + return NULL; + } + + if (pHandle->inMemSort) { + pHandle->tupleHandle.rowIndex += 1; + if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) { + pHandle->numOfCompletedSources = 1; + return NULL; + } + + return &pHandle->tupleHandle; + } + + int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); + SExternalMemSource *pSource = pHandle->cmpParam.pSources[index]; + + if (pHandle->needAdjust) { + int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + } + + if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { + return NULL; + } + + index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); + pSource = pHandle->cmpParam.pSources[index]; + + assert(pSource->src.pBlock != NULL); + + pHandle->tupleHandle.rowIndex = pSource->src.rowIndex; + pHandle->tupleHandle.pBlock = pSource->src.pBlock; + + pHandle->needAdjust = true; + pSource->src.rowIndex += 1; + + return &pHandle->tupleHandle; +} + +bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex) { + return false; +} + +void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex) { + SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex); + return colDataGet(pColInfo, pVHandle->rowIndex); +} diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index ebea6755d7..22df894cdf 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -121,6 +121,7 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } +#if 0 TEST(testCase, build_executor_tree_Test) { const char* msg = "{\n" "\t\"Id\":\t{\n" @@ -330,7 +331,7 @@ TEST(testCase, external_sort_Test) { } } - printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf); +// setPrintStatis(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf); int64_t s2 = taosGetTimestampUs(); printf("total:%ld\n", s2 - s1); @@ -341,4 +342,7 @@ TEST(testCase, external_sort_Test) { taosArrayDestroy(pExprInfo); taosArrayDestroy(pOrderVal); } + +#endif + #pragma GCC diagnostic pop diff --git a/source/libs/executor/test/executorUtilTests.cpp b/source/libs/executor/test/executorUtilTests.cpp new file mode 100644 index 0000000000..a8ddafd7cd --- /dev/null +++ b/source/libs/executor/test/executorUtilTests.cpp @@ -0,0 +1,269 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "executor.h" +#include "stub.h" +#include "taos.h" +#include "tdef.h" +#include "tep.h" +#include "trpc.h" +#include "tvariant.h" + +namespace { +typedef struct { + int32_t startVal; + int32_t count; + int32_t pageRows; +} _info; + +SSDataBlock* getSingleColDummyBlock(void* param) { + _info* pInfo = (_info*) param; + if (--pInfo->count < 0) { + return NULL; + } + + SSDataBlock* pBlock = static_cast(calloc(1, sizeof(SSDataBlock))); + pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_INT; + colInfo.info.bytes = sizeof(int32_t); + colInfo.info.colId = 1; + colInfo.pData = static_cast(calloc(pInfo->pageRows, sizeof(int32_t))); + colInfo.nullbitmap = static_cast(calloc(1, (pInfo->pageRows + 7) / 8)); + + taosArrayPush(pBlock->pDataBlock, &colInfo); + + for (int32_t i = 0; i < pInfo->pageRows; ++i) { + SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); + + int32_t v = ++pInfo->startVal; + colDataAppend(pColInfo, i, reinterpret_cast(&v), false); + } + + pBlock->info.rows = pInfo->pageRows; + pBlock->info.numOfCols = 1; + return pBlock; +} + +int32_t docomp(const void* p1, const void* p2, void* param) { + int32_t pLeftIdx = *(int32_t *)p1; + int32_t pRightIdx = *(int32_t *)p2; + + SMsortComparParam *pParam = (SMsortComparParam *)param; + SOperatorSource** px = reinterpret_cast(pParam->pSources); + + SArray *pInfo = pParam->orderInfo; + + SOperatorSource* pLeftSource = px[pLeftIdx]; + SOperatorSource* pRightSource = px[pRightIdx]; + + // this input is exhausted, set the special value to denote this + if (pLeftSource->src.rowIndex == -1) { + return 1; + } + + if (pRightSource->src.rowIndex == -1) { + return -1; + } + + SSDataBlock* pLeftBlock = pLeftSource->src.pBlock; + SSDataBlock* pRightBlock = pRightSource->src.pBlock; + + for(int32_t i = 0; i < pInfo->size; ++i) { + SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); + + SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); + + bool leftNull = false; + if (pLeftColInfoData->hasNull) { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); + } + + SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); + bool rightNull = false; + if (pRightColInfoData->hasNull) { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); + } + + if (leftNull && rightNull) { + continue; // continue to next slot + } + + if (rightNull) { + return pParam->nullFirst? 1:-1; + } + + if (leftNull) { + return pParam->nullFirst? -1:1; + } + + void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex); + void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex); + + switch(pLeftColInfoData->info.type) { + case TSDB_DATA_TYPE_INT: { + int32_t leftv = *(int32_t*)left1; + int32_t rightv = *(int32_t*)right1; + + if (leftv == rightv) { + break; + } else { + if (pOrder->order == TSDB_ORDER_ASC) { + return leftv < rightv? -1 : 1; + } else { + return leftv < rightv? 1 : -1; + } + } + } + default: + assert(0); + } + } +} +} // namespace + +//TEST(testCase, inMem_sort_Test) { +// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); +// SOrder o = {.order = TSDB_ORDER_ASC}; +// o.col.info.colId = 1; +// o.col.info.type = TSDB_DATA_TYPE_INT; +// taosArrayPush(pOrderVal, &o); +// +// int32_t numOfRows = 1000; +// SBlockOrderInfo oi = {0}; +// oi.order = TSDB_ORDER_ASC; +// oi.colIndex = 0; +// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); +// taosArrayPush(orderInfo, &oi); +// +// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc"); +// setFetchRawDataFp(phandle, getSingleColDummyBlock); +// sortAddSource(phandle, &numOfRows); +// +// int32_t code = sortOpen(phandle); +// int32_t row = 1; +// +// while(1) { +// STupleHandle* pTupleHandle = sortNextTuple(phandle); +// if (pTupleHandle == NULL) { +// break; +// } +// +// void* v = sortGetValue(pTupleHandle, 0); +// printf("%d: %d\n", row++, *(int32_t*) v); +// +// } +// destroySortHandle(phandle); +//} +// +//TEST(testCase, external_mem_sort_Test) { +// totalcount = 50; +// startVal = 100000; +// +// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); +// SOrder o = {.order = TSDB_ORDER_ASC}; +// o.col.info.colId = 1; +// o.col.info.type = TSDB_DATA_TYPE_INT; +// taosArrayPush(pOrderVal, &o); +// +// int32_t numOfRows = 1000; +// SBlockOrderInfo oi = {0}; +// oi.order = TSDB_ORDER_ASC; +// oi.colIndex = 0; +// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); +// taosArrayPush(orderInfo, &oi); +// +// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc"); +// setFetchRawDataFp(phandle, getSingleColDummyBlock); +// sortAddSource(phandle, &numOfRows); +// +// int32_t code = sortOpen(phandle); +// int32_t row = 1; +// +// while(1) { +// STupleHandle* pTupleHandle = sortNextTuple(phandle); +// if (pTupleHandle == NULL) { +// break; +// } +// +// void* v = sortGetValue(pTupleHandle, 0); +// printf("%d: %d\n", row++, *(int32_t*) v); +// +// } +// destroySortHandle(phandle); +//} + +TEST(testCase, ordered_merge_sort_Test) { + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {.order = TSDB_ORDER_ASC}; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + int32_t numOfRows = 1000; + SBlockOrderInfo oi = {0}; + oi.order = TSDB_ORDER_ASC; + oi.colIndex = 0; + SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + taosArrayPush(orderInfo, &oi); + + SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; + SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc"); + setFetchRawDataFp(phandle, getSingleColDummyBlock); + setComparFn(phandle, docomp); + + + for(int32_t i = 0; i < 10; ++i) { + SOperatorSource* p = static_cast(calloc(1, sizeof(SOperatorSource))); + _info* c = static_cast<_info*>(calloc(1, sizeof(_info))); + c->count = 1; + c->pageRows = 1000; + c->startVal = 0; + + p->param = c; + sortAddSource(phandle, p); + } + + int32_t code = sortOpen(phandle); + int32_t row = 1; + + while(1) { + STupleHandle* pTupleHandle = sortNextTuple(phandle); + if (pTupleHandle == NULL) { + break; + } + + void* v = sortGetValue(pTupleHandle, 0); + printf("%d: %d\n", row++, *(int32_t*) v); + + } + destroySortHandle(phandle); +} + +#pragma GCC diagnostic pop diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 0e8d85492c..d5415c219f 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -53,7 +53,7 @@ typedef struct SDiskbasedBuf { static void printStatisData(const SDiskbasedBuf* pBuf); - int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { +int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { *pBuf = calloc(1, sizeof(SDiskbasedBuf)); SDiskbasedBuf* pResBuf = *pBuf; @@ -569,7 +569,7 @@ void setBufPageDirty(SFilePage* pPage, bool dirty) { ppi->dirty = dirty; } -void printStatisBeforeClose(SDiskbasedBuf* pBuf) { +void setPrintStatis(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; } -- GitLab