From 2ced335b04c2a75da8ddfb539e90e3e6c21798bc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 26 May 2022 17:48:07 +0800 Subject: [PATCH] fix(query): set the proper buffer page size. --- source/common/src/tdatablock.c | 2 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 30 +++++++++++++++--------- source/libs/executor/src/groupoperator.c | 17 +++++++------- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5216922643..2d77905d86 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1246,7 +1246,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { } int32_t newRows = (payloadSize - additional) / rowSize; - ASSERT(newRows <= nRows && newRows > 1); + ASSERT(newRows <= nRows && newRows >= 1); return newRows; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1b26d666df..1c9a49b679 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -672,6 +672,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI SArray* pColList); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 593b79ecc8..07e774c7c4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3938,6 +3938,21 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) { + *defaultPgsz = 4096; + while (*defaultPgsz < rowSize * 4) { + *defaultPgsz <<= 1u; + } + + // at least four pages need to be in buffer + *defaultBufsz = 4096 * 256; + if ((*defaultBufsz) <= (*defaultPgsz)) { + (*defaultBufsz) = (*defaultPgsz) * 4; + } + + return 0; +} + int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3950,18 +3965,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n return TSDB_CODE_OUT_OF_MEMORY; } - uint32_t defaultPgsz = 4096; - while (defaultPgsz < pAggSup->resultRowSize * 4) { - defaultPgsz <<= 1u; - } - - // at least four pages need to be in buffer - int32_t defaultBufsz = 4096 * 256; - if (defaultBufsz <= defaultPgsz) { - defaultBufsz = defaultPgsz * 4; - } + uint32_t defaultPgsz = 0; + uint32_t defaultBufsz = 0; + getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); - int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); + int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2600c17060..3ab296f62e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -439,7 +439,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { memcpy(data + (*columnLen), src, varDataTLen(src)); int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); ASSERT(v > 0); - printf("len:%d\n", v); contentLen = varDataTLen(src); } @@ -490,16 +489,13 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf int32_t *rows = (int32_t*) pPage; if (*rows >= pInfo->rowCapacity) { + // release buffer + releaseBufPage(pInfo->pBuf, pPage); + // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); taosArrayPush(p->pPageList, &pageId); - -// // number of rows -// *(int32_t*) pPage = 0; -// -// uint64_t* groupId = (pPage + sizeof(int32_t)); -// *groupId = 0; memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } } @@ -566,6 +562,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); pInfo->pageIndex += 1; + releaseBufPage(pInfo->pBuf, page); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; @@ -631,7 +628,11 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, TD_TMP_DIR_PATH); + uint32_t defaultPgsz = 0; + uint32_t defaultBufsz = 0; + getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz); + + int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH); if (code != TSDB_CODE_SUCCESS) { goto _error; } -- GitLab