diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 68e3bf4b8a20106d37c0dcd9c0a5e449c634ed58..02c5604ab427efca2227c90947c9ae80a84892fe 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group; setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv, true); { // reset output buffer for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { @@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); @@ -985,6 +985,8 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { } } + // shrink output memory on end + shrinkOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity); return (pRes->info.rows != 0)? pRes:NULL; } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 138422c18ed3b7678abe0ce213f9836adb508196..fc222e8a81a088b0abead471376ae5f5810f9ab8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -664,7 +664,8 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv); +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv, bool extendLarge); +void shrinkOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 48e52e078aa0d7a14b5f38c11fbd76609b1f6cd4..096f5695019b9fae5d9845a02241759d83be1f7e 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -2170,6 +2170,10 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { // set the corresponding tag data for each record // todo check malloc failure + if (pCtx->tagInfo.numOfTagCols == 0) { + return ; + } + char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; @@ -4694,6 +4698,10 @@ static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) { pOutput += pCtx->outputBytes; pTimestamp++; } + + if (pCtx->tagInfo.numOfTagCols == 0) { + return ; + } char **tagOutputs = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a52d834be556ee7599efba7d12ccad49de526401..c4228c67ad9890f4902241fa04656744d8bca286 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3682,31 +3682,24 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv) { +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv, bool extendLarge) { SSDataBlock* pDataBlock = pBInfo->pRes; int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer if ((*bufCapacity) < newSize) { for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - - char* p = realloc(pColInfo->pData, ((size_t)newSize) * pColInfo->info.bytes); - if (p != NULL) { - pColInfo->pData = p; - - // it starts from the tail of the previously generated results. - pBInfo->pCtx[i].pOutput = pColInfo->pData; - (*bufCapacity) = newSize; - } else { + if (!extendColCapacity(pColInfo, newSize, &pBInfo->pCtx[i], bufCapacity, extendLarge)) { + // error throw except size_t allocateSize = ((size_t)(newSize)) * pColInfo->info.bytes; - qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d", + qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d", allocateSize, newSize, pColInfo->info.bytes); longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + return ; } } } - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); pBInfo->pCtx[i].pOutput = pColInfo->pData + (size_t)pColInfo->info.bytes * pDataBlock->info.rows; @@ -3726,6 +3719,26 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf } } +// shrink pBInfo->pRes memory +void shrinkOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity) { + SSDataBlock* pDataBlock = pBInfo->pRes; + int32_t rows = pDataBlock->info.rows + 5; // remain 5 buffer + + // shrink if only too large blank space + if (*bufCapacity - rows <= 200) { + return ; // no need shrink + } + + // bufCapcaity shrink to rows + for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); + void* pNew = realloc(pColInfo->pData, rows * pColInfo->info.bytes); + if (pNew) + pColInfo->pData = pNew; + } + *bufCapacity = rows; +} + void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) { bool interpQuery = false; int32_t tsNum = 0; @@ -6019,7 +6032,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); + updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pTableQueryInfo != NULL) { @@ -6088,7 +6101,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); + updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pTableQueryInfo != NULL) { @@ -6603,7 +6616,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa break; } - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false); } } } @@ -6623,7 +6636,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { pRes->info.rows = 0; if (!pEveryInfo->groupDone) { - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false); doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); @@ -6659,7 +6672,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false); doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup); if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) { @@ -6685,7 +6698,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { if (!pEveryInfo->groupDone) { pEveryInfo->allDone = true; - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; @@ -6706,7 +6719,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // Return result of the previous group in the firstly. if (*newgroup) { if (!pEveryInfo->groupDone) { - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv, false); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { pEveryInfo->existDataBlock = pBlock; @@ -6742,7 +6755,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv, false); pEveryInfo->groupDone = false;