diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 5d936fe7067a9ce13a590537c2ba6162cf2a6c83..68e3bf4b8a20106d37c0dcd9c0a5e449c634ed58 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group; setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv); { // reset output buffer for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { @@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ba277b23018a58e3ed29122761aa65506c94078a..68ca7a1e3913ac3de6be448fee3f6229953dc15b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -659,7 +659,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6346e743081a6594fcc9e8d8001ae18e3f90ac92..941a0e5710c263da31cbeabe096b62ad6a1877ea 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1432,7 +1432,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); if (pBlock->pDataBlock == NULL){ - tscError("pBlock->pDataBlock == NULL"); + qError("pBlock->pDataBlock == NULL"); return; } SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -3586,7 +3586,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) { +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv) { SSDataBlock* pDataBlock = pBInfo->pRes; int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer @@ -3602,7 +3602,9 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf pBInfo->pCtx[i].pOutput = pColInfo->pData; (*bufCapacity) = newSize; } else { - // longjmp + size_t allocateSize = newSize * pColInfo->info.bytes; + qError("can not allocate %zu bytes for output", allocateSize); + longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } } @@ -5752,7 +5754,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pTableQueryInfo != NULL) { @@ -5818,7 +5820,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pTableQueryInfo != NULL) { @@ -6315,7 +6317,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa break; } - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); } } } @@ -6335,7 +6337,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { pRes->info.rows = 0; if (!pEveryInfo->groupDone) { - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); @@ -6371,7 +6373,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup); if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) { @@ -6397,7 +6399,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { if (!pEveryInfo->groupDone) { pEveryInfo->allDone = true; - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; @@ -6418,7 +6420,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // Return result of the previous group in the firstly. if (*newgroup) { if (!pEveryInfo->groupDone) { - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { pEveryInfo->existDataBlock = pBlock; @@ -6454,7 +6456,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); pEveryInfo->groupDone = false;