From 035d51576a992c49f9334e2a3166f3c51e4db773 Mon Sep 17 00:00:00 2001 From: xywang Date: Wed, 12 Jan 2022 18:10:11 +0800 Subject: [PATCH] [TS-1029](query): added some memory allocation checks --- src/client/src/tscServer.c | 7 + src/client/src/tscSubquery.c | 5 +- src/query/src/qExecutor.c | 604 ++++++++++++++++++++++++++++++++--- src/query/src/qFill.c | 8 + 4 files changed, 584 insertions(+), 40 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 78b038ab19..146e0a1c8c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1861,6 +1861,13 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); + if (pQueryInfo->pQInfo == NULL) { + taosHashCleanup(tableGroupInfo.map); + taosArrayDestroy(&group); + tscAsyncResultOnError(pSql); + pRes->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pRes->code; + } } uint64_t localQueryId = pSql->self; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b64184ea0b..3732e05df6 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3902,8 +3902,11 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr STsBufInfo bufInfo = {0}; SQueryParam param = {.pOperator = pa}; - /*int32_t code = */initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); + int32_t code = initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); taosArrayDestroy(&pa); + if (code != TSDB_CODE_SUCCESS) { + goto _cleanup; + } return pQInfo; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e0b192be2e..c1bd818a58 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -340,9 +340,17 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO const static int32_t minSize = 8; SSDataBlock *res = calloc(1, sizeof(SSDataBlock)); - res->info.numOfCols = numOfOutput; + if (res == NULL) { + qError("failed to allocate for output buffer"); + goto _clean; + } res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + if (res->pDataBlock == NULL) { + qError("failed to init arrary for data block of output buffer"); + goto _clean; + } + for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData idata = {{0}}; idata.info.type = pExpr[i].base.resType; @@ -351,10 +359,20 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO int32_t size = MAX(idata.info.bytes * numOfRows, minSize); idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform + if (idata.pData == NULL) { + qError("failed to allocate column buffer for output buffer"); + goto _clean; + } + taosArrayPush(res->pDataBlock, &idata); + res->info.numOfCols++; } return res; + +_clean: + destroyOutputBuf(res); + return NULL; } void* destroyOutputBuf(SSDataBlock* pBlock) { @@ -1810,15 +1828,15 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) { // if param[2] is set value, input data come from client, order is no relation with pQueryAttr->order, so always return true if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT) - return true; + return true; return QUERY_IS_ASC_QUERY(pQueryAttr); } // denote the order type if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) { - // if param[2] is set value, input data come from client, order is no relation with pQueryAttr->order , so always return true + // if param[2] is set value, input data come from client, order is no relation with pQueryAttr->order, so always return true if(pCtx->param[2].nType == TSDB_DATA_TYPE_INT) - return true; + return true; return pCtx->param[0].i64 == pQueryAttr->order.order; } @@ -2072,17 +2090,26 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf switch (*op) { case OP_TagScan: { pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_MultiTableTimeInterval: { pRuntimeEnv->proot = createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); break; } case OP_TimeWindow: { pRuntimeEnv->proot = createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2092,6 +2119,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_TimeEvery: { pRuntimeEnv->proot = createTimeEveryOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2101,7 +2131,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Groupby: { pRuntimeEnv->proot = createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2111,6 +2143,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_SessionWindow: { pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2120,13 +2155,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_MultiTableAggregate: { pRuntimeEnv->proot = createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); break; } case OP_Aggregate: { pRuntimeEnv->proot = createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput && opType != OP_Join) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2146,11 +2186,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf assert(pQueryAttr->pExpr2 != NULL); pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); } + + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_StateWindow: { - pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; if (opType != OP_DummyInput) { setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); @@ -2160,6 +2207,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Limit: { pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2171,12 +2221,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } } else { SColumnInfo* pColInfo = extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } } break; @@ -2185,11 +2241,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Fill: { SOperatorInfo* pInfo = pRuntimeEnv->proot; pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_MultiwayMergeSort: { pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 200, merger); // TD-10899 + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2201,6 +2263,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2208,11 +2273,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf int32_t num = pRuntimeEnv->proot->numOfOutput; SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } case OP_Distinct: { pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -2224,6 +2295,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); } + if (pRuntimeEnv->proot == NULL) { + goto _clean; + } break; } @@ -4824,18 +4898,30 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr switch(tbScanner) { case OP_TableBlockInfoScan: { pRuntimeEnv->proot = createTableBlockInfoScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } case OP_TableSeqScan: { pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } case OP_DataBlocksOptScan: { pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } case OP_TableScan: { pRuntimeEnv->proot = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr)); + if (pRuntimeEnv->proot == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } break; } default: { // do nothing @@ -5152,6 +5238,10 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = 0; @@ -5159,6 +5249,11 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pInfo->current = 0; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + tfree(pInfo); + return NULL; + } + pOperator->name = "TableScanOperator"; pOperator->operatorType = OP_TableScan; pOperator->blockingOptr = false; @@ -5173,6 +5268,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->times = 1; @@ -5183,6 +5281,11 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE pRuntimeEnv->enableGroupData = true; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + tfree(pInfo); + return NULL; + } + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = OP_TableSeqScan; pOperator->blockingOptr = false; @@ -5197,9 +5300,15 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + if (pInfo->block.pDataBlock == NULL) { + goto _clean; + } SColumnInfoData infoData = {{0}}; infoData.info.type = TSDB_DATA_TYPE_BINARY; @@ -5208,6 +5317,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu taosArrayPush(pInfo->block.pDataBlock, &infoData); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + taosArrayDestroy(&pInfo->block.pDataBlock); + goto _clean; + } + pOperator->name = "TableBlockInfoScanOperator"; pOperator->operatorType = OP_TableBlockInfoScan; pOperator->blockingOptr = false; @@ -5218,6 +5332,11 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu pOperator->exec = doBlockInfoScan; return pOperator; + +_clean: + tfree(pInfo); + + return NULL; } void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { @@ -5285,6 +5404,10 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->times = repeatTime; pInfo->reverseTimes = reverseTime; @@ -5296,6 +5419,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime } SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); + if (pOptr == NULL) { + tfree(pInfo); + return NULL; + } + pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->operatorType = OP_DataBlocksOptScan; pOptr->pRuntimeEnv = pRuntimeEnv; @@ -5317,6 +5445,10 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); } + if (pOrderColumns == NULL) { + return NULL; + } + if (pQuery->interval.interval > 0) { if (pOrderColumns == NULL) { pOrderColumns = taosArrayInit(1, sizeof(SColIndex)); @@ -5356,7 +5488,11 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); } - for(int32_t i = 0; i < numOfCols; ++i) { + if (pOrderColumns == NULL) { + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { SColIndex* index = taosArrayGet(pOrderColumns, i); bool found = false; @@ -5384,21 +5520,45 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param; destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput); - taosArrayDestroy(&pInfo->orderColumnList); - taosArrayDestroy(&pInfo->groupColumnList); - tfree(pInfo->prevRow); - tfree(pInfo->currentGroupColData); + if (pInfo->orderColumnList) { + taosArrayDestroy(&pInfo->orderColumnList); + } + + if (pInfo->groupColumnList) { + taosArrayDestroy(&pInfo->groupColumnList); + } + + if (pInfo->prevRow) { + tfree(pInfo->prevRow); + } + + if (pInfo->currentGroupColData) { + tfree(pInfo->currentGroupColData); + } } + static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; - taosArrayDestroy(&pInfo->orderColumnList); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); - tfree(pInfo->prevRow); + + if (pInfo->orderColumnList) { + taosArrayDestroy(&pInfo->orderColumnList); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } + + if (pInfo->prevRow) { + tfree(pInfo->prevRow); + } } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); @@ -5414,6 +5574,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->orderColumnList == NULL || pInfo->groupColumnList == NULL) { + goto _clean; + } + // TODO refactor int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { @@ -5433,6 +5597,10 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0; pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); + if (pInfo->currentGroupColData == NULL) { + goto _clean; + } + offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { @@ -5443,11 +5611,18 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } pInfo->seed = rand(); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + return NULL; + } + pOperator->name = "GlobalAggregate"; pOperator->operatorType = OP_GlobalAggregate; pOperator->blockingOptr = true; @@ -5462,17 +5637,30 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyGlobalAggOperatorInfo((void *) pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, int32_t numOfRows, void *merger) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pMerge = merger; pInfo->bufCapacity = numOfRows; pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + if (pInfo->orderColumnList == NULL || pInfo->binfo.pRes == NULL) { + goto _clean; + } + { // todo extract method to create prev compare buffer int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { @@ -5492,6 +5680,10 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "MultiwaySortOperator"; pOperator->operatorType = OP_MultiwayMergeSort; pOperator->blockingOptr = false; @@ -5503,6 +5695,12 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->exec = doMultiwayMergeSort; pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; + +_clean: + destroyGlobalAggOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { @@ -5579,11 +5777,22 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } { SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); + if (pDataBlock == NULL) { + goto _clean; + } + pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < numOfOutput; ++i) { + if (pDataBlock->pDataBlock == NULL) { + goto _clean; + } + + for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData col = {{0}}; col.info.colId = pExpr[i].base.colInfo.colId; col.info.bytes = pExpr[i].base.resBytes; @@ -5601,6 +5810,10 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "InMemoryOrder"; pOperator->operatorType = OP_Order; pOperator->blockingOptr = true; @@ -5612,6 +5825,12 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyOrderOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { @@ -5891,8 +6110,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { if (pRuntimeEnv->currentOffset == 0) { break; - } - else if(srows > 0) { + } else if(srows > 0) { if(pRuntimeEnv->currentOffset - srows >= pBlock->info.rows) { pRuntimeEnv->currentOffset -= pBlock->info.rows; } else { @@ -6982,6 +7200,9 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); @@ -6991,10 +7212,18 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + pInfo->seed = rand(); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "TableAggregate"; pOperator->operatorType = OP_Aggregate; pOperator->blockingOptr = true; @@ -7009,31 +7238,53 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyAggOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { assert(pInfo != NULL); - destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); - tfree(pInfo->rowCellInfoOffset); + if (pInfo->pCtx) { + destroySQLFunctionCtx(pInfo->pCtx, numOfOutput); + } + + if (pInfo->rowCellInfoOffset) { + tfree(pInfo->rowCellInfoOffset); + } + + if (pInfo->resultRowInfo.pResult) { + cleanupResultRowInfo(&pInfo->resultRowInfo); + } - cleanupResultRowInfo(&pInfo->resultRowInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } } static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; doDestroyBasicInfo(pInfo, numOfOutput); } + static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - tfree(pInfo->prevData); + + if (pInfo->prevData) { + tfree(pInfo->prevData); + } } + static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } + static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -7041,15 +7292,27 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; - pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); - tfree(pInfo->p); + + if (pInfo->pFillInfo) { + pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } + + if (pInfo->p) { + tfree(pInfo->p); + } } static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - tfree(pInfo->prevData); + + if (pInfo->prevData) { + tfree(pInfo->prevData); + } } static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { @@ -7060,18 +7323,27 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTimeEveryOperatorInfo(void* param, int32_t numOfOutput) { STimeEveryOperatorInfo* pInfo = (STimeEveryOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - taosHashCleanup(pInfo->rangeStart); + + if (pInfo->rangeStart) { + taosHashCleanup(pInfo->rangeStart); + } } static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; - pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); + + if (pInfo->pDataBlock) { + pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); + } } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -7081,14 +7353,29 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; - taosHashCleanup(pInfo->pSet); - tfree(pInfo->buf); - taosArrayDestroy(&pInfo->pDistinctDataInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + + if (pInfo->pSet) { + taosHashCleanup(pInfo->pSet); + } + + if (pInfo->buf) { + tfree(pInfo->buf); + } + + if (pInfo->pDistinctDataInfo) { + taosArrayDestroy(&pInfo->pDistinctDataInfo); + } + + if (pInfo->pRes) { + pInfo->pRes = destroyOutputBuf(pInfo->pRes); + } } SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); @@ -7096,7 +7383,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "MultiTableAggregate"; pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; @@ -7111,10 +7406,19 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyAggOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->seed = rand(); pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; @@ -7124,9 +7428,18 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + if (pInfo->binfo.pRes == NULL || pInfo->binfo.pCtx == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "ProjectOperator"; pOperator->operatorType = OP_Project; pOperator->blockingOptr = false; @@ -7141,6 +7454,12 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyProjectOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) { @@ -7175,12 +7494,18 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } assert(numOfFilter > 0 && pCols != NULL); doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); pInfo->numOfFilterCols = numOfFilter; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "FilterOperator"; pOperator->operatorType = OP_Filter; @@ -7195,13 +7520,27 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyConditionOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + tfree(pInfo); + return NULL; + } pOperator->name = "LimitOperator"; pOperator->operatorType = OP_Limit; @@ -7217,12 +7556,22 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->pRes == NULL || pInfo->pCtx == NULL || pInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "TimeIntervalAggOperator"; pOperator->operatorType = OP_TimeWindow; @@ -7237,12 +7586,22 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyBasicOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STimeEveryOperatorInfo* pInfo = calloc(1, sizeof(STimeEveryOperatorInfo)); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + if (pInfo == NULL) { + return NULL; + } + + SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; pInfo->seed = rand(); pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; @@ -7258,9 +7617,20 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera } initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + + if (pBInfo->pRes == NULL || pBInfo->pCtx == NULL || pBInfo->resultRowInfo.pResult == NULL || + (pQueryAttr->needReverseScan && pInfo->rangeStart == NULL)) + { + goto _clean; + } + setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "TimeEveryOperator"; pOperator->operatorType = OP_TimeEvery; pOperator->blockingOptr = false; @@ -7275,18 +7645,36 @@ SOperatorInfo* createTimeEveryOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyTimeEveryOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->colIndex = -1; pInfo->reptScan = false; pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "StateWindowOperator"; pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; @@ -7300,17 +7688,34 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyStateWindowOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } + SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + pInfo->prevTs = INT64_MIN; pInfo->reptScan = false; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = OP_SessionWindow; @@ -7325,16 +7730,33 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyStateWindowOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->pCtx == NULL || pInfo->pRes == NULL || pInfo->resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "MultiTableTimeIntervalOperator"; pOperator->operatorType = OP_MultiTableTimeInterval; pOperator->blockingOptr = true; @@ -7349,14 +7771,22 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyBasicOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); - pInfo->colIndex = -1; // group by column index - + if (pInfo == NULL) { + return NULL; + } + pInfo->colIndex = -1; // group by column index pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -7367,7 +7797,15 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL || pInfo->binfo.resultRowInfo.pResult == NULL) { + goto _clean; + } + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; @@ -7381,16 +7819,34 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyGroupbyOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + if (pInfo->pRes == NULL) { + goto _clean; + } + pInfo->multigroupResult = multigroupResult; { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal); + if (pColInfo == NULL) { + goto _clean; + } + STimeWindow w = TSWINDOW_INITIALIZER; TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); @@ -7401,11 +7857,20 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + if (pInfo->pFillInfo == NULL) { + goto _clean; + } pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES); + if (pInfo->p == NULL) { + goto _clean; + } } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } pOperator->name = "FillOperator"; pOperator->blockingOptr = false; @@ -7420,14 +7885,27 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroySFillOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); + if (pInfo->orderColumnList == NULL) { + goto _clean; + } + pInfo->slimit = pQueryAttr->slimit; pInfo->limit = pQueryAttr->limit; pInfo->capacity = pRuntimeEnv->resultInfo.capacity; @@ -7444,6 +7922,9 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + if (pInfo->prevRow == NULL) { + goto _clean; + } int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { @@ -7457,6 +7938,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo->pRes == NULL || pOperator == NULL) { + goto _clean; + } + pOperator->name = "SLimitOperator"; pOperator->operatorType = OP_SLimit; pOperator->blockingOptr = false; @@ -7468,6 +7953,12 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroySlimitOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static SSDataBlock* doTagScan(void* param, bool* newgroup) { @@ -7618,7 +8109,14 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + if (pInfo->pRes == NULL) { + goto _clean; + } size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); @@ -7627,6 +8125,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf pInfo->curPos = 0; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "SeqTableTagScan"; pOperator->operatorType = OP_TagScan; pOperator->blockingOptr = false; @@ -7639,7 +8141,14 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf pOperator->cleanup = destroyTagScanOperatorInfo; return pOperator; + +_clean: + destroyTagScanOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } + static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) { if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { // distinct info already inited @@ -7756,6 +8265,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->totalBytes = 0; pInfo->buf = NULL; pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold @@ -7764,8 +8277,15 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); + if (pInfo->pDistinctDataInfo == NULL || pInfo->pSet == NULL || pInfo->pRes == NULL) { + goto _clean; + } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL) { + goto _clean; + } + pOperator->name = "DistinctOperator"; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; @@ -7780,6 +8300,12 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat appendUpstream(pOperator, upstream); return pOperator; + +_clean: + destroyDistinctOperatorInfo((void *)pInfo, numOfOutput); + tfree(pInfo); + + return NULL; } static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { @@ -8991,9 +9517,9 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S doUpdateExprColumnIndex(pQueryAttr); - // calc skipOffset + // calc skipOffset if(pQueryMsg->offset > 0 && TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_PROJECTION_QUERY)) { - if(pQueryAttr->stableQuery) + if(pQueryAttr->stableQuery) pQueryAttr->skipOffset = false; else pQueryAttr->skipOffset = pQueryAttr->pFilters == NULL; diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 9694dac7db..dbe385e249 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -354,6 +354,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 } SFillInfo* pFillInfo = calloc(1, sizeof(SFillInfo)); + if (pFillInfo == NULL) { + return NULL; + } + taosResetFillInfo(pFillInfo, skey); pFillInfo->order = order; @@ -371,6 +375,10 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 pFillInfo->interval.slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); + if (pFillInfo->pData == NULL) { + tfree(pFillInfo); + return NULL; + } // if (numOfTags > 0) { pFillInfo->pTags = calloc(numOfCols, sizeof(SFillTagColInfo)); -- GitLab