未验证 提交 1535da13 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #9490 from taosdata/szhou/fix/2.4/td-12275

TD-12275: fix coredump due to colbytes * numOfRows exceeds INT32_MAX
...@@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -902,7 +902,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group; // not belongs to the same group, return the result of current group;
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv);
{ // reset output buffer { // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
...@@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -954,7 +954,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
// not belongs to the same group, return the result of current group // not belongs to the same group, return the result of current group
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv);
doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
......
...@@ -1866,6 +1866,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { ...@@ -1866,6 +1866,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
bool convertJson = true; bool convertJson = true;
if (pQueryInfo->isStddev == true) convertJson = false; if (pQueryInfo->isStddev == true) convertJson = false;
convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson); convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson);
pRes->code = pQueryInfo->pQInfo->code;
code = pRes->code; code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
......
...@@ -659,7 +659,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil ...@@ -659,7 +659,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput); int32_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv);
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -1432,7 +1432,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc ...@@ -1432,7 +1432,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
if (pBlock->pDataBlock == NULL){ if (pBlock->pDataBlock == NULL){
tscError("pBlock->pDataBlock == NULL"); qError("window border interpolation: pBlock->pDataBlock == NULL");
return; return;
} }
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
...@@ -3586,7 +3586,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3586,7 +3586,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols);
} }
void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) { void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows, SQueryRuntimeEnv* runtimeEnv) {
SSDataBlock* pDataBlock = pBInfo->pRes; SSDataBlock* pDataBlock = pBInfo->pRes;
int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer
...@@ -3594,7 +3594,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3594,7 +3594,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes); char* p = realloc(pColInfo->pData, ((size_t)newSize) * pColInfo->info.bytes);
if (p != NULL) { if (p != NULL) {
pColInfo->pData = p; pColInfo->pData = p;
...@@ -3602,7 +3602,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3602,7 +3602,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
pBInfo->pCtx[i].pOutput = pColInfo->pData; pBInfo->pCtx[i].pOutput = pColInfo->pData;
(*bufCapacity) = newSize; (*bufCapacity) = newSize;
} else { } else {
// longjmp size_t allocateSize = ((size_t)(newSize)) * pColInfo->info.bytes;
qError("can not allocate %zu bytes for output. Rows: %d, colBytes %d",
allocateSize, newSize, pColInfo->info.bytes);
longjmp(runtimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
} }
...@@ -3610,7 +3613,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3610,7 +3613,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; pBInfo->pCtx[i].pOutput = pColInfo->pData + (size_t)pColInfo->info.bytes * pDataBlock->info.rows;
// set the correct pointer after the memory buffer reallocated. // set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
...@@ -5752,7 +5755,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -5752,7 +5755,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) { if (pTableQueryInfo != NULL) {
...@@ -5818,7 +5821,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -5818,7 +5821,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) { if (pTableQueryInfo != NULL) {
...@@ -6315,7 +6318,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa ...@@ -6315,7 +6318,7 @@ static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDa
break; break;
} }
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
} }
} }
} }
...@@ -6335,7 +6338,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6335,7 +6338,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
pRes->info.rows = 0; pRes->info.rows = 0;
if (!pEveryInfo->groupDone) { if (!pEveryInfo->groupDone) {
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false); doTimeEveryImpl(pOperator, pInfo->pCtx, pEveryInfo->lastBlock, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
...@@ -6371,7 +6374,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6371,7 +6374,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup); doTimeEveryImpl(pOperator, pInfo->pCtx, pBlock, *newgroup);
if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) { if (pEveryInfo->groupDone && pOperator->upstream[0]->notify) {
...@@ -6397,7 +6400,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6397,7 +6400,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
if (!pEveryInfo->groupDone) { if (!pEveryInfo->groupDone) {
pEveryInfo->allDone = true; pEveryInfo->allDone = true;
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
break; break;
...@@ -6418,7 +6421,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6418,7 +6421,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// Return result of the previous group in the firstly. // Return result of the previous group in the firstly.
if (*newgroup) { if (*newgroup) {
if (!pEveryInfo->groupDone) { if (!pEveryInfo->groupDone) {
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, 0, pOperator->pRuntimeEnv);
doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false); doTimeEveryImpl(pOperator, pInfo->pCtx, NULL, false);
if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) {
pEveryInfo->existDataBlock = pBlock; pEveryInfo->existDataBlock = pBlock;
...@@ -6454,7 +6457,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) { ...@@ -6454,7 +6457,7 @@ static SSDataBlock* doTimeEvery(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pEveryInfo->binfo, &pEveryInfo->bufCapacity, pBlock->info.rows, pOperator->pRuntimeEnv);
pEveryInfo->groupDone = false; pEveryInfo->groupDone = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册