diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 5d936fe7067a9ce13a590537c2ba6162cf2a6c83..68e3bf4b8a20106d37c0dcd9c0a5e449c634ed58 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group; setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv); { // reset output buffer for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { @@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { // not belongs to the same group, return the result of current group setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); - updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor); + updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 57362499a4fcaaa1500b199de8f63c07a03af898..7f8e9066af2ea05dd2cc50d8a9e156a5b44cb6cc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1866,6 +1866,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { bool convertJson = true; if (pQueryInfo->isStddev == true) convertJson = false; convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson); + pRes->code = pQueryInfo->pQInfo->code; code = pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ba277b23018a58e3ed29122761aa65506c94078a..68ca7a1e3913ac3de6be448fee3f6229953dc15b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -659,7 +659,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6346e743081a6594fcc9e8d8001ae18e3f90ac92..9ea0c07f68eb91fa3dfad7404ea9b2c13329c2d4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1432,7 +1432,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); if (pBlock->pDataBlock == NULL){ - tscError("pBlock->pDataBlock == NULL"); + qError("window border interpolation: pBlock->pDataBlock == NULL"); return; } SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); @@ -3586,7 +3586,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) { +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv) { SSDataBlock* pDataBlock = pBInfo->pRes; int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer @@ -3594,7 +3594,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes); + char* p = realloc(pColInfo->pData, ((size_t)newSize) * pColInfo->info.bytes); if (p != NULL) { pColInfo->pData = p; @@ -3602,7 +3602,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf pBInfo->pCtx[i].pOutput = pColInfo->pData; (*bufCapacity) = newSize; } else { - // longjmp + size_t allocateSize = ((size_t)(newSize)) * pColInfo->info.bytes; + qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d", + allocateSize, newSize, pColInfo->info.bytes); + longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } } @@ -3610,7 +3613,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; + pBInfo->pCtx[i].pOutput = pColInfo->pData + (size_t)pColInfo->info.bytes * pDataBlock->info.rows; // set the correct pointer after the memory buffer reallocated. int32_t functionId = pBInfo->pCtx[i].functionId; @@ -5752,7 +5755,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pTableQueryInfo != NULL) { @@ -5818,7 +5821,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); if (pTableQueryInfo != NULL) { @@ -6315,7 +6318,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa break; } - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); } } } @@ -6335,7 +6338,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { pRes->info.rows = 0; if (!pEveryInfo->groupDone) { - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); @@ -6371,7 +6374,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup); if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) { @@ -6397,7 +6400,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { if (!pEveryInfo->groupDone) { pEveryInfo->allDone = true; - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; @@ -6418,7 +6421,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // Return result of the previous group in the firstly. if (*newgroup) { if (!pEveryInfo->groupDone) { - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { pEveryInfo->existDataBlock = pBlock; @@ -6454,7 +6457,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows); + updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv); pEveryInfo->groupDone = false;