From 63f3da038309c9aec65e33acc78c04cb6362e94e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 15 Feb 2022 14:40:29 +0800 Subject: [PATCH] [td-11818] refactor and fix bug. --- include/common/tep.h | 4 + include/util/tpagedbuf.h | 14 ++ source/common/src/tep.c | 29 +++ source/libs/executor/inc/executorimpl.h | 8 +- source/libs/executor/src/executil.c | 1 - source/libs/executor/src/executorimpl.c | 204 ++++++++++++++++---- source/libs/executor/test/executorTests.cpp | 13 +- source/util/src/tpagedbuf.c | 32 +-- 8 files changed, 239 insertions(+), 66 deletions(-) diff --git a/include/common/tep.h b/include/common/tep.h index d5ad2bca98..584b8a5a71 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -90,6 +90,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock); +double blockDataGetSerialRowSize(const SSDataBlock* pBlock); +size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); + +size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index 461547ad79..e989c31cd6 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -37,6 +37,15 @@ typedef struct SFilePage { char data[]; } SFilePage; +typedef struct SDiskbasedBufStatis { + int64_t flushBytes; + int64_t loadBytes; + int32_t loadPages; + int32_t getPages; + int32_t releasePages; + int32_t flushPages; +} SDiskbasedBufStatis; + /** * create disk-based result buffer * @param pBuf @@ -150,6 +159,11 @@ void setBufPageDirty(SFilePage* pPageInfo, bool dirty); */ void printStatisBeforeClose(SDiskbasedBuf* pBuf); +/** + * return buf statistics. + */ +SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tep.c b/source/common/src/tep.c index c015f0ac21..970b6d954f 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -512,6 +512,35 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) { return rowSize; } +/** + * @refitem blockDataToBuf for the meta size + * + * @param pBlock + * @return + */ +size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { + return sizeof(int32_t) + pBlock->info.numOfCols * sizeof(int32_t); +} + +double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { + ASSERT(pBlock != NULL); + double rowSize = 0; + + size_t numOfCols = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + rowSize += pColInfo->info.bytes; + + if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + rowSize += sizeof(int32_t); + } else { + rowSize += 1/8.0; + } + } + + return rowSize; +} + typedef struct SSDataBlockSortHelper { SArray *orderInfo; // SArray SSDataBlock *pDataBlock; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2baa5055a6..232b54554f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -389,7 +389,7 @@ typedef struct SExchangeInfo { tsem_t ready; void* pTransporter; SSDataBlock* pResult; - bool seqLoadData; + bool seqLoadData; // sequential load data or not, false by default int32_t current; uint64_t totalSize; // total load bytes from remote uint64_t totalRows; // total number of rows @@ -591,6 +591,12 @@ typedef struct SOrderOperatorInfo { int32_t numOfRowsInRes; SMsortComparParam cmpParam; + + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SOrderOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7ef2c03fda..52ab8493f1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -358,7 +358,6 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES); pGroupResInfo->index = 0; - assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 07832c76ca..f5dc7a82b1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5604,6 +5604,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevRow); tfree(pInfo->currentGroupColData); } + static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); @@ -5854,16 +5855,20 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); - char* pData = colDataGet(pSrcColInfo, *rowIndex); - colDataAppend(pColInfo, pBlock->info.rows, pData, isNull); + if (isNull) { + colDataAppend(pColInfo, pBlock->info.rows, NULL, true); + } else { + char* pData = colDataGet(pSrcColInfo, *rowIndex); + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } } pBlock->info.rows += 1; *rowIndex += 1; } -static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { +static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, int32_t numOfCols) { SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); if (pSource == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -5883,24 +5888,17 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { taosArrayPush(pSource->pBlock->pDataBlock, &colInfo); } - taosArrayPush(pInfo->pSources, &pSource); + taosArrayPush(pAllSources, &pSource); pInfo->sourceId += 1; - pInfo->cmpParam.numOfSources += 1; - - if (pInfo->cmpParam.numOfSources > getNumOfInMemBufPages(pInfo->pSortInternalBuf)) { - // TODO sort memory not enough, return with error code. - } - ASSERT(pInfo->cmpParam.numOfSources == taosArrayGetSize(pInfo->pSources)); - - int32_t rowSize = blockDataGetRowSize(pSource->pBlock); - int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize; + int32_t rowSize = blockDataGetSerialRowSize(pSource->pBlock); + int32_t numOfRows = (getBufPageSize(pInfo->pSortInternalBuf) - blockDataGetSerialMetaSize(pInfo->pDataBlock))/rowSize; return blockDataEnsureCapacity(pSource->pBlock, numOfRows); } -void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { +void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, SArray* pSources, jmp_buf env) { int32_t start = 0; while(start < pInfo->pDataBlock->info.rows) { @@ -5933,36 +5931,47 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); - int32_t code = doAddNewSource(pInfo, numOfCols); + int32_t code = doAddNewSource(pInfo, pSources, numOfCols); if (code != TSDB_CODE_SUCCESS) { longjmp(env, code); } } -static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorInfo* pInfo) { - cmpParam->pSources = pInfo->pSources->pData; +static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SDiskbasedBuf* pBuf) { + cmpParam->pSources = taosArrayGet(pSources, startIndex); + cmpParam->numOfSources = (endIndex - startIndex + 1); - for(int32_t i = 0; i < pInfo->cmpParam.numOfSources; ++i) { + for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { SExternalMemSource* pSource = cmpParam->pSources[i]; - SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); + SPageInfo* pPgInfo = *(SPageInfo**) taosArrayGet(pSource->pageIdList, pSource->pageIndex); - SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + SFilePage* pPage = getBufPage(pBuf, getPageId(pPgInfo)); int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data); if (code != TSDB_CODE_SUCCESS) { return code; } - releaseBufPage(pInfo->pSortInternalBuf, pPage); + releaseBufPage(pBuf, pPage); } return TSDB_CODE_SUCCESS; } -static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam) { +static int32_t sortComparClearup(SMsortComparParam* cmpParam) { + for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { + SExternalMemSource* pSource = cmpParam->pSources[i]; + blockDataDestroy(pSource->pBlock); + tfree(pSource); + } + + cmpParam->numOfSources = 0; +} + +static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo, SMsortComparParam* cmpParam, int32_t capacity) { blockDataClearup(pInfo->pDataBlock, pInfo->hasVarCol); while(1) { - if (pInfo->cmpParam.numOfSources == pInfo->numOfCompleted) { + if (cmpParam->numOfSources == pInfo->numOfCompleted) { break; } @@ -5976,7 +5985,7 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI longjmp(pTaskInfo->env, code); } - if (pInfo->pDataBlock->info.rows >= pInfo->numOfRowsInRes) { + if (pInfo->pDataBlock->info.rows >= capacity) { return pInfo->pDataBlock; } } @@ -5984,6 +5993,108 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } +static int32_t doInternalSort(SExecTaskInfo* pTaskInfo, SOrderOperatorInfo* pInfo) { + size_t numOfSources = taosArrayGetSize(pInfo->pSources); + + // Calculate the I/O counts to complete the data sort. + double sortCount = floorl(log2(numOfSources) / log2(getNumOfInMemBufPages(pInfo->pSortInternalBuf))); + + pInfo->totalElapsed = taosGetTimestampUs() - pInfo->startTs; + qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64, + GET_TASKID(pTaskInfo), (int32_t) (sortCount + 1), getTotalBufSize(pInfo->pSortInternalBuf), pInfo->sortElapsed, + pInfo->totalElapsed); + + size_t pgSize = getBufPageSize(pInfo->pSortInternalBuf); + int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pInfo->pDataBlock))/ blockDataGetSerialRowSize(pInfo->pDataBlock); + + blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows); + + size_t numOfSorted = taosArrayGetSize(pInfo->pSources); + for(int32_t t = 0; t < sortCount; ++t) { + int64_t st = taosGetTimestampUs(); + + SArray* pResList = taosArrayInit(4, POINTER_BYTES); + SMsortComparParam resultParam = {.orderInfo = pInfo->cmpParam.orderInfo}; + + int32_t numOfInputSources = getNumOfInMemBufPages(pInfo->pSortInternalBuf); + int32_t sortGroup = (numOfSorted + numOfInputSources - 1) / numOfInputSources; + + // Only *numOfInputSources* can be loaded into buffer to perform the external sort. + for(int32_t i = 0; i < sortGroup; ++i) { + pInfo->sourceId += 1; + + int32_t end = (i + 1) * numOfInputSources - 1; + if (end > numOfSorted - 1) { + end = numOfSorted - 1; + } + + pInfo->cmpParam.numOfSources = end - i * numOfInputSources + 1; + + int32_t code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, i * numOfInputSources, end, pInfo->pSortInternalBuf); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + code = tMergeTreeCreate(&pInfo->pMergeTree, pInfo->cmpParam.numOfSources, &pInfo->cmpParam, msortComparFn); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + while (1) { + SSDataBlock* pDataBlock = getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, numOfRows); + if (pDataBlock == NULL) { + break; + } + + int32_t pageId = -1; + SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); + if (pPage == NULL) { + assert(0); + longjmp(pTaskInfo->env, terrno); + } + + int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t); + assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); + + blockDataToBuf(pPage->data, pDataBlock); + + setBufPageDirty(pPage, true); + releaseBufPage(pInfo->pSortInternalBuf, pPage); + + blockDataClearup(pDataBlock, pInfo->hasVarCol); + } + + tMergeTreeDestroy(pInfo->pMergeTree); + pInfo->numOfCompleted = 0; + + code = doAddNewSource(pInfo, pResList, pInfo->pDataBlock->info.numOfCols); + if (code != 0) { + longjmp(pTaskInfo->env, code); + } + } + + sortComparClearup(&pInfo->cmpParam); + + taosArrayClear(pInfo->pSources); + taosArrayAddAll(pInfo->pSources, pResList); + taosArrayDestroy(pResList); + + pInfo->cmpParam = resultParam; + numOfSorted = taosArrayGetSize(pInfo->pSources); + + int64_t el = taosGetTimestampUs() - st; + pInfo->totalElapsed += el; + + SDiskbasedBufStatis statis = getDBufStatis(pInfo->pSortInternalBuf); + + qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", GET_TASKID(pTaskInfo), t + 1, el, statis.loadBytes/1024.0, + statis.flushBytes/1024.0); + } + + pInfo->cmpParam.numOfSources = taosArrayGetSize(pInfo->pSources); + return 0; +} + static SSDataBlock* doSort(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5995,9 +6106,11 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam); + return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes); } + int64_t st = taosGetTimestampUs(); + while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); @@ -6018,9 +6131,11 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { // Perform the in-memory sort and then flush data in the buffer into disk. int64_t p = taosGetTimestampUs(); blockDataSort(pInfo->pDataBlock, pInfo->cmpParam.orderInfo, pInfo->cmpParam.nullFirst); - printf("sort time:%ld\n", taosGetTimestampUs() - p); - addToDiskbasedBuf(pInfo, pTaskInfo->env); + int64_t el = taosGetTimestampUs() - p; + pInfo->sortElapsed += el; + + addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env); } } @@ -6035,14 +6150,19 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock; } - addToDiskbasedBuf(pInfo, pTaskInfo->env); + addToDiskbasedBuf(pInfo, pInfo->pSources, pTaskInfo->env); } - int32_t rowSize = blockDataGetRowSize(pInfo->pDataBlock); - int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize; - blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows); + doInternalSort(pTaskInfo, pInfo); + + int32_t code = blockDataEnsureCapacity(pInfo->pDataBlock, pInfo->numOfRowsInRes); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } - int32_t code = sortComparInit(&pInfo->cmpParam, pInfo); + int32_t numOfSources = taosArrayGetSize(pInfo->pSources); + ASSERT(numOfSources <= getNumOfInMemBufPages(pInfo->pSortInternalBuf)); + code = sortComparInit(&pInfo->cmpParam, pInfo->pSources, 0, numOfSources - 1, pInfo->pSortInternalBuf); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -6053,7 +6173,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam); + return getSortedBlockData(pTaskInfo, pInfo, &pInfo->cmpParam, pInfo->numOfRowsInRes); } static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { @@ -6089,9 +6209,10 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI return NULL; } - pInfo->sortBufSize = 1024 * 1024 * 50; // 1MB - pInfo->bufPageSize = 64 * 1024; - pInfo->numOfRowsInRes = 4096; + pInfo->sortBufSize = 1024 * 16; // 1MB + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); pInfo->pSources = taosArrayInit(4, POINTER_BYTES); pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal); @@ -6104,7 +6225,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI } } - int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->bufPageSize*1000, 1, "/tmp/"); + int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->sortBufSize, 1, "/tmp/"); if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) { tfree(pOperator); destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo)); @@ -6190,8 +6311,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { } // table scan order - int32_t order = TSDB_ORDER_ASC;//pQueryAttr->order.order; - + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { @@ -6229,10 +6349,10 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { closeAllResultRows(&pInfo->resultRowInfo); updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); -// initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); + initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); + toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); -// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); - if (pInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { + if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index c0e22ad555..ebea6755d7 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -86,7 +86,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { for(int32_t i = 0; i < numOfRows; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); - int32_t v = rand();//(++pInfo->startVal); + int32_t v = (--pInfo->startVal); colDataAppend(pColInfo, i, reinterpret_cast(&v), false); // sprintf(buf, "this is %d row", i); @@ -110,7 +110,7 @@ SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); pInfo->max = numOfBlocks; - pInfo->startVal = 5000000; + pInfo->startVal = 1500000; pOperator->info = pInfo; return pOperator; @@ -298,7 +298,7 @@ TEST(testCase, external_sort_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); // taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(50000), pExprInfo, pOrderVal); + SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(1500), pExprInfo, pOrderVal); bool newgroup = false; SSDataBlock* pRes = NULL; @@ -321,12 +321,13 @@ TEST(testCase, external_sort_Test) { break; } -// SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); + SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); // SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); -// for (int32_t i = 0; i < pRes->info.rows; ++i) { + for (int32_t i = 0; i < pRes->info.rows; ++i) { // char* p = colDataGet(pCol2, i); + printf("%d: %d\n", total++, ((int32_t*)pCol1->pData)[i]); // printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); -// } + } } printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 52a9fdd522..0e8d85492c 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -28,15 +28,6 @@ typedef struct SPageInfo { bool dirty:1; // set current buffer page is dirty or not } SPageInfo; -typedef struct SDiskbasedBufStatis { - int64_t flushBytes; - int64_t loadBytes; - int32_t loadPages; - int32_t getPages; - int32_t releasePages; - int32_t flushPages; -} SDiskbasedBufStatis; - typedef struct SDiskbasedBuf { int32_t numOfPages; int64_t totalBufSize; @@ -56,8 +47,8 @@ typedef struct SDiskbasedBuf { uint64_t nextPos; // next page flush position uint64_t qId; // for debug purpose - SDiskbasedBufStatis statis; bool printStatis; // Print statistics info when closing this buffer. + SDiskbasedBufStatis statis; } SDiskbasedBuf; static void printStatisData(const SDiskbasedBuf* pBuf); @@ -130,7 +121,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba return data; } - *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0); + *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize+sizeof(SFilePage), ONE_STAGE_COMP, NULL, 0); if (*dst > 0) { memcpy(data, pBuf->assistBuf, *dst); } @@ -164,7 +155,11 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { assert(!pg->used && pg->pData != NULL); int32_t size = -1; - char* t = doCompressData(GET_DATA_PAYLOAD(pg), pBuf->pageSize, &size, pBuf); + char* t = NULL; + if (pg->offset == -1 || pg->dirty) { + SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg); + t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf); + } // this page is flushed to disk for the first time if (pg->offset == -1) { @@ -225,7 +220,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } char* pDataBuf = pg->pData; - memset(pDataBuf, 0, pBuf->pageSize); + memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage)); pg->pData = NULL; // this means the data is not in buffer pg->length = size; @@ -256,7 +251,8 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return ret; } - ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->length, pBuf->file); + SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg); + ret = (int32_t)fread(pPage->data, 1, pg->length, pBuf->file); if (ret != pg->length) { ret = TAOS_SYSTEM_ERROR(errno); return ret; @@ -266,7 +262,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { pBuf->statis.loadPages += 1; int32_t fullSize = 0; - doDecompressData(GET_DATA_PAYLOAD(pg), pg->length, &fullSize, pBuf); + doDecompressData(pPage->data, pg->length, &fullSize, pBuf); return 0; } @@ -558,7 +554,7 @@ int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { } int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { - return pBuf->inMemPages; + return pBuf->inMemPages; } bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { @@ -577,6 +573,10 @@ void printStatisBeforeClose(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; } +SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { + return pBuf->statis; +} + void printStatisData(const SDiskbasedBuf* pBuf) { if (!pBuf->printStatis) { return; -- GitLab