From 0b4d01227b16fd2d0dc04bca3cc6ac1903474b77 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Feb 2022 15:40:13 +0800 Subject: [PATCH] [td-11818] Refactor and add error check. --- include/common/tep.h | 4 + include/util/tlosertree.h | 27 +- include/util/tpagedfile.h | 6 +- source/common/src/tep.c | 90 +++- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 +- source/libs/executor/inc/executorimpl.h | 23 +- source/libs/executor/src/executil.c | 8 +- source/libs/executor/src/executorimpl.c | 540 +++++------------------- source/libs/function/inc/thistogram.h | 2 +- source/libs/function/src/thistogram.c | 24 +- source/libs/function/src/tpercentile.c | 2 +- source/util/src/tlosertree.c | 91 ++-- source/util/src/tpagedfile.c | 6 +- 13 files changed, 278 insertions(+), 553 deletions(-) diff --git a/include/common/tep.h b/include/common/tep.h index b894d972d4..7d81b829bc 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -52,8 +52,12 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock); + int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); +int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); +void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol); + #ifdef __cplusplus } #endif diff --git a/include/util/tlosertree.h b/include/util/tlosertree.h index 2977287be6..7be4270dcb 100644 --- a/include/util/tlosertree.h +++ b/include/util/tlosertree.h @@ -22,28 +22,25 @@ extern "C" { typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param); -typedef struct SLoserTreeNode { - int32_t index; - void *pData; // TODO remove it? -} SLoserTreeNode; - -typedef struct SLoserTreeInfo { - int32_t numOfEntries; - int32_t totalEntries; +typedef struct SMultiwayMergeTreeInfo { + int32_t numOfSources; + int32_t totalSources; __merge_compare_fn_t comparFn; void * param; - SLoserTreeNode *pNode; -} SLoserTreeInfo; + struct STreeNode *pNode; +} SMultiwayMergeTreeInfo; -int32_t tLoserTreeCreate(SLoserTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); +int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); -void tLoserTreeInit(SLoserTreeInfo *pTree); +void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx); -void tLoserTreeAdjust(SLoserTreeInfo *pTree, int32_t idx); +void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree); -void tLoserTreeRebuild(SLoserTreeInfo *pTree); +void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree); -void tLoserTreeDisplay(SLoserTreeInfo *pTree); +int32_t tMergeTreeGetChosenIndex(const SMultiwayMergeTreeInfo* pTree); + +int32_t tMergeTreeAdjustIndex(const SMultiwayMergeTreeInfo* pTree); #ifdef __cplusplus } diff --git a/include/util/tpagedfile.h b/include/util/tpagedfile.h index c5c6c5cb59..fcf1f3c26f 100644 --- a/include/util/tpagedfile.h +++ b/include/util/tpagedfile.h @@ -47,7 +47,7 @@ typedef struct SFilePage { * @param handle * @return */ -int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); +int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); /** * @@ -113,14 +113,14 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf); * @param pList * @return */ -struct SPageInfo* getLastPageInfo(SIDList pList); +SPageInfo* getLastPageInfo(SIDList pList); /** * * @param pPgInfo * @return */ -int32_t getPageId(const struct SPageInfo* pPgInfo); +int32_t getPageId(const SPageInfo* pPgInfo); /** * Return the buffer page size. diff --git a/source/common/src/tep.c b/source/common/src/tep.c index 554dc4e552..a5a9e8dac7 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -359,17 +359,16 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) { ASSERT(pBlock != NULL && stopIndex != NULL); - int32_t size = 0; int32_t numOfCols = pBlock->info.numOfCols; int32_t numOfRows = pBlock->info.rows; size_t headerSize = sizeof(int32_t); - + size_t colHeaderSize = sizeof(int32_t) * numOfCols; // TODO speedup by checking if the whole page can fit in firstly. if (!hasVarCol) { size_t rowSize = blockDataGetRowSize(pBlock); - int32_t capacity = ((pageSize - headerSize) / (rowSize * 8 + 1)) * 8; + int32_t capacity = ((pageSize - headerSize - colHeaderSize) / (rowSize * 8 + 1)) * 8; *stopIndex = startIndex + capacity; if (*stopIndex >= numOfRows) { @@ -379,7 +378,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd return TSDB_CODE_SUCCESS; } else { // iterate the rows that can be fit in this buffer page - size += headerSize; + int32_t size = (headerSize + colHeaderSize); for(int32_t j = startIndex; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { @@ -423,6 +422,10 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 } SSDataBlock* pDst = calloc(1, sizeof(SSDataBlock)); + if (pDst == NULL) { + return NULL; + } + pDst->info = pBlock->info; pDst->info.rows = 0; @@ -472,7 +475,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 * @param pBlock * @return */ -int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { // TODO add the column length!! +int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); // write the number of rows @@ -516,21 +519,9 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); if (IS_VAR_DATA_TYPE(pCol->info.type)) { - char* p = realloc(pCol->varmeta.offset, metaSize); - if (p == NULL) { - // TODO handle error - } - - pCol->varmeta.offset = (int32_t*)p; memcpy(pCol->varmeta.offset, pStart, metaSize); pStart += metaSize; } else { - char* p = realloc(pCol->nullbitmap, BitmapLen(pBlock->info.rows)); - if (p == NULL) { - // TODO handle error - } - - pCol->nullbitmap = p; memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows)); pStart += BitmapLen(pBlock->info.rows); } @@ -538,13 +529,26 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { int32_t colLength = *(int32_t*) pStart; pStart += sizeof(int32_t); - if (pCol->pData == NULL) { - pCol->pData = malloc(pCol->info.bytes * 4096); // TODO refactor the memory mgmt + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + if (pCol->varmeta.allocLen < colLength) { + char* tmp = realloc(pCol->pData, colLength); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pCol->pData = tmp; + pCol->varmeta.allocLen = colLength; + } + + pCol->varmeta.length = colLength; + ASSERT(pCol->varmeta.length <= pCol->varmeta.allocLen); } memcpy(pCol->pData, pStart, colLength); pStart += colLength; } + + return TSDB_CODE_SUCCESS; } size_t blockDataGetRowSize(const SSDataBlock* pBlock) { @@ -759,3 +763,51 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3); destroyTupleIndex(index); } + +void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) { + pDataBlock->info.rows = 0; + + if (hasVarCol) { + for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(p->info.type)) { + p->varmeta.length = 0; + } + } + } +} + +int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { + for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(p->info.type)) { + char* tmp = realloc(p->varmeta.offset, sizeof(int32_t) * numOfRows); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->varmeta.offset = (int32_t*)tmp; + memset(p->varmeta.offset, 0, sizeof(int32_t) * numOfRows); + + p->varmeta.length = 0; + p->varmeta.allocLen = 0; + tfree(p->pData); + } else { + char* tmp = realloc(p->nullbitmap, BitmapLen(numOfRows)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->nullbitmap = tmp; + memset(p->nullbitmap, 0, BitmapLen(numOfRows)); + + tmp = realloc(p->pData, numOfRows * p->info.bytes); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->pData = tmp; + } + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0dcc7db33d..16d8bf74d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2171,8 +2171,8 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 sup.numOfTables = numOfQualTables; - SLoserTreeInfo* pTree = NULL; - uint8_t ret = tLoserTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); + SMultiwayMergeTreeInfo* pTree = NULL; + uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, dataBlockOrderCompar); if (ret != TSDB_CODE_SUCCESS) { cleanBlockOrderSupporter(&sup, numOfTables); return TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -2181,7 +2181,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu int32_t numOfTotal = 0; while (numOfTotal < cnt) { - int32_t pos = pTree->pNode[0].index; + int32_t pos = tMergeTreeGetChosenIndex(pTree); int32_t index = sup.blockIndexArray[pos]++; STableBlockInfo* pBlocksInfo = sup.pDataBlockInfo[pos]; @@ -2192,7 +2192,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu sup.blockIndexArray[pos] = sup.numOfBlocksPerTable[pos] + 1; } - tLoserTreeAdjust(pTree, pos + sup.numOfTables); + tMergeTreeAdjust(pTree, tMergeTreeAdjustIndex(pTree)); } /* diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b9c8012cff..25877468b4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -558,17 +558,18 @@ typedef struct SMultiwayMergeInfo { } SMultiwayMergeInfo; typedef struct SOrderOperatorInfo { - int32_t sourceId; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* orderInfo; // SArray - SSDataBlock* pDataBlock; - bool nullFirst; // null value is put in the front - bool hasVarCol; // has variable length column, such as binary/varchar/nchar - SDiskbasedBuf* pSortInternalBuf; - int32_t numOfSources; - int32_t numOfCompleted; - SLoserTreeInfo *pMergeTree; - SArray *pSources; // SArray + int32_t sourceId; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray *orderInfo; // SArray + SSDataBlock *pDataBlock; + bool nullFirst; // null value is put in the front + bool hasVarCol; // has variable length column, such as binary/varchar/nchar + int32_t numOfSources; + int32_t numOfCompleted; + SDiskbasedBuf *pSortInternalBuf; + SMultiwayMergeTreeInfo *pMergeTree; + SArray *pSources; // SArray + int32_t capacity; } SOrderOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a6abe0662f..1ef934002a 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -533,7 +533,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv int32_t code = TSDB_CODE_SUCCESS; int32_t *posList = NULL; - SLoserTreeInfo *pTree = NULL; + SMultiwayMergeTreeInfo *pTree = NULL; STableQueryInfo **pTableQueryInfoList = NULL; size_t size = taosArrayGetSize(pTableList); @@ -566,7 +566,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv SCompSupporter cs = {pTableQueryInfoList, posList, pRuntimeEnv->pQueryAttr->order.order}; - int32_t ret = tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); + int32_t ret = tMergeTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn); if (ret != TSDB_CODE_SUCCESS) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _end; @@ -576,7 +576,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv int64_t startt = taosGetTimestampMs(); while (1) { - int32_t tableIndex = pTree->pNode[0].index; + int32_t tableIndex = tMergeTreeGetChosenIndex(pTree); SResultRowInfo *pWindowResInfo = &pTableQueryInfoList[tableIndex]->resInfo; SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.rowIndex[tableIndex]); @@ -612,7 +612,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv } } - tLoserTreeAdjust(pTree, tableIndex + pTree->numOfEntries); + tMergeTreeAdjust(pTree, tMergeTreeAdjustIndex(pTree)); } int64_t endt = taosGetTimestampMs(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e59671b7cc..0e12963250 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4615,7 +4615,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; - int32_t code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, tsTempDir); + int32_t code = createDiskbasedBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, tsTempDir); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5543,18 +5543,18 @@ typedef struct SExternalMemSource { SSDataBlock *pBlock; } SExternalMemSource; -typedef struct SCompareParam { +typedef struct SMsortComparParam { SExternalMemSource **pSources; int32_t num; SArray *orderInfo; // SArray bool nullFirst; -} SCompareParam; +} SMsortComparParam; -int32_t doMergeSortCompar(const void *pLeft, const void *pRight, void *param) { +int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; int32_t pRightIdx = *(int32_t *)pRight; - SCompareParam *pParam = (SCompareParam *)param; + SMsortComparParam *pParam = (SMsortComparParam *)param; SExternalMemSource **pSources = pParam->pSources; SArray *pInfo = pParam->orderInfo; @@ -5613,51 +5613,46 @@ int32_t doMergeSortCompar(const void *pLeft, const void *pRight, void *param) { } } -int32_t loadNewDataBlock(SExternalMemSource *pSource, SOrderOperatorInfo* pInfo) { - pSource->rowIndex = 0; - pSource->pageIndex += 1; - - if (pSource->pageIndex < taosArrayGetSize(pSource->pageIdList)) { - struct SPageInfo* pPgInfo = *(struct SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); - - SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); - return blockDataFromBuf(pSource->pBlock, pPage->data); - } else { - pInfo->numOfCompleted += 1; - pSource->rowIndex = -1; - pSource->pageIndex = -1; - - return 0; - } -} - -void adjustLoserTreeFromNewData(SExternalMemSource *pSource, SLoserTreeInfo *pTree, SOrderOperatorInfo* pInfo) { +static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SOrderOperatorInfo* pInfo) { /* * load a new SDataBlock into memory of a given intermediate data-set source, * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree */ if (pSource->rowIndex >= pSource->pBlock->info.rows) { - // TODO check if has remain pages. - loadNewDataBlock(pSource, pInfo); + pSource->rowIndex = 0; + pSource->pageIndex += 1; + + if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { + pInfo->numOfCompleted += 1; + pSource->rowIndex = -1; + pSource->pageIndex = -1; + } else { + SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); + + SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + int32_t code = blockDataFromBuf(pSource->pBlock, pPage->data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } } /* * Adjust loser tree otherwise, according to new candidate data * if the loser tree is rebuild completed, we do not need to adjust */ - int32_t leafNodeIdx = pTree->pNode[0].index + pInfo->numOfSources; + int32_t leafNodeIndex = tMergeTreeAdjustIndex(pTree); #ifdef _DEBUG_VIEW - printf("before adjust:\t"); - tLoserTreeDisplay(pTree); + printf("before adjust:\t"); + tMergeTreePrint(pTree); #endif - tLoserTreeAdjust(pTree, leafNodeIdx); + tMergeTreeAdjust(pTree, leafNodeIndex); #ifdef _DEBUG_VIEW - printf("\nafter adjust:\t"); - tLoserTreeDisplay(pTree); - printf("\n"); + printf("\nafter adjust:\t"); + tMergeTreePrint(pTree); #endif } @@ -5676,13 +5671,42 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou *rowIndex += 1; } -void addToDiskBasedBuf(SOrderOperatorInfo* pInfo) { +static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { + SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); + if (pSource == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId); + pSource->sourceId = pInfo->sourceId; + + pSource->pBlock = calloc(1, sizeof(SSDataBlock)); + pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pSource->pBlock->info.numOfCols = numOfCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); + colInfo.info = p->info; + taosArrayPush(pSource->pBlock->pDataBlock, &colInfo); + } + + taosArrayPush(pInfo->pSources, &pSource); + pInfo->sourceId += 1; + + return blockDataEnsureCapacity(pSource->pBlock, pInfo->capacity); +} + +void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { int32_t start = 0; while(start < pInfo->pDataBlock->info.rows) { int32_t stop = 0; blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf)); SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1); + if (p == NULL) { + longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } int32_t pageId = -1; SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); @@ -5692,36 +5716,30 @@ void addToDiskBasedBuf(SOrderOperatorInfo* pInfo) { } int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; + blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); - pInfo->pDataBlock->info.rows = 0; - if (pInfo->hasVarCol) { - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); - - if (IS_VAR_DATA_TYPE(p->info.type)) { - p->varmeta.length = 0; - } - } + int32_t code = doAddNewSource(pInfo, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + longjmp(env, code); } +} - pInfo->sourceId += 1; +static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorInfo* pInfo) { + cmpParam->nullFirst = pInfo->nullFirst; + cmpParam->orderInfo = pInfo->orderInfo; + cmpParam->num = pInfo->numOfSources; + cmpParam->pSources = pInfo->pSources->pData; - // TODO extract method - SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); - pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId - 1); - pSource->sourceId = pInfo->sourceId - 1; - pSource->pBlock = calloc(1, sizeof(SSDataBlock)); - pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - pSource->pBlock->info.numOfCols = numOfCols; + for(int32_t i = 0; i < pInfo->numOfSources; ++i) { + SExternalMemSource* pSource = cmpParam->pSources[i]; + SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData colInfo = {0}; - SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); - colInfo.info = p->info; - taosArrayPush(pSource->pBlock->pDataBlock, &colInfo); + SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - - taosArrayPush(pInfo->pSources, &pSource); } static SSDataBlock* doSort(void* param, bool* newgroup) { @@ -5730,6 +5748,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return NULL; } + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOrderOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = NULL; @@ -5745,7 +5764,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { int32_t code = blockDataMerge(pInfo->pDataBlock, pBlock); if (code != TSDB_CODE_SUCCESS) { - // todo handle error + longjmp(pOperator->pTaskInfo->env, code); } size_t size = blockDataGetSize(pInfo->pDataBlock); @@ -5755,8 +5774,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); printf("sort time:%ld\n", taosGetTimestampUs() - p); - // flush to disk - addToDiskBasedBuf(pInfo); + addToDiskbasedBuf(pInfo, pTaskInfo->env); } } @@ -5773,37 +5791,33 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock; } - // flush to disk - addToDiskBasedBuf(pInfo); + addToDiskbasedBuf(pInfo, pTaskInfo->env); } - SCompareParam cmpParam = {0}; - cmpParam.nullFirst = pInfo->nullFirst; - cmpParam.orderInfo = pInfo->orderInfo; - cmpParam.num = pInfo->numOfSources; - cmpParam.pSources = pInfo->pSources->pData; - - pInfo->numOfSources = taosArrayGetSize(pInfo->pSources); - for(int32_t i = 0; i < pInfo->numOfSources; ++i) { - SExternalMemSource* pSource = cmpParam.pSources[i]; - SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); - - SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); - int32_t code = blockDataFromBuf(cmpParam.pSources[i]->pBlock, pPage->data); + SMsortComparParam cmpParam = {0}; + int32_t code = sortComparInit(&cmpParam, pInfo); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); } - int32_t code = tLoserTreeCreate(&pInfo->pMergeTree, pInfo->numOfSources, &cmpParam, doMergeSortCompar); + code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->numOfSources, &cmpParam, msortComparFn); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } while(1) { if (pInfo->numOfSources == pInfo->numOfCompleted) { break; } - SExternalMemSource *pSource = cmpParam.pSources[pInfo->pMergeTree->pNode[0].index]; + SExternalMemSource *pSource = cmpParam.pSources[tMergeTreeGetChosenIndex(pInfo->pMergeTree)]; appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex); - adjustLoserTreeFromNewData(pSource, pInfo->pMergeTree, pInfo); + code = adjustMergeTreeForNextTuple(pSource, pInfo->pMergeTree, pInfo); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } - if (pInfo->pDataBlock->info.rows >= 4096) { + if (pInfo->pDataBlock->info.rows >= pInfo->capacity) { return pInfo->pDataBlock; } } @@ -5838,7 +5852,8 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); pInfo->sortBufSize = 1024 * 1024; // 1MB - pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, 4096); + pInfo->capacity = 4096; + pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity); pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); pInfo->pSources = taosArrayInit(4, POINTER_BYTES); @@ -5850,7 +5865,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI } } - int32_t code = createDiskbasedResultBuffer(&pInfo->pSortInternalBuf, 4096, 4096*1000, 1, "/tmp/"); + int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, 4096, 4096*1000, 1, "/tmp/"); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "Order"; @@ -7895,367 +7910,6 @@ _complete: return code; } -/** - * pQueryMsg->head has been converted before this function is called. - * - * @param pQueryMsg - * @param pTableIdList - * @param pExpr - * @return - */ -//int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param) { -// int32_t code = TSDB_CODE_SUCCESS; -// -//// if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) { -//// return TSDB_CODE_QRY_INVALID_MSG; -//// } -// -// pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); -// pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); -// pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); -// pQueryMsg->interval.interval = htobe64(pQueryMsg->interval.interval); -// pQueryMsg->interval.sliding = htobe64(pQueryMsg->interval.sliding); -// pQueryMsg->interval.offset = htobe64(pQueryMsg->interval.offset); -// pQueryMsg->limit = htobe64(pQueryMsg->limit); -// pQueryMsg->offset = htobe64(pQueryMsg->offset); -// pQueryMsg->vgroupLimit = htobe64(pQueryMsg->vgroupLimit); -// -// pQueryMsg->order = htons(pQueryMsg->order); -// pQueryMsg->orderColId = htons(pQueryMsg->orderColId); -// pQueryMsg->queryType = htonl(pQueryMsg->queryType); -//// pQueryMsg->tagNameRelType = htons(pQueryMsg->tagNameRelType); -// -// pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); -// pQueryMsg->numOfOutput = htons(pQueryMsg->numOfOutput); -// pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols); -// -// pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen); -// pQueryMsg->colCondLen = htons(pQueryMsg->colCondLen); -// -// pQueryMsg->tsBuf.tsOffset = htonl(pQueryMsg->tsBuf.tsOffset); -// pQueryMsg->tsBuf.tsLen = htonl(pQueryMsg->tsBuf.tsLen); -// pQueryMsg->tsBuf.tsNumOfBlocks = htonl(pQueryMsg->tsBuf.tsNumOfBlocks); -// pQueryMsg->tsBuf.tsOrder = htonl(pQueryMsg->tsBuf.tsOrder); -// -// pQueryMsg->numOfTags = htonl(pQueryMsg->numOfTags); -//// pQueryMsg->tbnameCondLen = htonl(pQueryMsg->tbnameCondLen); -// pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput); -// pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen); -// pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen); -//// pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap); -//// pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId); -// pQueryMsg->tableScanOperator = htonl(pQueryMsg->tableScanOperator); -// pQueryMsg->numOfOperator = htonl(pQueryMsg->numOfOperator); -// pQueryMsg->udfContentOffset = htonl(pQueryMsg->udfContentOffset); -// pQueryMsg->udfContentLen = htonl(pQueryMsg->udfContentLen); -// pQueryMsg->udfNum = htonl(pQueryMsg->udfNum); -// -// // query msg safety check -// if (!validateQueryMsg(pQueryMsg)) { -// code = TSDB_CODE_QRY_INVALID_MSG; -// goto _cleanup; -// } -// -// char *pMsg = (char *)(pQueryMsg->tableCols) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; -// for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) { -// SColumnInfo *pColInfo = &pQueryMsg->tableCols[col]; -// -// pColInfo->colId = htons(pColInfo->colId); -// pColInfo->type = htons(pColInfo->type); -// pColInfo->bytes = htons(pColInfo->bytes); -// pColInfo->flist.numOfFilters = 0; -// -// if (!isValidDataType(pColInfo->type)) { -// //qDebug("qmsg:%p, invalid data type in source column, index:%d, type:%d", pQueryMsg, col, pColInfo->type); -// code = TSDB_CODE_QRY_INVALID_MSG; -// goto _cleanup; -// } -// -///* -// int32_t numOfFilters = pColInfo->flist.numOfFilters; -// if (numOfFilters > 0) { -// pColInfo->flist.filterInfo = calloc(numOfFilters, sizeof(SColumnFilterInfo)); -// if (pColInfo->flist.filterInfo == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// } -// -// code = deserializeColFilterInfo(pColInfo->flist.filterInfo, numOfFilters, &pMsg); -// if (code != TSDB_CODE_SUCCESS) { -// goto _cleanup; -// } -//*/ -// } -// -// if (pQueryMsg->colCondLen > 0) { -// param->colCond = calloc(1, pQueryMsg->colCondLen); -// if (param->colCond == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// -// memcpy(param->colCond, pMsg, pQueryMsg->colCondLen); -// pMsg += pQueryMsg->colCondLen; -// } -// -// -// param->tableScanOperator = pQueryMsg->tableScanOperator; -// param->pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES); -// if (param->pExpr == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// -// SSqlExpr *pExprMsg = (SSqlExpr *)pMsg; -// -// for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { -// param->pExpr[i] = pExprMsg; -// -//// pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); -//// pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); -//// pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); -//// pExprMsg->colBytes = htons(pExprMsg->colBytes); -//// pExprMsg->colType = htons(pExprMsg->colType); -// -//// pExprMsg->resType = htons(pExprMsg->resType); -//// pExprMsg->resBytes = htons(pExprMsg->resBytes); -// pExprMsg->interBytes = htonl(pExprMsg->interBytes); -// -//// pExprMsg->functionId = htons(pExprMsg->functionId); -// pExprMsg->numOfParams = htons(pExprMsg->numOfParams); -//// pExprMsg->resColId = htons(pExprMsg->resColId); -//// pExprMsg->flist.numOfFilters = htons(pExprMsg->flist.numOfFilters); -// pMsg += sizeof(SSqlExpr); -// -// for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { -// pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType); -// pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen); -// -// if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { -// pExprMsg->param[j].pz = pMsg; -// pMsg += pExprMsg->param[j].nLen; // one more for the string terminated char. -// } else { -// pExprMsg->param[j].i = htobe64(pExprMsg->param[j].i); -// } -// } -// -//// int16_t functionId = pExprMsg->functionId; -//// if (functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ || functionId == FUNCTION_TAG_DUMMY) { -//// if (!TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { // ignore the column index check for arithmetic expression. -//// code = TSDB_CODE_QRY_INVALID_MSG; -//// goto _cleanup; -//// } -//// } -// -//// if (pExprMsg->flist.numOfFilters > 0) { -//// pExprMsg->flist.filterInfo = calloc(pExprMsg->flist.numOfFilters, sizeof(SColumnFilterInfo)); -//// } -//// -//// deserializeColFilterInfo(pExprMsg->flist.filterInfo, pExprMsg->flist.numOfFilters, &pMsg); -// pExprMsg = (SSqlExpr *)pMsg; -// } -// -// if (pQueryMsg->secondStageOutput) { -// pExprMsg = (SSqlExpr *)pMsg; -// param->pSecExpr = calloc(pQueryMsg->secondStageOutput, POINTER_BYTES); -// -// for (int32_t i = 0; i < pQueryMsg->secondStageOutput; ++i) { -// param->pSecExpr[i] = pExprMsg; -// -//// pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); -//// pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); -//// pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); -//// pExprMsg->resType = htons(pExprMsg->resType); -//// pExprMsg->resBytes = htons(pExprMsg->resBytes); -//// pExprMsg->colBytes = htons(pExprMsg->colBytes); -//// pExprMsg->colType = htons(pExprMsg->colType); -// -//// pExprMsg->functionId = htons(pExprMsg->functionId); -// pExprMsg->numOfParams = htons(pExprMsg->numOfParams); -// -// pMsg += sizeof(SSqlExpr); -// -// for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { -// pExprMsg->param[j].nType = htonl(pExprMsg->param[j].nType); -// pExprMsg->param[j].nLen = htonl(pExprMsg->param[j].nLen); -// -// if (pExprMsg->param[j].nType == TSDB_DATA_TYPE_BINARY) { -// pExprMsg->param[j].pz = pMsg; -// pMsg += pExprMsg->param[j].nLen; // one more for the string terminated char. -// } else { -// pExprMsg->param[j].i = htobe64(pExprMsg->param[j].i); -// } -// } -// -//// int16_t functionId = pExprMsg->functionId; -//// if (functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ || functionId == FUNCTION_TAG_DUMMY) { -//// if (!TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { // ignore the column index check for arithmetic expression. -//// code = TSDB_CODE_QRY_INVALID_MSG; -//// goto _cleanup; -//// } -//// } -// -// pExprMsg = (SSqlExpr *)pMsg; -// } -// } -// -// pMsg = createTableIdList(pQueryMsg, pMsg, &(param->pTableIdList)); -// -// if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns -// param->pGroupColIndex = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex)); -// if (param->pGroupColIndex == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// -// for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { -// param->pGroupColIndex[i].colId = htons(*(int16_t *)pMsg); -// pMsg += sizeof(param->pGroupColIndex[i].colId); -// -// param->pGroupColIndex[i].colIndex = htons(*(int16_t *)pMsg); -// pMsg += sizeof(param->pGroupColIndex[i].colIndex); -// -// param->pGroupColIndex[i].flag = htons(*(int16_t *)pMsg); -// pMsg += sizeof(param->pGroupColIndex[i].flag); -// -// memcpy(param->pGroupColIndex[i].name, pMsg, tListLen(param->pGroupColIndex[i].name)); -// pMsg += tListLen(param->pGroupColIndex[i].name); -// } -// -// pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); -// pQueryMsg->orderType = htons(pQueryMsg->orderType); -// } -// -// pQueryMsg->fillType = htons(pQueryMsg->fillType); -// if (pQueryMsg->fillType != TSDB_FILL_NONE) { -// pQueryMsg->fillVal = (uint64_t)(pMsg); -// -// int64_t *v = (int64_t *)pMsg; -// for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { -// v[i] = htobe64(v[i]); -// } -// -// pMsg += sizeof(int64_t) * pQueryMsg->numOfOutput; -// } -// -// if (pQueryMsg->numOfTags > 0) { -// param->pTagColumnInfo = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags); -// if (param->pTagColumnInfo == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// -// for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) { -// SColumnInfo* pTagCol = (SColumnInfo*) pMsg; -// -// pTagCol->colId = htons(pTagCol->colId); -// pTagCol->bytes = htons(pTagCol->bytes); -// pTagCol->type = htons(pTagCol->type); -//// pTagCol->flist.numOfFilters = 0; -// -// param->pTagColumnInfo[i] = *pTagCol; -// pMsg += sizeof(SColumnInfo); -// } -// } -// -// // the tag query condition expression string is located at the end of query msg -// if (pQueryMsg->tagCondLen > 0) { -// param->tagCond = calloc(1, pQueryMsg->tagCondLen); -// if (param->tagCond == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// -// memcpy(param->tagCond, pMsg, pQueryMsg->tagCondLen); -// pMsg += pQueryMsg->tagCondLen; -// } -// -// if (pQueryMsg->prevResultLen > 0) { -// param->prevResult = calloc(1, pQueryMsg->prevResultLen); -// if (param->prevResult == NULL) { -// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -// goto _cleanup; -// } -// -// memcpy(param->prevResult, pMsg, pQueryMsg->prevResultLen); -// pMsg += pQueryMsg->prevResultLen; -// } -// -//// if (pQueryMsg->tbnameCondLen > 0) { -//// param->tbnameCond = calloc(1, pQueryMsg->tbnameCondLen + 1); -//// if (param->tbnameCond == NULL) { -//// code = TSDB_CODE_QRY_OUT_OF_MEMORY; -//// goto _cleanup; -//// } -//// -//// strncpy(param->tbnameCond, pMsg, pQueryMsg->tbnameCondLen); -//// pMsg += pQueryMsg->tbnameCondLen; -//// } -// -// //skip ts buf -// if ((pQueryMsg->tsBuf.tsOffset + pQueryMsg->tsBuf.tsLen) > 0) { -// pMsg = (char *)pQueryMsg + pQueryMsg->tsBuf.tsOffset + pQueryMsg->tsBuf.tsLen; -// } -// -// param->pOperator = taosArrayInit(pQueryMsg->numOfOperator, sizeof(int32_t)); -// for(int32_t i = 0; i < pQueryMsg->numOfOperator; ++i) { -// int32_t op = htonl(*(int32_t*)pMsg); -// taosArrayPush(param->pOperator, &op); -// -// pMsg += sizeof(int32_t); -// } -// -// if (pQueryMsg->udfContentLen > 0) { -// // todo extract udf function in tudf.c -//// param->pUdfInfo = calloc(1, sizeof(SUdfInfo)); -//// param->pUdfInfo->contLen = pQueryMsg->udfContentLen; -//// -//// pMsg = (char*)pQueryMsg + pQueryMsg->udfContentOffset; -//// param->pUdfInfo->resType = *(int8_t*) pMsg; -//// pMsg += sizeof(int8_t); -//// -//// param->pUdfInfo->resBytes = htons(*(int16_t*)pMsg); -//// pMsg += sizeof(int16_t); -//// -//// tstr* name = (tstr*)(pMsg); -//// param->pUdfInfo->name = strndup(name->data, name->len); -//// -//// pMsg += varDataTLen(name); -//// param->pUdfInfo->funcType = htonl(*(int32_t*)pMsg); -//// pMsg += sizeof(int32_t); -//// -//// param->pUdfInfo->bufSize = htonl(*(int32_t*)pMsg); -//// pMsg += sizeof(int32_t); -//// -//// param->pUdfInfo->content = malloc(pQueryMsg->udfContentLen); -//// memcpy(param->pUdfInfo->content, pMsg, pQueryMsg->udfContentLen); -// -// pMsg += pQueryMsg->udfContentLen; -// } -// -// param->sql = strndup(pMsg, pQueryMsg->sqlstrLen); -// -// SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols}; -// if (!validateQueryTableCols(&info, param->pExpr, pQueryMsg->numOfOutput, param->pTagColumnInfo, pQueryMsg)) { -// code = TSDB_CODE_QRY_INVALID_MSG; -// goto _cleanup; -// } -// -// //qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " -//// "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, -//// pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, -//// pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->interval.interval, -//// pQueryMsg->fillType, pQueryMsg->tsBuf.tsLen, pQueryMsg->tsBuf.tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); -// -// //qDebug("qmsg:%p, sql:%s", pQueryMsg, param->sql); -// return TSDB_CODE_SUCCESS; -// -//_cleanup: -// freeParam(param); -// return code; -//} - int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) { if (filterNum <= 0) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/function/inc/thistogram.h b/source/libs/function/inc/thistogram.h index 3b5c2b4cfb..cb6560325b 100644 --- a/source/libs/function/inc/thistogram.h +++ b/source/libs/function/inc/thistogram.h @@ -49,7 +49,7 @@ typedef struct SHistogramInfo { SHistBin* elems; #else tSkipList* pList; - SLoserTreeInfo* pLoserTree; + SMultiwayMergeTreeInfo* pLoserTree; int32_t maxIndex; bool ordered; #endif diff --git a/source/libs/function/src/thistogram.c b/source/libs/function/src/thistogram.c index 2229ac8561..49799aef7a 100644 --- a/source/libs/function/src/thistogram.c +++ b/source/libs/function/src/thistogram.c @@ -117,14 +117,14 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { if ((*pHisto)->ordered) { int32_t lastIndex = (*pHisto)->maxIndex; - SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; + SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree; (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].pData = pResNode; pEntry1->index = (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].index; // update the loser tree if ((*pHisto)->ordered) { - tLoserTreeAdjust(pTree, pEntry1->index + pTree->numOfEntries); + tMergeTreeAdjust(pTree, pEntry1->index + pTree->numOfEntries); } tSkipListKey kx = @@ -142,10 +142,10 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { SHistBin* pPrevEntry = (SHistBin*)pResNode->pBackward[0]->pData; pPrevEntry->delta = val - pPrevEntry->val; - SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; + SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree; if ((*pHisto)->ordered) { - tLoserTreeAdjust(pTree, pPrevEntry->index + pTree->numOfEntries); - tLoserTreeDisplay(pTree); + tMergeTreeAdjust(pTree, pPrevEntry->index + pTree->numOfEntries); + tMergeTreePrint(pTree); } } @@ -155,7 +155,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { if (!(*pHisto)->ordered) { SSkipListPrint((*pHisto)->pList, 1); - SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; + SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree; tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; tSkipListNode* p1 = pHead; @@ -183,13 +183,13 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { pTree->pNode[i].index = -1; } - tLoserTreeDisplay(pTree); + tMergeTreePrint(pTree); for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) { - tLoserTreeAdjust(pTree, i); + tMergeTreeAdjust(pTree, i); } - tLoserTreeDisplay(pTree); + tMergeTreePrint(pTree); (*pHisto)->ordered = true; } @@ -219,7 +219,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { pPrevEntry->delta = pEntry->val - pPrevEntry->val; } - SLoserTreeInfo* pTree = (*pHisto)->pLoserTree; + SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree; if (pNextEntry->index != -1) { (*pHisto)->maxIndex = pNextEntry->index; @@ -230,12 +230,12 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { printf("disappear index is:%d\n", f); } - tLoserTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); + tMergeTreeAdjust(pTree, pEntry->index + pTree->numOfEntries); // remove the next node in skiplist tSkipListRemoveNode((*pHisto)->pList, pNext); SSkipListPrint((*pHisto)->pList, 1); - tLoserTreeDisplay((*pHisto)->pLoserTree); + tMergeTreePrint((*pHisto)->pLoserTree); } else { // add to heap if (pResNode->pForward[0] != NULL) { pEntry1->delta = ((SHistBin*)pResNode->pForward[0]->pData)->val - val; diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index c90d8e209d..ab711d1f98 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -254,7 +254,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, resetSlotInfo(pBucket); - int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); + int32_t ret = createDiskbasedBuffer(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index 5e1cf8bb1f..7ccd49a64e 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -17,79 +17,76 @@ #include "tlosertree.h" #include "ulog.h" -// set initial value for loser tree -void tLoserTreeInit(SLoserTreeInfo* pTree) { - assert((pTree->totalEntries & 0x01) == 0 && (pTree->numOfEntries << 1 == pTree->totalEntries)); +typedef struct STreeNode { + int32_t index; + void *pData; // TODO remove it? +} STreeNode; - for (int32_t i = 0; i < pTree->totalEntries; ++i) { - if (i < pTree->numOfEntries) { +// Set the initial value of the multiway merge tree. +static void tMergeTreeInit(SMultiwayMergeTreeInfo* pTree) { + assert((pTree->totalSources & 0x01) == 0 && (pTree->numOfSources << 1 == pTree->totalSources)); + + for (int32_t i = 0; i < pTree->totalSources; ++i) { + if (i < pTree->numOfSources) { pTree->pNode[i].index = -1; } else { - pTree->pNode[i].index = i - pTree->numOfEntries; + pTree->pNode[i].index = i - pTree->numOfSources; } } } -/* - * display whole loser tree on screen for debug purpose only. - */ -void tLoserTreeDisplay(SLoserTreeInfo* pTree) { - printf("the value of loser tree:\t"); - for (int32_t i = 0; i < pTree->totalEntries; ++i) printf("%d\t", pTree->pNode[i].index); - printf("\n"); -} - -int32_t tLoserTreeCreate(SLoserTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) { +int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) { int32_t totalEntries = numOfSources << 1u; - *pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); + SMultiwayMergeTreeInfo* pTreeInfo = (SMultiwayMergeTreeInfo*)calloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries); if ((*pTree) == NULL) { uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); return -1; } - (*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo)); + pTreeInfo->pNode = (STreeNode*)(((char*)(*pTree)) + sizeof(SMultiwayMergeTreeInfo)); - (*pTree)->numOfEntries = numOfSources; - (*pTree)->totalEntries = totalEntries; - (*pTree)->param = param; - (*pTree)->comparFn = compareFn; + pTreeInfo->numOfSources = numOfSources; + pTreeInfo->totalSources = totalEntries; + pTreeInfo->param = param; + pTreeInfo->comparFn = compareFn; // set initial value for loser tree - tLoserTreeInit(*pTree); + tMergeTreeInit(pTreeInfo); #ifdef _DEBUG_VIEW printf("the initial value of loser tree:\n"); - tLoserTreeDisplay(*pTree); + tLoserTreeDisplaypTreeInfo; #endif for (int32_t i = totalEntries - 1; i >= numOfSources; i--) { - tLoserTreeAdjust(*pTree, i); + tMergeTreeAdjust(pTreeInfo, i); } #if defined(_DEBUG_VIEW) printf("after adjust:\n"); - tLoserTreeDisplay(*pTree); + tLoserTreeDisplaypTreeInfo; printf("initialize local reducer completed!\n"); #endif + *pTree = pTreeInfo; return 0; } -void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { - assert(idx <= pTree->totalEntries - 1 && idx >= pTree->numOfEntries && pTree->totalEntries >= 2); +void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) { + assert(idx <= pTree->totalSources - 1 && idx >= pTree->numOfSources && pTree->totalSources >= 2); - if (pTree->totalEntries == 2) { + if (pTree->totalSources == 2) { pTree->pNode[0].index = 0; pTree->pNode[1].index = 0; return; } int32_t parentId = idx >> 1; - SLoserTreeNode kLeaf = pTree->pNode[idx]; + STreeNode kLeaf = pTree->pNode[idx]; while (parentId > 0) { - SLoserTreeNode* pCur = &pTree->pNode[parentId]; + STreeNode* pCur = &pTree->pNode[parentId]; if (pCur->index == -1) { pTree->pNode[parentId] = kLeaf; return; @@ -97,7 +94,7 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { int32_t ret = pTree->comparFn(pCur, &kLeaf, pTree->param); if (ret < 0) { - SLoserTreeNode t = pTree->pNode[parentId]; + STreeNode t = pTree->pNode[parentId]; pTree->pNode[parentId] = kLeaf; kLeaf = t; } @@ -111,11 +108,31 @@ void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { } } -void tLoserTreeRebuild(SLoserTreeInfo* pTree) { - assert((pTree->totalEntries & 0x1) == 0); +void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) { + assert((pTree->totalSources & 0x1) == 0); - tLoserTreeInit(pTree); - for (int32_t i = pTree->totalEntries - 1; i >= pTree->numOfEntries; i--) { - tLoserTreeAdjust(pTree, i); + tMergeTreeInit(pTree); + for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) { + tMergeTreeAdjust(pTree, i); } } + +/* + * display whole loser tree on screen for debug purpose only. + */ +void tMergeTreePrint(const SMultiwayMergeTreeInfo* pTree) { + printf("the value of loser tree:\t"); + for (int32_t i = 0; i < pTree->totalSources; ++i) { + printf("%d\t", pTree->pNode[i].index); + } + + printf("\n"); +} + +int32_t tMergeTreeGetChosenIndex(const SMultiwayMergeTreeInfo* pTree) { + return pTree->pNode[0].index; +} + +int32_t tMergeTreeAdjustIndex(const SMultiwayMergeTreeInfo* pTree) { + return tMergeTreeGetChosenIndex(pTree) + pTree->numOfSources; +} diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index a8cb8563a4..3e9b99cb21 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -55,7 +55,7 @@ typedef struct SDiskbasedBuf { SDiskbasedBufStatis statis; } SDiskbasedBuf; -int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { +int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { *pResultBuf = calloc(1, sizeof(SDiskbasedBuf)); SDiskbasedBuf* pResBuf = *pResultBuf; @@ -492,13 +492,13 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf) { tfree(pResultBuf); } -struct SPageInfo* getLastPageInfo(SIDList pList) { +SPageInfo* getLastPageInfo(SIDList pList) { size_t size = taosArrayGetSize(pList); SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1); return pPgInfo; } -int32_t getPageId(const struct SPageInfo* pPgInfo) { +int32_t getPageId(const SPageInfo* pPgInfo) { ASSERT(pPgInfo != NULL); return pPgInfo->pageId; } -- GitLab