未验证 提交 5e4863bb 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #9428 from taosdata/hotfix/TD-12275

TD-12275: fix core dump caused by num of rows multiple by column bytes exceeds INT32_MAX
......@@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv);
{ // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
......@@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv);
doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
......
......@@ -1866,6 +1866,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
bool convertJson = true;
if (pQueryInfo->isStddev == true) convertJson = false;
convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson);
pRes->code = pQueryInfo->pQInfo->code;
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......
......@@ -659,7 +659,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
......
......@@ -1432,7 +1432,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
if (pBlock->pDataBlock == NULL){
tscError("pBlock->pDataBlock == NULL");
qError("window border interpolation: pBlock->pDataBlock == NULL");
return;
}
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
......@@ -3586,7 +3586,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
}
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) {
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv) {
SSDataBlock* pDataBlock = pBInfo->pRes;
int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer
......@@ -3594,7 +3594,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes);
char* p = realloc(pColInfo->pData, ((size_t)newSize) * pColInfo->info.bytes);
if (p != NULL) {
pColInfo->pData = p;
......@@ -3602,7 +3602,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
pBInfo->pCtx[i].pOutput = pColInfo->pData;
(*bufCapacity) = newSize;
} else {
// longjmp
size_t allocateSize = ((size_t)(newSize)) * pColInfo->info.bytes;
qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d",
allocateSize, newSize, pColInfo->info.bytes);
longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
......@@ -3610,7 +3613,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
pBInfo->pCtx[i].pOutput = pColInfo->pData + (size_t)pColInfo->info.bytes * pDataBlock->info.rows;
// set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId;
......@@ -5752,7 +5755,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
......@@ -5818,7 +5821,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
......@@ -6315,7 +6318,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
break;
}
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
}
}
}
......@@ -6335,7 +6338,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
pRes->info.rows = 0;
if (!pEveryInfo->groupDone) {
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
......@@ -6371,7 +6374,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup);
if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) {
......@@ -6397,7 +6400,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
if (!pEveryInfo->groupDone) {
pEveryInfo->allDone = true;
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break;
......@@ -6418,7 +6421,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// Return result of the previous group in the firstly.
if (*newgroup) {
if (!pEveryInfo->groupDone) {
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
pEveryInfo->existDataBlock = pBlock;
......@@ -6454,7 +6457,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
pEveryInfo->groupDone = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册